PostgreSQL Source Code: src/backend/replication/logical/applyparallelworker.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
159
174
175#define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
176
177
178
179
180
181
182#define PARALLEL_APPLY_KEY_SHARED 1
183#define PARALLEL_APPLY_KEY_MQ 2
184#define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
185
186
187#define DSM_QUEUE_SIZE (16 * 1024 * 1024)
188
189
190
191
192
193
194
195#define DSM_ERROR_QUEUE_SIZE (16 * 1024)
196
197
198
199
200
201
202
203#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
204
205
206
207
208
209#define PARALLEL_APPLY_LOCK_STREAM 0
210#define PARALLEL_APPLY_LOCK_XACT 1
211
212
213
214
216{
220
221
222
223
224
226
227
228
229
230
231
232
233
235
236
237
238
240
241
242
243
244
246
247
248
249
250
251
253
254
256
260
261
262
263
264static bool
266{
267
269 return false;
270
271
272
273
274
275
276
277
278
279
281
282
283
284
285
286
288 return false;
289
290
291
292
293
294
295
296
297
298
299
300
301
303 return false;
304
305
306
307
308
309
310
311
313 return false;
314
315 return true;
316}
317
318
319
320
321
322
323
324
325
326static bool
328{
330 Size segsize;
337
338
339
340
341
342
343
344
345
346
347
352
355
356
358 if (!seg)
359 return false;
360
362 segsize);
363
364
367
372
374
375
379
380
382
383
385 error_queue_size);
388
389
391
392
394 winfo->shared = shared;
395
396 return true;
397}
398
399
400
401
402
405{
407 bool launched;
410
411
413 {
415
417 return winfo;
418 }
419
420
421
422
423
424
425
427
429
430
432 {
435 return NULL;
436 }
437
445
446 if (launched)
447 {
449 }
450 else
451 {
453 winfo = NULL;
454 }
455
457
458 return winfo;
459}
460
461
462
463
464
465
466
467
468
469void
471{
472 bool found;
475
477 return;
478
480 if (!winfo)
481 return;
482
483
485 {
487
492
494 16, &ctl,
496 }
497
498
500 if (found)
501 elog(ERROR, "hash table corrupted");
502
503
508
509 winfo->in_use = true;
511 entry->winfo = winfo;
512}
513
514
515
516
519{
520 bool found;
522
524 return NULL;
525
527 return NULL;
528
529
532
533
535 if (found)
536 {
537
539 return entry->winfo;
540 }
541
542 return NULL;
543}
544
545
546
547
548
549
550
551
552
553
554
555static void
557{
561
563 elog(ERROR, "hash table corrupted");
564
565
566
567
568
569
570
571
572
573
574
575
579 {
582
583 return;
584 }
585
586 winfo->in_use = false;
588}
589
590
591
592
593
594static void
596{
598
601
604
605
608
611
612
614
616}
617
618
619
620
621void
623{
625
627 {
629
631 {
634 }
635 }
636}
637
638
639
640
641static bool
643{
645
647
648 return (fileset_state != FS_EMPTY);
649}
650
651
652
653
654
655
656
657static bool
659{
661
663
664 if (fileset_state == FS_EMPTY)
665 return false;
666
667
668
669
670
671
672
673
674
675
676
677
679 {
682
684 }
685
686
687
688
689
690
691
692
694 {
696 }
697 else if (fileset_state == FS_READY)
698 {
703 }
704
705 return true;
706}
707
708
709
710
711static void
713{
715
717 {
719 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
721
723 }
724
726 {
729 }
730}
731
732
733static void
735{
739
740
741
742
743
745 "ApplyMessageContext",
747
748
749
750
751
755
756 for (;;)
757 {
760
762
763
765
767
769 {
771 int c;
772
773 if (len == 0)
774 elog(ERROR, "invalid message length");
775
777
778
779
780
781
783 if (c != 'w')
784 elog(ERROR, "unexpected message \"%c\"", c);
785
786
787
788
789
790
791
792
793
794
796
798 }
800 {
801
803 {
804 int rc;
805
806
809 1000L,
810 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
811
814 }
815 }
816 else
817 {
819
821 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
822 errmsg("lost connection to the logical replication apply worker")));
823 }
824
827 }
828
829
831
833}
834
835
836
837
838
839
840
841
842
843static void
845{
849
851}
852
853
854
855
856void
858{
869
871
872
877
878
879
880
881
882
883
884
887 if (!seg)
889 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
890 errmsg("could not map dynamic shared memory segment")));
891
893 if (!toc)
895 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
896 errmsg("invalid magic number in dynamic shared memory segment")));
897
898
901
902
903
904
908
909
910
911
912
913
915
916
917
918
919
920
922
927
928
929
930
934
938
941
943
945
946
949 originname, sizeof(originname));
951
952
953
954
955
959
960
961
962
963
967
969
971
972
973
974
975
976
977
979}
980
981
982
983
984
985
986
987
988void
990{
994}
995
996
997
998
999
1000static void
1002{
1003 char msgtype;
1004
1006
1007 switch (msgtype)
1008 {
1009 case 'E':
1010 {
1012
1013
1015
1016
1017
1018
1019
1020
1021
1024 _("logical replication parallel apply worker"));
1025 else
1026 edata.context = pstrdup(_("logical replication parallel apply worker"));
1027
1028
1029
1030
1031
1033
1034
1035
1036
1037
1039 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1040 errmsg("logical replication parallel apply worker exited due to error"),
1042 }
1043
1044
1045
1046
1047
1048
1049 case 'N':
1050 case 'A':
1051 break;
1052
1053 default:
1054 elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1055 msgtype, msg->len);
1056 }
1057}
1058
1059
1060
1061
1062void
1064{
1067
1069
1070
1071
1072
1073
1074
1075
1076
1078
1079
1080
1081
1082
1083
1084 if (!hpam_context)
1086 "ProcessParallelApplyMessages",
1088 else
1090
1092
1094
1096 {
1098 Size nbytes;
1101
1102
1103
1104
1105
1106
1107
1109 continue;
1110
1112
1114 continue;
1116 {
1118
1123 }
1124 else
1126 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1127 errmsg("lost connection to the logical replication parallel apply worker")));
1128 }
1129
1131
1132
1134
1136}
1137
1138
1139
1140
1141
1142
1143
1144
1145bool
1147{
1148 int rc;
1151
1154
1155
1156
1157
1158
1160 return false;
1161
1162
1163
1164
1165
1166
1167
1168#define SHM_SEND_RETRY_INTERVAL_MS 1000
1169#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1170
1171 for (;;)
1172 {
1174
1176 return true;
1179 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1180 errmsg("could not send data to shared-memory queue")));
1181
1183
1184
1188 WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
1189
1191 {
1194 }
1195
1196 if (startTime == 0)
1200 return false;
1201 }
1202}
1203
1204
1205
1206
1207
1208
1209
1210void
1212 bool stream_locked)
1213{
1215 (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1217
1218
1219
1220
1221
1222
1224
1225
1227
1228
1229
1230
1231
1232
1233 if (!stream_locked)
1235
1237}
1238
1239
1240
1241
1242
1243static void
1246{
1247 for (;;)
1248 {
1249
1250
1251
1252
1254 break;
1255
1256
1259 10L,
1260 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
1261
1262
1264
1265
1267 }
1268}
1269
1270
1271
1272
1273static void
1275{
1276
1277
1278
1279
1280
1281
1283
1284
1285
1286
1287
1288
1291
1292
1293
1294
1295
1296
1299 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1300 errmsg("lost connection to the logical replication parallel apply worker")));
1301}
1302
1303
1304
1305
1306void
1309{
1313}
1314
1315
1316
1317
1320{
1322
1326
1327 return xact_state;
1328}
1329
1330
1331
1332
1333void
1335{
1337}
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347static void
1349{
1350 snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
1351}
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361void
1363{
1364 if (current_xid != top_xid &&
1366 {
1369
1371 spname, sizeof(spname));
1372
1373 elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1374
1375
1377 {
1380
1383 }
1384
1386
1387
1388
1389
1390
1391
1393
1397 }
1398}
1399
1400
1401void
1403{
1404
1405
1406
1407
1409}
1410
1411
1412
1413
1414
1415void
1417{
1420
1421
1422
1423
1424
1427
1428
1429
1430
1431
1432 if (subxid == xid)
1433 {
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1447
1449
1451 {
1454 }
1455
1457
1459 }
1460 else
1461 {
1462
1463 int i;
1465
1467
1468 elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1469
1470
1471
1472
1473
1474
1475
1476
1478 {
1480
1481 if (xid_tmp == subxid)
1482 {
1486 break;
1487 }
1488 }
1489 }
1490}
1491
1492
1493
1494
1495
1496
1497void
1500{
1503
1505 {
1509 }
1510
1512}
1513
1514
1515
1516
1519{
1521
1523
1527
1528 return fileset_state;
1529}
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539void
1541{
1544}
1545
1546void
1548{
1551}
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572void
1574{
1577}
1578
1579void
1581{
1584}
1585
1586
1587
1588
1589
1590void
1592{
1594
1595
1596
1597
1598
1600 {
1602 return;
1603
1604 elog(ERROR, "invalid pending streaming chunk 0");
1605 }
1606
1608 {
1611 }
1612}
1613
1614
1615
1616
1617void
1619{
1621
1622
1623
1624
1625
1627
1628
1629
1630
1631
1632
1634
1637
1639}
struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry
static ParallelApplyWorkerInfo * stream_apply_worker
static List * ParallelApplyWorkerPool
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
#define DSM_ERROR_QUEUE_SIZE
volatile sig_atomic_t ParallelApplyMessagePending
static bool pa_can_start(void)
void HandleParallelApplyMessageInterrupt(void)
void ProcessParallelApplyMessages(void)
#define SHM_SEND_TIMEOUT_MS
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
static void ProcessParallelApplyInterrupts(void)
static void ProcessParallelApplyMessage(StringInfo msg)
static PartialFileSetState pa_get_fileset_state(void)
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
#define PARALLEL_APPLY_LOCK_XACT
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
static List * subxactlist
static bool pa_has_spooled_message_pending()
static void pa_shutdown(int code, Datum arg)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)
#define PARALLEL_APPLY_KEY_SHARED
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_detach_all_error_mq(void)
static void LogicalParallelApplyLoop(shm_mq_handle *mqh)
static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
#define PARALLEL_APPLY_KEY_ERROR_QUEUE
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
#define PARALLEL_APPLY_KEY_MQ
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
#define SIZE_STATS_MESSAGE
#define SHM_SEND_RETRY_INTERVAL_MS
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
static bool pa_process_spooled_messages_if_required(void)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static HTAB * ParallelApplyTxnHash
#define PARALLEL_APPLY_LOCK_STREAM
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
void ParallelApplyWorkerMain(Datum main_arg)
#define PG_LOGICAL_APPLY_SHM_MAGIC
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
void stream_cleanup_files(Oid subid, TransactionId xid)
MemoryContext ApplyMessageContext
bool InitializingApplyWorker
void apply_dispatch(StringInfo s)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
ErrorContextCallback * apply_error_context_stack
void stream_start_internal(TransactionId xid, bool first_segment)
void set_apply_error_context_origin(char *originname)
MemoryContext ApplyContext
void apply_error_callback(void *arg)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
void maybe_reread_subscription(void)
void InitializeLogRepWorker(void)
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Subscription * MySubscription
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
void pgstat_report_activity(BackendState state, const char *cmd_str)
void BackgroundWorkerUnblockSignals(void)
#define MemSet(start, val, len)
dsm_handle dsm_segment_handle(dsm_segment *seg)
void dsm_detach(dsm_segment *seg)
void * dsm_segment_address(dsm_segment *seg)
dsm_segment * dsm_create(Size size, int flags)
dsm_segment * dsm_attach(dsm_handle h)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
ErrorContextCallback * error_context_stack
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
volatile sig_atomic_t InterruptPending
void ProcessConfigFile(GucContext context)
Assert(PointerIsAligned(start, uint64))
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
void logicalrep_worker_attach(int slot)
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
LogicalRepWorker * MyLogicalRepWorker
int max_parallel_apply_workers_per_subscription
List * list_delete_ptr(List *list, void *datum)
List * lappend(List *list, void *datum)
List * lappend_xid(List *list, TransactionId datum)
bool list_member_xid(const List *list, TransactionId datum)
List * list_truncate(List *list, int new_size)
void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
#define AccessExclusiveLock
void MemoryContextReset(MemoryContext context)
MemoryContext TopTransactionContext
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext TopMemoryContext
MemoryContext CurrentMemoryContext
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
TimestampTz replorigin_session_origin_timestamp
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_session_origin
void replorigin_session_setup(RepOriginId node, int acquired_by)
XLogRecPtr replorigin_session_origin_lsn
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int list_length(const List *l)
static ListCell * list_nth_cell(const List *list, int n)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static int32 DatumGetInt32(Datum X)
BackgroundWorker * MyBgworkerEntry
int pq_getmsgbyte(StringInfo msg)
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
#define INVALID_PROC_NUMBER
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
@ PROCSIG_PARALLEL_APPLY_MESSAGE
char * psprintf(const char *fmt,...)
int debug_logical_replication_streaming
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_detach(shm_mq_handle *mqh)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Size shm_toc_estimate(shm_toc_estimator *e)
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
shm_toc * shm_toc_attach(uint64 magic, void *address)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_initialize_estimator(e)
#define shm_toc_estimate_keys(e, cnt)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
void initStringInfo(StringInfo str)
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
char bgw_extra[BGW_EXTRALEN]
struct ErrorContextCallback * previous
void(* callback)(void *arg)
TimestampTz last_recv_time
TimestampTz last_send_time
ParallelApplyWorkerInfo * winfo
shm_mq_handle * error_mq_handle
shm_mq_handle * mq_handle
ParallelApplyWorkerShared * shared
bool AllTablesyncsReady(void)
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
#define TransactionIdIsValid(xid)
#define WL_EXIT_ON_PM_DEATH
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
@ WORKERTYPE_PARALLEL_APPLY
@ FS_SERIALIZE_IN_PROGRESS
static bool am_leader_apply_worker(void)
void DefineSavepoint(const char *name)
bool IsTransactionState(void)
void StartTransactionCommand(void)
bool IsTransactionBlock(void)
void BeginTransactionBlock(void)
void CommitTransactionCommand(void)
void RollbackToSavepoint(const char *name)
bool EndTransactionBlock(bool chain)
void AbortCurrentTransaction(void)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr