PostgreSQL Source Code: src/backend/replication/logical/proto.c File Reference (original) (raw)

Go to the source code of this file.

Macros
#define LOGICALREP_IS_REPLICA_IDENTITY 1
#define MESSAGE_TRANSACTIONAL (1<<0)
#define TRUNCATE_CASCADE (1<<0)
#define TRUNCATE_RESTART_SEQS (1<<1)
Functions
static void logicalrep_write_attrs (StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
static void logicalrep_write_tuple (StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
static void logicalrep_read_attrs (StringInfo in, LogicalRepRelation *rel)
static void logicalrep_read_tuple (StringInfo in, LogicalRepTupleData *tuple)
static void logicalrep_write_namespace (StringInfo out, Oid nspid)
static const char * logicalrep_read_namespace (StringInfo in)
void logicalrep_write_begin (StringInfo out, ReorderBufferTXN *txn)
void logicalrep_read_begin (StringInfo in, LogicalRepBeginData *begin_data)
void logicalrep_write_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_read_commit (StringInfo in, LogicalRepCommitData *commit_data)
void logicalrep_write_begin_prepare (StringInfo out, ReorderBufferTXN *txn)
void logicalrep_read_begin_prepare (StringInfo in, LogicalRepPreparedTxnData *begin_data)
static void logicalrep_write_prepare_common (StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void logicalrep_write_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void logicalrep_read_prepare_common (StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
void logicalrep_read_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
void logicalrep_write_commit_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_read_commit_prepared (StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
void logicalrep_write_rollback_prepared (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
void logicalrep_read_rollback_prepared (StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
void logicalrep_write_stream_prepare (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void logicalrep_read_stream_prepare (StringInfo in, LogicalRepPreparedTxnData *prepare_data)
void logicalrep_write_origin (StringInfo out, const char *origin, XLogRecPtr origin_lsn)
char * logicalrep_read_origin (StringInfo in, XLogRecPtr *origin_lsn)
void logicalrep_write_insert (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
LogicalRepRelId logicalrep_read_insert (StringInfo in, LogicalRepTupleData *newtup)
void logicalrep_write_update (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
LogicalRepRelId logicalrep_read_update (StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
void logicalrep_write_delete (StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
LogicalRepRelId logicalrep_read_delete (StringInfo in, LogicalRepTupleData *oldtup)
void logicalrep_write_truncate (StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
List * logicalrep_read_truncate (StringInfo in, bool *cascade, bool *restart_seqs)
void logicalrep_write_message (StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
void logicalrep_write_rel (StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
LogicalRepRelation * logicalrep_read_rel (StringInfo in)
void logicalrep_write_typ (StringInfo out, TransactionId xid, Oid typoid)
void logicalrep_read_typ (StringInfo in, LogicalRepTyp *ltyp)
void logicalrep_write_stream_start (StringInfo out, TransactionId xid, bool first_segment)
TransactionId logicalrep_read_stream_start (StringInfo in, bool *first_segment)
void logicalrep_write_stream_stop (StringInfo out)
void logicalrep_write_stream_commit (StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
TransactionId logicalrep_read_stream_commit (StringInfo in, LogicalRepCommitData *commit_data)
void logicalrep_write_stream_abort (StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
void logicalrep_read_stream_abort (StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
const char * logicalrep_message_type (LogicalRepMsgType action)
bool logicalrep_should_publish_column (Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)

LOGICALREP_IS_REPLICA_IDENTITY

#define LOGICALREP_IS_REPLICA_IDENTITY 1

Definition at line 26 of file proto.c.

MESSAGE_TRANSACTIONAL

#define MESSAGE_TRANSACTIONAL (1<<0)

Definition at line 28 of file proto.c.

TRUNCATE_CASCADE

#define TRUNCATE_CASCADE (1<<0)

Definition at line 29 of file proto.c.

TRUNCATE_RESTART_SEQS

#define TRUNCATE_RESTART_SEQS (1<<1)

Definition at line 30 of file proto.c.

logicalrep_message_type()

Definition at line 1209 of file proto.c.

1210{

1211 static char err_unknown[20];

1212

1214 {

1216 return "BEGIN";

1218 return "COMMIT";

1220 return "ORIGIN";

1222 return "INSERT";

1224 return "UPDATE";

1226 return "DELETE";

1228 return "TRUNCATE";

1230 return "RELATION";

1232 return "TYPE";

1234 return "MESSAGE";

1236 return "BEGIN PREPARE";

1238 return "PREPARE";

1240 return "COMMIT PREPARED";

1242 return "ROLLBACK PREPARED";

1244 return "STREAM START";

1246 return "STREAM STOP";

1248 return "STREAM COMMIT";

1250 return "STREAM ABORT";

1252 return "STREAM PREPARE";

1253 }

1254

1255

1256

1257

1258

1259

1260 snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);

1261

1262 return err_unknown;

1263}

@ LOGICAL_REP_MSG_TRUNCATE

@ LOGICAL_REP_MSG_STREAM_STOP

@ LOGICAL_REP_MSG_STREAM_PREPARE

@ LOGICAL_REP_MSG_STREAM_ABORT

@ LOGICAL_REP_MSG_BEGIN_PREPARE

@ LOGICAL_REP_MSG_STREAM_START

@ LOGICAL_REP_MSG_PREPARE

@ LOGICAL_REP_MSG_RELATION

@ LOGICAL_REP_MSG_MESSAGE

@ LOGICAL_REP_MSG_ROLLBACK_PREPARED

@ LOGICAL_REP_MSG_COMMIT_PREPARED

@ LOGICAL_REP_MSG_STREAM_COMMIT

References generate_unaccent_rules::action, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and snprintf.

Referenced by apply_error_callback().

logicalrep_read_attrs()

Definition at line 985 of file proto.c.

986{

987 int i;

988 int natts;

989 char **attnames;

990 Oid *atttyps;

992

994 attnames = palloc(natts * sizeof(char *));

995 atttyps = palloc(natts * sizeof(Oid));

996

997

998 for (i = 0; i < natts; i++)

999 {

1001

1002

1006

1007

1009

1010

1012

1013

1015 }

1016

1020 rel->natts = natts;

1021}

Bitmapset * bms_add_member(Bitmapset *a, int x)

char * pstrdup(const char *in)

unsigned int pq_getmsgint(StringInfo msg, int b)

const char * pq_getmsgstring(StringInfo msg)

int pq_getmsgbyte(StringInfo msg)

#define LOGICALREP_IS_REPLICA_IDENTITY

References LogicalRepRelation::attkeys, LogicalRepRelation::attnames, LogicalRepRelation::atttyps, bms_add_member(), i, LOGICALREP_IS_REPLICA_IDENTITY, LogicalRepRelation::natts, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), and pstrdup().

Referenced by logicalrep_read_rel().

logicalrep_read_begin()

logicalrep_read_begin_prepare()

Definition at line 134 of file proto.c.

135{

136

139 elog(ERROR, "prepare_lsn not set in begin prepare message");

142 elog(ERROR, "end_lsn not set in begin prepare message");

145

146

148}

size_t strlcpy(char *dst, const char *src, size_t siz)

References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), and LogicalRepPreparedTxnData::xid.

Referenced by apply_handle_begin_prepare().

logicalrep_read_commit()

logicalrep_read_commit_prepared()

Definition at line 267 of file proto.c.

268{

269

271

272 if (flags != 0)

273 elog(ERROR, "unrecognized flags %u in commit prepared message", flags);

274

275

278 elog(ERROR, "commit_lsn is not set in commit prepared message");

281 elog(ERROR, "end_lsn is not set in commit prepared message");

284

285

287}

References LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, elog, LogicalRepCommitPreparedTxnData::end_lsn, ERROR, LogicalRepCommitPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), strlcpy(), and LogicalRepCommitPreparedTxnData::xid.

Referenced by apply_handle_commit_prepared().

logicalrep_read_delete()

logicalrep_read_insert()

logicalrep_read_namespace()

static const char * logicalrep_read_namespace ( StringInfo in) static

logicalrep_read_origin()

logicalrep_read_prepare()

logicalrep_read_prepare_common()

Definition at line 199 of file proto.c.

201{

202

204

205 if (flags != 0)

206 elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);

207

208

211 elog(ERROR, "prepare_lsn is not set in %s message", msgtype);

214 elog(ERROR, "end_lsn is not set in %s message", msgtype);

218 elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);

219

220

222}

#define InvalidTransactionId

References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, InvalidTransactionId, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), and LogicalRepPreparedTxnData::xid.

Referenced by logicalrep_read_prepare(), and logicalrep_read_stream_prepare().

logicalrep_read_rel()

Definition at line 698 of file proto.c.

699{

701

703

704

707

708

710

711

713

714 return rel;

715}

static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)

static const char * logicalrep_read_namespace(StringInfo in)

References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.

Referenced by apply_handle_relation().

logicalrep_read_rollback_prepared()

Definition at line 325 of file proto.c.

327{

328

330

331 if (flags != 0)

332 elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);

333

334

337 elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");

340 elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");

344

345

347}

XLogRecPtr prepare_end_lsn

XLogRecPtr rollback_end_lsn

TimestampTz rollback_time

References elog, ERROR, LogicalRepRollbackPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, strlcpy(), and LogicalRepRollbackPreparedTxnData::xid.

Referenced by apply_handle_rollback_prepared().

logicalrep_read_stream_abort()

logicalrep_read_stream_commit()

logicalrep_read_stream_prepare()

logicalrep_read_stream_start()

logicalrep_read_truncate()

List * logicalrep_read_truncate ( StringInfo in,
bool * cascade,
bool * restart_seqs
)

logicalrep_read_tuple()

Definition at line 861 of file proto.c.

862{

863 int i;

864 int natts;

865

866

868

869

871 tuple->colstatus = (char *) palloc(natts * sizeof(char));

872 tuple->ncols = natts;

873

874

875 for (i = 0; i < natts; i++)

876 {

877 char *buff;

878 char kind;

881

884

885 switch (kind)

886 {

888

889 break;

891

892 break;

896

897

900

901

902

903

904

905

906

907 buff[len] = '\0';

908

910 break;

911 default:

912 elog(ERROR, "unrecognized data representation type '%c'", kind);

913 }

914 }

915}

#define LOGICALREP_COLUMN_UNCHANGED

#define LOGICALREP_COLUMN_NULL

#define LOGICALREP_COLUMN_BINARY

#define LOGICALREP_COLUMN_TEXT

void * palloc0(Size size)

void pq_copymsgbytes(StringInfo msg, void *buf, int datalen)

static void initStringInfoFromString(StringInfo str, char *data, int len)

StringInfoData * colvalues

References LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, elog, ERROR, i, initStringInfoFromString(), len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_NULL, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, LogicalRepTupleData::ncols, palloc(), palloc0(), pq_copymsgbytes(), pq_getmsgbyte(), pq_getmsgint(), and value.

Referenced by logicalrep_read_delete(), logicalrep_read_insert(), and logicalrep_read_update().

logicalrep_read_typ()

logicalrep_read_update()

Definition at line 487 of file proto.c.

490{

493

494

496

497

500 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",

502

503

505 {

507 *has_oldtuple = true;

508

510 }

511 else

512 *has_oldtuple = false;

513

514

516 elog(ERROR, "expected action 'N', got %c",

518

520

521 return relid;

522}

References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().

Referenced by apply_handle_update().

logicalrep_should_publish_column()

Definition at line 1279 of file proto.c.

1281{

1282 if (att->attisdropped)

1283 return false;

1284

1285

1286 if (columns)

1288

1289

1290 if (!att->attgenerated)

1291 return true;

1292

1293

1294

1295

1296

1297 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)

1298 return include_gencols_type == PUBLISH_GENCOLS_STORED;

1299

1300 return false;

1301}

bool bms_is_member(int x, const Bitmapset *a)

References bms_is_member().

Referenced by logicalrep_write_attrs(), logicalrep_write_tuple(), and send_relation_and_attrs().

logicalrep_write_attrs()

static void logicalrep_write_attrs ( StringInfo out, Relation rel, Bitmapset * columns, PublishGencolsType include_gencols_type ) static

Definition at line 921 of file proto.c.

923{

925 int i;

926 uint16 nliveatts = 0;

928 bool replidentfull;

929

931

932

933 for (i = 0; i < desc->natts; i++)

934 {

936

938 include_gencols_type))

939 continue;

940

941 nliveatts++;

942 }

944

945

946 replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);

947 if (!replidentfull)

949

950

951 for (i = 0; i < desc->natts; i++)

952 {

955

957 include_gencols_type))

958 continue;

959

960

961 if (replidentfull ||

963 idattrs))

965

967

968

970

971

973

974

976 }

977

979}

void bms_free(Bitmapset *a)

FormData_pg_attribute * Form_pg_attribute

void pq_sendstring(StringInfo buf, const char *str)

static void pq_sendint32(StringInfo buf, uint32 i)

static void pq_sendbyte(StringInfo buf, uint8 byt)

static void pq_sendint16(StringInfo buf, uint16 i)

bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)

#define RelationGetDescr(relation)

Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)

#define FirstLowInvalidHeapAttributeNumber

static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)

References bms_free(), bms_is_member(), FirstLowInvalidHeapAttributeNumber, i, LOGICALREP_IS_REPLICA_IDENTITY, logicalrep_should_publish_column(), NameStr, TupleDescData::natts, pq_sendbyte(), pq_sendint16(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetDescr, RelationGetIdentityKeyBitmap(), and TupleDescAttr().

Referenced by logicalrep_write_rel().

logicalrep_write_begin()

logicalrep_write_begin_prepare()

Definition at line 116 of file proto.c.

117{

119

120

125

126

128}

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_BEGIN_PREPARE, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::prepare_time, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_begin_prepare_txn().

logicalrep_write_commit()

logicalrep_write_commit_prepared()

Definition at line 237 of file proto.c.

239{

241

243

244

245

246

247

249

250

252

253

258

259

261}

References Assert(), ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_COMMIT_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_commit_prepared_txn().

logicalrep_write_delete()

Definition at line 528 of file proto.c.

532{

533 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||

534 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||

535 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);

536

538

539

542

543

545

546 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)

547 pq_sendbyte(out, 'O');

548 else

549 pq_sendbyte(out, 'K');

550

552 include_gencols_type);

553}

static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)

#define RelationGetRelid(relation)

#define TransactionIdIsValid(xid)

References Assert(), LOGICAL_REP_MSG_DELETE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().

logicalrep_write_insert()

logicalrep_write_message()

Definition at line 640 of file proto.c.

643{

645

647

648

649 if (transactional)

651

652

655

661}

void pq_sendbytes(StringInfo buf, const void *data, int datalen)

static void pq_sendint8(StringInfo buf, uint8 i)

#define MESSAGE_TRANSACTIONAL

References LOGICAL_REP_MSG_MESSAGE, MESSAGE_TRANSACTIONAL, pq_sendbyte(), pq_sendbytes(), pq_sendint32(), pq_sendint64(), pq_sendint8(), pq_sendstring(), and TransactionIdIsValid.

Referenced by pgoutput_message().

logicalrep_write_namespace()

static void logicalrep_write_namespace ( StringInfo out, Oid nspid ) static

logicalrep_write_origin()

logicalrep_write_prepare()

logicalrep_write_prepare_common()

Definition at line 155 of file proto.c.

157{

159

161

162

163

164

165

169

170

172

173

178

179

181}

#define rbtxn_is_prepared(txn)

References Assert(), ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::prepare_time, rbtxn_is_prepared, TransactionIdIsValid, type, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by logicalrep_write_prepare(), and logicalrep_write_stream_prepare().

logicalrep_write_rel()

Definition at line 667 of file proto.c.

670{

672

674

675

678

679

681

682

686

687

689

690

692}

static void logicalrep_write_namespace(StringInfo out, Oid nspid)

static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)

#define RelationGetRelationName(relation)

#define RelationGetNamespace(relation)

References LOGICAL_REP_MSG_RELATION, logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

logicalrep_write_rollback_prepared()

Definition at line 293 of file proto.c.

296{

298

300

301

302

303

304

306

307

309

310

316

317

319}

References Assert(), ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_ROLLBACK_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by pgoutput_rollback_prepared_txn().

logicalrep_write_stream_abort()

logicalrep_write_stream_commit()

logicalrep_write_stream_prepare()

logicalrep_write_stream_start()

logicalrep_write_stream_stop()

void logicalrep_write_stream_stop ( StringInfo out )

logicalrep_write_truncate()

logicalrep_write_tuple()

Definition at line 767 of file proto.c.

770{

773 bool *isnull;

774 int i;

775 uint16 nliveatts = 0;

776

778

779 for (i = 0; i < desc->natts; i++)

780 {

782

784 include_gencols_type))

785 continue;

786

787 nliveatts++;

788 }

790

794

795

796 for (i = 0; i < desc->natts; i++)

797 {

801

803 include_gencols_type))

804 continue;

805

806 if (isnull[i])

807 {

809 continue;

810 }

811

813 {

814

815

816

817

818

820 continue;

821 }

822

825 elog(ERROR, "cache lookup failed for type %u", att->atttypid);

827

828

829

830

831 if (binary && OidIsValid(typclass->typsend))

832 {

833 bytea *outputbytes;

835

841 pfree(outputbytes);

842 }

843 else

844 {

845 char *outputstr;

846

850 pfree(outputstr);

851 }

852

854 }

855}

static Datum values[MAXATTR]

#define OidIsValid(objectId)

char * OidOutputFunctionCall(Oid functionId, Datum val)

bytea * OidSendFunctionCall(Oid functionId, Datum val)

#define HeapTupleIsValid(tuple)

static void * GETSTRUCT(const HeapTupleData *tuple)

void pfree(void *pointer)

FormData_pg_type * Form_pg_type

static Datum ObjectIdGetDatum(Oid X)

void pq_sendcountedtext(StringInfo buf, const char *str, int slen)

static void pq_sendint(StringInfo buf, uint32 i, int b)

void ReleaseSysCache(HeapTuple tuple)

HeapTuple SearchSysCache1(int cacheId, Datum key1)

static void slot_getallattrs(TupleTableSlot *slot)

#define VARATT_IS_EXTERNAL_ONDISK(PTR)

References elog, ERROR, GETSTRUCT(), HeapTupleIsValid, i, len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_NULL, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, logicalrep_should_publish_column(), TupleDescData::natts, ObjectIdGetDatum(), OidIsValid, OidOutputFunctionCall(), OidSendFunctionCall(), pfree(), pq_sendbyte(), pq_sendbytes(), pq_sendcountedtext(), pq_sendint(), pq_sendint16(), RelationGetDescr, ReleaseSysCache(), SearchSysCache1(), slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, TupleDescAttr(), values, VARATT_IS_EXTERNAL_ONDISK, VARDATA, VARHDRSZ, and VARSIZE.

Referenced by logicalrep_write_delete(), logicalrep_write_insert(), and logicalrep_write_update().

logicalrep_write_typ()

Definition at line 723 of file proto.c.

724{

728

730

731

734

737 elog(ERROR, "cache lookup failed for type %u", basetypoid);

739

740

742

743

746

748}

Oid getBaseType(Oid typid)

References elog, ERROR, getBaseType(), GETSTRUCT(), HeapTupleIsValid, LOGICAL_REP_MSG_TYPE, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), and TransactionIdIsValid.

Referenced by send_relation_and_attrs().

logicalrep_write_update()

Definition at line 450 of file proto.c.

454{

456

457 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||

458 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||

459 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);

460

461

464

465

467

468 if (oldslot != NULL)

469 {

470 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)

471 pq_sendbyte(out, 'O');

472 else

473 pq_sendbyte(out, 'K');

475 include_gencols_type);

476 }

477

478 pq_sendbyte(out, 'N');

480 include_gencols_type);

481}

References Assert(), LOGICAL_REP_MSG_UPDATE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.

Referenced by pgoutput_change().