PostgreSQL Source Code: src/backend/replication/logical/applyparallelworker.c File Reference (original) (raw)
Go to the source code of this file.
Macros | |
---|---|
#define | PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067 |
#define | PARALLEL_APPLY_KEY_SHARED 1 |
#define | PARALLEL_APPLY_KEY_MQ 2 |
#define | PARALLEL_APPLY_KEY_ERROR_QUEUE 3 |
#define | DSM_QUEUE_SIZE (16 * 1024 * 1024) |
#define | DSM_ERROR_QUEUE_SIZE (16 * 1024) |
#define | SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz)) |
#define | PARALLEL_APPLY_LOCK_STREAM 0 |
#define | PARALLEL_APPLY_LOCK_XACT 1 |
#define | SHM_SEND_RETRY_INTERVAL_MS 1000 |
#define | SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS) |
Variables | |
---|---|
static HTAB * | ParallelApplyTxnHash = NULL |
static List * | ParallelApplyWorkerPool = NIL |
ParallelApplyWorkerShared * | MyParallelShared = NULL |
volatile sig_atomic_t | ParallelApplyMessagePending = false |
static ParallelApplyWorkerInfo * | stream_apply_worker = NULL |
static List * | subxactlist = NIL |
◆ DSM_ERROR_QUEUE_SIZE
#define DSM_ERROR_QUEUE_SIZE (16 * 1024)
◆ DSM_QUEUE_SIZE
#define DSM_QUEUE_SIZE (16 * 1024 * 1024)
◆ PARALLEL_APPLY_KEY_ERROR_QUEUE
#define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
◆ PARALLEL_APPLY_KEY_MQ
#define PARALLEL_APPLY_KEY_MQ 2
◆ PARALLEL_APPLY_KEY_SHARED
#define PARALLEL_APPLY_KEY_SHARED 1
◆ PARALLEL_APPLY_LOCK_STREAM
#define PARALLEL_APPLY_LOCK_STREAM 0
◆ PARALLEL_APPLY_LOCK_XACT
#define PARALLEL_APPLY_LOCK_XACT 1
◆ PG_LOGICAL_APPLY_SHM_MAGIC
#define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
◆ SHM_SEND_RETRY_INTERVAL_MS
#define SHM_SEND_RETRY_INTERVAL_MS 1000
◆ SHM_SEND_TIMEOUT_MS
◆ SIZE_STATS_MESSAGE
◆ ParallelApplyWorkerEntry
◆ HandleParallelApplyMessageInterrupt()
void HandleParallelApplyMessageInterrupt | ( | void | ) |
---|
◆ LogicalParallelApplyLoop()
static void LogicalParallelApplyLoop ( shm_mq_handle * mqh) | static |
---|
Definition at line 734 of file applyparallelworker.c.
735{
739
740
741
742
743
745 "ApplyMessageContext",
747
748
749
750
751
755
756 for (;;)
757 {
760
762
763
765
767
769 {
771 int c;
772
773 if (len == 0)
774 elog(ERROR, "invalid message length");
775
777
778
779
780
781
783 if (c != 'w')
784 elog(ERROR, "unexpected message \"%c\"", c);
785
786
787
788
789
790
791
792
793
794
796
798 }
800 {
801
803 {
804 int rc;
805
806
809 1000L,
810 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
811
814 }
815 }
816 else
817 {
819
821 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
822 errmsg("lost connection to the logical replication apply worker")));
823 }
824
827 }
828
829
831
833}
static void ProcessParallelApplyInterrupts(void)
#define SIZE_STATS_MESSAGE
static bool pa_process_spooled_messages_if_required(void)
MemoryContext ApplyMessageContext
void apply_dispatch(StringInfo s)
MemoryContext ApplyContext
void apply_error_callback(void *arg)
ErrorContextCallback * error_context_stack
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
Assert(PointerIsAligned(start, uint64))
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
void MemoryContextReset(MemoryContext context)
MemoryContext CurrentMemoryContext
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
int pq_getmsgbyte(StringInfo msg)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
struct ErrorContextCallback * previous
void(* callback)(void *arg)
#define WL_EXIT_ON_PM_DEATH
References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), ApplyContext, ApplyMessageContext, Assert(), ErrorContextCallback::callback, CurrentMemoryContext, StringInfoData::cursor, data, elog, ereport, errcode(), errmsg(), ERROR, error_context_stack, initReadOnlyStringInfo(), len, MemoryContextReset(), MemoryContextSwitchTo(), MyLatch, pa_process_spooled_messages_if_required(), pq_getmsgbyte(), ErrorContextCallback::previous, ProcessParallelApplyInterrupts(), ResetLatch(), SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SIZE_STATS_MESSAGE, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.
Referenced by ParallelApplyWorkerMain().
◆ pa_allocate_worker()
Definition at line 470 of file applyparallelworker.c.
471{
472 bool found;
475
477 return;
478
480 if (!winfo)
481 return;
482
483
485 {
487
492
494 16, &ctl,
496 }
497
498
500 if (found)
501 elog(ERROR, "hash table corrupted");
502
503
508
509 winfo->in_use = true;
511 entry->winfo = winfo;
512}
struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry
static bool pa_can_start(void)
static HTAB * ParallelApplyTxnHash
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
#define MemSet(start, val, len)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
ParallelApplyWorkerInfo * winfo
ParallelApplyWorkerShared * shared
References ApplyContext, ctl, elog, ERROR, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ParallelApplyWorkerInfo::in_use, MemSet, ParallelApplyWorkerShared::mutex, pa_can_start(), pa_launch_parallel_worker(), PARALLEL_TRANS_UNKNOWN, ParallelApplyTxnHash, ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, SpinLockAcquire, SpinLockRelease, ParallelApplyWorkerEntry::winfo, ParallelApplyWorkerShared::xact_state, and ParallelApplyWorkerShared::xid.
Referenced by apply_handle_stream_start().
◆ pa_can_start()
static bool pa_can_start ( void ) | static |
---|
◆ pa_decr_and_wait_stream_block()
void pa_decr_and_wait_stream_block | ( | void | ) |
---|
Definition at line 1591 of file applyparallelworker.c.
1592{
1594
1595
1596
1597
1598
1600 {
1602 return;
1603
1604 elog(ERROR, "invalid pending streaming chunk 0");
1605 }
1606
1608 {
1611 }
1612}
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
static bool pa_has_spooled_message_pending()
ParallelApplyWorkerShared * MyParallelShared
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
static bool am_parallel_apply_worker(void)
References AccessShareLock, am_parallel_apply_worker(), Assert(), elog, ERROR, MyParallelShared, pa_has_spooled_message_pending(), pa_lock_stream(), pa_unlock_stream(), ParallelApplyWorkerShared::pending_stream_count, pg_atomic_read_u32(), pg_atomic_sub_fetch_u32(), and ParallelApplyWorkerShared::xid.
Referenced by apply_handle_stream_abort(), and apply_handle_stream_stop().
◆ pa_detach_all_error_mq()
void pa_detach_all_error_mq | ( | void | ) |
---|
◆ pa_find_worker()
◆ pa_free_worker()
Definition at line 556 of file applyparallelworker.c.
557{
561
563 elog(ERROR, "hash table corrupted");
564
565
566
567
568
569
570
571
572
573
574
575
579 {
582
583 return;
584 }
585
586 winfo->in_use = false;
588}
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
int max_parallel_apply_workers_per_subscription
static int list_length(const List *l)
@ PARALLEL_TRANS_FINISHED
References am_parallel_apply_worker(), Assert(), elog, ERROR, HASH_REMOVE, hash_search(), ParallelApplyWorkerInfo::in_use, list_length(), logicalrep_pa_worker_stop(), max_parallel_apply_workers_per_subscription, pa_free_worker_info(), pa_get_xact_state(), PARALLEL_TRANS_FINISHED, ParallelApplyTxnHash, ParallelApplyWorkerPool, ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, and ParallelApplyWorkerShared::xid.
Referenced by pa_xact_finish().
◆ pa_free_worker_info()
Definition at line 595 of file applyparallelworker.c.
596{
598
601
604
605
608
611
612
614
616}
void stream_cleanup_files(Oid subid, TransactionId xid)
void dsm_detach(dsm_segment *seg)
List * list_delete_ptr(List *list, void *datum)
void pfree(void *pointer)
shm_mq_handle * mq_handle
References Assert(), dsm_detach(), ParallelApplyWorkerInfo::dsm_seg, ParallelApplyWorkerInfo::error_mq_handle, list_delete_ptr(), ParallelApplyWorkerInfo::mq_handle, MyLogicalRepWorker, ParallelApplyWorkerPool, pfree(), ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, shm_mq_detach(), stream_cleanup_files(), LogicalRepWorker::subid, and ParallelApplyWorkerShared::xid.
Referenced by pa_free_worker(), and pa_launch_parallel_worker().
◆ pa_get_fileset_state()
◆ pa_get_xact_state()
◆ pa_has_spooled_message_pending()
static bool pa_has_spooled_message_pending ( ) | static |
---|
◆ pa_launch_parallel_worker()
Definition at line 404 of file applyparallelworker.c.
405{
407 bool launched;
410
411
413 {
415
417 return winfo;
418 }
419
420
421
422
423
424
425
427
429
430
432 {
435 return NULL;
436 }
437
445
446 if (launched)
447 {
449 }
450 else
451 {
453 winfo = NULL;
454 }
455
457
458 return winfo;
459}
static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
dsm_handle dsm_segment_handle(dsm_segment *seg)
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
List * lappend(List *list, void *datum)
void * palloc0(Size size)
@ WORKERTYPE_PARALLEL_APPLY
References ApplyContext, LogicalRepWorker::dbid, ParallelApplyWorkerInfo::dsm_seg, dsm_segment_handle(), ParallelApplyWorkerInfo::in_use, InvalidOid, lappend(), lfirst, logicalrep_worker_launch(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pa_free_worker_info(), pa_setup_dsm(), palloc0(), ParallelApplyWorkerPool, pfree(), LogicalRepWorker::userid, and WORKERTYPE_PARALLEL_APPLY.
Referenced by pa_allocate_worker().
◆ pa_lock_stream()
Definition at line 1540 of file applyparallelworker.c.
1541{
1544}
#define PARALLEL_APPLY_LOCK_STREAM
void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
References LockApplyTransactionForSession(), MyLogicalRepWorker, PARALLEL_APPLY_LOCK_STREAM, and LogicalRepWorker::subid.
Referenced by apply_handle_stream_abort(), apply_handle_stream_stop(), pa_decr_and_wait_stream_block(), pa_process_spooled_messages_if_required(), and pa_switch_to_partial_serialize().
◆ pa_lock_transaction()
◆ pa_process_spooled_messages_if_required()
static bool pa_process_spooled_messages_if_required ( void ) | static |
---|
Definition at line 658 of file applyparallelworker.c.
659{
661
663
664 if (fileset_state == FS_EMPTY)
665 return false;
666
667
668
669
670
671
672
673
674
675
676
677
679 {
682
684 }
685
686
687
688
689
690
691
692
694 {
696 }
697 else if (fileset_state == FS_READY)
698 {
703 }
704
705 return true;
706}
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
@ FS_SERIALIZE_IN_PROGRESS
#define InvalidXLogRecPtr
References AccessShareLock, apply_spooled_messages(), ParallelApplyWorkerShared::fileset, FS_EMPTY, FS_READY, FS_SERIALIZE_DONE, FS_SERIALIZE_IN_PROGRESS, InvalidXLogRecPtr, MyParallelShared, pa_get_fileset_state(), pa_lock_stream(), pa_set_fileset_state(), pa_unlock_stream(), and ParallelApplyWorkerShared::xid.
Referenced by LogicalParallelApplyLoop().
◆ pa_reset_subtrans()
void pa_reset_subtrans | ( | void | ) |
---|
◆ pa_savepoint_name()
◆ pa_send_data()
Definition at line 1146 of file applyparallelworker.c.
1147{
1148 int rc;
1151
1154
1155
1156
1157
1158
1160 return false;
1161
1162
1163
1164
1165
1166
1167
1168#define SHM_SEND_RETRY_INTERVAL_MS 1000
1169#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1170
1171 for (;;)
1172 {
1174
1176 return true;
1179 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1180 errmsg("could not send data to shared-memory queue")));
1181
1183
1184
1188 WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
1189
1191 {
1194 }
1195
1196 if (startTime == 0)
1200 return false;
1201 }
1202}
#define SHM_SEND_TIMEOUT_MS
#define SHM_SEND_RETRY_INTERVAL_MS
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
#define CHECK_FOR_INTERRUPTS()
int debug_logical_replication_streaming
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
bool IsTransactionState(void)
References Assert(), CHECK_FOR_INTERRUPTS, data, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, ereport, errcode(), errmsg(), ERROR, GetCurrentTimestamp(), IsTransactionState(), ParallelApplyWorkerInfo::mq_handle, MyLatch, ResetLatch(), ParallelApplyWorkerInfo::serialize_changes, SHM_MQ_DETACHED, shm_mq_send(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SHM_SEND_RETRY_INTERVAL_MS, SHM_SEND_TIMEOUT_MS, TimestampDifferenceExceeds(), unlikely, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.
Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().
◆ pa_set_fileset_state()
Definition at line 1498 of file applyparallelworker.c.
1500{
1503
1505 {
1509 }
1510
1512}
References am_leader_apply_worker(), Assert(), ParallelApplyWorkerShared::fileset, ParallelApplyWorkerShared::fileset_state, FS_SERIALIZE_DONE, ParallelApplyWorkerShared::mutex, MyLogicalRepWorker, SpinLockAcquire, SpinLockRelease, and LogicalRepWorker::stream_fileset.
Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), pa_process_spooled_messages_if_required(), and pa_switch_to_partial_serialize().
◆ pa_set_stream_apply_worker()
◆ pa_set_xact_state()
◆ pa_setup_dsm()
Definition at line 327 of file applyparallelworker.c.
328{
330 Size segsize;
337
338
339
340
341
342
343
344
345
346
347
352
355
356
358 if (!seg)
359 return false;
360
362 segsize);
363
364
367
372
374
375
379
380
382
383
385 error_queue_size);
388
389
391
392
394 winfo->shared = shared;
395
396 return true;
397}
#define DSM_ERROR_QUEUE_SIZE
#define PARALLEL_APPLY_KEY_SHARED
#define PARALLEL_APPLY_KEY_ERROR_QUEUE
#define PARALLEL_APPLY_KEY_MQ
#define PG_LOGICAL_APPLY_SHM_MAGIC
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
void * dsm_segment_address(dsm_segment *seg)
dsm_segment * dsm_create(Size size, int flags)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Size shm_toc_estimate(shm_toc_estimator *e)
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_initialize_estimator(e)
#define shm_toc_estimate_keys(e, cnt)
#define SpinLockInit(lock)
References dsm_create(), DSM_ERROR_QUEUE_SIZE, DSM_QUEUE_SIZE, ParallelApplyWorkerInfo::dsm_seg, dsm_segment_address(), ParallelApplyWorkerInfo::error_mq_handle, ParallelApplyWorkerShared::fileset_state, FS_EMPTY, InvalidXLogRecPtr, ParallelApplyWorkerShared::last_commit_end, ParallelApplyWorkerInfo::mq_handle, ParallelApplyWorkerShared::mutex, MyProc, PARALLEL_APPLY_KEY_ERROR_QUEUE, PARALLEL_APPLY_KEY_MQ, PARALLEL_APPLY_KEY_SHARED, PARALLEL_TRANS_UNKNOWN, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_init_u32(), PG_LOGICAL_APPLY_SHM_MAGIC, ParallelApplyWorkerInfo::shared, shm_mq_attach(), shm_mq_create(), shm_mq_set_receiver(), shm_mq_set_sender(), shm_toc_allocate(), shm_toc_create(), shm_toc_estimate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_initialize_estimator, shm_toc_insert(), SpinLockInit, and ParallelApplyWorkerShared::xact_state.
Referenced by pa_launch_parallel_worker().
◆ pa_shutdown()
static void pa_shutdown ( int code, Datum arg ) | static |
---|
◆ pa_start_subtrans()
Definition at line 1362 of file applyparallelworker.c.
1363{
1364 if (current_xid != top_xid &&
1366 {
1369
1371 spname, sizeof(spname));
1372
1373 elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1374
1375
1377 {
1380
1383 }
1384
1386
1387
1388
1389
1390
1391
1393
1397 }
1398}
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
List * lappend_xid(List *list, TransactionId datum)
bool list_member_xid(const List *list, TransactionId datum)
MemoryContext TopTransactionContext
void DefineSavepoint(const char *name)
void StartTransactionCommand(void)
bool IsTransactionBlock(void)
void BeginTransactionBlock(void)
void CommitTransactionCommand(void)
References BeginTransactionBlock(), CommitTransactionCommand(), DEBUG1, DefineSavepoint(), elog, IsTransactionBlock(), IsTransactionState(), lappend_xid(), list_member_xid(), MemoryContextSwitchTo(), MySubscription, NAMEDATALEN, Subscription::oid, pa_savepoint_name(), StartTransactionCommand(), subxactlist, and TopTransactionContext.
Referenced by handle_streamed_transaction().
◆ pa_stream_abort()
Definition at line 1416 of file applyparallelworker.c.
1417{
1420
1421
1422
1423
1424
1427
1428
1429
1430
1431
1432 if (subxid == xid)
1433 {
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1447
1449
1451 {
1454 }
1455
1457
1459 }
1460 else
1461 {
1462
1463 int i;
1465
1467
1468 elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1469
1470
1471
1472
1473
1474
1475
1476
1478 {
1480
1481 if (xid_tmp == subxid)
1482 {
1486 break;
1487 }
1488 }
1489 }
1490}
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pgstat_report_activity(BackendState state, const char *cmd_str)
List * list_truncate(List *list, int new_size)
#define AccessExclusiveLock
TimestampTz replorigin_session_origin_timestamp
XLogRecPtr replorigin_session_origin_lsn
static ListCell * list_nth_cell(const List *list, int n)
void RollbackToSavepoint(const char *name)
bool EndTransactionBlock(bool chain)
void AbortCurrentTransaction(void)
References LogicalRepStreamAbortData::abort_lsn, LogicalRepStreamAbortData::abort_time, AbortCurrentTransaction(), AccessExclusiveLock, CommitTransactionCommand(), DEBUG1, elog, EndTransactionBlock(), i, IsTransactionBlock(), lfirst_xid, list_length(), list_nth_cell(), list_truncate(), MyParallelShared, MySubscription, NAMEDATALEN, Subscription::oid, pa_reset_subtrans(), pa_savepoint_name(), pa_set_xact_state(), pa_unlock_transaction(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, RollbackToSavepoint(), STATE_IDLE, subxactlist, LogicalRepStreamAbortData::subxid, and LogicalRepStreamAbortData::xid.
Referenced by apply_handle_stream_abort().
◆ pa_switch_to_partial_serialize()
Definition at line 1211 of file applyparallelworker.c.
1213{
1215 (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1217
1218
1219
1220
1221
1222
1224
1225
1227
1228
1229
1230
1231
1232
1233 if (!stream_locked)
1235
1237}
void stream_start_internal(TransactionId xid, bool first_segment)
References AccessExclusiveLock, ereport, errmsg(), FS_SERIALIZE_IN_PROGRESS, LOG, pa_lock_stream(), pa_set_fileset_state(), ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, stream_start_internal(), and ParallelApplyWorkerShared::xid.
Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().
◆ pa_unlock_stream()
Definition at line 1547 of file applyparallelworker.c.
1548{
1551}
void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
References MyLogicalRepWorker, PARALLEL_APPLY_LOCK_STREAM, LogicalRepWorker::subid, and UnlockApplyTransactionForSession().
Referenced by apply_handle_stream_abort(), apply_handle_stream_start(), pa_decr_and_wait_stream_block(), pa_process_spooled_messages_if_required(), and pa_xact_finish().
◆ pa_unlock_transaction()
◆ pa_wait_for_xact_finish()
Definition at line 1274 of file applyparallelworker.c.
1275{
1276
1277
1278
1279
1280
1281
1283
1284
1285
1286
1287
1288
1291
1292
1293
1294
1295
1296
1299 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1300 errmsg("lost connection to the logical replication parallel apply worker")));
1301}
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)
References AccessShareLock, ereport, errcode(), errmsg(), ERROR, pa_get_xact_state(), pa_lock_transaction(), pa_unlock_transaction(), pa_wait_for_xact_state(), PARALLEL_TRANS_FINISHED, PARALLEL_TRANS_STARTED, ParallelApplyWorkerInfo::shared, and ParallelApplyWorkerShared::xid.
Referenced by pa_xact_finish().
◆ pa_wait_for_xact_state()
◆ pa_xact_finish()
Definition at line 1618 of file applyparallelworker.c.
1619{
1621
1622
1623
1624
1625
1627
1628
1629
1630
1631
1632
1634
1637
1639}
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
References AccessExclusiveLock, am_leader_apply_worker(), Assert(), ParallelApplyWorkerShared::last_commit_end, pa_free_worker(), pa_unlock_stream(), pa_wait_for_xact_finish(), ParallelApplyWorkerInfo::shared, store_flush_position(), ParallelApplyWorkerShared::xid, and XLogRecPtrIsInvalid.
Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), and apply_handle_stream_prepare().
◆ ParallelApplyWorkerMain()
void ParallelApplyWorkerMain | ( | Datum | main_arg | ) |
---|
Definition at line 857 of file applyparallelworker.c.
858{
869
871
872
877
878
879
880
881
882
883
884
887 if (!seg)
889 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
890 errmsg("could not map dynamic shared memory segment")));
891
893 if (!toc)
895 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
896 errmsg("invalid magic number in dynamic shared memory segment")));
897
898
901
902
903
904
908
909
910
911
912
913
915
916
917
918
919
920
922
927
928
929
930
934
938
941
943
945
946
949 originname, sizeof(originname));
951
952
953
954
955
959
960
961
962
963
967
969
971
972
973
974
975
976
977
979}
static void pa_shutdown(int code, Datum arg)
static void LogicalParallelApplyLoop(shm_mq_handle *mqh)
bool InitializingApplyWorker
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
void set_apply_error_context_origin(char *originname)
void InitializeLogRepWorker(void)
void BackgroundWorkerUnblockSignals(void)
dsm_segment * dsm_attach(dsm_handle h)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void logicalrep_worker_attach(int slot)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_session_origin
void replorigin_session_setup(RepOriginId node, int acquired_by)
static Datum PointerGetDatum(const void *X)
static int32 DatumGetInt32(Datum X)
BackgroundWorker * MyBgworkerEntry
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
shm_toc * shm_toc_attach(uint64 magic, void *address)
char bgw_extra[BGW_EXTRALEN]
TimestampTz last_recv_time
TimestampTz last_send_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
References Assert(), BackgroundWorkerUnblockSignals(), before_shmem_exit(), BackgroundWorker::bgw_extra, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), DatumGetInt32(), die, dsm_attach(), dsm_segment_address(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::generation, InitializeLogRepWorker(), InitializingApplyWorker, INVALID_PROC_NUMBER, invalidate_syncing_table_states(), InvalidOid, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::leader_pid, LogicalParallelApplyLoop(), logicalrep_worker_attach(), ParallelApplyWorkerShared::logicalrep_worker_generation, ParallelApplyWorkerShared::logicalrep_worker_slot_no, ParallelApplyWorkerShared::mutex, MyBgworkerEntry, MyLogicalRepWorker, MyParallelShared, MyProc, MySubscription, NAMEDATALEN, Subscription::oid, pa_shutdown(), PARALLEL_APPLY_KEY_ERROR_QUEUE, PARALLEL_APPLY_KEY_MQ, PARALLEL_APPLY_KEY_SHARED, PG_LOGICAL_APPLY_SHM_MAGIC, PointerGetDatum(), pq_redirect_to_shm_mq(), pq_set_parallel_leader(), pqsignal, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, set_apply_error_context_origin(), shm_mq_attach(), shm_mq_set_receiver(), shm_mq_set_sender(), shm_toc_attach(), shm_toc_lookup(), SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SpinLockAcquire, SpinLockRelease, and StartTransactionCommand().
◆ ProcessParallelApplyInterrupts()
static void ProcessParallelApplyInterrupts ( void ) | static |
---|
Definition at line 712 of file applyparallelworker.c.
713{
715
717 {
719 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
721
723 }
724
726 {
729 }
730}
void ProcessConfigFile(GucContext context)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
References CHECK_FOR_INTERRUPTS, ConfigReloadPending, ereport, errmsg(), LOG, MySubscription, Subscription::name, PGC_SIGHUP, proc_exit(), ProcessConfigFile(), and ShutdownRequestPending.
Referenced by LogicalParallelApplyLoop().
◆ ProcessParallelApplyMessage()
static void ProcessParallelApplyMessage ( StringInfo msg) | static |
---|
Definition at line 1001 of file applyparallelworker.c.
1002{
1003 char msgtype;
1004
1006
1007 switch (msgtype)
1008 {
1009 case 'E':
1010 {
1012
1013
1015
1016
1017
1018
1019
1020
1021
1024 _("logical replication parallel apply worker"));
1025 else
1026 edata.context = pstrdup(_("logical replication parallel apply worker"));
1027
1028
1029
1030
1031
1033
1034
1035
1036
1037
1039 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1040 errmsg("logical replication parallel apply worker exited due to error"),
1042 }
1043
1044
1045
1046
1047
1048
1049 case 'N':
1050 case 'A':
1051 break;
1052
1053 default:
1054 elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1055 msgtype, msg->len);
1056 }
1057}
ErrorContextCallback * apply_error_context_stack
char * pstrdup(const char *in)
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
char * psprintf(const char *fmt,...)
References _, apply_error_context_stack, ErrorData::context, elog, ereport, errcode(), errcontext, errmsg(), ERROR, error_context_stack, StringInfoData::len, pq_getmsgbyte(), pq_parse_errornotice(), psprintf(), and pstrdup().
Referenced by ProcessParallelApplyMessages().
◆ ProcessParallelApplyMessages()
void ProcessParallelApplyMessages | ( | void | ) |
---|
Definition at line 1063 of file applyparallelworker.c.
1064{
1067
1069
1070
1071
1072
1073
1074
1075
1076
1078
1079
1080
1081
1082
1083
1084 if (!hpam_context)
1086 "ProcessParallelApplyMessages",
1088 else
1090
1092
1094
1096 {
1098 Size nbytes;
1101
1102
1103
1104
1105
1106
1107
1109 continue;
1110
1112
1114 continue;
1116 {
1118
1123 }
1124 else
1126 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1127 errmsg("lost connection to the logical replication parallel apply worker")));
1128 }
1129
1131
1132
1134
1136}
static void ProcessParallelApplyMessage(StringInfo msg)
MemoryContext TopMemoryContext
#define RESUME_INTERRUPTS()
#define HOLD_INTERRUPTS()
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
void initStringInfo(StringInfo str)
References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), StringInfoData::data, data, ereport, errcode(), errmsg(), ERROR, ParallelApplyWorkerInfo::error_mq_handle, HOLD_INTERRUPTS, initStringInfo(), lfirst, MemoryContextReset(), MemoryContextSwitchTo(), ParallelApplyMessagePending, ParallelApplyWorkerPool, pfree(), ProcessParallelApplyMessage(), RESUME_INTERRUPTS, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and TopMemoryContext.
Referenced by ProcessInterrupts().
◆ MyParallelShared
◆ ParallelApplyMessagePending
volatile sig_atomic_t ParallelApplyMessagePending = false
◆ ParallelApplyTxnHash
HTAB* ParallelApplyTxnHash = NULL | static |
---|
◆ ParallelApplyWorkerPool
List* ParallelApplyWorkerPool = NIL | static |
---|