PostgreSQL Source Code: src/backend/replication/logical/applyparallelworker.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
159
174
175#define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
176
177
178
179
180
181
182#define PARALLEL_APPLY_KEY_SHARED 1
183#define PARALLEL_APPLY_KEY_MQ 2
184#define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
185
186
187#define DSM_QUEUE_SIZE (16 * 1024 * 1024)
188
189
190
191
192
193
194
195#define DSM_ERROR_QUEUE_SIZE (16 * 1024)
196
197
198
199
200
201
202
203#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
204
205
206
207
208
209#define PARALLEL_APPLY_LOCK_STREAM 0
210#define PARALLEL_APPLY_LOCK_XACT 1
211
212
213
214
216{
220
221
222
223
224
226
227
228
229
230
231
232
233
235
236
237
238
240
241
242
243
244
246
247
248
249
250
251
253
254
256
260
261
262
263
264static bool
266{
267
269 return false;
270
271
272
273
274
275
276
277
278
279
281
282
283
284
285
286
288 return false;
289
290
291
292
293
294
295
296
297
298
299
300
301
303 return false;
304
305
306
307
308
309
310
311
313 return false;
314
315 return true;
316}
317
318
319
320
321
322
323
324
325
326static bool
328{
330 Size segsize;
337
338
339
340
341
342
343
344
345
346
347
352
355
356
358 if (!seg)
359 return false;
360
362 segsize);
363
364
367
372
374
375
379
380
382
383
385 error_queue_size);
388
389
391
392
394 winfo->shared = shared;
395
396 return true;
397}
398
399
400
401
402
405{
407 bool launched;
410
411
413 {
415
417 return winfo;
418 }
419
420
421
422
423
424
425
427
429
430
432 {
435 return NULL;
436 }
437
445 false);
446
447 if (launched)
448 {
450 }
451 else
452 {
454 winfo = NULL;
455 }
456
458
459 return winfo;
460}
461
462
463
464
465
466
467
468
469
470void
472{
473 bool found;
476
478 return;
479
481 if (!winfo)
482 return;
483
484
486 {
488
493
495 16, &ctl,
497 }
498
499
501 if (found)
502 elog(ERROR, "hash table corrupted");
503
504
509
510 winfo->in_use = true;
512 entry->winfo = winfo;
513}
514
515
516
517
520{
521 bool found;
523
525 return NULL;
526
528 return NULL;
529
530
533
534
536 if (found)
537 {
538
540 return entry->winfo;
541 }
542
543 return NULL;
544}
545
546
547
548
549
550
551
552
553
554
555
556static void
558{
562
564 elog(ERROR, "hash table corrupted");
565
566
567
568
569
570
571
572
573
574
575
576
580 {
583
584 return;
585 }
586
587 winfo->in_use = false;
589}
590
591
592
593
594
595static void
597{
599
602
605
606
609
612
613
615
617}
618
619
620
621
622void
624{
626
628 {
630
632 {
635 }
636 }
637}
638
639
640
641
642static bool
644{
646
648
649 return (fileset_state != FS_EMPTY);
650}
651
652
653
654
655
656
657
658static bool
660{
662
664
665 if (fileset_state == FS_EMPTY)
666 return false;
667
668
669
670
671
672
673
674
675
676
677
678
680 {
683
685 }
686
687
688
689
690
691
692
693
695 {
697 }
698 else if (fileset_state == FS_READY)
699 {
704 }
705
706 return true;
707}
708
709
710
711
712static void
714{
716
718 {
720 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
722
724 }
725
727 {
730 }
731}
732
733
734static void
736{
740
741
742
743
744
746 "ApplyMessageContext",
748
749
750
751
752
756
757 for (;;)
758 {
761
763
764
766
768
770 {
772 int c;
773
774 if (len == 0)
775 elog(ERROR, "invalid message length");
776
778
779
780
781
782
785 elog(ERROR, "unexpected message \"%c\"", c);
786
787
788
789
790
791
792
793
794
795
797
799 }
801 {
802
804 {
805 int rc;
806
807
810 1000L,
811 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
812
815 }
816 }
817 else
818 {
820
822 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
823 errmsg("lost connection to the logical replication apply worker")));
824 }
825
828 }
829
830
832
834}
835
836
837
838
839
840
841
842
843
844static void
846{
850
852}
853
854
855
856
857void
859{
870
872
873
874
875
876
877
878
879
880
885
886
887
888
889
890
891
892
895 if (!seg)
897 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
898 errmsg("could not map dynamic shared memory segment")));
899
901 if (!toc)
903 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
904 errmsg("invalid magic number in dynamic shared memory segment")));
905
906
909
910
911
912
916
917
918
919
920
921
923
924
925
926
927
928
930
935
936
937
938
942
946
949
951
953
954
957 originname, sizeof(originname));
959
960
961
962
963
967
968
969
970
971
975
977
979
980
981
982
983
984
985
987}
988
989
990
991
992
993
994
995
996void
998{
1002}
1003
1004
1005
1006
1007
1008static void
1010{
1011 char msgtype;
1012
1014
1015 switch (msgtype)
1016 {
1018 {
1020
1021
1023
1024
1025
1026
1027
1028
1029
1032 _("logical replication parallel apply worker"));
1033 else
1034 edata.context = pstrdup(_("logical replication parallel apply worker"));
1035
1036
1037
1038
1039
1041
1042
1043
1044
1045
1047 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1048 errmsg("logical replication parallel apply worker exited due to error"),
1050 }
1051
1052
1053
1054
1055
1056
1059 break;
1060
1061 default:
1062 elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1063 msgtype, msg->len);
1064 }
1065}
1066
1067
1068
1069
1070void
1072{
1075
1077
1078
1079
1080
1081
1082
1083
1084
1086
1087
1088
1089
1090
1091
1092 if (!hpam_context)
1094 "ProcessParallelApplyMessages",
1096 else
1098
1100
1102
1104 {
1106 Size nbytes;
1109
1110
1111
1112
1113
1114
1115
1117 continue;
1118
1120
1122 continue;
1124 {
1126
1131 }
1132 else
1134 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1135 errmsg("lost connection to the logical replication parallel apply worker")));
1136 }
1137
1139
1140
1142
1144}
1145
1146
1147
1148
1149
1150
1151
1152
1153bool
1155{
1156 int rc;
1159
1162
1163
1164
1165
1166
1168 return false;
1169
1170
1171
1172
1173
1174
1175
1176#define SHM_SEND_RETRY_INTERVAL_MS 1000
1177#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1178
1179 for (;;)
1180 {
1182
1184 return true;
1187 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1188 errmsg("could not send data to shared-memory queue")));
1189
1191
1192
1196 WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
1197
1199 {
1202 }
1203
1204 if (startTime == 0)
1208 return false;
1209 }
1210}
1211
1212
1213
1214
1215
1216
1217
1218void
1220 bool stream_locked)
1221{
1223 (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1225
1226
1227
1228
1229
1230
1232
1233
1235
1236
1237
1238
1239
1240
1241 if (!stream_locked)
1243
1245}
1246
1247
1248
1249
1250
1251static void
1254{
1255 for (;;)
1256 {
1257
1258
1259
1260
1262 break;
1263
1264
1267 10L,
1268 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
1269
1270
1272
1273
1275 }
1276}
1277
1278
1279
1280
1281static void
1283{
1284
1285
1286
1287
1288
1289
1291
1292
1293
1294
1295
1296
1299
1300
1301
1302
1303
1304
1307 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1308 errmsg("lost connection to the logical replication parallel apply worker")));
1309}
1310
1311
1312
1313
1314void
1317{
1321}
1322
1323
1324
1325
1328{
1330
1334
1335 return xact_state;
1336}
1337
1338
1339
1340
1341void
1343{
1345}
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355static void
1357{
1358 snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
1359}
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369void
1371{
1372 if (current_xid != top_xid &&
1374 {
1377
1379 spname, sizeof(spname));
1380
1381 elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1382
1383
1385 {
1388
1391 }
1392
1394
1395
1396
1397
1398
1399
1401
1405 }
1406}
1407
1408
1409void
1411{
1412
1413
1414
1415
1417}
1418
1419
1420
1421
1422
1423void
1425{
1428
1429
1430
1431
1432
1435
1436
1437
1438
1439
1440 if (subxid == xid)
1441 {
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1455
1457
1459 {
1462 }
1463
1465
1467 }
1468 else
1469 {
1470
1471 int i;
1473
1475
1476 elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1477
1478
1479
1480
1481
1482
1483
1484
1486 {
1488
1489 if (xid_tmp == subxid)
1490 {
1494 break;
1495 }
1496 }
1497 }
1498}
1499
1500
1501
1502
1503
1504
1505void
1508{
1511
1513 {
1517 }
1518
1520}
1521
1522
1523
1524
1527{
1529
1531
1535
1536 return fileset_state;
1537}
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547void
1549{
1552}
1553
1554void
1556{
1559}
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580void
1582{
1585}
1586
1587void
1589{
1592}
1593
1594
1595
1596
1597
1598void
1600{
1602
1603
1604
1605
1606
1608 {
1610 return;
1611
1612 elog(ERROR, "invalid pending streaming chunk 0");
1613 }
1614
1616 {
1619 }
1620}
1621
1622
1623
1624
1625void
1627{
1629
1630
1631
1632
1633
1635
1636
1637
1638
1639
1640
1642
1645
1647}
struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry
static ParallelApplyWorkerInfo * stream_apply_worker
static List * ParallelApplyWorkerPool
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
#define DSM_ERROR_QUEUE_SIZE
volatile sig_atomic_t ParallelApplyMessagePending
static bool pa_can_start(void)
void HandleParallelApplyMessageInterrupt(void)
void ProcessParallelApplyMessages(void)
#define SHM_SEND_TIMEOUT_MS
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
static void ProcessParallelApplyInterrupts(void)
static void ProcessParallelApplyMessage(StringInfo msg)
static PartialFileSetState pa_get_fileset_state(void)
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
#define PARALLEL_APPLY_LOCK_XACT
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
static List * subxactlist
static void pa_shutdown(int code, Datum arg)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)
#define PARALLEL_APPLY_KEY_SHARED
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_detach_all_error_mq(void)
static bool pa_has_spooled_message_pending(void)
static void LogicalParallelApplyLoop(shm_mq_handle *mqh)
static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
#define PARALLEL_APPLY_KEY_ERROR_QUEUE
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
#define PARALLEL_APPLY_KEY_MQ
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
#define SIZE_STATS_MESSAGE
#define SHM_SEND_RETRY_INTERVAL_MS
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
static bool pa_process_spooled_messages_if_required(void)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static HTAB * ParallelApplyTxnHash
#define PARALLEL_APPLY_LOCK_STREAM
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
void ParallelApplyWorkerMain(Datum main_arg)
#define PG_LOGICAL_APPLY_SHM_MAGIC
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
void stream_cleanup_files(Oid subid, TransactionId xid)
MemoryContext ApplyMessageContext
bool InitializingApplyWorker
void apply_dispatch(StringInfo s)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
ErrorContextCallback * apply_error_context_stack
void stream_start_internal(TransactionId xid, bool first_segment)
void set_apply_error_context_origin(char *originname)
MemoryContext ApplyContext
void apply_error_callback(void *arg)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
void maybe_reread_subscription(void)
void InitializeLogRepWorker(void)
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Subscription * MySubscription
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
void pgstat_report_activity(BackendState state, const char *cmd_str)
void BackgroundWorkerUnblockSignals(void)
#define MemSet(start, val, len)
dsm_handle dsm_segment_handle(dsm_segment *seg)
void dsm_detach(dsm_segment *seg)
void * dsm_segment_address(dsm_segment *seg)
dsm_segment * dsm_create(Size size, int flags)
dsm_segment * dsm_attach(dsm_handle h)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
ErrorContextCallback * error_context_stack
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define palloc0_object(type)
volatile sig_atomic_t InterruptPending
void ProcessConfigFile(GucContext context)
Assert(PointerIsAligned(start, uint64))
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
void logicalrep_worker_attach(int slot)
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
LogicalRepWorker * MyLogicalRepWorker
int max_parallel_apply_workers_per_subscription
List * list_delete_ptr(List *list, void *datum)
List * lappend(List *list, void *datum)
List * lappend_xid(List *list, TransactionId datum)
bool list_member_xid(const List *list, TransactionId datum)
List * list_truncate(List *list, int new_size)
void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
#define AccessExclusiveLock
void MemoryContextReset(MemoryContext context)
MemoryContext TopTransactionContext
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext TopMemoryContext
MemoryContext CurrentMemoryContext
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
TimestampTz replorigin_session_origin_timestamp
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_session_origin
void replorigin_session_setup(RepOriginId node, int acquired_by)
XLogRecPtr replorigin_session_origin_lsn
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int list_length(const List *l)
static ListCell * list_nth_cell(const List *list, int n)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static int32 DatumGetInt32(Datum X)
BackgroundWorker * MyBgworkerEntry
int pq_getmsgbyte(StringInfo msg)
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
#define INVALID_PROC_NUMBER
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
@ PROCSIG_PARALLEL_APPLY_MESSAGE
#define PqReplMsg_WALData
#define PqMsg_NotificationResponse
#define PqMsg_ErrorResponse
#define PqMsg_NoticeResponse
char * psprintf(const char *fmt,...)
int debug_logical_replication_streaming
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_detach(shm_mq_handle *mqh)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Size shm_toc_estimate(shm_toc_estimator *e)
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
shm_toc * shm_toc_attach(uint64 magic, void *address)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_initialize_estimator(e)
#define shm_toc_estimate_keys(e, cnt)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
void initStringInfo(StringInfo str)
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
char bgw_extra[BGW_EXTRALEN]
struct ErrorContextCallback * previous
void(* callback)(void *arg)
TimestampTz last_recv_time
TimestampTz last_send_time
ParallelApplyWorkerInfo * winfo
shm_mq_handle * error_mq_handle
shm_mq_handle * mq_handle
ParallelApplyWorkerShared * shared
void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
bool AllTablesyncsReady(void)
#define TransactionIdIsValid(xid)
#define WL_EXIT_ON_PM_DEATH
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
@ WORKERTYPE_PARALLEL_APPLY
@ FS_SERIALIZE_IN_PROGRESS
static bool am_leader_apply_worker(void)
void DefineSavepoint(const char *name)
bool IsTransactionState(void)
void StartTransactionCommand(void)
bool IsTransactionBlock(void)
void BeginTransactionBlock(void)
void CommitTransactionCommand(void)
void RollbackToSavepoint(const char *name)
bool EndTransactionBlock(bool chain)
void AbortCurrentTransaction(void)
#define XLogRecPtrIsValid(r)
#define InvalidXLogRecPtr