PostgreSQL Source Code: src/backend/replication/logical/origin.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
69
71#include <sys/stat.h>
72
92#include "utils/fmgroids.h"
98
99
100#define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101#define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102
103
105
106
107
108
110{
111
112
113
115
116
117
118
120
121
122
123
124
125
127
128
129
130
132
133
134
135
137
138
139
140
143
144
145
146
148{
152
153
155{
156
158
161
162
166
167
168
169
170
172
173
174
175
177
178
179
180
181
182
183
185
186
187#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
188
189static void
191{
194 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195 errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
196
199 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
200 errmsg("cannot manipulate replication origins during recovery")));
201}
202
203
204
205
206
207
208static bool
210{
213}
214
215
216
217
218
219
220
221
222
223
224
227{
232
234
237 {
239 roident = ident->roident;
241 }
242 else if (!missing_ok)
244 (errcode(ERRCODE_UNDEFINED_OBJECT),
245 errmsg("replication origin \"%s\" does not exist",
246 roname)));
247
248 return roident;
249}
250
251
252
253
254
255
258{
259 Oid roident;
266
267
268
269
270
271
274 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
275 errmsg("replication origin name is too long"),
276 errdetail("Replication origin names must be no longer than %d bytes.",
278
280
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
299
301
302
303
304
305
306
307
308
309
310
312
314 {
315 bool nulls[Natts_pg_replication_origin];
316 Datum values[Natts_pg_replication_origin];
317 bool collides;
318
320
322 Anum_pg_replication_origin_roident,
325
327 true ,
328 &SnapshotDirty,
329 1, &key);
330
332
334
335 if (!collides)
336 {
337
338
339
340
341 memset(&nulls, 0, sizeof(nulls));
342
344 values[Anum_pg_replication_origin_roname - 1] = roname_d;
345
349 break;
350 }
351 }
352
353
355
356 if (tuple == NULL)
358 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
359 errmsg("could not find free replication origin ID")));
360
362 return roident;
363}
364
365
366
367
368static void
370{
371 int i;
372
373
374
375
376restart:
378
380 {
382
383 if (state->roident == roident)
384 {
385
386 if (state->acquired_by != 0)
387 {
389
390 if (nowait)
392 (errcode(ERRCODE_OBJECT_IN_USE),
393 errmsg("could not drop replication origin with ID %d, in use by PID %d",
395 state->acquired_by)));
396
397
398
399
400
401
402
403
404 cv = &state->origin_cv;
405
407
409 goto restart;
410 }
411
412
413 {
415
420 }
421
422
426 break;
427 }
428 }
431}
432
433
434
435
436
437
438void
440{
444
446
448
450
451
454
457 {
458 if (!missing_ok)
459 elog(ERROR, "cache lookup failed for replication origin with ID %d",
460 roident);
461
462
463
464
468 return;
469 }
470
472
473
474
475
478
480
481
483}
484
485
486
487
488
489
490
491
492bool
494{
497
501
504
506 {
510
511 return true;
512 }
513 else
514 {
515 *roname = NULL;
516
517 if (!missing_ok)
519 (errcode(ERRCODE_UNDEFINED_OBJECT),
520 errmsg("replication origin with ID %d does not exist",
521 roident)));
522
523 return false;
524 }
525}
526
527
528
529
530
531
532
535{
536 Size size = 0;
537
539 return size;
540
542
545 return size;
546}
547
548void
550{
551 bool found;
552
554 return;
555
559 &found);
561
562 if (!found)
563 {
564 int i;
565
567
569
571 {
575 }
576 }
577}
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595void
597{
600 int tmpfd;
601 int i;
604
606 return;
607
609
610
611 if (unlink(tmppath) < 0 && errno != ENOENT)
614 errmsg("could not remove file \"%s\": %m",
615 tmppath)));
616
617
618
619
620
622 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
623 if (tmpfd < 0)
626 errmsg("could not create file \"%s\": %m",
627 tmppath)));
628
629
630 errno = 0;
631 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
632 {
633
634 if (errno == 0)
635 errno = ENOSPC;
638 errmsg("could not write to file \"%s\": %m",
639 tmppath)));
640 }
642
643
645
646
648 {
652
654 continue;
655
656
657 memset(&disk_state, 0, sizeof(disk_state));
658
660
662
665
667
668
670
671 errno = 0;
672 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
673 sizeof(disk_state))
674 {
675
676 if (errno == 0)
677 errno = ENOSPC;
680 errmsg("could not write to file \"%s\": %m",
681 tmppath)));
682 }
683
685 }
686
688
689
691 errno = 0;
692 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
693 {
694
695 if (errno == 0)
696 errno = ENOSPC;
699 errmsg("could not write to file \"%s\": %m",
700 tmppath)));
701 }
702
706 errmsg("could not close file \"%s\": %m",
707 tmppath)));
708
709
711}
712
713
714
715
716
717
718
719
720
721void
723{
725 int fd;
726 int readBytes;
728 int last_state = 0;
731
732
733#ifdef USE_ASSERT_CHECKING
734 static bool already_started = false;
735
736 Assert(!already_started);
737 already_started = true;
738#endif
739
741 return;
742
744
745 elog(DEBUG2, "starting up replication origin progress state");
746
748
749
750
751
752
753 if (fd < 0 && errno == ENOENT)
754 return;
755 else if (fd < 0)
758 errmsg("could not open file \"%s\": %m",
759 path)));
760
761
762 readBytes = read(fd, &magic, sizeof(magic));
763 if (readBytes != sizeof(magic))
764 {
765 if (readBytes < 0)
768 errmsg("could not read file \"%s\": %m",
769 path)));
770 else
773 errmsg("could not read file \"%s\": read %d of %zu",
774 path, readBytes, sizeof(magic))));
775 }
777
780 (errmsg("replication checkpoint has wrong magic %u instead of %u",
782
783
784
785
786 while (true)
787 {
789
790 readBytes = read(fd, &disk_state, sizeof(disk_state));
791
792
793 if (readBytes == sizeof(crc))
794 {
795
796 file_crc = *(pg_crc32c *) &disk_state;
797 break;
798 }
799
800 if (readBytes < 0)
801 {
804 errmsg("could not read file \"%s\": %m",
805 path)));
806 }
807
808 if (readBytes != sizeof(disk_state))
809 {
812 errmsg("could not read file \"%s\": read %d of %zu",
813 path, readBytes, sizeof(disk_state))));
814 }
815
817
820 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
821 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
822
823
826 last_state++;
827
829 (errmsg("recovered replication state of node %d to %X/%X",
832 }
833
834
836 if (file_crc != crc)
839 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
840 crc, file_crc)));
841
845 errmsg("could not close file \"%s\": %m",
846 path)));
847}
848
849void
851{
853
854 switch (info)
855 {
857 {
860
863 xlrec->force ,
864 false );
865 break;
866 }
868 {
870 int i;
871
873
875 {
877
878
880 {
881
885 break;
886 }
887 }
888 break;
889 }
890 default:
891 elog(PANIC, "replorigin_redo: unknown op code %u", info);
892 }
893}
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910void
913 bool go_backward, bool wal_log)
914{
915 int i;
918
920
921
923 return;
924
925
926
927
928
929
930
931
932
934
935
936
937
938
940 {
942
943
945 free_state == NULL)
946 {
947 free_state = curstate;
948 continue;
949 }
950
951
952 if (curstate->roident != node)
953 {
954 continue;
955 }
956
957
958 replication_state = curstate;
959
961
962
964 {
966 (errcode(ERRCODE_OBJECT_IN_USE),
967 errmsg("replication origin with ID %d is already active for PID %d",
968 replication_state->roident,
970 }
971
972 break;
973 }
974
975 if (replication_state == NULL && free_state == NULL)
977 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
978 errmsg("could not find free replication state slot for replication origin with ID %d",
979 node),
980 errhint("Increase \"max_active_replication_origins\" and try again.")));
981
982 if (replication_state == NULL)
983 {
984
986 replication_state = free_state;
989 replication_state->roident = node;
990 }
991
993
994
995
996
997
998
999 if (wal_log)
1000 {
1002
1005 xlrec.force = go_backward;
1006
1009
1011 }
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021 if (go_backward || replication_state->remote_lsn < remote_commit)
1022 replication_state->remote_lsn = remote_commit;
1024 (go_backward || replication_state->local_lsn < local_commit))
1025 replication_state->local_lsn = local_commit;
1027
1028
1029
1030
1031
1033}
1034
1035
1038{
1039 int i;
1042
1043
1045
1047 {
1049
1051
1052 if (state->roident == node)
1053 {
1055
1056 remote_lsn = state->remote_lsn;
1057 local_lsn = state->local_lsn;
1058
1060
1061 break;
1062 }
1063 }
1064
1066
1069
1070 return remote_lsn;
1071}
1072
1073
1074
1075
1076
1077static void
1079{
1081
1083 return;
1084
1086
1088 {
1090
1093 }
1094
1096
1097 if (cv)
1099}
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119void
1121{
1122 static bool registered_cleanup;
1123 int i;
1124 int free_slot = -1;
1125
1126 if (!registered_cleanup)
1127 {
1129 registered_cleanup = true;
1130 }
1131
1133
1136 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1137 errmsg("cannot setup replication origin when one is already setup")));
1138
1139
1141
1142
1143
1144
1145
1147 {
1149
1150
1152 free_slot == -1)
1153 {
1154 free_slot = i;
1155 continue;
1156 }
1157
1158
1159 if (curstate->roident != node)
1160 continue;
1161
1162 else if (curstate->acquired_by != 0 && acquired_by == 0)
1163 {
1165 (errcode(ERRCODE_OBJECT_IN_USE),
1166 errmsg("replication origin with ID %d is already active for PID %d",
1168 }
1169
1170
1172 break;
1173 }
1174
1175
1178 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1179 errmsg("could not find free replication state slot for replication origin with ID %d",
1180 node),
1181 errhint("Increase \"max_active_replication_origins\" and try again.")));
1183 {
1184
1189 }
1190
1191
1193
1194 if (acquired_by == 0)
1197 elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1198 node, acquired_by);
1199
1201
1202
1204}
1205
1206
1207
1208
1209
1210
1211
1212void
1214{
1216
1218
1221 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1222 errmsg("no replication origin is configured")));
1223
1225
1229
1231
1233}
1234
1235
1236
1237
1238
1239
1240
1241void
1243{
1246
1253}
1254
1255
1256
1257
1258
1261{
1264
1266
1271
1274
1275 return remote_lsn;
1276}
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1293{
1296
1298
1300
1301
1302
1303
1304
1307 (errcode(ERRCODE_RESERVED_NAME),
1308 errmsg("replication origin name \"%s\" is reserved",
1310 errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1311 LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1312
1313
1314
1315
1316
1317#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1318 if (strncmp(name, "regress_", 8) != 0)
1319 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1320#endif
1321
1323
1325
1327}
1328
1329
1330
1331
1334{
1336
1338
1340
1342
1344
1346}
1347
1348
1349
1350
1353{
1356
1358
1361
1363
1367}
1368
1369
1370
1371
1374{
1377
1379
1383
1385
1387
1389}
1390
1391
1392
1393
1396{
1398
1400
1404
1406}
1407
1408
1409
1410
1413{
1415
1417}
1418
1419
1420
1421
1422
1423
1424
1425
1426
1429{
1432
1434
1437 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1438 errmsg("no replication origin is configured")));
1439
1441
1444
1446}
1447
1450{
1452
1454
1457 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1458 errmsg("no replication origin is configured")));
1459
1462
1464}
1465
1468{
1470
1473
1475}
1476
1477
1480{
1484
1486
1487
1489
1491
1492
1493
1494
1495
1496
1498 true , true );
1499
1501
1503}
1504
1505
1506
1507
1508
1509
1510
1511
1512
1515{
1517 bool flush;
1520
1522
1525
1528
1530
1533
1535}
1536
1537
1540{
1542 int i;
1544
1545
1547
1549
1550
1552
1553
1554
1555
1556
1557
1559 {
1563 char *roname;
1564
1566
1567
1569 continue;
1570
1572 memset(nulls, 1, sizeof(nulls));
1573
1575 nulls[0] = false;
1576
1577
1578
1579
1580
1582 &roname))
1583 {
1585 nulls[1] = false;
1586 }
1587
1589
1591 nulls[2] = false;
1592
1594 nulls[3] = false;
1595
1597
1600 }
1601
1603
1604#undef REPLICATION_ORIGIN_PROGRESS_COLS
1605
1606 return (Datum) 0;
1607}
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define FLEXIBLE_ARRAY_MEMBER
#define MemSet(start, val, len)
#define OidIsValid(objectId)
bool IsReservedName(const char *name)
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
int durable_rename(const char *oldfile, const char *newfile, int elevel)
int CloseTransientFile(int fd)
int OpenTransientFile(const char *fileName, int fileFlags)
#define PG_GETARG_TEXT_PP(n)
#define PG_GETARG_DATUM(n)
#define PG_GETARG_BOOL(n)
#define PG_RETURN_BOOL(x)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
void systable_endscan(SysScanDesc sysscan)
HeapTuple systable_getnext(SysScanDesc sysscan)
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
void LockRelationOid(Oid relid, LOCKMODE lockmode)
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockInitialize(LWLock *lock, int tranche_id)
@ LWTRANCHE_REPLICATION_ORIGIN_STATE
void pfree(void *pointer)
#define CHECK_FOR_INTERRUPTS()
TimestampTz replorigin_session_origin_timestamp
static ReplicationStateCtl * replication_states_ctl
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Size ReplicationOriginShmemSize(void)
RepOriginId replorigin_create(const char *roname)
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
void replorigin_session_reset(void)
struct ReplicationState ReplicationState
static bool IsReservedOriginName(const char *name)
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
int max_active_replication_origins
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
static ReplicationState * replication_states
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
static void ReplicationOriginExitCleanup(int code, Datum arg)
void StartupReplicationOrigin(void)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
RepOriginId replorigin_session_origin
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
static void replorigin_state_clear(RepOriginId roident, bool nowait)
void replorigin_session_setup(RepOriginId node, int acquired_by)
void CheckPointReplicationOrigin(void)
static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
static ReplicationState * session_replication_state
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
#define REPLICATION_ORIGIN_PROGRESS_COLS
XLogRecPtr replorigin_session_get_progress(bool flush)
void ReplicationOriginShmemInit(void)
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
#define REPLICATION_STATE_MAGIC
XLogRecPtr replorigin_session_origin_lsn
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
void replorigin_redo(XLogReaderState *record)
struct ReplicationStateCtl ReplicationStateCtl
struct ReplicationStateOnDisk ReplicationStateOnDisk
#define InvalidRepOriginId
#define XLOG_REPLORIGIN_DROP
#define XLOG_REPLORIGIN_SET
#define ERRCODE_DATA_CORRUPTED
#define COMP_CRC32C(crc, data, len)
static Datum LSNGetDatum(XLogRecPtr X)
FormData_pg_replication_origin * Form_pg_replication_origin
int pg_strcasecmp(const char *s1, const char *s2)
static Datum ObjectIdGetDatum(Oid X)
static Pointer DatumGetPointer(Datum X)
static int fd(const char *x, int i)
#define RelationGetDescr(relation)
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
#define InitDirtySnapshot(snapshotdata)
#define BTEqualStrategyNumber
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
ConditionVariable origin_cv
Tuplestorestate * setResult
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
#define PG_GETARG_TIMESTAMPTZ(n)
char * text_to_cstring(const text *t)
bool IsTransactionState(void)
void CommandCounterIncrement(void)
bool RecoveryInProgress(void)
void XLogFlush(XLogRecPtr record)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogRegisterData(const void *data, uint32 len)
void XLogBeginInsert(void)
#define XLogRecGetInfo(decoder)
#define XLogRecGetData(decoder)