PostgreSQL Source Code: src/backend/replication/logical/proto.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
14
22
23
24
25
26#define LOGICALREP_IS_REPLICA_IDENTITY 1
27
28#define MESSAGE_TRANSACTIONAL (1<<0)
29#define TRUNCATE_CASCADE (1<<0)
30#define TRUNCATE_RESTART_SEQS (1<<1)
31
34 PublishGencolsType include_gencols_type);
38 PublishGencolsType include_gencols_type);
41
44
45
46
47
48void
50{
52
53
57}
58
59
60
61
62void
64{
65
68 elog(ERROR, "final_lsn not set in begin message");
71}
72
73
74
75
76
77void
80{
82
84
85
87
88
92}
93
94
95
96
97void
99{
100
102
103 if (flags != 0)
104 elog(ERROR, "unrecognized flags %u in commit message", flags);
105
106
110}
111
112
113
114
115void
117{
119
120
125
126
128}
129
130
131
132
133void
135{
136
139 elog(ERROR, "prepare_lsn not set in begin prepare message");
142 elog(ERROR, "end_lsn not set in begin prepare message");
145
146
148}
149
150
151
152
153
154static void
157{
159
161
162
163
164
165
169
170
172
173
178
179
181}
182
183
184
185
186void
189{
191 txn, prepare_lsn);
192}
193
194
195
196
197
198static void
201{
202
204
205 if (flags != 0)
206 elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
207
208
211 elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
214 elog(ERROR, "end_lsn is not set in %s message", msgtype);
218 elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
219
220
222}
223
224
225
226
227void
229{
231}
232
233
234
235
236void
239{
241
243
244
245
246
247
249
250
252
253
258
259
261}
262
263
264
265
266void
268{
269
271
272 if (flags != 0)
273 elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
274
275
278 elog(ERROR, "commit_lsn is not set in commit prepared message");
281 elog(ERROR, "end_lsn is not set in commit prepared message");
284
285
287}
288
289
290
291
292void
296{
298
300
301
302
303
304
306
307
309
310
316
317
319}
320
321
322
323
324void
327{
328
330
331 if (flags != 0)
332 elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
333
334
337 elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
340 elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
344
345
347}
348
349
350
351
352void
356{
358 txn, prepare_lsn);
359}
360
361
362
363
364void
366{
368}
369
370
371
372
373void
376{
378
379
381
382
384}
385
386
387
388
389char *
391{
392
394
395
397}
398
399
400
401
402void
406 PublishGencolsType include_gencols_type)
407{
409
410
413
414
416
417 pq_sendbyte(out, 'N');
419 include_gencols_type);
420}
421
422
423
424
425
426
429{
432
433
435
438 elog(ERROR, "expected new tuple but got %d",
440
442
443 return relid;
444}
445
446
447
448
449void
453 PublishGencolsType include_gencols_type)
454{
456
457 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
458 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
459 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
460
461
464
465
467
468 if (oldslot != NULL)
469 {
470 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
471 pq_sendbyte(out, 'O');
472 else
473 pq_sendbyte(out, 'K');
475 include_gencols_type);
476 }
477
478 pq_sendbyte(out, 'N');
480 include_gencols_type);
481}
482
483
484
485
490{
493
494
496
497
500 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
502
503
505 {
507 *has_oldtuple = true;
508
510 }
511 else
512 *has_oldtuple = false;
513
514
516 elog(ERROR, "expected action 'N', got %c",
518
520
521 return relid;
522}
523
524
525
526
527void
531 PublishGencolsType include_gencols_type)
532{
533 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
534 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
535 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
536
538
539
542
543
545
546 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
547 pq_sendbyte(out, 'O');
548 else
549 pq_sendbyte(out, 'K');
550
552 include_gencols_type);
553}
554
555
556
557
558
559
562{
565
566
568
569
572 elog(ERROR, "expected action 'O' or 'K', got %c", action);
573
575
576 return relid;
577}
578
579
580
581
582void
585 int nrelids,
586 Oid relids[],
587 bool cascade, bool restart_seqs)
588{
589 int i;
591
593
594
597
599
600
601 if (cascade)
603 if (restart_seqs)
606
607 for (i = 0; i < nrelids; i++)
609}
610
611
612
613
616 bool *cascade, bool *restart_seqs)
617{
618 int i;
619 int nrelids;
622
624
625
629
630 for (i = 0; i < nrelids; i++)
632
633 return relids;
634}
635
636
637
638
639void
641 bool transactional, const char *prefix, Size sz,
642 const char *message)
643{
645
647
648
649 if (transactional)
651
652
655
661}
662
663
664
665
666void
669 PublishGencolsType include_gencols_type)
670{
672
674
675
678
679
681
682
686
687
689
690
692}
693
694
695
696
699{
701
703
704
707
708
710
711
713
714
716
717 return rel;
718}
719
720
721
722
723
724
725void
727{
731
733
734
737
740 elog(ERROR, "cache lookup failed for type %u", basetypoid);
742
743
745
746
749
751}
752
753
754
755
756void
758{
760
761
764}
765
766
767
768
769static void
772 PublishGencolsType include_gencols_type)
773{
776 bool *isnull;
777 int i;
778 uint16 nliveatts = 0;
779
781
782 for (i = 0; i < desc->natts; i++)
783 {
785
787 include_gencols_type))
788 continue;
789
790 nliveatts++;
791 }
793
797
798
799 for (i = 0; i < desc->natts; i++)
800 {
804
806 include_gencols_type))
807 continue;
808
809 if (isnull[i])
810 {
812 continue;
813 }
814
816 {
817
818
819
820
821
823 continue;
824 }
825
828 elog(ERROR, "cache lookup failed for type %u", att->atttypid);
830
831
832
833
834 if (binary && OidIsValid(typclass->typsend))
835 {
836 bytea *outputbytes;
838
844 pfree(outputbytes);
845 }
846 else
847 {
848 char *outputstr;
849
853 pfree(outputstr);
854 }
855
857 }
858}
859
860
861
862
863static void
865{
866 int i;
867 int natts;
868
869
871
872
875 tuple->ncols = natts;
876
877
878 for (i = 0; i < natts; i++)
879 {
880 char *buff;
881 char kind;
884
887
888 switch (kind)
889 {
891
892 break;
894
895 break;
899
900
903
904
905
906
907
908
909
910 buff[len] = '\0';
911
913 break;
914 default:
915 elog(ERROR, "unrecognized data representation type '%c'", kind);
916 }
917 }
918}
919
920
921
922
923static void
925 PublishGencolsType include_gencols_type)
926{
928 int i;
929 uint16 nliveatts = 0;
931 bool replidentfull;
932
934
935
936 for (i = 0; i < desc->natts; i++)
937 {
939
941 include_gencols_type))
942 continue;
943
944 nliveatts++;
945 }
947
948
949 replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
950 if (!replidentfull)
952
953
954 for (i = 0; i < desc->natts; i++)
955 {
958
960 include_gencols_type))
961 continue;
962
963
964 if (replidentfull ||
966 idattrs))
968
970
971
973
974
976
977
979 }
980
982}
983
984
985
986
987static void
989{
990 int i;
991 int natts;
992 char **attnames;
993 Oid *atttyps;
995
999
1000
1001 for (i = 0; i < natts; i++)
1002 {
1004
1005
1009
1010
1012
1013
1015
1016
1018 }
1019
1023 rel->natts = natts;
1024}
1025
1026
1027
1028
1029static void
1031{
1032 if (nspid == PG_CATALOG_NAMESPACE)
1034 else
1035 {
1037
1038 if (nspname == NULL)
1039 elog(ERROR, "cache lookup failed for namespace %u",
1041
1043 }
1044}
1045
1046
1047
1048
1049static const char *
1051{
1053
1054 if (nspname[0] == '\0')
1055 nspname = "pg_catalog";
1056
1057 return nspname;
1058}
1059
1060
1061
1062
1063void
1066{
1068
1070
1071
1073
1074
1075 pq_sendbyte(out, first_segment ? 1 : 0);
1076}
1077
1078
1079
1080
1083{
1085
1086 Assert(first_segment);
1087
1090
1091 return xid;
1092}
1093
1094
1095
1096
1097void
1099{
1101}
1102
1103
1104
1105
1106void
1109{
1110 uint8 flags = 0;
1111
1113
1115
1116
1118
1119
1121
1122
1126}
1127
1128
1129
1130
1133{
1136
1138
1139
1141
1142 if (flags != 0)
1143 elog(ERROR, "unrecognized flags %u in commit message", flags);
1144
1145
1149
1150 return xid;
1151}
1152
1153
1154
1155
1156
1157
1158
1159
1160void
1163 TimestampTz abort_time, bool write_abort_info)
1164{
1166
1168
1169
1172
1173 if (write_abort_info)
1174 {
1177 }
1178}
1179
1180
1181
1182
1183
1184
1185
1186void
1189 bool read_abort_info)
1190{
1192
1195
1196 if (read_abort_info)
1197 {
1200 }
1201 else
1202 {
1205 }
1206}
1207
1208
1209
1210
1211const char *
1213{
1214 static char err_unknown[20];
1215
1217 {
1219 return "BEGIN";
1221 return "COMMIT";
1223 return "ORIGIN";
1225 return "INSERT";
1227 return "UPDATE";
1229 return "DELETE";
1231 return "TRUNCATE";
1233 return "RELATION";
1235 return "TYPE";
1237 return "MESSAGE";
1239 return "BEGIN PREPARE";
1241 return "PREPARE";
1243 return "COMMIT PREPARED";
1245 return "ROLLBACK PREPARED";
1247 return "STREAM START";
1249 return "STREAM STOP";
1251 return "STREAM COMMIT";
1253 return "STREAM ABORT";
1255 return "STREAM PREPARE";
1256 }
1257
1258
1259
1260
1261
1262
1263 snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1264
1265 return err_unknown;
1266}
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281bool
1283 PublishGencolsType include_gencols_type)
1284{
1285 if (att->attisdropped)
1286 return false;
1287
1288
1289 if (columns)
1291
1292
1293 if (!att->attgenerated)
1294 return true;
1295
1296
1297
1298
1299
1300 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
1301 return include_gencols_type == PUBLISH_GENCOLS_STORED;
1302
1303 return false;
1304}
void bms_free(Bitmapset *a)
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
#define OidIsValid(objectId)
#define palloc_object(type)
#define palloc_array(type, count)
char * OidOutputFunctionCall(Oid functionId, Datum val)
bytea * OidSendFunctionCall(Oid functionId, Datum val)
Assert(PointerIsAligned(start, uint64))
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
List * lappend_oid(List *list, Oid datum)
#define LOGICALREP_COLUMN_UNCHANGED
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_STREAM_COMMIT
#define LOGICALREP_COLUMN_NULL
#define LOGICALREP_COLUMN_BINARY
#define LOGICALREP_COLUMN_TEXT
Oid getBaseType(Oid typid)
char * get_namespace_name(Oid nspid)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
FormData_pg_attribute * Form_pg_attribute
FormData_pg_type * Form_pg_type
size_t strlcpy(char *dst, const char *src, size_t siz)
static Datum ObjectIdGetDatum(Oid X)
static Pointer DatumGetPointer(Datum X)
unsigned int pq_getmsgint(StringInfo msg, int b)
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
const char * pq_getmsgstring(StringInfo msg)
void pq_sendstring(StringInfo buf, const char *str)
void pq_copymsgbytes(StringInfo msg, void *buf, int datalen)
int pq_getmsgbyte(StringInfo msg)
int64 pq_getmsgint64(StringInfo msg)
void pq_sendcountedtext(StringInfo buf, const char *str, int slen)
static void pq_sendint32(StringInfo buf, uint32 i)
static void pq_sendbyte(StringInfo buf, uint8 byt)
static void pq_sendint64(StringInfo buf, uint64 i)
static void pq_sendint8(StringInfo buf, uint8 i)
static void pq_sendint16(StringInfo buf, uint16 i)
static void pq_sendint(StringInfo buf, uint32 i, int b)
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
#define TRUNCATE_RESTART_SEQS
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
#define MESSAGE_TRANSACTIONAL
char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
#define LOGICALREP_IS_REPLICA_IDENTITY
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
const char * logicalrep_message_type(LogicalRepMsgType action)
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
static const char * logicalrep_read_namespace(StringInfo in)
void logicalrep_write_stream_stop(StringInfo out)
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)
#define rbtxn_is_prepared(txn)
static void initStringInfoFromString(StringInfo str, char *data, int len)
XLogRecPtr prepare_end_lsn
XLogRecPtr rollback_end_lsn
TimestampTz rollback_time
StringInfoData * colvalues
#define FirstLowInvalidHeapAttributeNumber
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
static void slot_getallattrs(TupleTableSlot *slot)
static bool VARATT_IS_EXTERNAL_ONDISK(const void *PTR)
static Size VARSIZE(const void *PTR)
static char * VARDATA(const void *PTR)
#define XLogRecPtrIsValid(r)
#define InvalidXLogRecPtr