PostgreSQL Source Code: src/backend/replication/logical/tablesync.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

96

125

126typedef enum

127{

132

136

138

139

140

141

144{

145

146

147

148

150 {

153 }

154

155

157

160 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",

164

165

167

168

170}

171

172

173

174

175

176

177

178

179

180

181

182static bool

184{

186

187 for (;;)

188 {

191

193

196 relid, &statelsn);

197

198 if (state == SUBREL_STATE_UNKNOWN)

199 break;

200

201 if (state == expected_state)

202 return true;

203

204

207 false);

209 if (!worker)

210 break;

211

214 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);

215

217 }

218

219 return false;

220}

221

222

223

224

225

226

227

228

229

230static bool

232{

233 int rc;

234

235 for (;;)

236 {

238

240

241

242

243

244

246 return true;

247

248

249

250

251

255 if (worker && worker->proc)

258 if (!worker)

259 break;

260

261

262

263

264

267 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);

268

271 }

272

273 return false;

274}

275

276

277

278

279void

281{

283}

284

285

286

287

288

289

290

291

292

293static void

295{

297

300 {

304

307

309

310

311

312

315

320

321

322

323

324

326

327

328

329

330

331

332

333

336 syncslotname,

337 sizeof(syncslotname));

338

339

340

341

342

343

345

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

364

367 originname,

368 sizeof(originname));

369

370

371

372

373

378

379

380

381

382

383

384

385

386

388

390 }

391 else

393}

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416static void

418{

419 struct tablesync_start_time_mapping

420 {

421 Oid relid;

423 };

426 bool started_tx = false;

427 bool should_exit = false;

428

430

431

433

434

435

436

437

438

440 {

442

443 ctl.keysize = sizeof(Oid);

444 ctl.entrysize = sizeof(struct tablesync_start_time_mapping);

447 }

448

449

450

451

452

454 {

457 }

458

459

460

461

463 {

465

466 if (rstate->state == SUBREL_STATE_SYNCDONE)

467 {

468

469

470

471

472

473 if (current_lsn >= rstate->lsn)

474 {

476

477 rstate->state = SUBREL_STATE_READY;

478 rstate->lsn = current_lsn;

479 if (!started_tx)

480 {

482 started_tx = true;

483 }

484

485

486

487

488

489

490

491

492

493

494

495

498 originname,

499 sizeof(originname));

501

502

503

504

507 rstate->lsn);

508 }

509 }

510 else

511 {

513

514

515

516

518

520 rstate->relid, false);

521

522 if (syncworker)

523 {

524

528 if (rstate->state == SUBREL_STATE_SYNCWAIT)

529 {

530

531

532

533

534 syncworker->relstate = SUBREL_STATE_CATCHUP;

537 }

539

540

541 if (rstate->state == SUBREL_STATE_SYNCWAIT)

542 {

543

544 if (syncworker->proc)

546

547

549

550 if (started_tx)

551 {

552

553

554

555

556

557

558

561 }

562

563

564

565

566

568 started_tx = true;

569

571 SUBREL_STATE_SYNCDONE);

572 }

573 else

575 }

576 else

577 {

578

579

580

581

582

583 int nsyncworkers =

585

586

588

589

590

591

592

594 {

596 struct tablesync_start_time_mapping *hentry;

597 bool found;

598

601

602 if (!found ||

605 {

613 hentry->last_start_time = now;

614 }

615 }

616 }

617 }

618 }

619

620 if (started_tx)

621 {

622

623

624

625

626

627

628

629

630

631

632

633

635 {

638 {

640 (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",

642 should_exit = true;

643 }

644 }

645

648 }

649

650 if (should_exit)

651 {

652

653

654

655

657

659 }

660}

661

662

663

664

665void

667{

669 {

671

672

673

674

675

676

677 break;

678

681 break;

682

685 break;

686

688

689 elog(ERROR, "Unknown worker type");

690 }

691}

692

693

694

695

698{

700 int i;

701

703 {

704 attnamelist = lappend(attnamelist,

706 }

707

708

709 return attnamelist;

710}

711

712

713

714

715

716static int

718{

719 int bytesread = 0;

720 int avail;

721

722

724 if (avail)

725 {

726 if (avail > maxread)

727 avail = maxread;

730 maxread -= avail;

731 bytesread += avail;

732 }

733

734 while (maxread > 0 && bytesread < minread)

735 {

738 char *buf = NULL;

739

740 for (;;)

741 {

742

744

746

747 if (len == 0)

748 break;

749 else if (len < 0)

750 return bytesread;

751 else

752 {

753

757

759 if (avail > maxread)

760 avail = maxread;

762 outbuf = (char *) outbuf + avail;

764 maxread -= avail;

765 bytesread += avail;

766 }

767

768 if (maxread <= 0 || bytesread >= minread)

769 return bytesread;

770 }

771

772

773

774

778 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);

779

781 }

782

783 return bytesread;

784}

785

786

787

788

789

790

791

792

793

794

795static void

797 List **qual, bool *gencol_published)

798{

802 Oid tableRow[] = {OIDOID, CHAROID, CHAROID};

803 Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};

804 Oid qualRow[] = {TEXTOID};

805 bool isnull;

806 int natt;

810

813

814

816 appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"

817 " FROM pg_catalog.pg_class c"

818 " INNER JOIN pg_catalog.pg_namespace n"

819 " ON (c.relnamespace = n.oid)"

820 " WHERE n.nspname = %s"

821 " AND c.relname = %s",

825 lengthof(tableRow), tableRow);

826

829 (errcode(ERRCODE_CONNECTION_FAILURE),

830 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",

832

836 (errcode(ERRCODE_UNDEFINED_OBJECT),

837 errmsg("table \"%s.%s\" not found on publisher",

839

846

849

850

851

852

853

854

855

856

858 {

861 Oid attrsRow[] = {INT2VECTOROID};

862

863

866

867

868

869

870

873 "SELECT DISTINCT"

874 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"

875 " THEN NULL ELSE gpt.attrs END)"

876 " FROM pg_publication p,"

877 " LATERAL pg_get_publication_tables(p.pubname) gpt,"

878 " pg_class c"

879 " WHERE gpt.relid = %u AND c.oid = gpt.relid"

880 " AND p.pubname IN ( %s )",

882 pub_names->data);

883

885 lengthof(attrsRow), attrsRow);

886

889 (errcode(ERRCODE_CONNECTION_FAILURE),

890 errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",

892

893

894

895

896

897

898

899

900

903 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

904 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",

906

907

908

909

910

911

912

915 {

917

918 if (!isnull)

919 {

921 int nelems;

923

927

928 for (natt = 0; natt < nelems; natt++)

929 included_cols = bms_add_member(included_cols, elems[natt]);

930 }

931

933 }

935

937 }

938

939

940

941

944 "SELECT a.attnum,"

945 " a.attname,"

946 " a.atttypid,"

947 " a.attnum = ANY(i.indkey)");

948

949

952

954 " FROM pg_catalog.pg_attribute a"

955 " LEFT JOIN pg_catalog.pg_index i"

956 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"

957 " WHERE a.attnum > 0::pg_catalog.int2"

958 " AND NOT a.attisdropped %s"

959 " AND a.attrelid = %u"

960 " ORDER BY a.attnum",

963 "AND a.attgenerated = ''" : ""),

967

970 (errcode(ERRCODE_CONNECTION_FAILURE),

971 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",

973

974

978

979

980

981

982

983 natt = 0;

986 {

987 char *rel_colname;

989

992

993

995 {

997 continue;

998 }

999

1002

1003 lrel->attnames[natt] = rel_colname;

1006

1009

1010

1011 if (server_version >= 180000 && !(*gencol_published))

1012 {

1015 }

1016

1017

1019 elog(ERROR, "too many columns in remote table \"%s.%s\"",

1021

1023 }

1025

1026 lrel->natts = natt;

1027

1029

1030

1031

1032

1033

1034

1035

1036

1037

1038

1039

1040

1041

1042

1043

1044

1045

1046

1047

1048

1050 {

1051

1052 Assert(pub_names != NULL);

1053

1054

1057 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"

1058 " FROM pg_publication p,"

1059 " LATERAL pg_get_publication_tables(p.pubname) gpt"

1060 " WHERE gpt.relid = %u"

1061 " AND p.pubname IN ( %s )",

1063 pub_names->data);

1064

1066

1069 (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",

1071

1072

1073

1074

1075

1076

1077

1078

1081 {

1083

1084 if (!isnull)

1086 else

1087 {

1088

1089 if (*qual)

1090 {

1092 *qual = NIL;

1093 }

1094 break;

1095 }

1096

1098 }

1100

1103 }

1104

1106}

1107

1108

1109

1110

1111

1112

1113static void

1115{

1122 List *attnamelist;

1125 bool gencol_published = false;

1126

1127

1130 &gencol_published);

1131

1132

1134

1135

1138

1139

1141

1142

1143 if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)

1144 {

1147

1148

1150 {

1152

1153

1154

1155

1156

1157 for (int i = 0; i < lrel.natts; i++)

1158 {

1159 if (i > 0)

1161

1163 }

1164

1166 }

1167

1169 }

1170 else

1171 {

1172

1173

1174

1175

1176

1177

1178

1179

1180

1181

1182

1184 for (int i = 0; i < lrel.natts; i++)

1185 {

1187 if (i < lrel.natts - 1)

1189 }

1190

1192

1193

1194

1195

1196

1197 if (lrel.relkind == RELKIND_RELATION)

1199

1201

1202 if (qual != NIL)

1203 {

1206

1209 {

1212 }

1214 }

1215

1217 }

1218

1219

1220

1221

1222

1225 {

1229 }

1230

1235 (errcode(ERRCODE_CONNECTION_FAILURE),

1236 errmsg("could not start initial contents copy for table \"%s.%s\": %s",

1239

1241

1244 NULL, false, false);

1245

1248

1249

1251

1253}

1254

1255

1256

1257

1258

1259

1260

1261

1262

1263

1264

1265

1266

1267

1268

1269

1270

1271

1272void

1274 char *syncslotname, Size szslot)

1275{

1278}

1279

1280

1281

1282

1283

1284

1285

1286

1287

1288static char *

1290{

1291 char *slotname;

1292 char *err;

1293 char relstate;

1301 bool must_use_password;

1302 bool run_as_owner;

1303

1304

1308 &relstate_lsn);

1310

1311

1314

1319

1320

1321

1322

1323

1324 switch (relstate)

1325 {

1326 case SUBREL_STATE_SYNCDONE:

1327 case SUBREL_STATE_READY:

1328 case SUBREL_STATE_UNKNOWN:

1330 }

1331

1332

1336 slotname,

1338

1339

1340

1341

1342

1343

1346 must_use_password,

1347 slotname, &err);

1350 (errcode(ERRCODE_CONNECTION_FAILURE),

1351 errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",

1353

1357

1358

1361 originname,

1362 sizeof(originname));

1363

1365 {

1366

1367

1368

1369

1370

1371

1372

1373

1374

1375

1376

1378 }

1380 {

1381

1382

1383

1384

1386

1387

1388

1389

1390

1395

1397

1398 goto copy_table_done;

1399 }

1400

1405

1406

1414

1416

1417

1418

1419

1420

1421

1422

1424

1425

1426

1427

1428

1429

1431 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",

1432 0, NULL);

1435 (errcode(ERRCODE_CONNECTION_FAILURE),

1436 errmsg("table copy could not start transaction on publisher: %s",

1437 res->err)));

1439

1440

1441

1442

1443

1444

1446 slotname, false , false ,

1449

1450

1451

1452

1453

1454

1457 {

1458

1459

1460

1461

1462

1463

1464

1466

1469 true , true );

1471

1474 }

1475 else

1476 {

1479 errmsg("replication origin \"%s\" already exists",

1480 originname)));

1481 }

1482

1483

1484

1485

1486

1488 if (!run_as_owner)

1490

1491

1492

1493

1494

1501

1502

1503

1504

1505

1506

1507

1508

1511 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

1512 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",

1515

1516

1520

1524 (errcode(ERRCODE_CONNECTION_FAILURE),

1525 errmsg("table copy could not finish transaction on publisher: %s",

1526 res->err)));

1528

1529 if (!run_as_owner)

1531

1533

1534

1536

1537

1538

1539

1540

1543 SUBREL_STATE_FINISHEDCOPY,

1545

1547

1548copy_table_done:

1549

1551 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",

1553

1554

1555

1556

1561

1562

1563

1564

1565

1567 return slotname;

1568}

1569

1570

1571

1572

1573

1574

1575

1576

1577

1578static bool

1580{

1581 static bool has_subrels = false;

1582

1583 *started_tx = false;

1584

1586 {

1588 List *rstates;

1591

1593

1594

1597

1599 {

1601 *started_tx = true;

1602 }

1603

1604

1606

1607

1609 foreach(lc, rstates)

1610 {

1614 }

1616

1617

1618

1619

1620

1621

1622

1623

1626

1627

1628

1629

1630

1631

1632

1633

1636 }

1637

1638 return has_subrels;

1639}

1640

1641

1642

1643

1644

1645

1646

1647

1648

1649static void

1651{

1652 char *sync_slotname = NULL;

1653

1655

1657 {

1658

1660 }

1662 {

1665 else

1666 {

1667

1668

1669

1670

1671

1674

1676 }

1677 }

1679

1680

1682 pfree(sync_slotname);

1683}

1684

1685

1686

1687

1688

1689

1690

1691static void

1693{

1696 char *slotname = NULL;

1698

1700

1703 originname,

1704 sizeof(originname));

1705

1707

1709

1711

1712

1714}

1715

1716

1717void

1719{

1721

1723

1725

1727}

1728

1729

1730

1731

1732

1733

1734

1735

1736

1737bool

1739{

1740 bool started_tx = false;

1741 bool has_subrels = false;

1742

1743

1745

1746 if (started_tx)

1747 {

1750 }

1751

1752

1753

1754

1755

1757}

1758

1759

1760

1761

1762void

1764{

1767 bool nulls[Natts_pg_subscription];

1768 bool replaces[Natts_pg_subscription];

1770

1771 Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||

1772 new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||

1773 new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);

1774

1779 "cache lookup failed for subscription oid %u",

1780 suboid);

1781

1782

1784 memset(nulls, false, sizeof(nulls));

1785 memset(replaces, false, sizeof(replaces));

1786

1787

1788 values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);

1789 replaces[Anum_pg_subscription_subtwophasestate - 1] = true;

1790

1792 values, nulls, replaces);

1794

1797}

void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)

AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)

#define DatumGetArrayTypeP(X)

void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)

void start_apply(XLogRecPtr origin_startpos)

void DisableSubscriptionAndExit(void)

void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)

void set_apply_error_context_origin(char *originname)

MemoryContext ApplyContext

void SetupApplyOrSyncWorker(int worker_slot)

WalReceiverConn * LogRepWorkerWalRcvConn

Subscription * MySubscription

bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)

TimestampTz GetCurrentTimestamp(void)

Datum now(PG_FUNCTION_ARGS)

bool bms_is_member(int x, const Bitmapset *a)

Bitmapset * bms_add_member(Bitmapset *a, int x)

static Datum values[MAXATTR]

#define TextDatumGetCString(d)

#define OidIsValid(objectId)

CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)

uint64 CopyFrom(CopyFromState cstate)

#define DSM_HANDLE_INVALID

void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)

void hash_destroy(HTAB *hashp)

HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

void err(int eval, const char *fmt,...)

TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)

void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)

const TupleTableSlotOps TTSOpsMinimalTuple

Assert(PointerIsAligned(start, uint64))

HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)

void heap_freetuple(HeapTuple htup)

#define HeapTupleIsValid(tuple)

#define MaxTupleAttributeNumber

void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)

int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)

void ResetLatch(Latch *latch)

int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)

bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)

void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)

LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)

void logicalrep_worker_wakeup(Oid subid, Oid relid)

static dshash_table * last_start_times

LogicalRepWorker * MyLogicalRepWorker

int max_sync_workers_per_subscription

int logicalrep_sync_worker_count(Oid subid)

void ApplyLauncherForgetWorkerStartTime(Oid subid)

List * lappend(List *list, void *datum)

void list_free_deep(List *list)

void UnlockRelationOid(Oid relid, LOCKMODE lockmode)

void LockRelationOid(Oid relid, LOCKMODE lockmode)

char * get_rel_name(Oid relid)

char * get_namespace_name(Oid nspid)

bool LWLockAcquire(LWLock *lock, LWLockMode mode)

void LWLockRelease(LWLock *lock)

DefElem * makeDefElem(char *name, Node *arg, int location)

char * MemoryContextStrdup(MemoryContext context, const char *string)

void pfree(void *pointer)

void * palloc0(Size size)

MemoryContext CacheMemoryContext

#define CHECK_FOR_INTERRUPTS()

char * GetUserNameFromId(Oid roleid, bool noerr)

ObjectType get_relkind_objtype(char relkind)

TimestampTz replorigin_session_origin_timestamp

RepOriginId replorigin_by_name(const char *roname, bool missing_ok)

RepOriginId replorigin_create(const char *roname)

void replorigin_session_reset(void)

void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)

RepOriginId replorigin_session_origin

void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)

void replorigin_session_setup(RepOriginId node, int acquired_by)

XLogRecPtr replorigin_session_get_progress(bool flush)

XLogRecPtr replorigin_session_origin_lsn

#define InvalidRepOriginId

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

ParseState * make_parsestate(ParseState *parentParseState)

ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)

static int server_version

#define for_each_from(cell, lst, N)

List * GetSubscriptionRelations(Oid subid, bool not_ready)

char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)

void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)

void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)

bool HasSubscriptionRelations(Oid subid)

long pgstat_report_stat(bool force)

void pgstat_report_subscription_error(Oid subid, bool is_apply_error)

static bool DatumGetBool(Datum X)

static Oid DatumGetObjectId(Datum X)

static Datum ObjectIdGetDatum(Oid X)

static char DatumGetChar(Datum X)

static int16 DatumGetInt16(Datum X)

static int32 DatumGetInt32(Datum X)

static Datum CharGetDatum(char X)

static int fd(const char *x, int i)

char * quote_literal_cstr(const char *rawstr)

#define RelationGetRelid(relation)

#define RelationGetDescr(relation)

#define RelationGetRelationName(relation)

#define RelationGetNamespace(relation)

int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)

char * quote_qualified_identifier(const char *qualifier, const char *ident)

const char * quote_identifier(const char *ident)

Snapshot GetTransactionSnapshot(void)

void PushActiveSnapshot(Snapshot snapshot)

void PopActiveSnapshot(void)

void InvalidateCatalogSnapshot(void)

#define SpinLockRelease(lock)

#define SpinLockAcquire(lock)

void logicalrep_relmap_update(LogicalRepRelation *remoterel)

void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)

LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)

#define ERRCODE_DUPLICATE_OBJECT

void destroyStringInfo(StringInfo str)

StringInfo makeStringInfo(void)

void resetStringInfo(StringInfo str)

void appendStringInfo(StringInfo str, const char *fmt,...)

void appendStringInfoString(StringInfo str, const char *s)

void appendStringInfoChar(StringInfo str, char ch)

void initStringInfo(StringInfo str)

LogicalRepRelation remoterel

LogicalRepWorkerType type

Tuplestorestate * tuplestore

void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)

#define SearchSysCacheCopy1(cacheId, key1)

void table_close(Relation relation, LOCKMODE lockmode)

Relation table_open(Oid relationId, LOCKMODE lockmode)

static List * table_states_not_ready

bool AllTablesyncsReady(void)

static bool wait_for_worker_state_change(char expected_state)

void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)

@ SYNC_TABLE_STATE_REBUILD_STARTED

@ SYNC_TABLE_STATE_NEEDS_REBUILD

static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)

static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)

void TablesyncWorkerMain(Datum main_arg)

static pg_noreturn void finish_sync_worker(void)

static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)

static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)

void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)

static void run_tablesync_worker()

static int copy_read_data(void *outbuf, int minread, int maxread)

static SyncingTablesState table_states_validity

void process_syncing_tables(XLogRecPtr current_lsn)

static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)

static void copy_table(Relation rel)

static bool wait_for_relation_state_change(Oid relid, char expected_state)

static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)

static StringInfo copybuf

static bool FetchTableStates(bool *started_tx)

void UpdateTwoPhaseState(Oid suboid, char new_state)

bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)

int64 tuplestore_tuple_count(Tuplestorestate *state)

static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)

static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)

void SwitchToUntrustedUser(Oid userid, UserContext *context)

void RestoreUserContext(UserContext *context)

String * makeString(char *str)

#define WL_SOCKET_READABLE

#define WL_EXIT_ON_PM_DEATH

#define walrcv_startstreaming(conn, options)

#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)

#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)

static void walrcv_clear_result(WalRcvExecResult *walres)

#define walrcv_server_version(conn)

#define walrcv_endstreaming(conn, next_tli)

#define walrcv_exec(conn, exec, nRetTypes, retTypes)

#define walrcv_receive(conn, buffer, wait_fd)

@ WORKERTYPE_PARALLEL_APPLY

static bool am_tablesync_worker(void)

bool IsTransactionState(void)

void CommandCounterIncrement(void)

void StartTransactionCommand(void)

void CommitTransactionCommand(void)

void AbortOutOfAnyTransaction(void)

uint64 GetSystemIdentifier(void)

XLogRecPtr GetXLogWriteRecPtr(void)

int wal_retrieve_retry_interval

void XLogFlush(XLogRecPtr record)

#define LSN_FORMAT_ARGS(lsn)

#define InvalidXLogRecPtr