PostgreSQL Source Code: src/backend/replication/logical/tablesync.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
96
125
126typedef enum
127{
132
136
138
139
140
141
144{
145
146
147
148
150 {
153 }
154
155
157
160 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
164
165
167
168
170}
171
172
173
174
175
176
177
178
179
180
181
182static bool
184{
186
187 for (;;)
188 {
191
193
196 relid, &statelsn);
197
198 if (state == SUBREL_STATE_UNKNOWN)
199 break;
200
201 if (state == expected_state)
202 return true;
203
204
207 false);
209 if (!worker)
210 break;
211
214 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
215
217 }
218
219 return false;
220}
221
222
223
224
225
226
227
228
229
230static bool
232{
233 int rc;
234
235 for (;;)
236 {
238
240
241
242
243
244
246 return true;
247
248
249
250
251
255 if (worker && worker->proc)
258 if (!worker)
259 break;
260
261
262
263
264
267 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
268
271 }
272
273 return false;
274}
275
276
277
278
279void
281{
283}
284
285
286
287
288
289
290
291
292
293static void
295{
297
300 {
304
307
309
310
311
312
315
320
321
322
323
324
326
327
328
329
330
331
332
333
336 syncslotname,
337 sizeof(syncslotname));
338
339
340
341
342
343
345
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
364
367 originname,
368 sizeof(originname));
369
370
371
372
373
378
379
380
381
382
383
384
385
386
388
390 }
391 else
393}
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416static void
418{
419 struct tablesync_start_time_mapping
420 {
421 Oid relid;
423 };
426 bool started_tx = false;
427 bool should_exit = false;
428
430
431
433
434
435
436
437
438
440 {
442
443 ctl.keysize = sizeof(Oid);
444 ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
447 }
448
449
450
451
452
454 {
457 }
458
459
460
461
463 {
465
466 if (rstate->state == SUBREL_STATE_SYNCDONE)
467 {
468
469
470
471
472
473 if (current_lsn >= rstate->lsn)
474 {
476
477 rstate->state = SUBREL_STATE_READY;
478 rstate->lsn = current_lsn;
479 if (!started_tx)
480 {
482 started_tx = true;
483 }
484
485
486
487
488
489
490
491
492
493
494
495
498 originname,
499 sizeof(originname));
501
502
503
504
507 rstate->lsn);
508 }
509 }
510 else
511 {
513
514
515
516
518
520 rstate->relid, false);
521
522 if (syncworker)
523 {
524
528 if (rstate->state == SUBREL_STATE_SYNCWAIT)
529 {
530
531
532
533
534 syncworker->relstate = SUBREL_STATE_CATCHUP;
537 }
539
540
541 if (rstate->state == SUBREL_STATE_SYNCWAIT)
542 {
543
544 if (syncworker->proc)
546
547
549
550 if (started_tx)
551 {
552
553
554
555
556
557
558
561 }
562
563
564
565
566
568 started_tx = true;
569
571 SUBREL_STATE_SYNCDONE);
572 }
573 else
575 }
576 else
577 {
578
579
580
581
582
583 int nsyncworkers =
585
586
588
589
590
591
592
594 {
596 struct tablesync_start_time_mapping *hentry;
597 bool found;
598
601
602 if (!found ||
605 {
613 hentry->last_start_time = now;
614 }
615 }
616 }
617 }
618 }
619
620 if (started_tx)
621 {
622
623
624
625
626
627
628
629
630
631
632
633
635 {
638 {
640 (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
642 should_exit = true;
643 }
644 }
645
648 }
649
650 if (should_exit)
651 {
652
653
654
655
657
659 }
660}
661
662
663
664
665void
667{
669 {
671
672
673
674
675
676
677 break;
678
681 break;
682
685 break;
686
688
689 elog(ERROR, "Unknown worker type");
690 }
691}
692
693
694
695
698{
700 int i;
701
703 {
704 attnamelist = lappend(attnamelist,
706 }
707
708
709 return attnamelist;
710}
711
712
713
714
715
716static int
718{
719 int bytesread = 0;
720 int avail;
721
722
724 if (avail)
725 {
726 if (avail > maxread)
727 avail = maxread;
730 maxread -= avail;
731 bytesread += avail;
732 }
733
734 while (maxread > 0 && bytesread < minread)
735 {
738 char *buf = NULL;
739
740 for (;;)
741 {
742
744
746
747 if (len == 0)
748 break;
749 else if (len < 0)
750 return bytesread;
751 else
752 {
753
757
759 if (avail > maxread)
760 avail = maxread;
762 outbuf = (char *) outbuf + avail;
764 maxread -= avail;
765 bytesread += avail;
766 }
767
768 if (maxread <= 0 || bytesread >= minread)
769 return bytesread;
770 }
771
772
773
774
778 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
779
781 }
782
783 return bytesread;
784}
785
786
787
788
789
790
791
792
793
794
795static void
797 List **qual, bool *gencol_published)
798{
802 Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
803 Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
804 Oid qualRow[] = {TEXTOID};
805 bool isnull;
806 int natt;
810
813
814
816 appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
817 " FROM pg_catalog.pg_class c"
818 " INNER JOIN pg_catalog.pg_namespace n"
819 " ON (c.relnamespace = n.oid)"
820 " WHERE n.nspname = %s"
821 " AND c.relname = %s",
825 lengthof(tableRow), tableRow);
826
829 (errcode(ERRCODE_CONNECTION_FAILURE),
830 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
832
836 (errcode(ERRCODE_UNDEFINED_OBJECT),
837 errmsg("table \"%s.%s\" not found on publisher",
839
846
849
850
851
852
853
854
855
856
858 {
861 Oid attrsRow[] = {INT2VECTOROID};
862
863
866
867
868
869
870
873 "SELECT DISTINCT"
874 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
875 " THEN NULL ELSE gpt.attrs END)"
876 " FROM pg_publication p,"
877 " LATERAL pg_get_publication_tables(p.pubname) gpt,"
878 " pg_class c"
879 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
880 " AND p.pubname IN ( %s )",
882 pub_names->data);
883
885 lengthof(attrsRow), attrsRow);
886
889 (errcode(ERRCODE_CONNECTION_FAILURE),
890 errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
892
893
894
895
896
897
898
899
900
903 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
904 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
906
907
908
909
910
911
912
915 {
917
918 if (!isnull)
919 {
921 int nelems;
923
927
928 for (natt = 0; natt < nelems; natt++)
929 included_cols = bms_add_member(included_cols, elems[natt]);
930 }
931
933 }
935
937 }
938
939
940
941
944 "SELECT a.attnum,"
945 " a.attname,"
946 " a.atttypid,"
947 " a.attnum = ANY(i.indkey)");
948
949
952
954 " FROM pg_catalog.pg_attribute a"
955 " LEFT JOIN pg_catalog.pg_index i"
956 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
957 " WHERE a.attnum > 0::pg_catalog.int2"
958 " AND NOT a.attisdropped %s"
959 " AND a.attrelid = %u"
960 " ORDER BY a.attnum",
963 "AND a.attgenerated = ''" : ""),
967
970 (errcode(ERRCODE_CONNECTION_FAILURE),
971 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
973
974
978
979
980
981
982
983 natt = 0;
986 {
987 char *rel_colname;
989
992
993
995 {
997 continue;
998 }
999
1002
1003 lrel->attnames[natt] = rel_colname;
1006
1009
1010
1011 if (server_version >= 180000 && !(*gencol_published))
1012 {
1015 }
1016
1017
1019 elog(ERROR, "too many columns in remote table \"%s.%s\"",
1021
1023 }
1025
1026 lrel->natts = natt;
1027
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1050 {
1051
1052 Assert(pub_names != NULL);
1053
1054
1057 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1058 " FROM pg_publication p,"
1059 " LATERAL pg_get_publication_tables(p.pubname) gpt"
1060 " WHERE gpt.relid = %u"
1061 " AND p.pubname IN ( %s )",
1063 pub_names->data);
1064
1066
1069 (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1071
1072
1073
1074
1075
1076
1077
1078
1081 {
1083
1084 if (!isnull)
1086 else
1087 {
1088
1089 if (*qual)
1090 {
1092 *qual = NIL;
1093 }
1094 break;
1095 }
1096
1098 }
1100
1103 }
1104
1106}
1107
1108
1109
1110
1111
1112
1113static void
1115{
1122 List *attnamelist;
1125 bool gencol_published = false;
1126
1127
1130 &gencol_published);
1131
1132
1134
1135
1138
1139
1141
1142
1143 if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
1144 {
1147
1148
1150 {
1152
1153
1154
1155
1156
1157 for (int i = 0; i < lrel.natts; i++)
1158 {
1159 if (i > 0)
1161
1163 }
1164
1166 }
1167
1169 }
1170 else
1171 {
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1184 for (int i = 0; i < lrel.natts; i++)
1185 {
1189 }
1190
1192
1193
1194
1195
1196
1197 if (lrel.relkind == RELKIND_RELATION)
1199
1201
1202 if (qual != NIL)
1203 {
1206
1209 {
1212 }
1214 }
1215
1217 }
1218
1219
1220
1221
1222
1225 {
1229 }
1230
1235 (errcode(ERRCODE_CONNECTION_FAILURE),
1236 errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1239
1241
1244 NULL, false, false);
1245
1248
1249
1251
1253}
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272void
1274 char *syncslotname, Size szslot)
1275{
1278}
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288static char *
1290{
1291 char *slotname;
1292 char *err;
1293 char relstate;
1301 bool must_use_password;
1302 bool run_as_owner;
1303
1304
1308 &relstate_lsn);
1310
1311
1314
1319
1320
1321
1322
1323
1324 switch (relstate)
1325 {
1326 case SUBREL_STATE_SYNCDONE:
1327 case SUBREL_STATE_READY:
1328 case SUBREL_STATE_UNKNOWN:
1330 }
1331
1332
1336 slotname,
1338
1339
1340
1341
1342
1343
1346 must_use_password,
1347 slotname, &err);
1350 (errcode(ERRCODE_CONNECTION_FAILURE),
1351 errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1353
1357
1358
1361 originname,
1362 sizeof(originname));
1363
1365 {
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1378 }
1380 {
1381
1382
1383
1384
1386
1387
1388
1389
1390
1395
1397
1398 goto copy_table_done;
1399 }
1400
1405
1406
1414
1416
1417
1418
1419
1420
1421
1422
1424
1425
1426
1427
1428
1429
1431 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1432 0, NULL);
1435 (errcode(ERRCODE_CONNECTION_FAILURE),
1436 errmsg("table copy could not start transaction on publisher: %s",
1437 res->err)));
1439
1440
1441
1442
1443
1444
1446 slotname, false , false ,
1449
1450
1451
1452
1453
1454
1457 {
1458
1459
1460
1461
1462
1463
1464
1466
1469 true , true );
1471
1474 }
1475 else
1476 {
1479 errmsg("replication origin \"%s\" already exists",
1480 originname)));
1481 }
1482
1483
1484
1485
1486
1488 if (!run_as_owner)
1490
1491
1492
1493
1494
1501
1502
1503
1504
1505
1506
1507
1508
1511 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1512 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1515
1516
1520
1524 (errcode(ERRCODE_CONNECTION_FAILURE),
1525 errmsg("table copy could not finish transaction on publisher: %s",
1526 res->err)));
1528
1529 if (!run_as_owner)
1531
1533
1534
1536
1537
1538
1539
1540
1543 SUBREL_STATE_FINISHEDCOPY,
1545
1547
1548copy_table_done:
1549
1551 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1553
1554
1555
1556
1561
1562
1563
1564
1565
1567 return slotname;
1568}
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578static bool
1580{
1581 static bool has_subrels = false;
1582
1583 *started_tx = false;
1584
1586 {
1588 List *rstates;
1591
1593
1594
1597
1599 {
1601 *started_tx = true;
1602 }
1603
1604
1606
1607
1609 foreach(lc, rstates)
1610 {
1614 }
1616
1617
1618
1619
1620
1621
1622
1623
1626
1627
1628
1629
1630
1631
1632
1633
1636 }
1637
1638 return has_subrels;
1639}
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649static void
1651{
1652 char *sync_slotname = NULL;
1653
1655
1657 {
1658
1660 }
1662 {
1665 else
1666 {
1667
1668
1669
1670
1671
1674
1676 }
1677 }
1679
1680
1682 pfree(sync_slotname);
1683}
1684
1685
1686
1687
1688
1689
1690
1691static void
1693{
1696 char *slotname = NULL;
1698
1700
1703 originname,
1704 sizeof(originname));
1705
1707
1709
1711
1712
1714}
1715
1716
1717void
1719{
1721
1723
1725
1727}
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737bool
1739{
1740 bool started_tx = false;
1741 bool has_subrels = false;
1742
1743
1745
1746 if (started_tx)
1747 {
1750 }
1751
1752
1753
1754
1755
1757}
1758
1759
1760
1761
1762void
1764{
1767 bool nulls[Natts_pg_subscription];
1768 bool replaces[Natts_pg_subscription];
1770
1771 Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1772 new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1773 new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1774
1779 "cache lookup failed for subscription oid %u",
1780 suboid);
1781
1782
1784 memset(nulls, false, sizeof(nulls));
1785 memset(replaces, false, sizeof(replaces));
1786
1787
1788 values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1789 replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1790
1792 values, nulls, replaces);
1794
1797}
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
#define DatumGetArrayTypeP(X)
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
void start_apply(XLogRecPtr origin_startpos)
void DisableSubscriptionAndExit(void)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
void set_apply_error_context_origin(char *originname)
MemoryContext ApplyContext
void SetupApplyOrSyncWorker(int worker_slot)
WalReceiverConn * LogRepWorkerWalRcvConn
Subscription * MySubscription
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
#define TextDatumGetCString(d)
#define OidIsValid(objectId)
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
uint64 CopyFrom(CopyFromState cstate)
#define DSM_HANDLE_INVALID
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
void hash_destroy(HTAB *hashp)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
#define MaxTupleAttributeNumber
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
void logicalrep_worker_wakeup(Oid subid, Oid relid)
static dshash_table * last_start_times
LogicalRepWorker * MyLogicalRepWorker
int max_sync_workers_per_subscription
int logicalrep_sync_worker_count(Oid subid)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
void LockRelationOid(Oid relid, LOCKMODE lockmode)
char * get_rel_name(Oid relid)
char * get_namespace_name(Oid nspid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
DefElem * makeDefElem(char *name, Node *arg, int location)
char * MemoryContextStrdup(MemoryContext context, const char *string)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CacheMemoryContext
#define CHECK_FOR_INTERRUPTS()
char * GetUserNameFromId(Oid roleid, bool noerr)
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_create(const char *roname)
void replorigin_session_reset(void)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
RepOriginId replorigin_session_origin
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
void replorigin_session_setup(RepOriginId node, int acquired_by)
XLogRecPtr replorigin_session_get_progress(bool flush)
XLogRecPtr replorigin_session_origin_lsn
#define InvalidRepOriginId
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
ParseState * make_parsestate(ParseState *parentParseState)
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
static int server_version
#define for_each_from(cell, lst, N)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
bool HasSubscriptionRelations(Oid subid)
long pgstat_report_stat(bool force)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
static bool DatumGetBool(Datum X)
static Oid DatumGetObjectId(Datum X)
static Datum ObjectIdGetDatum(Oid X)
static char DatumGetChar(Datum X)
static int16 DatumGetInt16(Datum X)
static int32 DatumGetInt32(Datum X)
static Datum CharGetDatum(char X)
static int fd(const char *x, int i)
char * quote_literal_cstr(const char *rawstr)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
char * quote_qualified_identifier(const char *qualifier, const char *ident)
const char * quote_identifier(const char *ident)
Snapshot GetTransactionSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
void PopActiveSnapshot(void)
void InvalidateCatalogSnapshot(void)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
#define ERRCODE_DUPLICATE_OBJECT
void destroyStringInfo(StringInfo str)
StringInfo makeStringInfo(void)
void resetStringInfo(StringInfo str)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
LogicalRepRelation remoterel
LogicalRepWorkerType type
Tuplestorestate * tuplestore
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
#define SearchSysCacheCopy1(cacheId, key1)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
static List * table_states_not_ready
bool AllTablesyncsReady(void)
static bool wait_for_worker_state_change(char expected_state)
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
@ SYNC_TABLE_STATE_REBUILD_STARTED
@ SYNC_TABLE_STATE_NEEDS_REBUILD
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
void TablesyncWorkerMain(Datum main_arg)
static pg_noreturn void finish_sync_worker(void)
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
static void run_tablesync_worker()
static int copy_read_data(void *outbuf, int minread, int maxread)
static SyncingTablesState table_states_validity
void process_syncing_tables(XLogRecPtr current_lsn)
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
static void copy_table(Relation rel)
static bool wait_for_relation_state_change(Oid relid, char expected_state)
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
static StringInfo copybuf
static bool FetchTableStates(bool *started_tx)
void UpdateTwoPhaseState(Oid suboid, char new_state)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
int64 tuplestore_tuple_count(Tuplestorestate *state)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
void SwitchToUntrustedUser(Oid userid, UserContext *context)
void RestoreUserContext(UserContext *context)
String * makeString(char *str)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_receive(conn, buffer, wait_fd)
@ WORKERTYPE_PARALLEL_APPLY
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
void CommandCounterIncrement(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
void AbortOutOfAnyTransaction(void)
uint64 GetSystemIdentifier(void)
XLogRecPtr GetXLogWriteRecPtr(void)
int wal_retrieve_retry_interval
void XLogFlush(XLogRecPtr record)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr