PostgreSQL Source Code: src/backend/replication/logical/proto.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

14

22

23

24

25

26#define LOGICALREP_IS_REPLICA_IDENTITY 1

27

28#define MESSAGE_TRANSACTIONAL (1<<0)

29#define TRUNCATE_CASCADE (1<<0)

30#define TRUNCATE_RESTART_SEQS (1<<1)

31

34 PublishGencolsType include_gencols_type);

38 PublishGencolsType include_gencols_type);

41

44

45

46

47

48void

50{

52

53

57}

58

59

60

61

62void

64{

65

68 elog(ERROR, "final_lsn not set in begin message");

71}

72

73

74

75

76

77void

80{

82

84

85

87

88

92}

93

94

95

96

97void

99{

100

102

103 if (flags != 0)

104 elog(ERROR, "unrecognized flags %u in commit message", flags);

105

106

110}

111

112

113

114

115void

117{

119

120

125

126

128}

129

130

131

132

133void

135{

136

139 elog(ERROR, "prepare_lsn not set in begin prepare message");

142 elog(ERROR, "end_lsn not set in begin prepare message");

145

146

148}

149

150

151

152

153

154static void

157{

159

161

162

163

164

165

169

170

172

173

178

179

181}

182

183

184

185

186void

189{

191 txn, prepare_lsn);

192}

193

194

195

196

197

198static void

201{

202

204

205 if (flags != 0)

206 elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);

207

208

211 elog(ERROR, "prepare_lsn is not set in %s message", msgtype);

214 elog(ERROR, "end_lsn is not set in %s message", msgtype);

218 elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);

219

220

222}

223

224

225

226

227void

229{

231}

232

233

234

235

236void

239{

241

243

244

245

246

247

249

250

252

253

258

259

261}

262

263

264

265

266void

268{

269

271

272 if (flags != 0)

273 elog(ERROR, "unrecognized flags %u in commit prepared message", flags);

274

275

278 elog(ERROR, "commit_lsn is not set in commit prepared message");

281 elog(ERROR, "end_lsn is not set in commit prepared message");

284

285

287}

288

289

290

291

292void

296{

298

300

301

302

303

304

306

307

309

310

316

317

319}

320

321

322

323

324void

327{

328

330

331 if (flags != 0)

332 elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);

333

334

337 elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");

340 elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");

344

345

347}

348

349

350

351

352void

356{

358 txn, prepare_lsn);

359}

360

361

362

363

364void

366{

368}

369

370

371

372

373void

376{

378

379

381

382

384}

385

386

387

388

389char *

391{

392

394

395

397}

398

399

400

401

402void

406 PublishGencolsType include_gencols_type)

407{

409

410

413

414

416

417 pq_sendbyte(out, 'N');

419 include_gencols_type);

420}

421

422

423

424

425

426

429{

432

433

435

438 elog(ERROR, "expected new tuple but got %d",

440

442

443 return relid;

444}

445

446

447

448

449void

453 PublishGencolsType include_gencols_type)

454{

456

457 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||

458 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||

459 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);

460

461

464

465

467

468 if (oldslot != NULL)

469 {

470 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)

471 pq_sendbyte(out, 'O');

472 else

473 pq_sendbyte(out, 'K');

475 include_gencols_type);

476 }

477

478 pq_sendbyte(out, 'N');

480 include_gencols_type);

481}

482

483

484

485

490{

493

494

496

497

500 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",

502

503

505 {

507 *has_oldtuple = true;

508

510 }

511 else

512 *has_oldtuple = false;

513

514

516 elog(ERROR, "expected action 'N', got %c",

518

520

521 return relid;

522}

523

524

525

526

527void

531 PublishGencolsType include_gencols_type)

532{

533 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||

534 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||

535 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);

536

538

539

542

543

545

546 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)

547 pq_sendbyte(out, 'O');

548 else

549 pq_sendbyte(out, 'K');

550

552 include_gencols_type);

553}

554

555

556

557

558

559

562{

565

566

568

569

572 elog(ERROR, "expected action 'O' or 'K', got %c", action);

573

575

576 return relid;

577}

578

579

580

581

582void

585 int nrelids,

586 Oid relids[],

587 bool cascade, bool restart_seqs)

588{

589 int i;

591

593

594

597

599

600

601 if (cascade)

603 if (restart_seqs)

606

607 for (i = 0; i < nrelids; i++)

609}

610

611

612

613

616 bool *cascade, bool *restart_seqs)

617{

618 int i;

619 int nrelids;

622

624

625

629

630 for (i = 0; i < nrelids; i++)

632

633 return relids;

634}

635

636

637

638

639void

641 bool transactional, const char *prefix, Size sz,

642 const char *message)

643{

645

647

648

649 if (transactional)

651

652

655

661}

662

663

664

665

666void

669 PublishGencolsType include_gencols_type)

670{

672

674

675

678

679

681

682

686

687

689

690

692}

693

694

695

696

699{

701

703

704

707

708

710

711

713

714

716

717 return rel;

718}

719

720

721

722

723

724

725void

727{

731

733

734

737

740 elog(ERROR, "cache lookup failed for type %u", basetypoid);

742

743

745

746

749

751}

752

753

754

755

756void

758{

760

761

764}

765

766

767

768

769static void

772 PublishGencolsType include_gencols_type)

773{

776 bool *isnull;

777 int i;

778 uint16 nliveatts = 0;

779

781

782 for (i = 0; i < desc->natts; i++)

783 {

785

787 include_gencols_type))

788 continue;

789

790 nliveatts++;

791 }

793

797

798

799 for (i = 0; i < desc->natts; i++)

800 {

804

806 include_gencols_type))

807 continue;

808

809 if (isnull[i])

810 {

812 continue;

813 }

814

816 {

817

818

819

820

821

823 continue;

824 }

825

828 elog(ERROR, "cache lookup failed for type %u", att->atttypid);

830

831

832

833

834 if (binary && OidIsValid(typclass->typsend))

835 {

836 bytea *outputbytes;

838

844 pfree(outputbytes);

845 }

846 else

847 {

848 char *outputstr;

849

853 pfree(outputstr);

854 }

855

857 }

858}

859

860

861

862

863static void

865{

866 int i;

867 int natts;

868

869

871

872

875 tuple->ncols = natts;

876

877

878 for (i = 0; i < natts; i++)

879 {

880 char *buff;

881 char kind;

884

887

888 switch (kind)

889 {

891

892 break;

894

895 break;

899

900

903

904

905

906

907

908

909

910 buff[len] = '\0';

911

913 break;

914 default:

915 elog(ERROR, "unrecognized data representation type '%c'", kind);

916 }

917 }

918}

919

920

921

922

923static void

925 PublishGencolsType include_gencols_type)

926{

928 int i;

929 uint16 nliveatts = 0;

931 bool replidentfull;

932

934

935

936 for (i = 0; i < desc->natts; i++)

937 {

939

941 include_gencols_type))

942 continue;

943

944 nliveatts++;

945 }

947

948

949 replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);

950 if (!replidentfull)

952

953

954 for (i = 0; i < desc->natts; i++)

955 {

958

960 include_gencols_type))

961 continue;

962

963

964 if (replidentfull ||

966 idattrs))

968

970

971

973

974

976

977

979 }

980

982}

983

984

985

986

987static void

989{

990 int i;

991 int natts;

992 char **attnames;

993 Oid *atttyps;

995

999

1000

1001 for (i = 0; i < natts; i++)

1002 {

1004

1005

1009

1010

1012

1013

1015

1016

1018 }

1019

1023 rel->natts = natts;

1024}

1025

1026

1027

1028

1029static void

1031{

1032 if (nspid == PG_CATALOG_NAMESPACE)

1034 else

1035 {

1037

1038 if (nspname == NULL)

1039 elog(ERROR, "cache lookup failed for namespace %u",

1041

1043 }

1044}

1045

1046

1047

1048

1049static const char *

1051{

1053

1054 if (nspname[0] == '\0')

1055 nspname = "pg_catalog";

1056

1057 return nspname;

1058}

1059

1060

1061

1062

1063void

1066{

1068

1070

1071

1073

1074

1075 pq_sendbyte(out, first_segment ? 1 : 0);

1076}

1077

1078

1079

1080

1083{

1085

1086 Assert(first_segment);

1087

1090

1091 return xid;

1092}

1093

1094

1095

1096

1097void

1099{

1101}

1102

1103

1104

1105

1106void

1109{

1110 uint8 flags = 0;

1111

1113

1115

1116

1118

1119

1121

1122

1126}

1127

1128

1129

1130

1133{

1136

1138

1139

1141

1142 if (flags != 0)

1143 elog(ERROR, "unrecognized flags %u in commit message", flags);

1144

1145

1149

1150 return xid;

1151}

1152

1153

1154

1155

1156

1157

1158

1159

1160void

1163 TimestampTz abort_time, bool write_abort_info)

1164{

1166

1168

1169

1172

1173 if (write_abort_info)

1174 {

1177 }

1178}

1179

1180

1181

1182

1183

1184

1185

1186void

1189 bool read_abort_info)

1190{

1192

1195

1196 if (read_abort_info)

1197 {

1200 }

1201 else

1202 {

1205 }

1206}

1207

1208

1209

1210

1211const char *

1213{

1214 static char err_unknown[20];

1215

1217 {

1219 return "BEGIN";

1221 return "COMMIT";

1223 return "ORIGIN";

1225 return "INSERT";

1227 return "UPDATE";

1229 return "DELETE";

1231 return "TRUNCATE";

1233 return "RELATION";

1235 return "TYPE";

1237 return "MESSAGE";

1239 return "BEGIN PREPARE";

1241 return "PREPARE";

1243 return "COMMIT PREPARED";

1245 return "ROLLBACK PREPARED";

1247 return "STREAM START";

1249 return "STREAM STOP";

1251 return "STREAM COMMIT";

1253 return "STREAM ABORT";

1255 return "STREAM PREPARE";

1256 }

1257

1258

1259

1260

1261

1262

1263 snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);

1264

1265 return err_unknown;

1266}

1267

1268

1269

1270

1271

1272

1273

1274

1275

1276

1277

1278

1279

1280

1281bool

1283 PublishGencolsType include_gencols_type)

1284{

1285 if (att->attisdropped)

1286 return false;

1287

1288

1289 if (columns)

1291

1292

1293 if (!att->attgenerated)

1294 return true;

1295

1296

1297

1298

1299

1300 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)

1301 return include_gencols_type == PUBLISH_GENCOLS_STORED;

1302

1303 return false;

1304}

void bms_free(Bitmapset *a)

bool bms_is_member(int x, const Bitmapset *a)

Bitmapset * bms_add_member(Bitmapset *a, int x)

static Datum values[MAXATTR]

#define OidIsValid(objectId)

#define palloc_object(type)

#define palloc_array(type, count)

char * OidOutputFunctionCall(Oid functionId, Datum val)

bytea * OidSendFunctionCall(Oid functionId, Datum val)

Assert(PointerIsAligned(start, uint64))

#define HeapTupleIsValid(tuple)

static void * GETSTRUCT(const HeapTupleData *tuple)

List * lappend_oid(List *list, Oid datum)

#define LOGICALREP_COLUMN_UNCHANGED

@ LOGICAL_REP_MSG_TRUNCATE

@ LOGICAL_REP_MSG_STREAM_STOP

@ LOGICAL_REP_MSG_STREAM_PREPARE

@ LOGICAL_REP_MSG_STREAM_ABORT

@ LOGICAL_REP_MSG_BEGIN_PREPARE

@ LOGICAL_REP_MSG_STREAM_START

@ LOGICAL_REP_MSG_PREPARE

@ LOGICAL_REP_MSG_RELATION

@ LOGICAL_REP_MSG_MESSAGE

@ LOGICAL_REP_MSG_ROLLBACK_PREPARED

@ LOGICAL_REP_MSG_COMMIT_PREPARED

@ LOGICAL_REP_MSG_STREAM_COMMIT

#define LOGICALREP_COLUMN_NULL

#define LOGICALREP_COLUMN_BINARY

#define LOGICALREP_COLUMN_TEXT

Oid getBaseType(Oid typid)

char * get_namespace_name(Oid nspid)

char * pstrdup(const char *in)

void pfree(void *pointer)

void * palloc0(Size size)

FormData_pg_attribute * Form_pg_attribute

FormData_pg_type * Form_pg_type

size_t strlcpy(char *dst, const char *src, size_t siz)

static Datum ObjectIdGetDatum(Oid X)

static Pointer DatumGetPointer(Datum X)

unsigned int pq_getmsgint(StringInfo msg, int b)

void pq_sendbytes(StringInfo buf, const void *data, int datalen)

const char * pq_getmsgstring(StringInfo msg)

void pq_sendstring(StringInfo buf, const char *str)

void pq_copymsgbytes(StringInfo msg, void *buf, int datalen)

int pq_getmsgbyte(StringInfo msg)

int64 pq_getmsgint64(StringInfo msg)

void pq_sendcountedtext(StringInfo buf, const char *str, int slen)

static void pq_sendint32(StringInfo buf, uint32 i)

static void pq_sendbyte(StringInfo buf, uint8 byt)

static void pq_sendint64(StringInfo buf, uint64 i)

static void pq_sendint8(StringInfo buf, uint8 i)

static void pq_sendint16(StringInfo buf, uint16 i)

static void pq_sendint(StringInfo buf, uint32 i, int b)

void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)

static void logicalrep_write_namespace(StringInfo out, Oid nspid)

#define TRUNCATE_RESTART_SEQS

LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)

void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)

void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)

static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)

void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)

void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)

void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)

void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)

static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)

void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)

LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)

void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)

static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)

List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)

void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)

void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)

#define MESSAGE_TRANSACTIONAL

char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)

void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)

void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)

void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)

static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)

#define LOGICALREP_IS_REPLICA_IDENTITY

static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)

void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)

void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)

LogicalRepRelation * logicalrep_read_rel(StringInfo in)

void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

const char * logicalrep_message_type(LogicalRepMsgType action)

void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)

static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)

bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)

void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)

void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)

void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)

TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)

LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)

void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)

static const char * logicalrep_read_namespace(StringInfo in)

void logicalrep_write_stream_stop(StringInfo out)

TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)

#define RelationGetRelid(relation)

#define RelationGetDescr(relation)

#define RelationGetRelationName(relation)

#define RelationGetNamespace(relation)

Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)

#define rbtxn_is_prepared(txn)

static void initStringInfoFromString(StringInfo str, char *data, int len)

XLogRecPtr prepare_end_lsn

XLogRecPtr rollback_end_lsn

TimestampTz rollback_time

StringInfoData * colvalues

#define FirstLowInvalidHeapAttributeNumber

void ReleaseSysCache(HeapTuple tuple)

HeapTuple SearchSysCache1(int cacheId, Datum key1)

#define InvalidTransactionId

#define TransactionIdIsValid(xid)

static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)

static void slot_getallattrs(TupleTableSlot *slot)

static bool VARATT_IS_EXTERNAL_ONDISK(const void *PTR)

static Size VARSIZE(const void *PTR)

static char * VARDATA(const void *PTR)

#define XLogRecPtrIsValid(r)

#define InvalidXLogRecPtr