PostgreSQL Source Code: src/backend/replication/logical/relation.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
19
32
33
35
37
38
39
40
41
42
43
44
45
46
47
48
52{
56
59
60
61
62
63static void
65{
67
68
70 return;
71
73 {
75
77
78
80 {
82 {
85 break;
86 }
87 }
88 }
89 else
90 {
91
93
95
98 }
99}
100
101
102
103
104static void
106{
108
112 "LogicalRepRelMapContext",
114
115
119
122
123
126}
127
128
129
130
131static void
133{
135
137
140
141 if (remoterel->natts > 0)
142 {
143 int i;
144
145 for (i = 0; i < remoterel->natts; i++)
147
150 }
152
155}
156
157
158
159
160
161
162
163void
165{
168 bool found;
169 int i;
170
173
174
175
176
179
180 if (found)
182
184
185
193 for (i = 0; i < remoterel->natts; i++)
194 {
197 }
201}
202
203
204
205
206
207
208static int
210{
211 int i;
212
213 for (i = 0; i < remoterel->natts; i++)
214 {
216 return i;
217 }
218
219 return -1;
220}
221
222
223
224
225
226static char *
228{
230 int attcnt = 0;
231 int i = -1;
232
234
236
238 {
239 attcnt++;
240 if (attcnt > 1)
242
244 }
245
246 return attsbuf.data;
247}
248
249
250
251
252
253
254static void
258{
261 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
262 errmsg_plural("logical replication target relation \"%s.%s\" is missing replicated column: %s",
263 "logical replication target relation \"%s.%s\" is missing replicated columns: %s",
268 missingatts)));
269
272 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
273 errmsg_plural("logical replication target relation \"%s.%s\" has incompatible generated column: %s",
274 "logical replication target relation \"%s.%s\" has incompatible generated columns: %s",
279 generatedatts)));
280}
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295static void
297{
300 int i;
301
303
306
307 if (idkey == NULL)
308 {
311
312
313
314
315
316 if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
318 }
319
320 i = -1;
322 {
324
327 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
328 errmsg("logical replication target relation \"%s.%s\" uses "
329 "system columns in REPLICA IDENTITY index",
331
333
336 {
338 break;
339 }
340 }
341}
342
343
344
345
346
347
350{
352 bool found;
354
357
358
361
362 if (!found)
363 elog(ERROR, "no relation map entry for remote relation ID %u",
364 remoteid);
365
367
368
370 elog(ERROR, "remote relation ID %u is already open", remoteid);
371
372
373
374
375
376
377
379 {
382 {
383
385 }
387 {
388
391 }
392 }
393
394
395
396
397
399 {
400 Oid relid;
403 int i;
405 Bitmapset *generatedattrs = NULL;
406
407
409 {
412 }
413
414
417 lockmode, true);
420 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
421 errmsg("logical replication target relation \"%s.%s\" does not exist",
425
426
429
430
431
432
433
434
439
440
442 for (i = 0; i < desc->natts; i++)
443 {
446
447 if (attr->attisdropped)
448 {
450 continue;
451 }
452
455
458 {
459
460 if (attr->attgenerated)
462
464 }
465 }
466
468 generatedattrs);
469
470
473
474
475
476
477
479
480
481
482
483
484
485
488
490 }
491
492 if (entry->state != SUBREL_STATE_READY)
496
497 return entry;
498}
499
500
501
502
503void
505{
508}
509
510
511
512
513
514
515
516
517
518
519
520
521
522static void
524{
526
527
529 return;
530
532 {
534
536
537
539 {
541 {
544 break;
545 }
546 }
547 }
548 else
549 {
550
552
554
557 }
558}
559
560
561
562
563
564
565
566
567
568
569
570void
572{
576
578 return;
579
582 {
584
586 continue;
587
589
591 }
592}
593
594
595
596
597static void
599{
601
605 "LogicalRepPartMapContext",
607
608
609 ctl.keysize = sizeof(Oid);
612
615
616
619}
620
621
622
623
624
625
626
627
628
629
630
631
635{
641 bool found;
643
646
647
649 &partOid,
651
653
654
655
656
657
658
659
660
661
662
664 {
666 return entry;
667 }
668
669
671
672 if (!found)
673 {
675 part_entry->partoid = partOid;
676 }
677
678
680 {
683 }
684
686 {
687 int i;
688
689
696 for (i = 0; i < remoterel->natts; i++)
697 {
700 }
703 }
704
707
708
709
710
711
712
713
714
715
716
717
718
719 if (map)
720 {
722
724 for (attno = 0; attno < entry->attrmap->maplen; attno++)
725 {
727
728
729 if (root_attno == 0)
731 else
733 }
734 }
735 else
736 {
737
741 }
742
743
745
746
748
749
750
751
752
753
754
755
756
757
760
762
763 return entry;
764}
765
766
767
768
769
770
771
772
773
774
775static Oid
777{
779
781 {
782 bool isUsableIdx;
784
788
789
790 if (isUsableIdx)
791 return idxoid;
792 }
793
795}
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820bool
822{
825
826
828 return false;
829
831
834 Anum_pg_index_indclass));
835
836
837 for (int i = 0; i < idxrel->rd_index->indnkeyatts; i++)
838 {
839 Oid opfamily;
840
843 return false;
844 }
845
846
847
848
849
850
851
853 {
855
858 return false;
859 }
860
861
862 keycol = idxrel->rd_index->indkey.values[0];
864 return false;
865
866
867
868
869
870
873 return false;
874
875
876
877
878
880 return false;
881
882 return true;
883}
884
885
886
887
888
889
892{
893 Oid idxoid;
894
896
899
900 return idxoid;
901}
902
903
904
905
906
907static Oid
910{
911 Oid idxoid;
912
913
914
915
916
917 if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
919
920
921
922
925 return idxoid;
926
927 if (remoterel->replident == REPLICA_IDENTITY_FULL)
928 {
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
945 }
946
948}
StrategyNumber IndexAmTranslateCompareType(CompareType cmptype, Oid amoid, Oid opfamily, bool missing_ok)
IndexAmRoutine * GetIndexAmRoutineByAmId(Oid amoid, bool noerror)
void free_attrmap(AttrMap *map)
AttrMap * make_attrmap(int maplen)
#define AttributeNumberIsValid(attributeNumber)
#define AttrNumberGetAttrOffset(attNum)
#define AttrNumberIsForUserDefinedAttr(attributeNumber)
Subscription * MySubscription
int bms_next_member(const Bitmapset *a, int prevbit)
Bitmapset * bms_add_range(Bitmapset *a, int lower, int upper)
Bitmapset * bms_del_member(Bitmapset *a, int x)
void bms_free(Bitmapset *a)
int bms_num_members(const Bitmapset *a)
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
Bitmapset * bms_copy(const Bitmapset *a)
#define OidIsValid(objectId)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
void * hash_seq_search(HASH_SEQ_STATUS *status)
void hash_seq_term(HASH_SEQ_STATUS *status)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
Assert(PointerIsAligned(start, uint64))
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
void index_close(Relation relation, LOCKMODE lockmode)
Relation index_open(Oid relationId, LOCKMODE lockmode)
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
struct LogicalRepRelMapEntry LogicalRepRelMapEntry
Oid get_opclass_family(Oid opclass)
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext CacheMemoryContext
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RangeVarGetRelid(relation, lockmode, missing_ok)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
FormData_pg_attribute * Form_pg_attribute
#define foreach_oid(var, lst)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
static Pointer DatumGetPointer(Datum X)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
List * RelationGetIndexList(Relation relation)
Oid RelationGetPrimaryKeyIndex(Relation relation, bool deferrable_ok)
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Oid RelationGetReplicaIndex(Relation relation)
@ INDEX_ATTR_BITMAP_PRIMARY_KEY
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
static MemoryContext LogicalRepPartMapContext
void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
static void logicalrep_partmap_init(void)
static void logicalrep_report_missing_or_gen_attrs(LogicalRepRelation *remoterel, Bitmapset *missingatts, Bitmapset *generatedatts)
LogicalRepRelMapEntry * logicalrep_partition_open(LogicalRepRelMapEntry *root, Relation partrel, AttrMap *map)
static void logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
struct LogicalRepPartMapEntry LogicalRepPartMapEntry
static char * logicalrep_get_attrs_str(LogicalRepRelation *remoterel, Bitmapset *atts)
bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap)
static void logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
static HTAB * LogicalRepPartMap
static HTAB * LogicalRepRelMap
static void logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
static MemoryContext LogicalRepRelMapContext
Oid GetRelationIdentityOrPK(Relation rel)
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
static void logicalrep_relmap_init(void)
static int logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
static Oid FindUsableIndexForReplicaIdentityFull(Relation localrel, AttrMap *attrmap)
static void logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
static Oid FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel, AttrMap *attrMap)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void initStringInfo(StringInfo str)
amgettuple_function amgettuple
LogicalRepRelMapEntry relmapentry
LogicalRepRelation remoterel
struct HeapTupleData * rd_indextuple
Oid values[FLEXIBLE_ARRAY_MEMBER]
#define FirstLowInvalidHeapAttributeNumber
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
Relation try_table_open(Oid relationId, LOCKMODE lockmode)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
#define TYPECACHE_EQ_OPR_FINFO