PostgreSQL Source Code: src/backend/replication/logical/decode.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

28

42

43

50

59

60

61

63

64

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87void

89{

93

96 buf.record = record;

97

99

100

101

102

103

104

106 {

108 txid,

110 buf.origptr);

111 }

112

114

117 else

118 {

119

121 buf.origptr);

122 }

123}

124

125

126

127

128void

130{

133

135 buf->origptr);

136

137 switch (info)

138 {

139

143

144 break;

146

147

148

149

150

151 break;

153 {

156

157

158

159

160

161

162

163

164

165

166

168 {

169

170

171

172

173

176 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

177 errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));

178 }

179 break;

180 }

191 break;

192 default:

193 elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);

194 }

195}

196

197

198

199

200void

202{

207

208

209

210

211

213 return;

214

215 switch (info)

216 {

219 {

224

227

230 else

232

233

234

235

236

237

241

243 break;

244 }

247 {

252

255

258 else

260

261

262

263

264

265

269

271 break;

272 }

274

275

276

277

278

279

280 break;

282 {

285

288

289

290

291

292

293

295 {

298 buf->origptr,

300 invals->msgs);

302 buf->origptr);

303 }

307 invals->msgs);

308

309 break;

310 }

312 {

315

316

319 xlrec, &parsed);

320

321

322

323

324

325

328 {

330 buf->origptr);

331 break;

332 }

333

334

335

336

337

338

339

340

341

342

343

344

345

346

348 break;

349 }

350 default:

351 elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);

352 }

353}

354

355

356

357

358void

360{

364

366

367 switch (info)

368 {

370 {

372

374

375

376

377

378

379

380

381

382

383

385 }

386 break;

388 break;

390

391

392

393

394

395 break;

396 default:

397 elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);

398 }

399}

400

401

402

403

404void

406{

410

412

413

414

415

416

417

418

419

420

422 return;

423

424 switch (info)

425 {

430 break;

433 {

435

438

439 break;

440 }

442

443

444

445

446

447

448 break;

449

450

451

452

453

459 break;

460 default:

461 elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);

462 }

463}

464

465

466

467

468void

470{

474

476

477

478

479

480

481

482

483

484

486 return;

487

488 switch (info)

489 {

494 break;

495

496

497

498

499

500

506 break;

507

512 break;

513

518 break;

519

521

522

523

524

525

526

527

528

529

530

531

532

533

534

535

536

537 break;

538

543 break;

544

546

547 break;

548

549 default:

550 elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);

551 break;

552 }

553}

554

555

556

557

558

559static inline bool

561 const char *gid)

562{

563

564

565

566

567

568

570 return true;

571

572

573

574

575

577 return false;

578

580}

581

582static inline bool

584{

586 return false;

587

589}

590

591

592

593

594void

596{

604

606 elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);

607

609

610

612 return;

613

615

618 return;

619

622 return;

626 return;

627

628

629

630

631

632

634 {

635

636

637

638

639

640

641

644

645 return;

646 }

647

648

649

650

651

652

653

654

655

658

661 message->message,

662

665}

666

667

668

669

670

671

672

673

674

675static void

679{

683 int i;

684

686 {

689 }

690

694

695

696

697

698

699

700

701

702

703

704

705

706

707

708

709

710

711

713 {

715 {

717 }

719

720 return;

721 }

722

723

725 {

727 buf->origptr, buf->endptr);

728 }

729

730

731

732

733

735 {

738 commit_time, origin_id, origin_lsn,

740 }

741 else

742 {

744 commit_time, origin_id, origin_lsn);

745 }

746

747

748

749

750

751

752

754}

755

756

757

758

759

760

761

762

763

764

765

766

767

768

769

770

771static void

774{

779 int i;

781

784

785

786

787

788

790 buf->endptr, prepare_time, origin_id,

791 origin_lsn))

792 return;

793

794

796 {

798 return;

799 }

800

801

802

803

804

805

806

807

808

809

810

811

812

814 {

817 return;

818 }

819

820

822 {

824 buf->origptr, buf->endptr);

825 }

826

827

829

830

831

832

833

834

835

837}

838

839

840

841

842

843

844

845

846static void

850{

851 int i;

855 bool skip_xact;

856

858 {

861 }

862

863

864

865

866

867

869

870

871

872

873

875 {

878 abort_time, origin_id, origin_lsn,

880 }

881 else

882 {

884 {

886 buf->record->EndRecPtr, abort_time);

887 }

888

890 abort_time);

891 }

892

893

895}

896

897

898

899

900

901

902static void

904{

905 Size datalen;

906 char *tupledata;

907 Size tuplelen;

912

914

915

916

917

918

920 return;

921

922

925 return;

926

927

929 return;

930

934 else

937

939

942

945

947

949

951 change,

953}

954

955

956

957

958

959

960

961static void

963{

969

971

972

975 return;

976

977

979 return;

980

985

987 {

988 Size datalen;

989 Size tuplelen;

990

992

994

997

999 }

1000

1002 {

1003 Size datalen;

1004 Size tuplelen;

1005

1006

1010

1013

1015 }

1016

1018

1020 change, false);

1021}

1022

1023

1024

1025

1026

1027

1028static void

1030{

1035

1037

1038

1041 return;

1042

1043

1045 return;

1046

1048

1051 else

1053

1055

1057

1058

1060 {

1063

1065

1068

1071 }

1072

1074

1076 change, false);

1077}

1078

1079

1080

1081

1082static void

1084{

1088

1090

1091

1093 return;

1094

1095

1097 return;

1098

1112 buf->origptr, change, false);

1113}

1114

1115

1116

1117

1118

1119

1120static void

1122{

1125 int i;

1127 char *tupledata;

1128 Size tuplelen;

1130

1132

1133

1134

1135

1136

1138 return;

1139

1140

1143 return;

1144

1145

1147 return;

1148

1149

1150

1151

1152

1154 Assert(tupledata != NULL);

1155

1156 data = tupledata;

1157 for (i = 0; i < xlrec->ntuples; i++)

1158 {

1161 int datalen;

1164

1168

1170

1173 datalen = xlhdr->datalen;

1174

1177

1179 header = tuple->t_data;

1180

1181

1183

1184

1185

1186

1188

1190

1192

1197

1198

1199

1200

1201

1202

1206 else

1208

1210 buf->origptr, change, false);

1211

1212

1213 data += datalen;

1214 }

1215 Assert(data == tupledata + tuplelen);

1216}

1217

1218

1219

1220

1221

1222

1223

1224static void

1226{

1230

1231

1234 return;

1235

1236

1238 return;

1239

1243

1245

1247

1249 change, false);

1250}

1251

1252

1253

1254

1255

1256

1257

1258

1259

1260static void

1262{

1266

1267 Assert(datalen >= 0);

1268

1270 header = tuple->t_data;

1271

1272

1274

1275

1277

1278

1280

1282

1285 datalen);

1286

1290}

1291

1292

1293

1294

1295

1296

1297

1298

1299

1300

1301

1302

1303

1304static bool

1307{

1311 return true;

1312

1313

1314

1315

1316

1317

1319 {

1321 return true;

1322 }

1323

1324 return false;

1325}

static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, Oid txn_dbid, RepOriginId origin_id)

void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)

static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_abort *parsed, TransactionId xid, bool two_phase)

static bool FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)

static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)

void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)

static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid, bool two_phase)

static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)

void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)

static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple)

void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)

void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)

void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)

static bool FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)

static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)

static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)

void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)

static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)

static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_prepare *parsed)

static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

Assert(PointerIsAligned(start, uint64))

#define XLH_INSERT_ON_TOAST_RELATION

#define XLOG_HEAP2_MULTI_INSERT

#define XLOG_HEAP_HOT_UPDATE

#define XLH_INSERT_IS_SPECULATIVE

#define XLOG_HEAP2_REWRITE

#define XLOG_HEAP_TRUNCATE

#define XLH_UPDATE_CONTAINS_NEW_TUPLE

#define XLH_INSERT_LAST_IN_MULTI

#define XLH_UPDATE_CONTAINS_OLD

#define XLH_TRUNCATE_RESTART_SEQS

#define XLH_DELETE_CONTAINS_OLD

#define XLOG_HEAP2_PRUNE_VACUUM_SCAN

#define XLOG_HEAP_INPLACE

#define XLOG_HEAP2_LOCK_UPDATED

#define SizeOfMultiInsertTuple

#define XLOG_HEAP2_PRUNE_ON_ACCESS

#define XLOG_HEAP2_NEW_CID

#define XLOG_HEAP2_PRUNE_VACUUM_CLEANUP

#define XLH_TRUNCATE_CASCADE

#define XLH_DELETE_IS_SUPER

#define XLOG_HEAP2_VISIBLE

#define XLH_INSERT_CONTAINS_NEW_TUPLE

#define XLOG_HEAP_CONFIRM

#define SizeofHeapTupleHeader

static void ItemPointerSetInvalid(ItemPointerData *pointer)

void UpdateDecodingStats(LogicalDecodingContext *ctx)

bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)

bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)

#define XLOG_LOGICAL_MESSAGE

#define XLOG_RESTORE_POINT

#define XLOG_CHECKPOINT_REDO

#define XLOG_OVERWRITE_CONTRECORD

#define XLOG_FPI_FOR_HINT

#define XLOG_CHECKPOINT_SHUTDOWN

#define XLOG_PARAMETER_CHANGE

#define XLOG_CHECKPOINT_ONLINE

#define XLOG_END_OF_RECOVERY

void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)

void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)

void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)

void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)

void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid)

void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)

void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)

void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)

void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)

void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)

HeapTuple ReorderBufferAllocTupleBuf(ReorderBuffer *rb, Size tuple_len)

void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)

ReorderBufferChange * ReorderBufferAllocChange(ReorderBuffer *rb)

void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)

Oid * ReorderBufferAllocRelids(ReorderBuffer *rb, int nrelids)

void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)

void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)

void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)

@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM

@ REORDER_BUFFER_CHANGE_INSERT

@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT

@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT

@ REORDER_BUFFER_CHANGE_TRUNCATE

@ REORDER_BUFFER_CHANGE_DELETE

@ REORDER_BUFFER_CHANGE_UPDATE

bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)

bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)

XLogRecPtr SnapBuildGetTwoPhaseAt(SnapBuild *builder)

SnapBuildState SnapBuildCurrentState(SnapBuild *builder)

Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder)

void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)

void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, TransactionId *subxacts, uint32 xinfo)

void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn, xl_heap_new_cid *xlrec)

void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)

@ SNAPBUILD_FULL_SNAPSHOT

#define XLOG_INVALIDATIONS

#define XLOG_STANDBY_LOCK

#define XLOG_RUNNING_XACTS

struct SnapBuild * snapshot_builder

OutputPluginCallbacks callbacks

struct ReorderBuffer * reorder

LogicalDecodeFilterPrepareCB filter_prepare_cb

LogicalDecodeFilterByOriginCB filter_by_origin_cb

struct ReorderBufferChange::@110::@112 truncate

ReorderBufferChangeType action

bool clear_toast_afterwards

union ReorderBufferChange::@110 data

struct ReorderBufferChange::@110::@111 tp

ReplicationSlotPersistentData data

void(* rm_decode)(struct LogicalDecodingContext *ctx, struct XLogRecordBuffer *buf)

Oid relids[FLEXIBLE_ARRAY_MEMBER]

char message[FLEXIBLE_ARRAY_MEMBER]

TransactionId oldestRunningXid

SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER]

TransactionId twophase_xid

char twophase_gid[GIDSIZE]

TimestampTz origin_timestamp

TransactionId twophase_xid

TimestampTz origin_timestamp

char twophase_gid[GIDSIZE]

#define TransactionIdIsValid(xid)

#define XLOG_XACT_COMMIT_PREPARED

#define XLOG_XACT_INVALIDATIONS

#define XACT_XINFO_HAS_ORIGIN

#define XLOG_XACT_PREPARE

#define XLOG_XACT_ASSIGNMENT

#define XLOG_XACT_ABORT_PREPARED

void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed)

void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)

void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parsed_prepare *parsed)

bool RecoveryInProgress(void)

static RmgrData GetRmgr(RmgrId rmid)

#define InvalidXLogRecPtr

char * XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)

void XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)

#define XLogRecGetOrigin(decoder)

#define XLogRecGetDataLen(decoder)

#define XLogRecGetInfo(decoder)

#define XLogRecGetRmid(decoder)

#define XLogRecGetData(decoder)

#define XLogRecGetXid(decoder)

#define XLogRecGetTopXid(decoder)