PostgreSQL Source Code: src/backend/replication/logical/proto.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
14
22
23
24
25
26#define LOGICALREP_IS_REPLICA_IDENTITY 1
27
28#define MESSAGE_TRANSACTIONAL (1<<0)
29#define TRUNCATE_CASCADE (1<<0)
30#define TRUNCATE_RESTART_SEQS (1<<1)
31
34 PublishGencolsType include_gencols_type);
38 PublishGencolsType include_gencols_type);
41
44
45
46
47
48void
50{
52
53
57}
58
59
60
61
62void
64{
65
68 elog(ERROR, "final_lsn not set in begin message");
71}
72
73
74
75
76
77void
80{
82
84
85
87
88
92}
93
94
95
96
97void
99{
100
102
103 if (flags != 0)
104 elog(ERROR, "unrecognized flags %u in commit message", flags);
105
106
110}
111
112
113
114
115void
117{
119
120
125
126
128}
129
130
131
132
133void
135{
136
139 elog(ERROR, "prepare_lsn not set in begin prepare message");
142 elog(ERROR, "end_lsn not set in begin prepare message");
145
146
148}
149
150
151
152
153
154static void
157{
159
161
162
163
164
165
169
170
172
173
178
179
181}
182
183
184
185
186void
189{
191 txn, prepare_lsn);
192}
193
194
195
196
197
198static void
201{
202
204
205 if (flags != 0)
206 elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
207
208
211 elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
214 elog(ERROR, "end_lsn is not set in %s message", msgtype);
218 elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
219
220
222}
223
224
225
226
227void
229{
231}
232
233
234
235
236void
239{
241
243
244
245
246
247
249
250
252
253
258
259
261}
262
263
264
265
266void
268{
269
271
272 if (flags != 0)
273 elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
274
275
278 elog(ERROR, "commit_lsn is not set in commit prepared message");
281 elog(ERROR, "end_lsn is not set in commit prepared message");
284
285
287}
288
289
290
291
292void
296{
298
300
301
302
303
304
306
307
309
310
316
317
319}
320
321
322
323
324void
327{
328
330
331 if (flags != 0)
332 elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
333
334
337 elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
340 elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
344
345
347}
348
349
350
351
352void
356{
358 txn, prepare_lsn);
359}
360
361
362
363
364void
366{
368}
369
370
371
372
373void
376{
378
379
381
382
384}
385
386
387
388
389char *
391{
392
394
395
397}
398
399
400
401
402void
406 PublishGencolsType include_gencols_type)
407{
409
410
413
414
416
417 pq_sendbyte(out, 'N');
419 include_gencols_type);
420}
421
422
423
424
425
426
429{
432
433
435
438 elog(ERROR, "expected new tuple but got %d",
440
442
443 return relid;
444}
445
446
447
448
449void
453 PublishGencolsType include_gencols_type)
454{
456
457 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
458 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
459 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
460
461
464
465
467
468 if (oldslot != NULL)
469 {
470 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
471 pq_sendbyte(out, 'O');
472 else
473 pq_sendbyte(out, 'K');
475 include_gencols_type);
476 }
477
478 pq_sendbyte(out, 'N');
480 include_gencols_type);
481}
482
483
484
485
490{
493
494
496
497
500 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
502
503
505 {
507 *has_oldtuple = true;
508
510 }
511 else
512 *has_oldtuple = false;
513
514
516 elog(ERROR, "expected action 'N', got %c",
518
520
521 return relid;
522}
523
524
525
526
527void
531 PublishGencolsType include_gencols_type)
532{
533 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
534 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
535 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
536
538
539
542
543
545
546 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
547 pq_sendbyte(out, 'O');
548 else
549 pq_sendbyte(out, 'K');
550
552 include_gencols_type);
553}
554
555
556
557
558
559
562{
565
566
568
569
572 elog(ERROR, "expected action 'O' or 'K', got %c", action);
573
575
576 return relid;
577}
578
579
580
581
582void
585 int nrelids,
586 Oid relids[],
587 bool cascade, bool restart_seqs)
588{
589 int i;
591
593
594
597
599
600
601 if (cascade)
603 if (restart_seqs)
606
607 for (i = 0; i < nrelids; i++)
609}
610
611
612
613
616 bool *cascade, bool *restart_seqs)
617{
618 int i;
619 int nrelids;
622
624
625
629
630 for (i = 0; i < nrelids; i++)
632
633 return relids;
634}
635
636
637
638
639void
641 bool transactional, const char *prefix, Size sz,
642 const char *message)
643{
645
647
648
649 if (transactional)
651
652
655
661}
662
663
664
665
666void
669 PublishGencolsType include_gencols_type)
670{
672
674
675
678
679
681
682
686
687
689
690
692}
693
694
695
696
699{
701
703
704
707
708
710
711
713
714 return rel;
715}
716
717
718
719
720
721
722void
724{
728
730
731
734
737 elog(ERROR, "cache lookup failed for type %u", basetypoid);
739
740
742
743
746
748}
749
750
751
752
753void
755{
757
758
761}
762
763
764
765
766static void
769 PublishGencolsType include_gencols_type)
770{
773 bool *isnull;
774 int i;
775 uint16 nliveatts = 0;
776
778
779 for (i = 0; i < desc->natts; i++)
780 {
782
784 include_gencols_type))
785 continue;
786
787 nliveatts++;
788 }
790
794
795
796 for (i = 0; i < desc->natts; i++)
797 {
801
803 include_gencols_type))
804 continue;
805
806 if (isnull[i])
807 {
809 continue;
810 }
811
813 {
814
815
816
817
818
820 continue;
821 }
822
825 elog(ERROR, "cache lookup failed for type %u", att->atttypid);
827
828
829
830
831 if (binary && OidIsValid(typclass->typsend))
832 {
833 bytea *outputbytes;
835
841 pfree(outputbytes);
842 }
843 else
844 {
845 char *outputstr;
846
850 pfree(outputstr);
851 }
852
854 }
855}
856
857
858
859
860static void
862{
863 int i;
864 int natts;
865
866
868
869
871 tuple->colstatus = (char *) palloc(natts * sizeof(char));
872 tuple->ncols = natts;
873
874
875 for (i = 0; i < natts; i++)
876 {
877 char *buff;
878 char kind;
881
884
885 switch (kind)
886 {
888
889 break;
891
892 break;
896
897
900
901
902
903
904
905
906
907 buff[len] = '\0';
908
910 break;
911 default:
912 elog(ERROR, "unrecognized data representation type '%c'", kind);
913 }
914 }
915}
916
917
918
919
920static void
922 PublishGencolsType include_gencols_type)
923{
925 int i;
926 uint16 nliveatts = 0;
928 bool replidentfull;
929
931
932
933 for (i = 0; i < desc->natts; i++)
934 {
936
938 include_gencols_type))
939 continue;
940
941 nliveatts++;
942 }
944
945
946 replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
947 if (!replidentfull)
949
950
951 for (i = 0; i < desc->natts; i++)
952 {
955
957 include_gencols_type))
958 continue;
959
960
961 if (replidentfull ||
963 idattrs))
965
967
968
970
971
973
974
976 }
977
979}
980
981
982
983
984static void
986{
987 int i;
988 int natts;
989 char **attnames;
990 Oid *atttyps;
992
994 attnames = palloc(natts * sizeof(char *));
995 atttyps = palloc(natts * sizeof(Oid));
996
997
998 for (i = 0; i < natts; i++)
999 {
1001
1002
1006
1007
1009
1010
1012
1013
1015 }
1016
1020 rel->natts = natts;
1021}
1022
1023
1024
1025
1026static void
1028{
1029 if (nspid == PG_CATALOG_NAMESPACE)
1031 else
1032 {
1034
1035 if (nspname == NULL)
1036 elog(ERROR, "cache lookup failed for namespace %u",
1038
1040 }
1041}
1042
1043
1044
1045
1046static const char *
1048{
1050
1051 if (nspname[0] == '\0')
1052 nspname = "pg_catalog";
1053
1054 return nspname;
1055}
1056
1057
1058
1059
1060void
1063{
1065
1067
1068
1070
1071
1072 pq_sendbyte(out, first_segment ? 1 : 0);
1073}
1074
1075
1076
1077
1080{
1082
1083 Assert(first_segment);
1084
1087
1088 return xid;
1089}
1090
1091
1092
1093
1094void
1096{
1098}
1099
1100
1101
1102
1103void
1106{
1107 uint8 flags = 0;
1108
1110
1112
1113
1115
1116
1118
1119
1123}
1124
1125
1126
1127
1130{
1133
1135
1136
1138
1139 if (flags != 0)
1140 elog(ERROR, "unrecognized flags %u in commit message", flags);
1141
1142
1146
1147 return xid;
1148}
1149
1150
1151
1152
1153
1154
1155
1156
1157void
1160 TimestampTz abort_time, bool write_abort_info)
1161{
1163
1165
1166
1169
1170 if (write_abort_info)
1171 {
1174 }
1175}
1176
1177
1178
1179
1180
1181
1182
1183void
1186 bool read_abort_info)
1187{
1189
1192
1193 if (read_abort_info)
1194 {
1197 }
1198 else
1199 {
1202 }
1203}
1204
1205
1206
1207
1208const char *
1210{
1211 static char err_unknown[20];
1212
1214 {
1216 return "BEGIN";
1218 return "COMMIT";
1220 return "ORIGIN";
1222 return "INSERT";
1224 return "UPDATE";
1226 return "DELETE";
1228 return "TRUNCATE";
1230 return "RELATION";
1232 return "TYPE";
1234 return "MESSAGE";
1236 return "BEGIN PREPARE";
1238 return "PREPARE";
1240 return "COMMIT PREPARED";
1242 return "ROLLBACK PREPARED";
1244 return "STREAM START";
1246 return "STREAM STOP";
1248 return "STREAM COMMIT";
1250 return "STREAM ABORT";
1252 return "STREAM PREPARE";
1253 }
1254
1255
1256
1257
1258
1259
1260 snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1261
1262 return err_unknown;
1263}
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278bool
1280 PublishGencolsType include_gencols_type)
1281{
1282 if (att->attisdropped)
1283 return false;
1284
1285
1286 if (columns)
1288
1289
1290 if (!att->attgenerated)
1291 return true;
1292
1293
1294
1295
1296
1297 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
1298 return include_gencols_type == PUBLISH_GENCOLS_STORED;
1299
1300 return false;
1301}
void bms_free(Bitmapset *a)
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
#define OidIsValid(objectId)
char * OidOutputFunctionCall(Oid functionId, Datum val)
bytea * OidSendFunctionCall(Oid functionId, Datum val)
Assert(PointerIsAligned(start, uint64))
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
List * lappend_oid(List *list, Oid datum)
#define LOGICALREP_COLUMN_UNCHANGED
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_STREAM_COMMIT
#define LOGICALREP_COLUMN_NULL
#define LOGICALREP_COLUMN_BINARY
#define LOGICALREP_COLUMN_TEXT
Oid getBaseType(Oid typid)
char * get_namespace_name(Oid nspid)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
FormData_pg_attribute * Form_pg_attribute
FormData_pg_type * Form_pg_type
size_t strlcpy(char *dst, const char *src, size_t siz)
static Datum ObjectIdGetDatum(Oid X)
unsigned int pq_getmsgint(StringInfo msg, int b)
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
const char * pq_getmsgstring(StringInfo msg)
void pq_sendstring(StringInfo buf, const char *str)
void pq_copymsgbytes(StringInfo msg, void *buf, int datalen)
int pq_getmsgbyte(StringInfo msg)
int64 pq_getmsgint64(StringInfo msg)
void pq_sendcountedtext(StringInfo buf, const char *str, int slen)
static void pq_sendint32(StringInfo buf, uint32 i)
static void pq_sendbyte(StringInfo buf, uint8 byt)
static void pq_sendint64(StringInfo buf, uint64 i)
static void pq_sendint8(StringInfo buf, uint8 i)
static void pq_sendint16(StringInfo buf, uint16 i)
static void pq_sendint(StringInfo buf, uint32 i, int b)
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
#define TRUNCATE_RESTART_SEQS
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
#define MESSAGE_TRANSACTIONAL
char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
#define LOGICALREP_IS_REPLICA_IDENTITY
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
const char * logicalrep_message_type(LogicalRepMsgType action)
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
static const char * logicalrep_read_namespace(StringInfo in)
void logicalrep_write_stream_stop(StringInfo out)
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)
#define rbtxn_is_prepared(txn)
static void initStringInfoFromString(StringInfo str, char *data, int len)
XLogRecPtr prepare_end_lsn
XLogRecPtr rollback_end_lsn
TimestampTz rollback_time
StringInfoData * colvalues
union ReorderBufferTXN::@116 xact_time
#define FirstLowInvalidHeapAttributeNumber
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
static void slot_getallattrs(TupleTableSlot *slot)
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
#define InvalidXLogRecPtr