PostgreSQL Source Code: src/backend/replication/logical/applyparallelworker.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
159
177
178#define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
179
180
181
182
183
184
185#define PARALLEL_APPLY_KEY_SHARED 1
186#define PARALLEL_APPLY_KEY_MQ 2
187#define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
188
189
190#define DSM_QUEUE_SIZE (16 * 1024 * 1024)
191
192
193
194
195
196
197
198#define DSM_ERROR_QUEUE_SIZE (16 * 1024)
199
200
201
202
203
204
205
206#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
207
208
209
210
211
212#define PARALLEL_APPLY_LOCK_STREAM 0
213#define PARALLEL_APPLY_LOCK_XACT 1
214
215
216
217
223
224
225
226
227
229
230
231
232
233
234
235
236
238
239
240
241
243
244
245
246
247
249
250
251
252
253
254
256
257
259
263
264
265
266
267static bool
269{
270
272 return false;
273
274
275
276
277
278
279
280
281
282
284
285
286
287
288
289
291 return false;
292
293
294
295
296
297
298
299
300
301
302
303
304
306 return false;
307
308
309
310
311
312
313
314
316 return false;
317
318 return true;
319}
320
321
322
323
324
325
326
327
328
329static bool
331{
340
341
342
343
344
345
346
347
348
349
350
355
358
359
361 if (!seg)
362 return false;
363
366
367
370
375
377
378
382
383
385
386
391
392
394
395
397 winfo->shared = shared;
398
399 return true;
400}
401
402
403
404
405
408{
413
414
416 {
418
420 return winfo;
421 }
422
423
424
425
426
427
428
430
432
433
435 {
439 }
440
448 false);
449
451 {
453 }
454 else
455 {
457 winfo = NULL;
458 }
459
461
462 return winfo;
463}
464
465
466
467
468
469
470
471
472
473void
475{
476 bool found;
479
481 return;
482
484 if (!winfo)
485 return;
486
487
489 {
491
496
498 16, &ctl,
500 }
501
502
504 if (found)
505 elog(ERROR, "hash table corrupted");
506
507
512
513 winfo->in_use = true;
515 entry->winfo = winfo;
516}
517
518
519
520
523{
524 bool found;
526
529
532
533
536
537
539 if (found)
540 {
541
543 return entry->winfo;
544 }
545
547}
548
549
550
551
552
553
554
555
556
557
558
559static void
561{
565
567 elog(ERROR, "hash table corrupted");
568
569
570
571
572
573
574
575
576
577
578
579
583 {
586
587 return;
588 }
589
590 winfo->in_use = false;
592}
593
594
595
596
597
598static void
600{
602
605
608
609
612
615
616
618
620}
621
622
623
624
625void
627{
629
631 {
633
635 {
638 }
639 }
640}
641
642
643
644
645static bool
647{
649
651
652 return (fileset_state != FS_EMPTY);
653}
654
655
656
657
658
659
660
661static bool
663{
665
667
668 if (fileset_state == FS_EMPTY)
669 return false;
670
671
672
673
674
675
676
677
678
679
680
681
683 {
686
688 }
689
690
691
692
693
694
695
696
698 {
700 }
701 else if (fileset_state == FS_READY)
702 {
707 }
708
709 return true;
710}
711
712
713
714
715static void
717{
719
721 {
723 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
725
727 }
728
730 {
733 }
734}
735
736
737static void
739{
743
744
745
746
747
749 "ApplyMessageContext",
751
752
753
754
755
759
760 for (;;)
761 {
764
766
767
769
771
773 {
775 int c;
776
777 if (len == 0)
778 elog(ERROR, "invalid message length");
779
781
782
783
784
785
788 elog(ERROR, "unexpected message \"%c\"", c);
789
790
791
792
793
794
795
796
797
798
800
802 }
804 {
805
807 {
808 int rc;
809
810
813 1000L,
815
818 }
819 }
820 else
821 {
823
826 errmsg("lost connection to the logical replication apply worker")));
827 }
828
831 }
832
833
835
837}
838
839
840
841
842
843
844
845
846
847static void
856
857
858
859
860void
862{
873
875
876
877
878
879
880
881
882
883
887
888
889
890
891
892
893
894
897 if (!seg)
900 errmsg("could not map dynamic shared memory segment")));
901
903 if (!toc)
906 errmsg("invalid magic number in dynamic shared memory segment")));
907
908
911
912
913
914
918
919
920
921
922
923
925
926
927
928
929
930
932
937
938
939
940
944
948
951
953
955
956
961
962
963
964
965
969
970
971
972
973
977
979
981
982
983
984
985
986
987
989}
990
991
992
993
994
995
996
997
998void
1005
1006
1007
1008
1009
1010static void
1012{
1014
1016
1018 {
1020 {
1022
1023
1025
1026
1027
1028
1029
1030
1031
1032 if (edata.context)
1034 _("logical replication parallel apply worker"));
1035 else
1036 edata.context = pstrdup(_("logical replication parallel apply worker"));
1037
1038
1039
1040
1041
1043
1044
1045
1046
1047
1050 errmsg("logical replication parallel apply worker exited due to error"),
1052 }
1053
1054
1055
1056
1057
1058
1061 break;
1062
1063 default:
1064 elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1066 }
1067}
1068
1069
1070
1071
1072void
1074{
1077
1079
1080
1081
1082
1083
1084
1085
1086
1088
1089
1090
1091
1092
1093
1094 if ()
1096 "ProcessParallelApplyMessages",
1098 else
1100
1102
1104
1106 {
1108 Size nbytes;
1111
1112
1113
1114
1115
1116
1117
1119 continue;
1120
1122
1124 continue;
1126 {
1128
1133 }
1134 else
1137 errmsg("lost connection to the logical replication parallel apply worker")));
1138 }
1139
1141
1142
1144
1146}
1147
1148
1149
1150
1151
1152
1153
1154
1155bool
1157{
1158 int rc;
1161
1164
1165
1166
1167
1168
1170 return false;
1171
1172
1173
1174
1175
1176
1177
1178#define SHM_SEND_RETRY_INTERVAL_MS 1000
1179#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1180
1181 for (;;)
1182 {
1184
1186 return true;
1190 errmsg("could not send data to shared-memory queue")));
1191
1193
1194
1199
1201 {
1204 }
1205
1206 if (startTime == 0)
1210 return false;
1211 }
1212}
1213
1214
1215
1216
1217
1218
1219
1220void
1223{
1225 (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1227
1228
1229
1230
1231
1232
1234
1235
1237
1238
1239
1240
1241
1242
1245
1247}
1248
1249
1250
1251
1252
1253static void
1256{
1257 for (;;)
1258 {
1259
1260
1261
1262
1264 break;
1265
1266
1269 10L,
1271
1272
1274
1275
1277 }
1278}
1279
1280
1281
1282
1283static void
1285{
1286
1287
1288
1289
1290
1291
1293
1294
1295
1296
1297
1298
1301
1302
1303
1304
1305
1306
1310 errmsg("lost connection to the logical replication parallel apply worker")));
1311}
1312
1313
1314
1315
1316void
1319{
1321 wshared->xact_state = xact_state;
1323}
1324
1325
1326
1327
1330{
1332
1334 xact_state = wshared->xact_state;
1336
1337 return xact_state;
1338}
1339
1340
1341
1342
1343void
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357static void
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371void
1373{
1376 {
1379
1382
1383 elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1384
1385
1387 {
1390
1393 }
1394
1396
1397
1398
1399
1400
1401
1403
1407 }
1408}
1409
1410
1411void
1413{
1414
1415
1416
1417
1419}
1420
1421
1422
1423
1424
1425void
1427{
1430
1431
1432
1433
1434
1437
1438
1439
1440
1441
1442 if (subxid == xid)
1443 {
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1457
1459
1461 {
1464 }
1465
1467
1469 }
1470 else
1471 {
1472
1473 int i;
1475
1477
1478 elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1479
1480
1481
1482
1483
1484
1485
1486
1488 {
1490
1492 {
1496 break;
1497 }
1498 }
1499 }
1500}
1501
1502
1503
1504
1505
1506
1507void
1510{
1512 wshared->fileset_state = fileset_state;
1513
1515 {
1519 }
1520
1522}
1523
1524
1525
1526
1529{
1531
1533
1537
1538 return fileset_state;
1539}
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549void
1555
1556void
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582void
1588
1589void
1595
1596
1597
1598
1599
1600void
1602{
1604
1605
1606
1607
1608
1610 {
1612 return;
1613
1614 elog(ERROR, "invalid pending streaming chunk 0");
1615 }
1616
1618 {
1621 }
1622}
1623
1624
1625
1626
1627void
1629{
1631
1632
1633
1634
1635
1637
1638
1639
1640
1641
1642
1644
1647
1649}
static ParallelApplyWorkerInfo * stream_apply_worker
static List * ParallelApplyWorkerPool
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
#define DSM_ERROR_QUEUE_SIZE
volatile sig_atomic_t ParallelApplyMessagePending
static bool pa_can_start(void)
void HandleParallelApplyMessageInterrupt(void)
void ProcessParallelApplyMessages(void)
#define SHM_SEND_TIMEOUT_MS
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
static void ProcessParallelApplyInterrupts(void)
static void ProcessParallelApplyMessage(StringInfo msg)
static PartialFileSetState pa_get_fileset_state(void)
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
#define PARALLEL_APPLY_LOCK_XACT
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
static List * subxactlist
static void pa_shutdown(int code, Datum arg)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)
#define PARALLEL_APPLY_KEY_SHARED
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_detach_all_error_mq(void)
static bool pa_has_spooled_message_pending(void)
static void LogicalParallelApplyLoop(shm_mq_handle *mqh)
static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
#define PARALLEL_APPLY_KEY_ERROR_QUEUE
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
#define PARALLEL_APPLY_KEY_MQ
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
#define SIZE_STATS_MESSAGE
#define SHM_SEND_RETRY_INTERVAL_MS
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
static bool pa_process_spooled_messages_if_required(void)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static HTAB * ParallelApplyTxnHash
#define PARALLEL_APPLY_LOCK_STREAM
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
void ParallelApplyWorkerMain(Datum main_arg)
#define PG_LOGICAL_APPLY_SHM_MAGIC
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
void stream_cleanup_files(Oid subid, TransactionId xid)
MemoryContext ApplyMessageContext
bool InitializingApplyWorker
void apply_dispatch(StringInfo s)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
ErrorContextCallback * apply_error_context_stack
void stream_start_internal(TransactionId xid, bool first_segment)
void set_apply_error_context_origin(char *originname)
MemoryContext ApplyContext
void apply_error_callback(void *arg)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
void maybe_reread_subscription(void)
void InitializeLogRepWorker(void)
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Subscription * MySubscription
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
void pgstat_report_activity(BackendState state, const char *cmd_str)
void BackgroundWorkerUnblockSignals(void)
#define Assert(condition)
#define MemSet(start, val, len)
dsm_handle dsm_segment_handle(dsm_segment *seg)
void dsm_detach(dsm_segment *seg)
void * dsm_segment_address(dsm_segment *seg)
dsm_segment * dsm_create(Size size, int flags)
dsm_segment * dsm_attach(dsm_handle h)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
ErrorContextCallback * error_context_stack
int errcode(int sqlerrcode)
#define ereport(elevel,...)
#define palloc0_object(type)
volatile sig_atomic_t InterruptPending
void ProcessConfigFile(GucContext context)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void CacheRegisterSyscacheCallback(SysCacheIdentifier cacheid, SyscacheCallbackFunction func, Datum arg)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
void logicalrep_worker_attach(int slot)
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
LogicalRepWorker * MyLogicalRepWorker
int max_parallel_apply_workers_per_subscription
List * list_delete_ptr(List *list, void *datum)
List * lappend(List *list, void *datum)
List * lappend_xid(List *list, TransactionId datum)
bool list_member_xid(const List *list, TransactionId datum)
List * list_truncate(List *list, int new_size)
void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
#define AccessExclusiveLock
void MemoryContextReset(MemoryContext context)
MemoryContext TopTransactionContext
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext TopMemoryContext
MemoryContext CurrentMemoryContext
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
ReplOriginXactState replorigin_xact_state
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
void replorigin_session_setup(ReplOriginId node, int acquired_by)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int list_length(const List *l)
static ListCell * list_nth_cell(const List *list, int n)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static int32 DatumGetInt32(Datum X)
BackgroundWorker * MyBgworkerEntry
int pq_getmsgbyte(StringInfo msg)
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
#define INVALID_PROC_NUMBER
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
@ PROCSIG_PARALLEL_APPLY_MESSAGE
#define PqReplMsg_WALData
#define PqMsg_NotificationResponse
#define PqMsg_ErrorResponse
#define PqMsg_NoticeResponse
char * psprintf(const char *fmt,...)
int debug_logical_replication_streaming
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_detach(shm_mq_handle *mqh)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Size shm_toc_estimate(shm_toc_estimator *e)
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
shm_toc * shm_toc_attach(uint64 magic, void *address)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_initialize_estimator(e)
#define shm_toc_estimate_keys(e, cnt)
static void SpinLockRelease(volatile slock_t *lock)
static void SpinLockAcquire(volatile slock_t *lock)
static void SpinLockInit(volatile slock_t *lock)
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
void initStringInfo(StringInfo str)
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
char bgw_extra[BGW_EXTRALEN]
struct ErrorContextCallback * previous
void(* callback)(void *arg)
TimestampTz last_recv_time
TimestampTz last_send_time
ParallelApplyWorkerInfo * winfo
shm_mq_handle * error_mq_handle
shm_mq_handle * mq_handle
ParallelApplyWorkerShared * shared
TimestampTz origin_timestamp
void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
bool AllTablesyncsReady(void)
#define TransactionIdIsValid(xid)
#define WL_EXIT_ON_PM_DEATH
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
@ WORKERTYPE_PARALLEL_APPLY
@ FS_SERIALIZE_IN_PROGRESS
static bool am_leader_apply_worker(void)
void DefineSavepoint(const char *name)
bool IsTransactionState(void)
void StartTransactionCommand(void)
bool IsTransactionBlock(void)
void BeginTransactionBlock(void)
void CommitTransactionCommand(void)
void RollbackToSavepoint(const char *name)
bool EndTransactionBlock(bool chain)
void AbortCurrentTransaction(void)
#define XLogRecPtrIsValid(r)
#define InvalidXLogRecPtr