PostgreSQL Source Code: src/backend/executor/execReplication.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
16
23#include "catalog/pg_am_d.h"
36
37
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54static int
57{
58 int index_attoff;
59 int skey_attoff = 0;
60 Datum indclassDatum;
63
65 Anum_pg_index_indclass);
67
68
70 index_attoff++)
71 {
72 Oid operator;
73 Oid optype;
74 Oid opfamily;
76 int table_attno = indkey->values[index_attoff];
78
80 {
81
82
83
84
85 continue;
86 }
87
88
89
90
91
96 optype,
97 eq_strategy);
98
100 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
101 eq_strategy, optype, optype, opfamily);
102
104
105
107 index_attoff + 1,
108 eq_strategy,
109 regop,
110 searchslot->tts_values[table_attno - 1]);
111
113
114
115 if (searchslot->tts_isnull[table_attno - 1])
117
118 skey_attoff++;
119 }
120
121
122 Assert(skey_attoff > 0);
123
124 return skey_attoff;
125}
126
127
128
129
130
131
132
133static bool
135{
136 bool refetch = false;
137
138 switch (res)
139 {
141 break;
143
147 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
148 else
151 errmsg("concurrent update, retrying")));
152 refetch = true;
153 break;
155
158 errmsg("concurrent delete, retrying")));
159 refetch = true;
160 break;
162 elog(ERROR, "attempted to lock invisible tuple");
163 break;
164 default:
165 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
166 break;
167 }
168
169 return refetch;
170}
171
172
173
174
175
176
177
178bool
183{
185 int skey_attoff;
190 bool found;
192 bool isIdxSafeToSkipDuplicates;
193
194
196
198
200
201
203
204
205 scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
206
207retry:
208 found = false;
209
210 index_rescan(scan, skey, skey_attoff, NULL, 0);
211
212
214 {
215
216
217
218
219 if (!isIdxSafeToSkipDuplicates)
220 {
221 if (eq == NULL)
223
225 continue;
226 }
227
229
232
233
234
235
236
238 {
240 goto retry;
241 }
242
243
244 found = true;
245 break;
246 }
247
248
249 if (found)
250 {
253
255
257 outslot,
259 lockmode,
261 0 ,
262 &tmfd);
263
265
267 goto retry;
268 }
269
271
272
274
275 return found;
276}
277
278
279
280
281static bool
284{
285 int attrnum;
286
289
292
293
295 {
298
300
301
302
303
304
305 if (att->attisdropped || att->attgenerated)
306 continue;
307
308
309
310
311
313 return false;
314
315
316
317
319 continue;
320
321 typentry = eq[attrnum];
322 if (typentry == NULL)
323 {
328 (errcode(ERRCODE_UNDEFINED_FUNCTION),
329 errmsg("could not identify an equality operator for type %s",
331 eq[attrnum] = typentry;
332 }
333
335 att->attcollation,
338 return false;
339 }
340
341 return true;
342}
343
344
345
346
347
348
349
350
351
352
353
354bool
357{
363 bool found;
365
367
369
370
374
375retry:
376 found = false;
377
379
380
382 {
384 continue;
385
386 found = true;
388
391
392
393
394
395
397 {
399 goto retry;
400 }
401
402
403 break;
404 }
405
406
407 if (found)
408 {
411
413
415 outslot,
417 lockmode,
419 0 ,
420 &tmfd);
421
423
425 goto retry;
426 }
427
430
431 return found;
432}
433
434
435
436
437static void
439{
441 {
444
446 continue;
447
448
449
450
451
453
455 }
456}
457
458
459
460
461
462
463
464
465
466
467static bool
471{
476
477 *conflictslot = NULL;
478
479
480
481
482
484
485retry:
487 &conflictTid, &slot->tts_tid,
489 {
490 if (*conflictslot)
492
493 *conflictslot = NULL;
494 return false;
495 }
496
498
500
502 *conflictslot,
506 0 ,
507 &tmfd);
508
510
512 goto retry;
513
514 return true;
515}
516
517
518
519
520
521static void
525{
526 List *conflicttuples = NIL;
528
529
531 {
534 &conflictslot))
535 {
537
538 conflicttuple->slot = conflictslot;
539 conflicttuple->indexoid = uniqueidx;
540
542 &conflicttuple->origin, &conflicttuple->ts);
543
544 conflicttuples = lappend(conflicttuples, conflicttuple);
545 }
546 }
547
548
549 if (conflicttuples)
552 searchslot, remoteslot, conflicttuples);
553}
554
555
556
557
558
559
560
561void
564{
565 bool skip_tuple = false;
567
568
569 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
570
572
573
576 {
578 skip_tuple = true;
579 }
580
581 if (!skip_tuple)
582 {
583 List *recheckIndexes = NIL;
584 List *conflictindexes;
585 bool conflict = false;
586
587
592
593
596 if (rel->rd_rel->relispartition)
598
599
601
603
606 slot, estate, false,
607 conflictindexes ? true : false,
608 &conflict,
609 conflictindexes, false);
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626 if (conflict)
628 recheckIndexes, NULL, slot);
629
630
632 recheckIndexes, NULL);
633
634
635
636
637
638
639
641 }
642}
643
644
645
646
647
648
649
650void
654{
655 bool skip_tuple = false;
658
659
660
661
662
663 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
665
667
668
671 {
673 tid, NULL, slot, NULL, NULL))
674 skip_tuple = true;
675 }
676
677 if (!skip_tuple)
678 {
679 List *recheckIndexes = NIL;
681 List *conflictindexes;
682 bool conflict = false;
683
684
689
690
693 if (rel->rd_rel->relispartition)
695
697 &update_indexes);
698
700
703 slot, estate, true,
704 conflictindexes ? true : false,
705 &conflict, conflictindexes,
707
708
709
710
711
712
713 if (conflict)
715 recheckIndexes, searchslot, slot);
716
717
719 NULL, NULL,
720 tid, NULL, slot,
721 recheckIndexes, NULL, false);
722
724 }
725}
726
727
728
729
730
731
732
733void
737{
738 bool skip_tuple = false;
741
743
744
747 {
749 tid, NULL, NULL, NULL, NULL);
750 }
751
752 if (!skip_tuple)
753 {
754
756
757
759 tid, NULL, NULL, false);
760 }
761}
762
763
764
765
766void
768{
770
771
772
773
774
775 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
776 return;
777
778
780 return;
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
808 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
809 errmsg("cannot update table \"%s\"",
811 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
814 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
815 errmsg("cannot update table \"%s\"",
817 errdetail("Column list used by the publication does not cover the replica identity.")));
820 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
821 errmsg("cannot update table \"%s\"",
823 errdetail("Replica identity must not contain unpublished generated columns.")));
826 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
827 errmsg("cannot delete from table \"%s\"",
829 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
832 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
833 errmsg("cannot delete from table \"%s\"",
835 errdetail("Column list used by the publication does not cover the replica identity.")));
838 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
839 errmsg("cannot delete from table \"%s\"",
841 errdetail("Replica identity must not contain unpublished generated columns.")));
842
843
845 return;
846
847
848 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
849 return;
850
851
852
853
854
855
858 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
859 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
861 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
864 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
865 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
867 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
868}
869
870
871
872
873
874
875
876void
879{
880 if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
882 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
883 errmsg("cannot use relation \"%s.%s\" as logical replication target",
886}
StrategyNumber IndexAmTranslateCompareType(CompareType cmptype, Oid amoid, Oid opfamily, bool missing_ok)
#define AttributeNumberIsValid(attributeNumber)
#define PG_USED_FOR_ASSERTS_ONLY
#define OidIsValid(objectId)
bool IsCatalogRelation(Relation relation)
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)
@ CT_MULTIPLE_UNIQUE_CONFLICTS
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, ItemPointer conflictTid, ItemPointer tupleid, List *arbiterIndexes)
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes, bool onlySummarizing)
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
static void BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq)
static bool should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
static bool FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate, Oid conflictindex, TupleTableSlot *slot, TupleTableSlot **conflictslot)
static int build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
static void CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, ConflictType type, List *recheckIndexes, TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
#define palloc0_object(type)
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
char * format_type_be(Oid type_oid)
Assert(PointerIsAligned(start, uint64))
void BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, IndexScanInstrumentation *instrument, int nkeys, int norderbys)
void index_close(Relation relation, LOCKMODE lockmode)
void index_endscan(IndexScanDesc scan)
Relation index_open(Oid relationId, LOCKMODE lockmode)
void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
static bool ItemPointerIndicatesMovedPartitions(const ItemPointerData *pointer)
List * lappend(List *list, void *datum)
void list_free(List *list)
bool list_member_oid(const List *list, Oid datum)
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Oid get_opclass_input_type(Oid opclass)
Oid get_opclass_family(Oid opclass)
RegProcedure get_opcode(Oid opno)
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
void * palloc0(Size size)
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
FormData_pg_attribute * Form_pg_attribute
int errdetail_relkind_not_supported(char relkind)
static int list_length(const List *l)
#define list_make1_oid(x1)
#define foreach_oid(var, lst)
#define ERRCODE_T_R_SERIALIZATION_FAILURE
static bool DatumGetBool(Datum X)
static Pointer DatumGetPointer(Datum X)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define IndexRelationGetNumberOfKeyAttributes(relation)
void RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
Oid RelationGetReplicaIndex(Relation relation)
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Snapshot GetLatestSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
void PopActiveSnapshot(void)
Snapshot GetActiveSnapshot(void)
#define InitDirtySnapshot(snapshotdata)
Oid GetRelationIdentityOrPK(Relation rel)
PublicationActions pubactions
bool cols_valid_for_delete
bool gencols_valid_for_update
bool cols_valid_for_update
bool gencols_valid_for_delete
struct HeapTupleData * rd_indextuple
List * ri_onConflictArbiterIndexes
RelationPtr ri_IndexRelationDescs
TriggerDesc * ri_TrigDesc
IndexInfo ** ri_IndexRelationInfo
bool trig_delete_before_row
bool trig_update_before_row
bool trig_insert_before_row
bool has_generated_stored
TupleDesc tts_tupleDescriptor
int16 values[FLEXIBLE_ARRAY_MEMBER]
Oid values[FLEXIBLE_ARRAY_MEMBER]
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
void simple_table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, Snapshot snapshot, TU_UpdateIndexes *update_indexes)
void simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)
void simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, struct ScanKeyData *key)
static void table_endscan(TableScanDesc scan)
static void table_rescan(TableScanDesc scan, struct ScanKeyData *key)
static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)
static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
#define TransactionIdIsValid(xid)
bool ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, TM_Result *tmresult, TM_FailureData *tmfd)
void ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TransitionCaptureState *transition_capture, bool is_crosspart_update)
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
void ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ResultRelInfo *src_partinfo, ResultRelInfo *dst_partinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, List *recheckIndexes, TransitionCaptureState *transition_capture, bool is_crosspart_update)
bool ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot **epqslot, TM_Result *tmresult, TM_FailureData *tmfd)
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
static void slot_getallattrs(TupleTableSlot *slot)
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
static void ExecMaterializeSlot(TupleTableSlot *slot)
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
#define TYPECACHE_EQ_OPR_FINFO
CommandId GetCurrentCommandId(bool used)