PostgreSQL Source Code: src/backend/replication/logical/applyparallelworker.c File Reference (original) (raw)

Go to the source code of this file.

Macros
#define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
#define PARALLEL_APPLY_KEY_SHARED 1
#define PARALLEL_APPLY_KEY_MQ 2
#define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
#define DSM_QUEUE_SIZE (16 * 1024 * 1024)
#define DSM_ERROR_QUEUE_SIZE (16 * 1024)
#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
#define PARALLEL_APPLY_LOCK_STREAM 0
#define PARALLEL_APPLY_LOCK_XACT 1
#define SHM_SEND_RETRY_INTERVAL_MS 1000
#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
Functions
static void pa_free_worker_info (ParallelApplyWorkerInfo *winfo)
static ParallelTransState pa_get_xact_state (ParallelApplyWorkerShared *wshared)
static PartialFileSetState pa_get_fileset_state (void)
static bool pa_can_start (void)
static bool pa_setup_dsm (ParallelApplyWorkerInfo *winfo)
static ParallelApplyWorkerInfo * pa_launch_parallel_worker (void)
void pa_allocate_worker (TransactionId xid)
ParallelApplyWorkerInfo * pa_find_worker (TransactionId xid)
static void pa_free_worker (ParallelApplyWorkerInfo *winfo)
void pa_detach_all_error_mq (void)
static bool pa_has_spooled_message_pending ()
static bool pa_process_spooled_messages_if_required (void)
static void ProcessParallelApplyInterrupts (void)
static void LogicalParallelApplyLoop (shm_mq_handle *mqh)
static void pa_shutdown (int code, Datum arg)
void ParallelApplyWorkerMain (Datum main_arg)
void HandleParallelApplyMessageInterrupt (void)
static void ProcessParallelApplyMessage (StringInfo msg)
void ProcessParallelApplyMessages (void)
bool pa_send_data (ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_switch_to_partial_serialize (ParallelApplyWorkerInfo *winfo, bool stream_locked)
static void pa_wait_for_xact_state (ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)
static void pa_wait_for_xact_finish (ParallelApplyWorkerInfo *winfo)
void pa_set_xact_state (ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_set_stream_apply_worker (ParallelApplyWorkerInfo *winfo)
static void pa_savepoint_name (Oid suboid, TransactionId xid, char *spname, Size szsp)
void pa_start_subtrans (TransactionId current_xid, TransactionId top_xid)
void pa_reset_subtrans (void)
void pa_stream_abort (LogicalRepStreamAbortData *abort_data)
void pa_set_fileset_state (ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_lock_stream (TransactionId xid, LOCKMODE lockmode)
void pa_unlock_stream (TransactionId xid, LOCKMODE lockmode)
void pa_lock_transaction (TransactionId xid, LOCKMODE lockmode)
void pa_unlock_transaction (TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block (void)
void pa_xact_finish (ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
Variables
static HTAB * ParallelApplyTxnHash = NULL
static List * ParallelApplyWorkerPool = NIL
ParallelApplyWorkerShared * MyParallelShared = NULL
volatile sig_atomic_t ParallelApplyMessagePending = false
static ParallelApplyWorkerInfo * stream_apply_worker = NULL
static List * subxactlist = NIL

DSM_ERROR_QUEUE_SIZE

#define DSM_ERROR_QUEUE_SIZE (16 * 1024)

DSM_QUEUE_SIZE

#define DSM_QUEUE_SIZE (16 * 1024 * 1024)

PARALLEL_APPLY_KEY_ERROR_QUEUE

#define PARALLEL_APPLY_KEY_ERROR_QUEUE 3

PARALLEL_APPLY_KEY_MQ

#define PARALLEL_APPLY_KEY_MQ 2

PARALLEL_APPLY_KEY_SHARED

#define PARALLEL_APPLY_KEY_SHARED 1

PARALLEL_APPLY_LOCK_STREAM

#define PARALLEL_APPLY_LOCK_STREAM 0

PARALLEL_APPLY_LOCK_XACT

#define PARALLEL_APPLY_LOCK_XACT 1

PG_LOGICAL_APPLY_SHM_MAGIC

#define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067

SHM_SEND_RETRY_INTERVAL_MS

#define SHM_SEND_RETRY_INTERVAL_MS 1000

SHM_SEND_TIMEOUT_MS

SIZE_STATS_MESSAGE

ParallelApplyWorkerEntry

HandleParallelApplyMessageInterrupt()

void HandleParallelApplyMessageInterrupt ( void )

LogicalParallelApplyLoop()

static void LogicalParallelApplyLoop ( shm_mq_handle * mqh) static

Definition at line 734 of file applyparallelworker.c.

735{

739

740

741

742

743

745 "ApplyMessageContext",

747

748

749

750

751

755

756 for (;;)

757 {

760

762

763

765

767

769 {

771 int c;

772

773 if (len == 0)

774 elog(ERROR, "invalid message length");

775

777

778

779

780

781

783 if (c != 'w')

784 elog(ERROR, "unexpected message \"%c\"", c);

785

786

787

788

789

790

791

792

793

794

796

798 }

800 {

801

803 {

804 int rc;

805

806

809 1000L,

810 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);

811

814 }

815 }

816 else

817 {

819

821 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

822 errmsg("lost connection to the logical replication apply worker")));

823 }

824

827 }

828

829

831

833}

static void ProcessParallelApplyInterrupts(void)

#define SIZE_STATS_MESSAGE

static bool pa_process_spooled_messages_if_required(void)

MemoryContext ApplyMessageContext

void apply_dispatch(StringInfo s)

MemoryContext ApplyContext

void apply_error_callback(void *arg)

ErrorContextCallback * error_context_stack

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

Assert(PointerIsAligned(start, uint64))

void ResetLatch(Latch *latch)

int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)

void MemoryContextReset(MemoryContext context)

MemoryContext CurrentMemoryContext

#define AllocSetContextCreate

#define ALLOCSET_DEFAULT_SIZES

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

int pq_getmsgbyte(StringInfo msg)

shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)

static void initReadOnlyStringInfo(StringInfo str, char *data, int len)

struct ErrorContextCallback * previous

void(* callback)(void *arg)

#define WL_EXIT_ON_PM_DEATH

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), ApplyContext, ApplyMessageContext, Assert(), ErrorContextCallback::callback, CurrentMemoryContext, StringInfoData::cursor, data, elog, ereport, errcode(), errmsg(), ERROR, error_context_stack, initReadOnlyStringInfo(), len, MemoryContextReset(), MemoryContextSwitchTo(), MyLatch, pa_process_spooled_messages_if_required(), pq_getmsgbyte(), ErrorContextCallback::previous, ProcessParallelApplyInterrupts(), ResetLatch(), SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SIZE_STATS_MESSAGE, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by ParallelApplyWorkerMain().

pa_allocate_worker()

Definition at line 470 of file applyparallelworker.c.

471{

472 bool found;

475

477 return;

478

480 if (!winfo)

481 return;

482

483

485 {

487

492

494 16, &ctl,

496 }

497

498

500 if (found)

501 elog(ERROR, "hash table corrupted");

502

503

508

509 winfo->in_use = true;

511 entry->winfo = winfo;

512}

struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry

static bool pa_can_start(void)

static HTAB * ParallelApplyTxnHash

static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)

#define MemSet(start, val, len)

void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)

HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)

#define SpinLockRelease(lock)

#define SpinLockAcquire(lock)

ParallelApplyWorkerInfo * winfo

ParallelApplyWorkerShared * shared

References ApplyContext, ctl, elog, ERROR, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ParallelApplyWorkerInfo::in_use, MemSet, ParallelApplyWorkerShared::mutex, pa_can_start(), pa_launch_parallel_worker(), PARALLEL_TRANS_UNKNOWN, ParallelApplyTxnHash, ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, SpinLockAcquire, SpinLockRelease, ParallelApplyWorkerEntry::winfo, ParallelApplyWorkerShared::xact_state, and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_start().

pa_can_start()

static bool pa_can_start ( void ) static

pa_decr_and_wait_stream_block()

void pa_decr_and_wait_stream_block ( void )

Definition at line 1591 of file applyparallelworker.c.

1592{

1594

1595

1596

1597

1598

1600 {

1602 return;

1603

1604 elog(ERROR, "invalid pending streaming chunk 0");

1605 }

1606

1608 {

1611 }

1612}

void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)

void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)

static bool pa_has_spooled_message_pending()

ParallelApplyWorkerShared * MyParallelShared

static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)

static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)

static bool am_parallel_apply_worker(void)

References AccessShareLock, am_parallel_apply_worker(), Assert(), elog, ERROR, MyParallelShared, pa_has_spooled_message_pending(), pa_lock_stream(), pa_unlock_stream(), ParallelApplyWorkerShared::pending_stream_count, pg_atomic_read_u32(), pg_atomic_sub_fetch_u32(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), and apply_handle_stream_stop().

pa_detach_all_error_mq()

void pa_detach_all_error_mq ( void )

pa_find_worker()

pa_free_worker()

Definition at line 556 of file applyparallelworker.c.

557{

561

563 elog(ERROR, "hash table corrupted");

564

565

566

567

568

569

570

571

572

573

574

575

579 {

582

583 return;

584 }

585

586 winfo->in_use = false;

588}

static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)

static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)

void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)

int max_parallel_apply_workers_per_subscription

static int list_length(const List *l)

@ PARALLEL_TRANS_FINISHED

References am_parallel_apply_worker(), Assert(), elog, ERROR, HASH_REMOVE, hash_search(), ParallelApplyWorkerInfo::in_use, list_length(), logicalrep_pa_worker_stop(), max_parallel_apply_workers_per_subscription, pa_free_worker_info(), pa_get_xact_state(), PARALLEL_TRANS_FINISHED, ParallelApplyTxnHash, ParallelApplyWorkerPool, ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, and ParallelApplyWorkerShared::xid.

Referenced by pa_xact_finish().

pa_free_worker_info()

Definition at line 595 of file applyparallelworker.c.

596{

598

601

604

605

608

611

612

614

616}

void stream_cleanup_files(Oid subid, TransactionId xid)

void dsm_detach(dsm_segment *seg)

List * list_delete_ptr(List *list, void *datum)

void pfree(void *pointer)

shm_mq_handle * mq_handle

References Assert(), dsm_detach(), ParallelApplyWorkerInfo::dsm_seg, ParallelApplyWorkerInfo::error_mq_handle, list_delete_ptr(), ParallelApplyWorkerInfo::mq_handle, MyLogicalRepWorker, ParallelApplyWorkerPool, pfree(), ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, shm_mq_detach(), stream_cleanup_files(), LogicalRepWorker::subid, and ParallelApplyWorkerShared::xid.

Referenced by pa_free_worker(), and pa_launch_parallel_worker().

pa_get_fileset_state()

pa_get_xact_state()

pa_has_spooled_message_pending()

static bool pa_has_spooled_message_pending ( ) static

pa_launch_parallel_worker()

Definition at line 404 of file applyparallelworker.c.

405{

407 bool launched;

410

411

413 {

415

417 return winfo;

418 }

419

420

421

422

423

424

425

427

429

430

432 {

435 return NULL;

436 }

437

445

446 if (launched)

447 {

449 }

450 else

451 {

453 winfo = NULL;

454 }

455

457

458 return winfo;

459}

static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)

dsm_handle dsm_segment_handle(dsm_segment *seg)

bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)

List * lappend(List *list, void *datum)

void * palloc0(Size size)

@ WORKERTYPE_PARALLEL_APPLY

References ApplyContext, LogicalRepWorker::dbid, ParallelApplyWorkerInfo::dsm_seg, dsm_segment_handle(), ParallelApplyWorkerInfo::in_use, InvalidOid, lappend(), lfirst, logicalrep_worker_launch(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pa_free_worker_info(), pa_setup_dsm(), palloc0(), ParallelApplyWorkerPool, pfree(), LogicalRepWorker::userid, and WORKERTYPE_PARALLEL_APPLY.

Referenced by pa_allocate_worker().

pa_lock_stream()

Definition at line 1540 of file applyparallelworker.c.

1541{

1544}

#define PARALLEL_APPLY_LOCK_STREAM

void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)

References LockApplyTransactionForSession(), MyLogicalRepWorker, PARALLEL_APPLY_LOCK_STREAM, and LogicalRepWorker::subid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_stop(), pa_decr_and_wait_stream_block(), pa_process_spooled_messages_if_required(), and pa_switch_to_partial_serialize().

pa_lock_transaction()

pa_process_spooled_messages_if_required()

static bool pa_process_spooled_messages_if_required ( void ) static

Definition at line 658 of file applyparallelworker.c.

659{

661

663

664 if (fileset_state == FS_EMPTY)

665 return false;

666

667

668

669

670

671

672

673

674

675

676

677

679 {

682

684 }

685

686

687

688

689

690

691

692

694 {

696 }

697 else if (fileset_state == FS_READY)

698 {

703 }

704

705 return true;

706}

void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)

void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)

@ FS_SERIALIZE_IN_PROGRESS

#define InvalidXLogRecPtr

References AccessShareLock, apply_spooled_messages(), ParallelApplyWorkerShared::fileset, FS_EMPTY, FS_READY, FS_SERIALIZE_DONE, FS_SERIALIZE_IN_PROGRESS, InvalidXLogRecPtr, MyParallelShared, pa_get_fileset_state(), pa_lock_stream(), pa_set_fileset_state(), pa_unlock_stream(), and ParallelApplyWorkerShared::xid.

Referenced by LogicalParallelApplyLoop().

pa_reset_subtrans()

void pa_reset_subtrans ( void )

pa_savepoint_name()

pa_send_data()

Definition at line 1146 of file applyparallelworker.c.

1147{

1148 int rc;

1151

1154

1155

1156

1157

1158

1160 return false;

1161

1162

1163

1164

1165

1166

1167

1168#define SHM_SEND_RETRY_INTERVAL_MS 1000

1169#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)

1170

1171 for (;;)

1172 {

1174

1176 return true;

1179 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1180 errmsg("could not send data to shared-memory queue")));

1181

1183

1184

1188 WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);

1189

1191 {

1194 }

1195

1196 if (startTime == 0)

1200 return false;

1201 }

1202}

#define SHM_SEND_TIMEOUT_MS

#define SHM_SEND_RETRY_INTERVAL_MS

bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)

TimestampTz GetCurrentTimestamp(void)

#define CHECK_FOR_INTERRUPTS()

int debug_logical_replication_streaming

@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE

shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)

bool IsTransactionState(void)

References Assert(), CHECK_FOR_INTERRUPTS, data, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, ereport, errcode(), errmsg(), ERROR, GetCurrentTimestamp(), IsTransactionState(), ParallelApplyWorkerInfo::mq_handle, MyLatch, ResetLatch(), ParallelApplyWorkerInfo::serialize_changes, SHM_MQ_DETACHED, shm_mq_send(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SHM_SEND_RETRY_INTERVAL_MS, SHM_SEND_TIMEOUT_MS, TimestampDifferenceExceeds(), unlikely, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

pa_set_fileset_state()

Definition at line 1498 of file applyparallelworker.c.

1500{

1503

1505 {

1509 }

1510

1512}

References am_leader_apply_worker(), Assert(), ParallelApplyWorkerShared::fileset, ParallelApplyWorkerShared::fileset_state, FS_SERIALIZE_DONE, ParallelApplyWorkerShared::mutex, MyLogicalRepWorker, SpinLockAcquire, SpinLockRelease, and LogicalRepWorker::stream_fileset.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), pa_process_spooled_messages_if_required(), and pa_switch_to_partial_serialize().

pa_set_stream_apply_worker()

pa_set_xact_state()

pa_setup_dsm()

Definition at line 327 of file applyparallelworker.c.

328{

330 Size segsize;

337

338

339

340

341

342

343

344

345

346

347

352

355

356

358 if (!seg)

359 return false;

360

362 segsize);

363

364

367

372

374

375

379

380

382

383

385 error_queue_size);

388

389

391

392

394 winfo->shared = shared;

395

396 return true;

397}

#define DSM_ERROR_QUEUE_SIZE

#define PARALLEL_APPLY_KEY_SHARED

#define PARALLEL_APPLY_KEY_ERROR_QUEUE

#define PARALLEL_APPLY_KEY_MQ

#define PG_LOGICAL_APPLY_SHM_MAGIC

static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)

void * dsm_segment_address(dsm_segment *seg)

dsm_segment * dsm_create(Size size, int flags)

void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)

shm_mq * shm_mq_create(void *address, Size size)

void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)

shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)

void * shm_toc_allocate(shm_toc *toc, Size nbytes)

Size shm_toc_estimate(shm_toc_estimator *e)

shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)

void shm_toc_insert(shm_toc *toc, uint64 key, void *address)

#define shm_toc_estimate_chunk(e, sz)

#define shm_toc_initialize_estimator(e)

#define shm_toc_estimate_keys(e, cnt)

#define SpinLockInit(lock)

References dsm_create(), DSM_ERROR_QUEUE_SIZE, DSM_QUEUE_SIZE, ParallelApplyWorkerInfo::dsm_seg, dsm_segment_address(), ParallelApplyWorkerInfo::error_mq_handle, ParallelApplyWorkerShared::fileset_state, FS_EMPTY, InvalidXLogRecPtr, ParallelApplyWorkerShared::last_commit_end, ParallelApplyWorkerInfo::mq_handle, ParallelApplyWorkerShared::mutex, MyProc, PARALLEL_APPLY_KEY_ERROR_QUEUE, PARALLEL_APPLY_KEY_MQ, PARALLEL_APPLY_KEY_SHARED, PARALLEL_TRANS_UNKNOWN, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_init_u32(), PG_LOGICAL_APPLY_SHM_MAGIC, ParallelApplyWorkerInfo::shared, shm_mq_attach(), shm_mq_create(), shm_mq_set_receiver(), shm_mq_set_sender(), shm_toc_allocate(), shm_toc_create(), shm_toc_estimate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_initialize_estimator, shm_toc_insert(), SpinLockInit, and ParallelApplyWorkerShared::xact_state.

Referenced by pa_launch_parallel_worker().

pa_shutdown()

static void pa_shutdown ( int code, Datum arg ) static

pa_start_subtrans()

Definition at line 1362 of file applyparallelworker.c.

1363{

1364 if (current_xid != top_xid &&

1366 {

1369

1371 spname, sizeof(spname));

1372

1373 elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);

1374

1375

1377 {

1380

1383 }

1384

1386

1387

1388

1389

1390

1391

1393

1397 }

1398}

static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)

List * lappend_xid(List *list, TransactionId datum)

bool list_member_xid(const List *list, TransactionId datum)

MemoryContext TopTransactionContext

void DefineSavepoint(const char *name)

void StartTransactionCommand(void)

bool IsTransactionBlock(void)

void BeginTransactionBlock(void)

void CommitTransactionCommand(void)

References BeginTransactionBlock(), CommitTransactionCommand(), DEBUG1, DefineSavepoint(), elog, IsTransactionBlock(), IsTransactionState(), lappend_xid(), list_member_xid(), MemoryContextSwitchTo(), MySubscription, NAMEDATALEN, Subscription::oid, pa_savepoint_name(), StartTransactionCommand(), subxactlist, and TopTransactionContext.

Referenced by handle_streamed_transaction().

pa_stream_abort()

Definition at line 1416 of file applyparallelworker.c.

1417{

1420

1421

1422

1423

1424

1427

1428

1429

1430

1431

1432 if (subxid == xid)

1433 {

1435

1436

1437

1438

1439

1440

1441

1442

1443

1444

1445

1447

1449

1451 {

1454 }

1455

1457

1459 }

1460 else

1461 {

1462

1463 int i;

1465

1467

1468 elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);

1469

1470

1471

1472

1473

1474

1475

1476

1478 {

1480

1481 if (xid_tmp == subxid)

1482 {

1486 break;

1487 }

1488 }

1489 }

1490}

void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)

void pa_reset_subtrans(void)

void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)

void pgstat_report_activity(BackendState state, const char *cmd_str)

List * list_truncate(List *list, int new_size)

#define AccessExclusiveLock

TimestampTz replorigin_session_origin_timestamp

XLogRecPtr replorigin_session_origin_lsn

static ListCell * list_nth_cell(const List *list, int n)

void RollbackToSavepoint(const char *name)

bool EndTransactionBlock(bool chain)

void AbortCurrentTransaction(void)

References LogicalRepStreamAbortData::abort_lsn, LogicalRepStreamAbortData::abort_time, AbortCurrentTransaction(), AccessExclusiveLock, CommitTransactionCommand(), DEBUG1, elog, EndTransactionBlock(), i, IsTransactionBlock(), lfirst_xid, list_length(), list_nth_cell(), list_truncate(), MyParallelShared, MySubscription, NAMEDATALEN, Subscription::oid, pa_reset_subtrans(), pa_savepoint_name(), pa_set_xact_state(), pa_unlock_transaction(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, RollbackToSavepoint(), STATE_IDLE, subxactlist, LogicalRepStreamAbortData::subxid, and LogicalRepStreamAbortData::xid.

Referenced by apply_handle_stream_abort().

pa_switch_to_partial_serialize()

Definition at line 1211 of file applyparallelworker.c.

1213{

1215 (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",

1217

1218

1219

1220

1221

1222

1224

1225

1227

1228

1229

1230

1231

1232

1233 if (!stream_locked)

1235

1237}

void stream_start_internal(TransactionId xid, bool first_segment)

References AccessExclusiveLock, ereport, errmsg(), FS_SERIALIZE_IN_PROGRESS, LOG, pa_lock_stream(), pa_set_fileset_state(), ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, stream_start_internal(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

pa_unlock_stream()

Definition at line 1547 of file applyparallelworker.c.

1548{

1551}

void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)

References MyLogicalRepWorker, PARALLEL_APPLY_LOCK_STREAM, LogicalRepWorker::subid, and UnlockApplyTransactionForSession().

Referenced by apply_handle_stream_abort(), apply_handle_stream_start(), pa_decr_and_wait_stream_block(), pa_process_spooled_messages_if_required(), and pa_xact_finish().

pa_unlock_transaction()

pa_wait_for_xact_finish()

Definition at line 1274 of file applyparallelworker.c.

1275{

1276

1277

1278

1279

1280

1281

1283

1284

1285

1286

1287

1288

1291

1292

1293

1294

1295

1296

1299 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1300 errmsg("lost connection to the logical replication parallel apply worker")));

1301}

void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)

static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)

References AccessShareLock, ereport, errcode(), errmsg(), ERROR, pa_get_xact_state(), pa_lock_transaction(), pa_unlock_transaction(), pa_wait_for_xact_state(), PARALLEL_TRANS_FINISHED, PARALLEL_TRANS_STARTED, ParallelApplyWorkerInfo::shared, and ParallelApplyWorkerShared::xid.

Referenced by pa_xact_finish().

pa_wait_for_xact_state()

pa_xact_finish()

Definition at line 1618 of file applyparallelworker.c.

1619{

1621

1622

1623

1624

1625

1627

1628

1629

1630

1631

1632

1634

1637

1639}

static void pa_free_worker(ParallelApplyWorkerInfo *winfo)

static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)

void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)

References AccessExclusiveLock, am_leader_apply_worker(), Assert(), ParallelApplyWorkerShared::last_commit_end, pa_free_worker(), pa_unlock_stream(), pa_wait_for_xact_finish(), ParallelApplyWorkerInfo::shared, store_flush_position(), ParallelApplyWorkerShared::xid, and XLogRecPtrIsInvalid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), and apply_handle_stream_prepare().

ParallelApplyWorkerMain()

void ParallelApplyWorkerMain ( Datum main_arg )

Definition at line 857 of file applyparallelworker.c.

858{

869

871

872

877

878

879

880

881

882

883

884

887 if (!seg)

889 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

890 errmsg("could not map dynamic shared memory segment")));

891

893 if (!toc)

895 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

896 errmsg("invalid magic number in dynamic shared memory segment")));

897

898

901

902

903

904

908

909

910

911

912

913

915

916

917

918

919

920

922

927

928

929

930

934

938

941

943

945

946

949 originname, sizeof(originname));

951

952

953

954

955

959

960

961

962

963

967

969

971

972

973

974

975

976

977

979}

static void pa_shutdown(int code, Datum arg)

static void LogicalParallelApplyLoop(shm_mq_handle *mqh)

bool InitializingApplyWorker

void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)

void set_apply_error_context_origin(char *originname)

void InitializeLogRepWorker(void)

void BackgroundWorkerUnblockSignals(void)

dsm_segment * dsm_attach(dsm_handle h)

void SignalHandlerForShutdownRequest(SIGNAL_ARGS)

void SignalHandlerForConfigReload(SIGNAL_ARGS)

void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)

void before_shmem_exit(pg_on_exit_callback function, Datum arg)

void logicalrep_worker_attach(int slot)

RepOriginId replorigin_by_name(const char *roname, bool missing_ok)

RepOriginId replorigin_session_origin

void replorigin_session_setup(RepOriginId node, int acquired_by)

static Datum PointerGetDatum(const void *X)

static int32 DatumGetInt32(Datum X)

BackgroundWorker * MyBgworkerEntry

void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)

void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)

void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)

shm_toc * shm_toc_attach(uint64 magic, void *address)

char bgw_extra[BGW_EXTRALEN]

TimestampTz last_recv_time

TimestampTz last_send_time

void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)

References Assert(), BackgroundWorkerUnblockSignals(), before_shmem_exit(), BackgroundWorker::bgw_extra, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), DatumGetInt32(), die, dsm_attach(), dsm_segment_address(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::generation, InitializeLogRepWorker(), InitializingApplyWorker, INVALID_PROC_NUMBER, invalidate_syncing_table_states(), InvalidOid, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::leader_pid, LogicalParallelApplyLoop(), logicalrep_worker_attach(), ParallelApplyWorkerShared::logicalrep_worker_generation, ParallelApplyWorkerShared::logicalrep_worker_slot_no, ParallelApplyWorkerShared::mutex, MyBgworkerEntry, MyLogicalRepWorker, MyParallelShared, MyProc, MySubscription, NAMEDATALEN, Subscription::oid, pa_shutdown(), PARALLEL_APPLY_KEY_ERROR_QUEUE, PARALLEL_APPLY_KEY_MQ, PARALLEL_APPLY_KEY_SHARED, PG_LOGICAL_APPLY_SHM_MAGIC, PointerGetDatum(), pq_redirect_to_shm_mq(), pq_set_parallel_leader(), pqsignal, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, set_apply_error_context_origin(), shm_mq_attach(), shm_mq_set_receiver(), shm_mq_set_sender(), shm_toc_attach(), shm_toc_lookup(), SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SpinLockAcquire, SpinLockRelease, and StartTransactionCommand().

ProcessParallelApplyInterrupts()

static void ProcessParallelApplyInterrupts ( void ) static

Definition at line 712 of file applyparallelworker.c.

713{

715

717 {

719 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",

721

723 }

724

726 {

729 }

730}

void ProcessConfigFile(GucContext context)

volatile sig_atomic_t ShutdownRequestPending

volatile sig_atomic_t ConfigReloadPending

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, ereport, errmsg(), LOG, MySubscription, Subscription::name, PGC_SIGHUP, proc_exit(), ProcessConfigFile(), and ShutdownRequestPending.

Referenced by LogicalParallelApplyLoop().

ProcessParallelApplyMessage()

static void ProcessParallelApplyMessage ( StringInfo msg) static

Definition at line 1001 of file applyparallelworker.c.

1002{

1003 char msgtype;

1004

1006

1007 switch (msgtype)

1008 {

1009 case 'E':

1010 {

1012

1013

1015

1016

1017

1018

1019

1020

1021

1024 _("logical replication parallel apply worker"));

1025 else

1026 edata.context = pstrdup(_("logical replication parallel apply worker"));

1027

1028

1029

1030

1031

1033

1034

1035

1036

1037

1039 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1040 errmsg("logical replication parallel apply worker exited due to error"),

1042 }

1043

1044

1045

1046

1047

1048

1049 case 'N':

1050 case 'A':

1051 break;

1052

1053 default:

1054 elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",

1055 msgtype, msg->len);

1056 }

1057}

ErrorContextCallback * apply_error_context_stack

char * pstrdup(const char *in)

void pq_parse_errornotice(StringInfo msg, ErrorData *edata)

char * psprintf(const char *fmt,...)

References _, apply_error_context_stack, ErrorData::context, elog, ereport, errcode(), errcontext, errmsg(), ERROR, error_context_stack, StringInfoData::len, pq_getmsgbyte(), pq_parse_errornotice(), psprintf(), and pstrdup().

Referenced by ProcessParallelApplyMessages().

ProcessParallelApplyMessages()

void ProcessParallelApplyMessages ( void )

Definition at line 1063 of file applyparallelworker.c.

1064{

1067

1069

1070

1071

1072

1073

1074

1075

1076

1078

1079

1080

1081

1082

1083

1084 if (!hpam_context)

1086 "ProcessParallelApplyMessages",

1088 else

1090

1092

1094

1096 {

1098 Size nbytes;

1101

1102

1103

1104

1105

1106

1107

1109 continue;

1110

1112

1114 continue;

1116 {

1118

1123 }

1124 else

1126 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1127 errmsg("lost connection to the logical replication parallel apply worker")));

1128 }

1129

1131

1132

1134

1136}

static void ProcessParallelApplyMessage(StringInfo msg)

MemoryContext TopMemoryContext

#define RESUME_INTERRUPTS()

#define HOLD_INTERRUPTS()

void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)

void initStringInfo(StringInfo str)

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), StringInfoData::data, data, ereport, errcode(), errmsg(), ERROR, ParallelApplyWorkerInfo::error_mq_handle, HOLD_INTERRUPTS, initStringInfo(), lfirst, MemoryContextReset(), MemoryContextSwitchTo(), ParallelApplyMessagePending, ParallelApplyWorkerPool, pfree(), ProcessParallelApplyMessage(), RESUME_INTERRUPTS, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and TopMemoryContext.

Referenced by ProcessInterrupts().

MyParallelShared

ParallelApplyMessagePending

volatile sig_atomic_t ParallelApplyMessagePending = false

ParallelApplyTxnHash

HTAB* ParallelApplyTxnHash = NULL static

ParallelApplyWorkerPool

List* ParallelApplyWorkerPool = NIL static

stream_apply_worker

subxactlist