PostgreSQL Source Code: src/backend/executor/execReplication.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
16
25#include "catalog/pg_am_d.h"
38
39
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56static int
59{
60 int index_attoff;
61 int skey_attoff = 0;
62 Datum indclassDatum;
65
67 Anum_pg_index_indclass);
69
70
72 index_attoff++)
73 {
74 Oid operator;
75 Oid optype;
76 Oid opfamily;
78 int table_attno = indkey->values[index_attoff];
80
82 {
83
84
85
86
87 continue;
88 }
89
90
91
92
93
98 optype,
99 eq_strategy);
100
102 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
103 eq_strategy, optype, optype, opfamily);
104
106
107
109 index_attoff + 1,
110 eq_strategy,
111 regop,
112 searchslot->tts_values[table_attno - 1]);
113
115
116
117 if (searchslot->tts_isnull[table_attno - 1])
119
120 skey_attoff++;
121 }
122
123
124 Assert(skey_attoff > 0);
125
126 return skey_attoff;
127}
128
129
130
131
132
133
134
135static bool
137{
138 bool refetch = false;
139
140 switch (res)
141 {
143 break;
145
149 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
150 else
153 errmsg("concurrent update, retrying")));
154 refetch = true;
155 break;
157
160 errmsg("concurrent delete, retrying")));
161 refetch = true;
162 break;
164 elog(ERROR, "attempted to lock invisible tuple");
165 break;
166 default:
167 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
168 break;
169 }
170
171 return refetch;
172}
173
174
175
176
177
178
179
180bool
185{
187 int skey_attoff;
192 bool found;
194 bool isIdxSafeToSkipDuplicates;
195
196
198
200
202
203
205
206
207 scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
208
209retry:
210 found = false;
211
212 index_rescan(scan, skey, skey_attoff, NULL, 0);
213
214
216 {
217
218
219
220
221 if (!isIdxSafeToSkipDuplicates)
222 {
223 if (eq == NULL)
225
226 if ((outslot, searchslot, eq, NULL))
227 continue;
228 }
229
231
234
235
236
237
238
240 {
242 goto retry;
243 }
244
245
246 found = true;
247 break;
248 }
249
250
251 if (found)
252 {
255
257
259 outslot,
261 lockmode,
263 0 ,
264 &tmfd);
265
267
269 goto retry;
270 }
271
273
274
276
277 return found;
278}
279
280
281
282
283
284
285
286static bool
289{
290 int attrnum;
291
294
297
298
300 {
303
305
306
307
308
309
310 if (att->attisdropped || att->attgenerated)
311 continue;
312
313
314
315
316 if (columns &&
318 columns))
319 continue;
320
321
322
323
324
326 return false;
327
328
329
330
332 continue;
333
334 typentry = eq[attrnum];
335 if (typentry == NULL)
336 {
341 (errcode(ERRCODE_UNDEFINED_FUNCTION),
342 errmsg("could not identify an equality operator for type %s",
344 eq[attrnum] = typentry;
345 }
346
348 att->attcollation,
351 return false;
352 }
353
354 return true;
355}
356
357
358
359
360
361
362
363
364
365
366
367bool
370{
376 bool found;
378
380
382
383
387
388retry:
389 found = false;
390
392
393
395 {
396 if ((scanslot, searchslot, eq, NULL))
397 continue;
398
399 found = true;
401
404
405
406
407
408
410 {
412 goto retry;
413 }
414
415
416 break;
417 }
418
419
420 if (found)
421 {
424
426
428 outslot,
430 lockmode,
432 0 ,
433 &tmfd);
434
436
438 goto retry;
439 }
440
443
444 return found;
445}
446
447
448
449
450static void
452{
454 {
457
459 continue;
460
461
462
463
464
466
468 }
469}
470
471
472
473
474
475
476
477static void
483{
487 bool recently_dead = false;
491
493
496
498
499
500
501
502
503
504
505
507 recently_dead = true;
508
510
511 if (!recently_dead)
512 return;
513
516 return;
517
518
521 {
522 *delete_xid = xmax;
523 *delete_time = localts;
524 *delete_origin = localorigin;
525 }
526}
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560bool
566{
572
574
577 *delete_time = 0;
578
579
580
581
582
583
584
585
586
587
590
591
592 if (!indexbitmap)
595
597
598
599
600
601
602
603
606
608
609
611 {
612 if ((scanslot, searchslot, eq, indexbitmap))
613 continue;
614
616 delete_time, delete_origin);
617 }
618
621
622 return *delete_time != 0;
623}
624
625
626
627
628
629bool
636{
639 int skey_attoff;
643 bool isIdxSafeToSkipDuplicates;
645
648
650 *delete_time = 0;
652
654
656
658
659
661
662
663
664
665
666
667
669
670 index_rescan(scan, skey, skey_attoff, NULL, 0);
671
672
674 {
675
676
677
678
679 if (!isIdxSafeToSkipDuplicates)
680 {
681 if (eq == NULL)
683
684 if ((scanslot, searchslot, eq, NULL))
685 continue;
686 }
687
689 delete_time, delete_origin);
690 }
691
693
695
697
698 return *delete_time != 0;
699}
700
701
702
703
704
705
706
707
708
709
710static bool
714{
719
720 *conflictslot = NULL;
721
722
723
724
725
727
728retry:
730 &conflictTid, &slot->tts_tid,
732 {
733 if (*conflictslot)
735
736 *conflictslot = NULL;
737 return false;
738 }
739
741
743
745 *conflictslot,
749 0 ,
750 &tmfd);
751
753
755 goto retry;
756
757 return true;
758}
759
760
761
762
763
764static void
768{
769 List *conflicttuples = NIL;
771
772
774 {
777 &conflictslot))
778 {
780
781 conflicttuple->slot = conflictslot;
782 conflicttuple->indexoid = uniqueidx;
783
785 &conflicttuple->origin, &conflicttuple->ts);
786
787 conflicttuples = lappend(conflicttuples, conflicttuple);
788 }
789 }
790
791
792 if (conflicttuples)
795 searchslot, remoteslot, conflicttuples);
796}
797
798
799
800
801
802
803
804void
807{
808 bool skip_tuple = false;
810
811
812 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
813
815
816
819 {
821 skip_tuple = true;
822 }
823
824 if (!skip_tuple)
825 {
826 List *recheckIndexes = NIL;
827 List *conflictindexes;
828 bool conflict = false;
829
830
835
836
839 if (rel->rd_rel->relispartition)
841
842
844
846
849 slot, estate, false,
850 conflictindexes ? true : false,
851 &conflict,
852 conflictindexes, false);
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869 if (conflict)
871 recheckIndexes, NULL, slot);
872
873
875 recheckIndexes, NULL);
876
877
878
879
880
881
882
884 }
885}
886
887
888
889
890
891
892
893void
897{
898 bool skip_tuple = false;
901
902
903
904
905
906 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
908
910
911
914 {
916 tid, NULL, slot, NULL, NULL, false))
917 skip_tuple = true;
918 }
919
920 if (!skip_tuple)
921 {
922 List *recheckIndexes = NIL;
924 List *conflictindexes;
925 bool conflict = false;
926
927
932
933
936 if (rel->rd_rel->relispartition)
938
940 &update_indexes);
941
943
946 slot, estate, true,
947 conflictindexes ? true : false,
948 &conflict, conflictindexes,
950
951
952
953
954
955
956 if (conflict)
958 recheckIndexes, searchslot, slot);
959
960
962 NULL, NULL,
963 tid, NULL, slot,
964 recheckIndexes, NULL, false);
965
967 }
968}
969
970
971
972
973
974
975
976void
980{
981 bool skip_tuple = false;
984
986
987
990 {
992 tid, NULL, NULL, NULL, NULL, false);
993 }
994
995 if (!skip_tuple)
996 {
997
999
1000
1002 tid, NULL, NULL, false);
1003 }
1004}
1005
1006
1007
1008
1009void
1011{
1013
1014
1015
1016
1017
1018 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1019 return;
1020
1021
1023 return;
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1051 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1052 errmsg("cannot update table \"%s\"",
1054 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
1057 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1058 errmsg("cannot update table \"%s\"",
1060 errdetail("Column list used by the publication does not cover the replica identity.")));
1063 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1064 errmsg("cannot update table \"%s\"",
1066 errdetail("Replica identity must not contain unpublished generated columns.")));
1069 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1070 errmsg("cannot delete from table \"%s\"",
1072 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
1075 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1076 errmsg("cannot delete from table \"%s\"",
1078 errdetail("Column list used by the publication does not cover the replica identity.")));
1081 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1082 errmsg("cannot delete from table \"%s\"",
1084 errdetail("Replica identity must not contain unpublished generated columns.")));
1085
1086
1088 return;
1089
1090
1091 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
1092 return;
1093
1094
1095
1096
1097
1098
1101 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1102 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
1104 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
1107 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1108 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
1110 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
1111}
1112
1113
1114
1115
1116
1117
1118
1119
1120void
1122 const char *nspname, const char *relname)
1123{
1124 if (localrelkind != RELKIND_RELATION &&
1125 localrelkind != RELKIND_PARTITIONED_TABLE &&
1126 localrelkind != RELKIND_SEQUENCE)
1128 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1129 errmsg("cannot use relation \"%s.%s\" as logical replication target",
1132
1133
1134
1135
1136
1137
1138 if ((localrelkind == RELKIND_SEQUENCE && remoterelkind != RELKIND_SEQUENCE) ||
1139 (localrelkind != RELKIND_SEQUENCE && remoterelkind == RELKIND_SEQUENCE))
1141 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1142
1143 errmsg("relation \"%s.%s\" type mismatch: source \"%s\", target \"%s\"",
1145 remoterelkind == RELKIND_SEQUENCE ? "sequence" : "table",
1146 localrelkind == RELKIND_SEQUENCE ? "sequence" : "table"));
1147}
StrategyNumber IndexAmTranslateCompareType(CompareType cmptype, Oid amoid, Oid opfamily, bool missing_ok)
#define AttributeNumberIsValid(attributeNumber)
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
bool bms_is_member(int x, const Bitmapset *a)
void LockBuffer(Buffer buffer, BufferLockMode mode)
#define PG_USED_FOR_ASSERTS_ONLY
#define OidIsValid(objectId)
bool IsCatalogRelation(Relation relation)
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)
@ CT_MULTIPLE_UNIQUE_CONFLICTS
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes, bool onlySummarizing)
bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, ItemPointer conflictTid, const ItemPointerData *tupleid, List *arbiterIndexes)
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
static void BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq, Bitmapset *columns)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
bool RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, RepOriginId *delete_origin, TimestampTz *delete_time)
static bool should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, RepOriginId *delete_origin, TimestampTz *delete_time)
static bool FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate, Oid conflictindex, TupleTableSlot *slot, TupleTableSlot **conflictslot)
static int build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
static void CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, ConflictType type, List *recheckIndexes, TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
static void update_most_recent_deletion_info(TupleTableSlot *scanslot, TransactionId oldestxmin, TransactionId *delete_xid, TimestampTz *delete_time, RepOriginId *delete_origin)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
HeapTuple ExecFetchSlotHeapTuple(TupleTableSlot *slot, bool materialize, bool *shouldFree)
#define palloc0_array(type, count)
#define palloc0_object(type)
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
char * format_type_be(Oid type_oid)
Assert(PointerIsAligned(start, uint64))
@ HEAPTUPLE_RECENTLY_DEAD
HTSV_Result HeapTupleSatisfiesVacuum(HeapTuple htup, TransactionId OldestXmin, Buffer buffer)
static TransactionId HeapTupleHeaderGetUpdateXid(const HeapTupleHeaderData *tup)
void BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, IndexScanInstrumentation *instrument, int nkeys, int norderbys)
void index_close(Relation relation, LOCKMODE lockmode)
void index_endscan(IndexScanDesc scan)
Relation index_open(Oid relationId, LOCKMODE lockmode)
void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
static bool ItemPointerIndicatesMovedPartitions(const ItemPointerData *pointer)
List * lappend(List *list, void *datum)
void list_free(List *list)
bool list_member_oid(const List *list, Oid datum)
void XactLockTableWait(TransactionId xid, Relation rel, const ItemPointerData *ctid, XLTW_Oper oper)
Oid get_opclass_input_type(Oid opclass)
Oid get_opclass_family(Oid opclass)
RegProcedure get_opcode(Oid opno)
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
#define InvalidRepOriginId
FormData_pg_attribute * Form_pg_attribute
int errdetail_relkind_not_supported(char relkind)
static int list_length(const List *l)
#define list_make1_oid(x1)
#define foreach_oid(var, lst)
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define ERRCODE_T_R_SERIALIZATION_FAILURE
static bool DatumGetBool(Datum X)
static Pointer DatumGetPointer(Datum X)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define IndexRelationGetNumberOfKeyAttributes(relation)
void RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Oid RelationGetReplicaIndex(Relation relation)
@ INDEX_ATTR_BITMAP_PRIMARY_KEY
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Snapshot GetLatestSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
void PopActiveSnapshot(void)
Snapshot GetActiveSnapshot(void)
#define InitDirtySnapshot(snapshotdata)
Oid GetRelationIdentityOrPK(Relation rel)
PublicationActions pubactions
bool cols_valid_for_delete
bool gencols_valid_for_update
bool cols_valid_for_update
bool gencols_valid_for_delete
struct HeapTupleData * rd_indextuple
List * ri_onConflictArbiterIndexes
RelationPtr ri_IndexRelationDescs
TriggerDesc * ri_TrigDesc
IndexInfo ** ri_IndexRelationInfo
bool trig_delete_before_row
bool trig_update_before_row
bool trig_insert_before_row
bool has_generated_stored
TupleDesc tts_tupleDescriptor
int16 values[FLEXIBLE_ARRAY_MEMBER]
Oid values[FLEXIBLE_ARRAY_MEMBER]
#define FirstLowInvalidHeapAttributeNumber
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
void simple_table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, Snapshot snapshot, TU_UpdateIndexes *update_indexes)
void simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)
void simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
static void table_endscan(TableScanDesc scan)
static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)
static void table_rescan(TableScanDesc scan, ScanKeyData *key)
static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, ScanKeyData *key)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
bool ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, TM_Result *tmresult, TM_FailureData *tmfd, bool is_merge_update)
void ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TransitionCaptureState *transition_capture, bool is_crosspart_update)
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
void ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ResultRelInfo *src_partinfo, ResultRelInfo *dst_partinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, List *recheckIndexes, TransitionCaptureState *transition_capture, bool is_crosspart_update)
bool ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot **epqslot, TM_Result *tmresult, TM_FailureData *tmfd, bool is_merge_delete)
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
static void slot_getallattrs(TupleTableSlot *slot)
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
static void ExecMaterializeSlot(TupleTableSlot *slot)
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
#define TYPECACHE_EQ_OPR_FINFO
CommandId GetCurrentCommandId(bool used)