PostgreSQL Source Code: src/backend/replication/logical/origin.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

69

71#include <sys/stat.h>

72

92#include "utils/fmgroids.h"

98

99

100#define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"

101#define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"

102

103

105

106

107

108

110{

111

112

113

115

116

117

118

120

121

122

123

124

125

127

128

129

130

132

133

134

135

137

138

139

140

143

144

145

146

148{

152

153

155{

156

158

161

162

166

167

168

169

170

172

173

174

175

177

178

179

180

181

182

183

185

186

187#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)

188

189static void

191{

194 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

195 errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));

196

199 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),

200 errmsg("cannot manipulate replication origins during recovery")));

201}

202

203

204

205

206

207

208static bool

210{

213}

214

215

216

217

218

219

220

221

222

223

224

227{

232

234

237 {

239 roident = ident->roident;

241 }

242 else if (!missing_ok)

244 (errcode(ERRCODE_UNDEFINED_OBJECT),

245 errmsg("replication origin \"%s\" does not exist",

246 roname)));

247

248 return roident;

249}

250

251

252

253

254

255

258{

259 Oid roident;

266

267

268

269

270

271

274 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),

275 errmsg("replication origin name is too long"),

276 errdetail("Replication origin names must be no longer than %d bytes.",

278

280

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

299

301

302

303

304

305

306

307

308

309

310

312

314 {

315 bool nulls[Natts_pg_replication_origin];

316 Datum values[Natts_pg_replication_origin];

317 bool collides;

318

320

322 Anum_pg_replication_origin_roident,

325

327 true ,

328 &SnapshotDirty,

329 1, &key);

330

332

334

335 if (!collides)

336 {

337

338

339

340

341 memset(&nulls, 0, sizeof(nulls));

342

344 values[Anum_pg_replication_origin_roname - 1] = roname_d;

345

349 break;

350 }

351 }

352

353

355

356 if (tuple == NULL)

358 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),

359 errmsg("could not find free replication origin ID")));

360

362 return roident;

363}

364

365

366

367

368static void

370{

371 int i;

372

373

374

375

376restart:

378

380 {

382

383 if (state->roident == roident)

384 {

385

386 if (state->acquired_by != 0)

387 {

389

390 if (nowait)

392 (errcode(ERRCODE_OBJECT_IN_USE),

393 errmsg("could not drop replication origin with ID %d, in use by PID %d",

395 state->acquired_by)));

396

397

398

399

400

401

402

403

404 cv = &state->origin_cv;

405

407

409 goto restart;

410 }

411

412

413 {

415

420 }

421

422

426 break;

427 }

428 }

431}

432

433

434

435

436

437

438void

440{

444

446

448

450

451

454

457 {

458 if (!missing_ok)

459 elog(ERROR, "cache lookup failed for replication origin with ID %d",

460 roident);

461

462

463

464

468 return;

469 }

470

472

473

474

475

478

480

481

483}

484

485

486

487

488

489

490

491

492bool

494{

497

501

504

506 {

510

511 return true;

512 }

513 else

514 {

515 *roname = NULL;

516

517 if (!missing_ok)

519 (errcode(ERRCODE_UNDEFINED_OBJECT),

520 errmsg("replication origin with ID %d does not exist",

521 roident)));

522

523 return false;

524 }

525}

526

527

528

529

530

531

532

535{

536 Size size = 0;

537

539 return size;

540

542

545 return size;

546}

547

548void

550{

551 bool found;

552

554 return;

555

559 &found);

561

562 if (!found)

563 {

564 int i;

565

567

569

571 {

575 }

576 }

577}

578

579

580

581

582

583

584

585

586

587

588

589

590

591

592

593

594

595void

597{

600 int tmpfd;

601 int i;

604

606 return;

607

609

610

611 if (unlink(tmppath) < 0 && errno != ENOENT)

614 errmsg("could not remove file \"%s\": %m",

615 tmppath)));

616

617

618

619

620

622 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);

623 if (tmpfd < 0)

626 errmsg("could not create file \"%s\": %m",

627 tmppath)));

628

629

630 errno = 0;

631 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))

632 {

633

634 if (errno == 0)

635 errno = ENOSPC;

638 errmsg("could not write to file \"%s\": %m",

639 tmppath)));

640 }

642

643

645

646

648 {

652

654 continue;

655

656

657 memset(&disk_state, 0, sizeof(disk_state));

658

660

662

665

667

668

670

671 errno = 0;

672 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=

673 sizeof(disk_state))

674 {

675

676 if (errno == 0)

677 errno = ENOSPC;

680 errmsg("could not write to file \"%s\": %m",

681 tmppath)));

682 }

683

685 }

686

688

689

691 errno = 0;

692 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))

693 {

694

695 if (errno == 0)

696 errno = ENOSPC;

699 errmsg("could not write to file \"%s\": %m",

700 tmppath)));

701 }

702

706 errmsg("could not close file \"%s\": %m",

707 tmppath)));

708

709

711}

712

713

714

715

716

717

718

719

720

721void

723{

725 int fd;

726 int readBytes;

728 int last_state = 0;

731

732

733#ifdef USE_ASSERT_CHECKING

734 static bool already_started = false;

735

736 Assert(!already_started);

737 already_started = true;

738#endif

739

741 return;

742

744

745 elog(DEBUG2, "starting up replication origin progress state");

746

748

749

750

751

752

753 if (fd < 0 && errno == ENOENT)

754 return;

755 else if (fd < 0)

758 errmsg("could not open file \"%s\": %m",

759 path)));

760

761

762 readBytes = read(fd, &magic, sizeof(magic));

763 if (readBytes != sizeof(magic))

764 {

765 if (readBytes < 0)

768 errmsg("could not read file \"%s\": %m",

769 path)));

770 else

773 errmsg("could not read file \"%s\": read %d of %zu",

774 path, readBytes, sizeof(magic))));

775 }

777

780 (errmsg("replication checkpoint has wrong magic %u instead of %u",

782

783

784

785

786 while (true)

787 {

789

790 readBytes = read(fd, &disk_state, sizeof(disk_state));

791

792

793 if (readBytes == sizeof(crc))

794 {

795

796 file_crc = *(pg_crc32c *) &disk_state;

797 break;

798 }

799

800 if (readBytes < 0)

801 {

804 errmsg("could not read file \"%s\": %m",

805 path)));

806 }

807

808 if (readBytes != sizeof(disk_state))

809 {

812 errmsg("could not read file \"%s\": read %d of %zu",

813 path, readBytes, sizeof(disk_state))));

814 }

815

817

820 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),

821 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));

822

823

826 last_state++;

827

829 (errmsg("recovered replication state of node %d to %X/%X",

832 }

833

834

836 if (file_crc != crc)

839 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",

840 crc, file_crc)));

841

845 errmsg("could not close file \"%s\": %m",

846 path)));

847}

848

849void

851{

853

854 switch (info)

855 {

857 {

860

863 xlrec->force ,

864 false );

865 break;

866 }

868 {

870 int i;

871

873

875 {

877

878

880 {

881

885 break;

886 }

887 }

888 break;

889 }

890 default:

891 elog(PANIC, "replorigin_redo: unknown op code %u", info);

892 }

893}

894

895

896

897

898

899

900

901

902

903

904

905

906

907

908

909

910void

913 bool go_backward, bool wal_log)

914{

915 int i;

918

920

921

923 return;

924

925

926

927

928

929

930

931

932

934

935

936

937

938

940 {

942

943

945 free_state == NULL)

946 {

947 free_state = curstate;

948 continue;

949 }

950

951

952 if (curstate->roident != node)

953 {

954 continue;

955 }

956

957

958 replication_state = curstate;

959

961

962

964 {

966 (errcode(ERRCODE_OBJECT_IN_USE),

967 errmsg("replication origin with ID %d is already active for PID %d",

968 replication_state->roident,

970 }

971

972 break;

973 }

974

975 if (replication_state == NULL && free_state == NULL)

977 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),

978 errmsg("could not find free replication state slot for replication origin with ID %d",

979 node),

980 errhint("Increase \"max_active_replication_origins\" and try again.")));

981

982 if (replication_state == NULL)

983 {

984

986 replication_state = free_state;

989 replication_state->roident = node;

990 }

991

993

994

995

996

997

998

999 if (wal_log)

1000 {

1002

1005 xlrec.force = go_backward;

1006

1009

1011 }

1012

1013

1014

1015

1016

1017

1018

1019

1020

1021 if (go_backward || replication_state->remote_lsn < remote_commit)

1022 replication_state->remote_lsn = remote_commit;

1024 (go_backward || replication_state->local_lsn < local_commit))

1025 replication_state->local_lsn = local_commit;

1027

1028

1029

1030

1031

1033}

1034

1035

1038{

1039 int i;

1042

1043

1045

1047 {

1049

1051

1052 if (state->roident == node)

1053 {

1055

1056 remote_lsn = state->remote_lsn;

1057 local_lsn = state->local_lsn;

1058

1060

1061 break;

1062 }

1063 }

1064

1066

1069

1070 return remote_lsn;

1071}

1072

1073

1074

1075

1076

1077static void

1079{

1081

1083 return;

1084

1086

1088 {

1090

1093 }

1094

1096

1097 if (cv)

1099}

1100

1101

1102

1103

1104

1105

1106

1107

1108

1109

1110

1111

1112

1113

1114

1115

1116

1117

1118

1119void

1121{

1122 static bool registered_cleanup;

1123 int i;

1124 int free_slot = -1;

1125

1126 if (!registered_cleanup)

1127 {

1129 registered_cleanup = true;

1130 }

1131

1133

1136 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1137 errmsg("cannot setup replication origin when one is already setup")));

1138

1139

1141

1142

1143

1144

1145

1147 {

1149

1150

1152 free_slot == -1)

1153 {

1154 free_slot = i;

1155 continue;

1156 }

1157

1158

1159 if (curstate->roident != node)

1160 continue;

1161

1162 else if (curstate->acquired_by != 0 && acquired_by == 0)

1163 {

1165 (errcode(ERRCODE_OBJECT_IN_USE),

1166 errmsg("replication origin with ID %d is already active for PID %d",

1168 }

1169

1170

1172 break;

1173 }

1174

1175

1178 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),

1179 errmsg("could not find free replication state slot for replication origin with ID %d",

1180 node),

1181 errhint("Increase \"max_active_replication_origins\" and try again.")));

1183 {

1184

1189 }

1190

1191

1193

1194 if (acquired_by == 0)

1197 elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",

1198 node, acquired_by);

1199

1201

1202

1204}

1205

1206

1207

1208

1209

1210

1211

1212void

1214{

1216

1218

1221 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1222 errmsg("no replication origin is configured")));

1223

1225

1229

1231

1233}

1234

1235

1236

1237

1238

1239

1240

1241void

1243{

1246

1253}

1254

1255

1256

1257

1258

1261{

1264

1266

1271

1274

1275 return remote_lsn;

1276}

1277

1278

1279

1280

1281

1282

1283

1284

1285

1286

1287

1288

1289

1290

1293{

1296

1298

1300

1301

1302

1303

1304

1307 (errcode(ERRCODE_RESERVED_NAME),

1308 errmsg("replication origin name \"%s\" is reserved",

1310 errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",

1311 LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));

1312

1313

1314

1315

1316

1317#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS

1318 if (strncmp(name, "regress_", 8) != 0)

1319 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");

1320#endif

1321

1323

1325

1327}

1328

1329

1330

1331

1334{

1336

1338

1340

1342

1344

1346}

1347

1348

1349

1350

1353{

1356

1358

1361

1363

1367}

1368

1369

1370

1371

1374{

1377

1379

1383

1385

1387

1389}

1390

1391

1392

1393

1396{

1398

1400

1404

1406}

1407

1408

1409

1410

1413{

1415

1417}

1418

1419

1420

1421

1422

1423

1424

1425

1426

1429{

1432

1434

1437 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1438 errmsg("no replication origin is configured")));

1439

1441

1444

1446}

1447

1450{

1452

1454

1457 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1458 errmsg("no replication origin is configured")));

1459

1462

1464}

1465

1468{

1470

1473

1475}

1476

1477

1480{

1484

1486

1487

1489

1491

1492

1493

1494

1495

1496

1498 true , true );

1499

1501

1503}

1504

1505

1506

1507

1508

1509

1510

1511

1512

1515{

1517 bool flush;

1520

1522

1525

1528

1530

1533

1535}

1536

1537

1540{

1542 int i;

1544

1545

1547

1549

1550

1552

1553

1554

1555

1556

1557

1559 {

1563 char *roname;

1564

1566

1567

1569 continue;

1570

1572 memset(nulls, 1, sizeof(nulls));

1573

1575 nulls[0] = false;

1576

1577

1578

1579

1580

1582 &roname))

1583 {

1585 nulls[1] = false;

1586 }

1587

1589

1591 nulls[2] = false;

1592

1594 nulls[3] = false;

1595

1597

1600 }

1601

1603

1604#undef REPLICATION_ORIGIN_PROGRESS_COLS

1605

1606 return (Datum) 0;

1607}

static Datum values[MAXATTR]

#define CStringGetTextDatum(s)

#define FLEXIBLE_ARRAY_MEMBER

#define MemSet(start, val, len)

#define OidIsValid(objectId)

bool IsReservedName(const char *name)

bool ConditionVariableCancelSleep(void)

void ConditionVariableBroadcast(ConditionVariable *cv)

void ConditionVariableInit(ConditionVariable *cv)

void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)

int errcode_for_file_access(void)

int errdetail(const char *fmt,...)

int errhint(const char *fmt,...)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

int durable_rename(const char *oldfile, const char *newfile, int elevel)

int CloseTransientFile(int fd)

int OpenTransientFile(const char *fileName, int fileFlags)

#define PG_GETARG_TEXT_PP(n)

#define PG_GETARG_DATUM(n)

#define PG_GETARG_BOOL(n)

#define PG_RETURN_BOOL(x)

void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)

void systable_endscan(SysScanDesc sysscan)

HeapTuple systable_getnext(SysScanDesc sysscan)

SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)

Assert(PointerIsAligned(start, uint64))

HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)

void heap_freetuple(HeapTuple htup)

#define HeapTupleIsValid(tuple)

static void * GETSTRUCT(const HeapTupleData *tuple)

void CatalogTupleInsert(Relation heapRel, HeapTuple tup)

void CatalogTupleDelete(Relation heapRel, ItemPointer tid)

void on_shmem_exit(pg_on_exit_callback function, Datum arg)

void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)

void UnlockRelationOid(Oid relid, LOCKMODE lockmode)

void LockRelationOid(Oid relid, LOCKMODE lockmode)

void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)

#define AccessExclusiveLock

bool LWLockAcquire(LWLock *lock, LWLockMode mode)

void LWLockRelease(LWLock *lock)

void LWLockInitialize(LWLock *lock, int tranche_id)

@ LWTRANCHE_REPLICATION_ORIGIN_STATE

void pfree(void *pointer)

#define CHECK_FOR_INTERRUPTS()

TimestampTz replorigin_session_origin_timestamp

static ReplicationStateCtl * replication_states_ctl

RepOriginId replorigin_by_name(const char *roname, bool missing_ok)

Size ReplicationOriginShmemSize(void)

RepOriginId replorigin_create(const char *roname)

Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)

void replorigin_session_reset(void)

struct ReplicationState ReplicationState

static bool IsReservedOriginName(const char *name)

void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)

bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)

int max_active_replication_origins

Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)

XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)

#define PG_REPLORIGIN_CHECKPOINT_TMPFILE

Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)

static ReplicationState * replication_states

#define PG_REPLORIGIN_CHECKPOINT_FILENAME

Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)

Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)

Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)

Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)

Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)

static void ReplicationOriginExitCleanup(int code, Datum arg)

void StartupReplicationOrigin(void)

void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)

RepOriginId replorigin_session_origin

void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)

static void replorigin_state_clear(RepOriginId roident, bool nowait)

void replorigin_session_setup(RepOriginId node, int acquired_by)

void CheckPointReplicationOrigin(void)

static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)

static ReplicationState * session_replication_state

Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)

#define REPLICATION_ORIGIN_PROGRESS_COLS

XLogRecPtr replorigin_session_get_progress(bool flush)

void ReplicationOriginShmemInit(void)

Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)

#define REPLICATION_STATE_MAGIC

XLogRecPtr replorigin_session_origin_lsn

Datum pg_replication_origin_create(PG_FUNCTION_ARGS)

Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)

void replorigin_redo(XLogReaderState *record)

struct ReplicationStateCtl ReplicationStateCtl

struct ReplicationStateOnDisk ReplicationStateOnDisk

#define InvalidRepOriginId

#define XLOG_REPLORIGIN_DROP

#define XLOG_REPLORIGIN_SET

#define ERRCODE_DATA_CORRUPTED

#define COMP_CRC32C(crc, data, len)

static Datum LSNGetDatum(XLogRecPtr X)

FormData_pg_replication_origin * Form_pg_replication_origin

int pg_strcasecmp(const char *s1, const char *s2)

static Datum ObjectIdGetDatum(Oid X)

static Pointer DatumGetPointer(Datum X)

static int fd(const char *x, int i)

#define RelationGetDescr(relation)

void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)

Size add_size(Size s1, Size s2)

Size mul_size(Size s1, Size s2)

void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)

#define InitDirtySnapshot(snapshotdata)

#define BTEqualStrategyNumber

ReplicationState states[FLEXIBLE_ARRAY_MEMBER]

ConditionVariable origin_cv

Tuplestorestate * setResult

void ReleaseSysCache(HeapTuple tuple)

HeapTuple SearchSysCache1(int cacheId, Datum key1)

void table_close(Relation relation, LOCKMODE lockmode)

Relation table_open(Oid relationId, LOCKMODE lockmode)

void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)

#define PG_GETARG_TIMESTAMPTZ(n)

char * text_to_cstring(const text *t)

bool IsTransactionState(void)

void CommandCounterIncrement(void)

bool RecoveryInProgress(void)

void XLogFlush(XLogRecPtr record)

#define LSN_FORMAT_ARGS(lsn)

#define InvalidXLogRecPtr

XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)

void XLogRegisterData(const void *data, uint32 len)

void XLogBeginInsert(void)

#define XLogRecGetInfo(decoder)

#define XLogRecGetData(decoder)