PostgreSQL Source Code: src/include/replication/logicalproto.h File Reference (original) (raw)
Go to the source code of this file.
Data Structures | |
---|---|
struct | LogicalRepTupleData |
struct | LogicalRepRelation |
struct | LogicalRepTyp |
struct | LogicalRepBeginData |
struct | LogicalRepCommitData |
struct | LogicalRepPreparedTxnData |
struct | LogicalRepCommitPreparedTxnData |
struct | LogicalRepRollbackPreparedTxnData |
struct | LogicalRepStreamAbortData |
Macros | |
---|---|
#define | LOGICALREP_PROTO_MIN_VERSION_NUM 1 |
#define | LOGICALREP_PROTO_VERSION_NUM 1 |
#define | LOGICALREP_PROTO_STREAM_VERSION_NUM 2 |
#define | LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3 |
#define | LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4 |
#define | LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM |
#define | LOGICALREP_COLUMN_NULL 'n' |
#define | LOGICALREP_COLUMN_UNCHANGED 'u' |
#define | LOGICALREP_COLUMN_TEXT 't' |
#define | LOGICALREP_COLUMN_BINARY 'b' /* added in PG14 */ |
Typedefs | |
---|---|
typedef enum LogicalRepMsgType | LogicalRepMsgType |
typedef struct LogicalRepTupleData | LogicalRepTupleData |
typedef uint32 | LogicalRepRelId |
typedef struct LogicalRepRelation | LogicalRepRelation |
typedef struct LogicalRepTyp | LogicalRepTyp |
typedef struct LogicalRepBeginData | LogicalRepBeginData |
typedef struct LogicalRepCommitData | LogicalRepCommitData |
typedef struct LogicalRepPreparedTxnData | LogicalRepPreparedTxnData |
typedef struct LogicalRepCommitPreparedTxnData | LogicalRepCommitPreparedTxnData |
typedef struct LogicalRepRollbackPreparedTxnData | LogicalRepRollbackPreparedTxnData |
typedef struct LogicalRepStreamAbortData | LogicalRepStreamAbortData |
Enumerations | |
---|---|
enum | LogicalRepMsgType { LOGICAL_REP_MSG_BEGIN = 'B' , LOGICAL_REP_MSG_COMMIT = 'C' , LOGICAL_REP_MSG_ORIGIN = 'O' , LOGICAL_REP_MSG_INSERT = 'I' , LOGICAL_REP_MSG_UPDATE = 'U' , LOGICAL_REP_MSG_DELETE = 'D' , LOGICAL_REP_MSG_TRUNCATE = 'T' , LOGICAL_REP_MSG_RELATION = 'R' , LOGICAL_REP_MSG_TYPE = 'Y' , LOGICAL_REP_MSG_MESSAGE = 'M' , LOGICAL_REP_MSG_BEGIN_PREPARE = 'b' , LOGICAL_REP_MSG_PREPARE = 'P' , LOGICAL_REP_MSG_COMMIT_PREPARED = 'K' , LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r' , LOGICAL_REP_MSG_STREAM_START = 'S' , LOGICAL_REP_MSG_STREAM_STOP = 'E' , LOGICAL_REP_MSG_STREAM_COMMIT = 'c' , LOGICAL_REP_MSG_STREAM_ABORT = 'A' , LOGICAL_REP_MSG_STREAM_PREPARE = 'p' } |
◆ LOGICALREP_COLUMN_BINARY
#define LOGICALREP_COLUMN_BINARY 'b' /* added in PG14 */
◆ LOGICALREP_COLUMN_NULL
#define LOGICALREP_COLUMN_NULL 'n'
◆ LOGICALREP_COLUMN_TEXT
#define LOGICALREP_COLUMN_TEXT 't'
◆ LOGICALREP_COLUMN_UNCHANGED
#define LOGICALREP_COLUMN_UNCHANGED 'u'
◆ LOGICALREP_PROTO_MAX_VERSION_NUM
◆ LOGICALREP_PROTO_MIN_VERSION_NUM
#define LOGICALREP_PROTO_MIN_VERSION_NUM 1
◆ LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4
◆ LOGICALREP_PROTO_STREAM_VERSION_NUM
#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
◆ LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
◆ LOGICALREP_PROTO_VERSION_NUM
#define LOGICALREP_PROTO_VERSION_NUM 1
◆ LogicalRepBeginData
◆ LogicalRepCommitData
◆ LogicalRepCommitPreparedTxnData
◆ LogicalRepMsgType
◆ LogicalRepPreparedTxnData
◆ LogicalRepRelation
◆ LogicalRepRelId
◆ LogicalRepRollbackPreparedTxnData
◆ LogicalRepStreamAbortData
◆ LogicalRepTupleData
◆ LogicalRepTyp
◆ LogicalRepMsgType
Enumerator |
---|
LOGICAL_REP_MSG_BEGIN |
LOGICAL_REP_MSG_COMMIT |
LOGICAL_REP_MSG_ORIGIN |
LOGICAL_REP_MSG_INSERT |
LOGICAL_REP_MSG_UPDATE |
LOGICAL_REP_MSG_DELETE |
LOGICAL_REP_MSG_TRUNCATE |
LOGICAL_REP_MSG_RELATION |
LOGICAL_REP_MSG_TYPE |
LOGICAL_REP_MSG_MESSAGE |
LOGICAL_REP_MSG_BEGIN_PREPARE |
LOGICAL_REP_MSG_PREPARE |
LOGICAL_REP_MSG_COMMIT_PREPARED |
LOGICAL_REP_MSG_ROLLBACK_PREPARED |
LOGICAL_REP_MSG_STREAM_START |
LOGICAL_REP_MSG_STREAM_STOP |
LOGICAL_REP_MSG_STREAM_COMMIT |
LOGICAL_REP_MSG_STREAM_ABORT |
LOGICAL_REP_MSG_STREAM_PREPARE |
Definition at line 57 of file logicalproto.h.
58{
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_STREAM_COMMIT
◆ logicalrep_message_type()
Definition at line 1209 of file proto.c.
1210{
1211 static char err_unknown[20];
1212
1214 {
1216 return "BEGIN";
1218 return "COMMIT";
1220 return "ORIGIN";
1222 return "INSERT";
1224 return "UPDATE";
1226 return "DELETE";
1228 return "TRUNCATE";
1230 return "RELATION";
1232 return "TYPE";
1234 return "MESSAGE";
1236 return "BEGIN PREPARE";
1238 return "PREPARE";
1240 return "COMMIT PREPARED";
1242 return "ROLLBACK PREPARED";
1244 return "STREAM START";
1246 return "STREAM STOP";
1248 return "STREAM COMMIT";
1250 return "STREAM ABORT";
1252 return "STREAM PREPARE";
1253 }
1254
1255
1256
1257
1258
1259
1260 snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1261
1262 return err_unknown;
1263}
References generate_unaccent_rules::action, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and snprintf.
Referenced by apply_error_callback().
◆ logicalrep_read_begin()
◆ logicalrep_read_begin_prepare()
Definition at line 134 of file proto.c.
135{
136
139 elog(ERROR, "prepare_lsn not set in begin prepare message");
142 elog(ERROR, "end_lsn not set in begin prepare message");
145
146
148}
size_t strlcpy(char *dst, const char *src, size_t siz)
const char * pq_getmsgstring(StringInfo msg)
References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), and LogicalRepPreparedTxnData::xid.
Referenced by apply_handle_begin_prepare().
◆ logicalrep_read_commit()
◆ logicalrep_read_commit_prepared()
Definition at line 267 of file proto.c.
268{
269
271
272 if (flags != 0)
273 elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
274
275
278 elog(ERROR, "commit_lsn is not set in commit prepared message");
281 elog(ERROR, "end_lsn is not set in commit prepared message");
284
285
287}
References LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, elog, LogicalRepCommitPreparedTxnData::end_lsn, ERROR, LogicalRepCommitPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), strlcpy(), and LogicalRepCommitPreparedTxnData::xid.
Referenced by apply_handle_commit_prepared().
◆ logicalrep_read_delete()
◆ logicalrep_read_insert()
◆ logicalrep_read_origin()
◆ logicalrep_read_prepare()
◆ logicalrep_read_rel()
Definition at line 698 of file proto.c.
699{
701
703
704
707
708
710
711
713
714 return rel;
715}
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
static const char * logicalrep_read_namespace(StringInfo in)
References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.
Referenced by apply_handle_relation().
◆ logicalrep_read_rollback_prepared()
Definition at line 325 of file proto.c.
327{
328
330
331 if (flags != 0)
332 elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
333
334
337 elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
340 elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
344
345
347}
XLogRecPtr prepare_end_lsn
XLogRecPtr rollback_end_lsn
TimestampTz rollback_time
References elog, ERROR, LogicalRepRollbackPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, strlcpy(), and LogicalRepRollbackPreparedTxnData::xid.
Referenced by apply_handle_rollback_prepared().
◆ logicalrep_read_stream_abort()
◆ logicalrep_read_stream_commit()
◆ logicalrep_read_stream_prepare()
◆ logicalrep_read_stream_start()
◆ logicalrep_read_truncate()
List * logicalrep_read_truncate | ( | StringInfo | in, |
---|---|---|---|
bool * | cascade, | ||
bool * | restart_seqs | ||
) |
◆ logicalrep_read_typ()
◆ logicalrep_read_update()
Definition at line 487 of file proto.c.
490{
493
494
496
497
500 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
502
503
505 {
507 *has_oldtuple = true;
508
510 }
511 else
512 *has_oldtuple = false;
513
514
516 elog(ERROR, "expected action 'N', got %c",
518
520
521 return relid;
522}
References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().
Referenced by apply_handle_update().
◆ logicalrep_should_publish_column()
Definition at line 1279 of file proto.c.
1281{
1282 if (att->attisdropped)
1283 return false;
1284
1285
1286 if (columns)
1288
1289
1290 if (!att->attgenerated)
1291 return true;
1292
1293
1294
1295
1296
1297 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
1298 return include_gencols_type == PUBLISH_GENCOLS_STORED;
1299
1300 return false;
1301}
bool bms_is_member(int x, const Bitmapset *a)
References bms_is_member().
Referenced by logicalrep_write_attrs(), logicalrep_write_tuple(), and send_relation_and_attrs().
◆ logicalrep_write_begin()
Definition at line 49 of file proto.c.
50{
52
53
57}
static void pq_sendint32(StringInfo buf, uint32 i)
static void pq_sendbyte(StringInfo buf, uint8 byt)
static void pq_sendint64(StringInfo buf, uint64 i)
union ReorderBufferTXN::@116 xact_time
References ReorderBufferTXN::commit_time, ReorderBufferTXN::final_lsn, LOGICAL_REP_MSG_BEGIN, pq_sendbyte(), pq_sendint32(), pq_sendint64(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.
Referenced by pgoutput_send_begin().
◆ logicalrep_write_begin_prepare()
Definition at line 116 of file proto.c.
117{
119
120
125
126
128}
void pq_sendstring(StringInfo buf, const char *str)
References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_BEGIN_PREPARE, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::prepare_time, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.
Referenced by pgoutput_begin_prepare_txn().
◆ logicalrep_write_commit()
◆ logicalrep_write_commit_prepared()
Definition at line 237 of file proto.c.
239{
241
243
244
245
246
247
249
250
252
253
258
259
261}
References Assert(), ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_COMMIT_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.
Referenced by pgoutput_commit_prepared_txn().
◆ logicalrep_write_delete()
Definition at line 528 of file proto.c.
532{
533 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
534 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
535 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
536
538
539
542
543
545
546 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
547 pq_sendbyte(out, 'O');
548 else
549 pq_sendbyte(out, 'K');
550
552 include_gencols_type);
553}
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
#define RelationGetRelid(relation)
#define TransactionIdIsValid(xid)
References Assert(), LOGICAL_REP_MSG_DELETE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.
Referenced by pgoutput_change().
◆ logicalrep_write_insert()
◆ logicalrep_write_message()
Definition at line 640 of file proto.c.
643{
645
647
648
649 if (transactional)
651
652
655
661}
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
static void pq_sendint8(StringInfo buf, uint8 i)
#define MESSAGE_TRANSACTIONAL
References LOGICAL_REP_MSG_MESSAGE, MESSAGE_TRANSACTIONAL, pq_sendbyte(), pq_sendbytes(), pq_sendint32(), pq_sendint64(), pq_sendint8(), pq_sendstring(), and TransactionIdIsValid.
Referenced by pgoutput_message().
◆ logicalrep_write_origin()
◆ logicalrep_write_prepare()
◆ logicalrep_write_rel()
Definition at line 667 of file proto.c.
670{
672
674
675
678
679
681
682
686
687
689
690
692}
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
References LOGICAL_REP_MSG_RELATION, logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.
Referenced by send_relation_and_attrs().
◆ logicalrep_write_rollback_prepared()
Definition at line 293 of file proto.c.
296{
298
300
301
302
303
304
306
307
309
310
316
317
319}
References Assert(), ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_ROLLBACK_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.
Referenced by pgoutput_rollback_prepared_txn().
◆ logicalrep_write_stream_abort()
◆ logicalrep_write_stream_commit()
◆ logicalrep_write_stream_prepare()
◆ logicalrep_write_stream_start()
◆ logicalrep_write_stream_stop()
void logicalrep_write_stream_stop | ( | StringInfo | out | ) |
---|
◆ logicalrep_write_truncate()
◆ logicalrep_write_typ()
Definition at line 723 of file proto.c.
724{
728
730
731
734
737 elog(ERROR, "cache lookup failed for type %u", basetypoid);
739
740
742
743
746
748}
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
Oid getBaseType(Oid typid)
FormData_pg_type * Form_pg_type
static Datum ObjectIdGetDatum(Oid X)
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
References elog, ERROR, getBaseType(), GETSTRUCT(), HeapTupleIsValid, LOGICAL_REP_MSG_TYPE, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), and TransactionIdIsValid.
Referenced by send_relation_and_attrs().
◆ logicalrep_write_update()
Definition at line 450 of file proto.c.
454{
456
457 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
458 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
459 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
460
461
464
465
467
468 if (oldslot != NULL)
469 {
470 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
471 pq_sendbyte(out, 'O');
472 else
473 pq_sendbyte(out, 'K');
475 include_gencols_type);
476 }
477
478 pq_sendbyte(out, 'N');
480 include_gencols_type);
481}
References Assert(), LOGICAL_REP_MSG_UPDATE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.
Referenced by pgoutput_change().