PostgreSQL Source Code: src/backend/replication/logical/decode.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
28
42
43
50
59
60
61
63
64
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87void
89{
93
96 buf.record = record;
97
99
100
101
102
103
104
106 {
108 txid,
110 buf.origptr);
111 }
112
114
117 else
118 {
119
121 buf.origptr);
122 }
123}
124
125
126
127
128void
130{
133
135 buf->origptr);
136
137 switch (info)
138 {
139
143
144 break;
146
147
148
149
150
151 break;
153 {
156
157
158
159
160
161
162
163
164
165
166
168 {
169
170
171
172
173
176 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
177 errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
178 }
179 break;
180 }
191 break;
192 default:
193 elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
194 }
195}
196
197
198
199
200void
202{
207
208
209
210
211
213 return;
214
215 switch (info)
216 {
219 {
224
227
230 else
232
233
234
235
236
237
241
243 break;
244 }
247 {
252
255
258 else
260
261
262
263
264
265
269
271 break;
272 }
274
275
276
277
278
279
280 break;
282 {
285
288
289
290
291
292
293
295 {
298 buf->origptr,
300 invals->msgs);
302 buf->origptr);
303 }
307 invals->msgs);
308
309 break;
310 }
312 {
315
316
319 xlrec, &parsed);
320
321
322
323
324
325
328 {
330 buf->origptr);
331 break;
332 }
333
334
335
336
337
338
339
340
341
342
343
344
345
346
348 break;
349 }
350 default:
351 elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
352 }
353}
354
355
356
357
358void
360{
364
366
367 switch (info)
368 {
370 {
372
374
375
376
377
378
379
380
381
382
383
385 }
386 break;
388 break;
390
391
392
393
394
395 break;
396 default:
397 elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
398 }
399}
400
401
402
403
404void
406{
410
412
413
414
415
416
417
418
419
420
422 return;
423
424 switch (info)
425 {
430 break;
433 {
435
438
439 break;
440 }
442
443
444
445
446
447
448 break;
449
450
451
452
453
459 break;
460 default:
461 elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
462 }
463}
464
465
466
467
468void
470{
474
476
477
478
479
480
481
482
483
484
486 return;
487
488 switch (info)
489 {
494 break;
495
496
497
498
499
500
506 break;
507
512 break;
513
518 break;
519
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537 break;
538
543 break;
544
546
547 break;
548
549 default:
550 elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
551 break;
552 }
553}
554
555
556
557
558
559static inline bool
561 const char *gid)
562{
563
564
565
566
567
568
570 return true;
571
572
573
574
575
577 return false;
578
580}
581
582static inline bool
584{
586 return false;
587
589}
590
591
592
593
594void
596{
604
606 elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
607
609
610
612 return;
613
615
618 return;
619
622 return;
626 return;
627
628
629
630
631
632
634 {
635
636
637
638
639
640
641
644
645 return;
646 }
647
648
649
650
651
652
653
654
655
658
661 message->message,
662
665}
666
667
668
669
670
671
672
673
674
675static void
679{
683 int i;
684
686 {
689 }
690
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
713 {
715 {
717 }
719
720 return;
721 }
722
723
725 {
727 buf->origptr, buf->endptr);
728 }
729
730
731
732
733
735 {
738 commit_time, origin_id, origin_lsn,
740 }
741 else
742 {
744 commit_time, origin_id, origin_lsn);
745 }
746
747
748
749
750
751
752
754}
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771static void
774{
779 int i;
781
784
785
786
787
788
790 buf->endptr, prepare_time, origin_id,
791 origin_lsn))
792 return;
793
794
796 {
798 return;
799 }
800
801
802
803
804
805
806
807
808
809
810
811
812
814 {
817 return;
818 }
819
820
822 {
824 buf->origptr, buf->endptr);
825 }
826
827
829
830
831
832
833
834
835
837}
838
839
840
841
842
843
844
845
846static void
850{
851 int i;
855 bool skip_xact;
856
858 {
861 }
862
863
864
865
866
867
869
870
871
872
873
875 {
878 abort_time, origin_id, origin_lsn,
880 }
881 else
882 {
884 {
886 buf->record->EndRecPtr, abort_time);
887 }
888
890 abort_time);
891 }
892
893
895}
896
897
898
899
900
901
902static void
904{
905 Size datalen;
906 char *tupledata;
907 Size tuplelen;
912
914
915
916
917
918
920 return;
921
922
925 return;
926
927
929 return;
930
934 else
937
939
942
945
947
949
951 change,
953}
954
955
956
957
958
959
960
961static void
963{
969
971
972
975 return;
976
977
979 return;
980
985
987 {
988 Size datalen;
989 Size tuplelen;
990
992
994
997
999 }
1000
1002 {
1003 Size datalen;
1004 Size tuplelen;
1005
1006
1010
1013
1015 }
1016
1018
1020 change, false);
1021}
1022
1023
1024
1025
1026
1027
1028static void
1030{
1035
1037
1038
1041 return;
1042
1043
1045 return;
1046
1048
1051 else
1053
1055
1057
1058
1060 {
1063
1065
1068
1071 }
1072
1074
1076 change, false);
1077}
1078
1079
1080
1081
1082static void
1084{
1088
1090
1091
1093 return;
1094
1095
1097 return;
1098
1112 buf->origptr, change, false);
1113}
1114
1115
1116
1117
1118
1119
1120static void
1122{
1125 int i;
1127 char *tupledata;
1128 Size tuplelen;
1130
1132
1133
1134
1135
1136
1138 return;
1139
1140
1143 return;
1144
1145
1147 return;
1148
1149
1150
1151
1152
1154 Assert(tupledata != NULL);
1155
1156 data = tupledata;
1157 for (i = 0; i < xlrec->ntuples; i++)
1158 {
1161 int datalen;
1164
1168
1170
1173 datalen = xlhdr->datalen;
1174
1177
1179 header = tuple->t_data;
1180
1181
1183
1184
1185
1186
1188
1190
1192
1197
1198
1199
1200
1201
1202
1206 else
1208
1210 buf->origptr, change, false);
1211
1212
1213 data += datalen;
1214 }
1215 Assert(data == tupledata + tuplelen);
1216}
1217
1218
1219
1220
1221
1222
1223
1224static void
1226{
1230
1231
1234 return;
1235
1236
1238 return;
1239
1243
1245
1247
1249 change, false);
1250}
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260static void
1262{
1266
1267 Assert(datalen >= 0);
1268
1270 header = tuple->t_data;
1271
1272
1274
1275
1277
1278
1280
1282
1285 datalen);
1286
1290}
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304static bool
1307{
1311 return true;
1312
1313
1314
1315
1316
1317
1319 {
1321 return true;
1322 }
1323
1324 return false;
1325}
static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, Oid txn_dbid, RepOriginId origin_id)
void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid, bool two_phase)
static bool FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid, bool two_phase)
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static bool FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_prepare *parsed)
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
Assert(PointerIsAligned(start, uint64))
#define XLH_INSERT_ON_TOAST_RELATION
#define XLOG_HEAP2_MULTI_INSERT
#define XLOG_HEAP_HOT_UPDATE
#define XLH_INSERT_IS_SPECULATIVE
#define XLOG_HEAP2_REWRITE
#define XLOG_HEAP_TRUNCATE
#define XLH_UPDATE_CONTAINS_NEW_TUPLE
#define XLH_INSERT_LAST_IN_MULTI
#define XLH_UPDATE_CONTAINS_OLD
#define XLH_TRUNCATE_RESTART_SEQS
#define XLH_DELETE_CONTAINS_OLD
#define XLOG_HEAP2_PRUNE_VACUUM_SCAN
#define XLOG_HEAP_INPLACE
#define XLOG_HEAP2_LOCK_UPDATED
#define SizeOfMultiInsertTuple
#define XLOG_HEAP2_PRUNE_ON_ACCESS
#define XLOG_HEAP2_NEW_CID
#define XLOG_HEAP2_PRUNE_VACUUM_CLEANUP
#define XLH_TRUNCATE_CASCADE
#define XLH_DELETE_IS_SUPER
#define XLOG_HEAP2_VISIBLE
#define XLH_INSERT_CONTAINS_NEW_TUPLE
#define XLOG_HEAP_CONFIRM
#define SizeofHeapTupleHeader
static void ItemPointerSetInvalid(ItemPointerData *pointer)
void UpdateDecodingStats(LogicalDecodingContext *ctx)
bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
#define XLOG_LOGICAL_MESSAGE
#define XLOG_RESTORE_POINT
#define XLOG_CHECKPOINT_REDO
#define XLOG_OVERWRITE_CONTRECORD
#define XLOG_FPI_FOR_HINT
#define XLOG_CHECKPOINT_SHUTDOWN
#define XLOG_PARAMETER_CHANGE
#define XLOG_CHECKPOINT_ONLINE
#define XLOG_END_OF_RECOVERY
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid)
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
HeapTuple ReorderBufferAllocTupleBuf(ReorderBuffer *rb, Size tuple_len)
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
ReorderBufferChange * ReorderBufferAllocChange(ReorderBuffer *rb)
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
Oid * ReorderBufferAllocRelids(ReorderBuffer *rb, int nrelids)
void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
@ REORDER_BUFFER_CHANGE_INSERT
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT
@ REORDER_BUFFER_CHANGE_TRUNCATE
@ REORDER_BUFFER_CHANGE_DELETE
@ REORDER_BUFFER_CHANGE_UPDATE
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
XLogRecPtr SnapBuildGetTwoPhaseAt(SnapBuild *builder)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, TransactionId *subxacts, uint32 xinfo)
void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn, xl_heap_new_cid *xlrec)
void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
@ SNAPBUILD_FULL_SNAPSHOT
#define XLOG_INVALIDATIONS
#define XLOG_STANDBY_LOCK
#define XLOG_RUNNING_XACTS
struct SnapBuild * snapshot_builder
OutputPluginCallbacks callbacks
struct ReorderBuffer * reorder
LogicalDecodeFilterPrepareCB filter_prepare_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
struct ReorderBufferChange::@110::@112 truncate
ReorderBufferChangeType action
bool clear_toast_afterwards
union ReorderBufferChange::@110 data
struct ReorderBufferChange::@110::@111 tp
ReplicationSlotPersistentData data
void(* rm_decode)(struct LogicalDecodingContext *ctx, struct XLogRecordBuffer *buf)
Oid relids[FLEXIBLE_ARRAY_MEMBER]
char message[FLEXIBLE_ARRAY_MEMBER]
TransactionId oldestRunningXid
SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER]
TransactionId twophase_xid
char twophase_gid[GIDSIZE]
TimestampTz origin_timestamp
TransactionId twophase_xid
TimestampTz origin_timestamp
char twophase_gid[GIDSIZE]
#define TransactionIdIsValid(xid)
#define XLOG_XACT_COMMIT_PREPARED
#define XLOG_XACT_INVALIDATIONS
#define XACT_XINFO_HAS_ORIGIN
#define XLOG_XACT_PREPARE
#define XLOG_XACT_ASSIGNMENT
#define XLOG_XACT_ABORT_PREPARED
void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed)
void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parsed_prepare *parsed)
bool RecoveryInProgress(void)
static RmgrData GetRmgr(RmgrId rmid)
#define InvalidXLogRecPtr
char * XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
void XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
#define XLogRecGetOrigin(decoder)
#define XLogRecGetDataLen(decoder)
#define XLogRecGetInfo(decoder)
#define XLogRecGetRmid(decoder)
#define XLogRecGetData(decoder)
#define XLogRecGetXid(decoder)
#define XLogRecGetTopXid(decoder)