PostgreSQL Source Code: src/backend/replication/logical/proto.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

14

22

23

24

25

26#define LOGICALREP_IS_REPLICA_IDENTITY 1

27

28#define MESSAGE_TRANSACTIONAL (1<<0)

29#define TRUNCATE_CASCADE (1<<0)

30#define TRUNCATE_RESTART_SEQS (1<<1)

31

34 PublishGencolsType include_gencols_type);

38 PublishGencolsType include_gencols_type);

41

44

45

46

47

48void

50{

52

53

57}

58

59

60

61

62void

64{

65

68 elog(ERROR, "final_lsn not set in begin message");

71}

72

73

74

75

76

77void

80{

82

84

85

87

88

92}

93

94

95

96

97void

99{

100

102

103 if (flags != 0)

104 elog(ERROR, "unrecognized flags %u in commit message", flags);

105

106

110}

111

112

113

114

115void

117{

119

120

125

126

128}

129

130

131

132

133void

135{

136

139 elog(ERROR, "prepare_lsn not set in begin prepare message");

142 elog(ERROR, "end_lsn not set in begin prepare message");

145

146

148}

149

150

151

152

153

154static void

157{

159

161

162

163

164

165

169

170

172

173

178

179

181}

182

183

184

185

186void

189{

191 txn, prepare_lsn);

192}

193

194

195

196

197

198static void

201{

202

204

205 if (flags != 0)

206 elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);

207

208

211 elog(ERROR, "prepare_lsn is not set in %s message", msgtype);

214 elog(ERROR, "end_lsn is not set in %s message", msgtype);

218 elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);

219

220

222}

223

224

225

226

227void

229{

231}

232

233

234

235

236void

239{

241

243

244

245

246

247

249

250

252

253

258

259

261}

262

263

264

265

266void

268{

269

271

272 if (flags != 0)

273 elog(ERROR, "unrecognized flags %u in commit prepared message", flags);

274

275

278 elog(ERROR, "commit_lsn is not set in commit prepared message");

281 elog(ERROR, "end_lsn is not set in commit prepared message");

284

285

287}

288

289

290

291

292void

296{

298

300

301

302

303

304

306

307

309

310

316

317

319}

320

321

322

323

324void

327{

328

330

331 if (flags != 0)

332 elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);

333

334

337 elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");

340 elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");

344

345

347}

348

349

350

351

352void

356{

358 txn, prepare_lsn);

359}

360

361

362

363

364void

366{

368}

369

370

371

372

373void

376{

378

379

381

382

384}

385

386

387

388

389char *

391{

392

394

395

397}

398

399

400

401

402void

406 PublishGencolsType include_gencols_type)

407{

409

410

413

414

416

417 pq_sendbyte(out, 'N');

419 include_gencols_type);

420}

421

422

423

424

425

426

429{

432

433

435

438 elog(ERROR, "expected new tuple but got %d",

440

442

443 return relid;

444}

445

446

447

448

449void

453 PublishGencolsType include_gencols_type)

454{

456

457 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||

458 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||

459 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);

460

461

464

465

467

468 if (oldslot != NULL)

469 {

470 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)

471 pq_sendbyte(out, 'O');

472 else

473 pq_sendbyte(out, 'K');

475 include_gencols_type);

476 }

477

478 pq_sendbyte(out, 'N');

480 include_gencols_type);

481}

482

483

484

485

490{

493

494

496

497

500 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",

502

503

505 {

507 *has_oldtuple = true;

508

510 }

511 else

512 *has_oldtuple = false;

513

514

516 elog(ERROR, "expected action 'N', got %c",

518

520

521 return relid;

522}

523

524

525

526

527void

531 PublishGencolsType include_gencols_type)

532{

533 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||

534 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||

535 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);

536

538

539

542

543

545

546 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)

547 pq_sendbyte(out, 'O');

548 else

549 pq_sendbyte(out, 'K');

550

552 include_gencols_type);

553}

554

555

556

557

558

559

562{

565

566

568

569

572 elog(ERROR, "expected action 'O' or 'K', got %c", action);

573

575

576 return relid;

577}

578

579

580

581

582void

585 int nrelids,

586 Oid relids[],

587 bool cascade, bool restart_seqs)

588{

589 int i;

591

593

594

597

599

600

601 if (cascade)

603 if (restart_seqs)

606

607 for (i = 0; i < nrelids; i++)

609}

610

611

612

613

616 bool *cascade, bool *restart_seqs)

617{

618 int i;

619 int nrelids;

622

624

625

629

630 for (i = 0; i < nrelids; i++)

632

633 return relids;

634}

635

636

637

638

639void

641 bool transactional, const char *prefix, Size sz,

642 const char *message)

643{

645

647

648

649 if (transactional)

651

652

655

661}

662

663

664

665

666void

669 PublishGencolsType include_gencols_type)

670{

672

674

675

678

679

681

682

686

687

689

690

692}

693

694

695

696

699{

701

703

704

707

708

710

711

713

714 return rel;

715}

716

717

718

719

720

721

722void

724{

728

730

731

734

737 elog(ERROR, "cache lookup failed for type %u", basetypoid);

739

740

742

743

746

748}

749

750

751

752

753void

755{

757

758

761}

762

763

764

765

766static void

769 PublishGencolsType include_gencols_type)

770{

773 bool *isnull;

774 int i;

775 uint16 nliveatts = 0;

776

778

779 for (i = 0; i < desc->natts; i++)

780 {

782

784 include_gencols_type))

785 continue;

786

787 nliveatts++;

788 }

790

794

795

796 for (i = 0; i < desc->natts; i++)

797 {

801

803 include_gencols_type))

804 continue;

805

806 if (isnull[i])

807 {

809 continue;

810 }

811

813 {

814

815

816

817

818

820 continue;

821 }

822

825 elog(ERROR, "cache lookup failed for type %u", att->atttypid);

827

828

829

830

831 if (binary && OidIsValid(typclass->typsend))

832 {

833 bytea *outputbytes;

835

841 pfree(outputbytes);

842 }

843 else

844 {

845 char *outputstr;

846

850 pfree(outputstr);

851 }

852

854 }

855}

856

857

858

859

860static void

862{

863 int i;

864 int natts;

865

866

868

869

871 tuple->colstatus = (char *) palloc(natts * sizeof(char));

872 tuple->ncols = natts;

873

874

875 for (i = 0; i < natts; i++)

876 {

877 char *buff;

878 char kind;

881

884

885 switch (kind)

886 {

888

889 break;

891

892 break;

896

897

900

901

902

903

904

905

906

907 buff[len] = '\0';

908

910 break;

911 default:

912 elog(ERROR, "unrecognized data representation type '%c'", kind);

913 }

914 }

915}

916

917

918

919

920static void

922 PublishGencolsType include_gencols_type)

923{

925 int i;

926 uint16 nliveatts = 0;

928 bool replidentfull;

929

931

932

933 for (i = 0; i < desc->natts; i++)

934 {

936

938 include_gencols_type))

939 continue;

940

941 nliveatts++;

942 }

944

945

946 replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);

947 if (!replidentfull)

949

950

951 for (i = 0; i < desc->natts; i++)

952 {

955

957 include_gencols_type))

958 continue;

959

960

961 if (replidentfull ||

963 idattrs))

965

967

968

970

971

973

974

976 }

977

979}

980

981

982

983

984static void

986{

987 int i;

988 int natts;

989 char **attnames;

990 Oid *atttyps;

992

994 attnames = palloc(natts * sizeof(char *));

995 atttyps = palloc(natts * sizeof(Oid));

996

997

998 for (i = 0; i < natts; i++)

999 {

1001

1002

1006

1007

1009

1010

1012

1013

1015 }

1016

1020 rel->natts = natts;

1021}

1022

1023

1024

1025

1026static void

1028{

1029 if (nspid == PG_CATALOG_NAMESPACE)

1031 else

1032 {

1034

1035 if (nspname == NULL)

1036 elog(ERROR, "cache lookup failed for namespace %u",

1038

1040 }

1041}

1042

1043

1044

1045

1046static const char *

1048{

1050

1051 if (nspname[0] == '\0')

1052 nspname = "pg_catalog";

1053

1054 return nspname;

1055}

1056

1057

1058

1059

1060void

1063{

1065

1067

1068

1070

1071

1072 pq_sendbyte(out, first_segment ? 1 : 0);

1073}

1074

1075

1076

1077

1080{

1082

1083 Assert(first_segment);

1084

1087

1088 return xid;

1089}

1090

1091

1092

1093

1094void

1096{

1098}

1099

1100

1101

1102

1103void

1106{

1107 uint8 flags = 0;

1108

1110

1112

1113

1115

1116

1118

1119

1123}

1124

1125

1126

1127

1130{

1133

1135

1136

1138

1139 if (flags != 0)

1140 elog(ERROR, "unrecognized flags %u in commit message", flags);

1141

1142

1146

1147 return xid;

1148}

1149

1150

1151

1152

1153

1154

1155

1156

1157void

1160 TimestampTz abort_time, bool write_abort_info)

1161{

1163

1165

1166

1169

1170 if (write_abort_info)

1171 {

1174 }

1175}

1176

1177

1178

1179

1180

1181

1182

1183void

1186 bool read_abort_info)

1187{

1189

1192

1193 if (read_abort_info)

1194 {

1197 }

1198 else

1199 {

1202 }

1203}

1204

1205

1206

1207

1208const char *

1210{

1211 static char err_unknown[20];

1212

1214 {

1216 return "BEGIN";

1218 return "COMMIT";

1220 return "ORIGIN";

1222 return "INSERT";

1224 return "UPDATE";

1226 return "DELETE";

1228 return "TRUNCATE";

1230 return "RELATION";

1232 return "TYPE";

1234 return "MESSAGE";

1236 return "BEGIN PREPARE";

1238 return "PREPARE";

1240 return "COMMIT PREPARED";

1242 return "ROLLBACK PREPARED";

1244 return "STREAM START";

1246 return "STREAM STOP";

1248 return "STREAM COMMIT";

1250 return "STREAM ABORT";

1252 return "STREAM PREPARE";

1253 }

1254

1255

1256

1257

1258

1259

1260 snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);

1261

1262 return err_unknown;

1263}

1264

1265

1266

1267

1268

1269

1270

1271

1272

1273

1274

1275

1276

1277

1278bool

1280 PublishGencolsType include_gencols_type)

1281{

1282 if (att->attisdropped)

1283 return false;

1284

1285

1286 if (columns)

1288

1289

1290 if (!att->attgenerated)

1291 return true;

1292

1293

1294

1295

1296

1297 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)

1298 return include_gencols_type == PUBLISH_GENCOLS_STORED;

1299

1300 return false;

1301}

void bms_free(Bitmapset *a)

bool bms_is_member(int x, const Bitmapset *a)

Bitmapset * bms_add_member(Bitmapset *a, int x)

static Datum values[MAXATTR]

#define OidIsValid(objectId)

char * OidOutputFunctionCall(Oid functionId, Datum val)

bytea * OidSendFunctionCall(Oid functionId, Datum val)

Assert(PointerIsAligned(start, uint64))

#define HeapTupleIsValid(tuple)

static void * GETSTRUCT(const HeapTupleData *tuple)

List * lappend_oid(List *list, Oid datum)

#define LOGICALREP_COLUMN_UNCHANGED

@ LOGICAL_REP_MSG_TRUNCATE

@ LOGICAL_REP_MSG_STREAM_STOP

@ LOGICAL_REP_MSG_STREAM_PREPARE

@ LOGICAL_REP_MSG_STREAM_ABORT

@ LOGICAL_REP_MSG_BEGIN_PREPARE

@ LOGICAL_REP_MSG_STREAM_START

@ LOGICAL_REP_MSG_PREPARE

@ LOGICAL_REP_MSG_RELATION

@ LOGICAL_REP_MSG_MESSAGE

@ LOGICAL_REP_MSG_ROLLBACK_PREPARED

@ LOGICAL_REP_MSG_COMMIT_PREPARED

@ LOGICAL_REP_MSG_STREAM_COMMIT

#define LOGICALREP_COLUMN_NULL

#define LOGICALREP_COLUMN_BINARY

#define LOGICALREP_COLUMN_TEXT

Oid getBaseType(Oid typid)

char * get_namespace_name(Oid nspid)

char * pstrdup(const char *in)

void pfree(void *pointer)

void * palloc0(Size size)

FormData_pg_attribute * Form_pg_attribute

FormData_pg_type * Form_pg_type

size_t strlcpy(char *dst, const char *src, size_t siz)

static Datum ObjectIdGetDatum(Oid X)

unsigned int pq_getmsgint(StringInfo msg, int b)

void pq_sendbytes(StringInfo buf, const void *data, int datalen)

const char * pq_getmsgstring(StringInfo msg)

void pq_sendstring(StringInfo buf, const char *str)

void pq_copymsgbytes(StringInfo msg, void *buf, int datalen)

int pq_getmsgbyte(StringInfo msg)

int64 pq_getmsgint64(StringInfo msg)

void pq_sendcountedtext(StringInfo buf, const char *str, int slen)

static void pq_sendint32(StringInfo buf, uint32 i)

static void pq_sendbyte(StringInfo buf, uint8 byt)

static void pq_sendint64(StringInfo buf, uint64 i)

static void pq_sendint8(StringInfo buf, uint8 i)

static void pq_sendint16(StringInfo buf, uint16 i)

static void pq_sendint(StringInfo buf, uint32 i, int b)

void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)

static void logicalrep_write_namespace(StringInfo out, Oid nspid)

#define TRUNCATE_RESTART_SEQS

LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)

void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)

void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)

static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)

void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)

void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)

void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)

void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)

static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)

void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)

LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)

void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)

static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)

List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)

void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)

void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)

#define MESSAGE_TRANSACTIONAL

char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)

void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)

void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)

void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)

static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)

#define LOGICALREP_IS_REPLICA_IDENTITY

static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)

void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)

void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)

LogicalRepRelation * logicalrep_read_rel(StringInfo in)

void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

const char * logicalrep_message_type(LogicalRepMsgType action)

void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)

static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)

bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)

void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)

void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)

void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)

TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)

LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)

void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)

static const char * logicalrep_read_namespace(StringInfo in)

void logicalrep_write_stream_stop(StringInfo out)

TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)

#define RelationGetRelid(relation)

#define RelationGetDescr(relation)

#define RelationGetRelationName(relation)

#define RelationGetNamespace(relation)

Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)

#define rbtxn_is_prepared(txn)

static void initStringInfoFromString(StringInfo str, char *data, int len)

XLogRecPtr prepare_end_lsn

XLogRecPtr rollback_end_lsn

TimestampTz rollback_time

StringInfoData * colvalues

union ReorderBufferTXN::@116 xact_time

#define FirstLowInvalidHeapAttributeNumber

void ReleaseSysCache(HeapTuple tuple)

HeapTuple SearchSysCache1(int cacheId, Datum key1)

#define InvalidTransactionId

#define TransactionIdIsValid(xid)

static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)

static void slot_getallattrs(TupleTableSlot *slot)

#define VARATT_IS_EXTERNAL_ONDISK(PTR)

#define InvalidXLogRecPtr