PostgreSQL Source Code: src/backend/replication/logical/proto.c File Reference (original) (raw)
Go to the source code of this file.
Macros | |
---|---|
#define | LOGICALREP_IS_REPLICA_IDENTITY 1 |
#define | MESSAGE_TRANSACTIONAL (1<<0) |
#define | TRUNCATE_CASCADE (1<<0) |
#define | TRUNCATE_RESTART_SEQS (1<<1) |
◆ LOGICALREP_IS_REPLICA_IDENTITY
#define LOGICALREP_IS_REPLICA_IDENTITY 1
Definition at line 26 of file proto.c.
◆ MESSAGE_TRANSACTIONAL
#define MESSAGE_TRANSACTIONAL (1<<0)
Definition at line 28 of file proto.c.
◆ TRUNCATE_CASCADE
#define TRUNCATE_CASCADE (1<<0)
Definition at line 29 of file proto.c.
◆ TRUNCATE_RESTART_SEQS
#define TRUNCATE_RESTART_SEQS (1<<1)
Definition at line 30 of file proto.c.
◆ logicalrep_message_type()
Definition at line 1209 of file proto.c.
1210{
1211 static char err_unknown[20];
1212
1214 {
1216 return "BEGIN";
1218 return "COMMIT";
1220 return "ORIGIN";
1222 return "INSERT";
1224 return "UPDATE";
1226 return "DELETE";
1228 return "TRUNCATE";
1230 return "RELATION";
1232 return "TYPE";
1234 return "MESSAGE";
1236 return "BEGIN PREPARE";
1238 return "PREPARE";
1240 return "COMMIT PREPARED";
1242 return "ROLLBACK PREPARED";
1244 return "STREAM START";
1246 return "STREAM STOP";
1248 return "STREAM COMMIT";
1250 return "STREAM ABORT";
1252 return "STREAM PREPARE";
1253 }
1254
1255
1256
1257
1258
1259
1260 snprintf(err_unknown, sizeof(err_unknown), "??? (%d)", action);
1261
1262 return err_unknown;
1263}
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_STREAM_COMMIT
References generate_unaccent_rules::action, LOGICAL_REP_MSG_BEGIN, LOGICAL_REP_MSG_BEGIN_PREPARE, LOGICAL_REP_MSG_COMMIT, LOGICAL_REP_MSG_COMMIT_PREPARED, LOGICAL_REP_MSG_DELETE, LOGICAL_REP_MSG_INSERT, LOGICAL_REP_MSG_MESSAGE, LOGICAL_REP_MSG_ORIGIN, LOGICAL_REP_MSG_PREPARE, LOGICAL_REP_MSG_RELATION, LOGICAL_REP_MSG_ROLLBACK_PREPARED, LOGICAL_REP_MSG_STREAM_ABORT, LOGICAL_REP_MSG_STREAM_COMMIT, LOGICAL_REP_MSG_STREAM_PREPARE, LOGICAL_REP_MSG_STREAM_START, LOGICAL_REP_MSG_STREAM_STOP, LOGICAL_REP_MSG_TRUNCATE, LOGICAL_REP_MSG_TYPE, LOGICAL_REP_MSG_UPDATE, and snprintf.
Referenced by apply_error_callback().
◆ logicalrep_read_attrs()
Definition at line 985 of file proto.c.
986{
987 int i;
988 int natts;
989 char **attnames;
990 Oid *atttyps;
992
994 attnames = palloc(natts * sizeof(char *));
995 atttyps = palloc(natts * sizeof(Oid));
996
997
998 for (i = 0; i < natts; i++)
999 {
1001
1002
1006
1007
1009
1010
1012
1013
1015 }
1016
1020 rel->natts = natts;
1021}
Bitmapset * bms_add_member(Bitmapset *a, int x)
char * pstrdup(const char *in)
unsigned int pq_getmsgint(StringInfo msg, int b)
const char * pq_getmsgstring(StringInfo msg)
int pq_getmsgbyte(StringInfo msg)
#define LOGICALREP_IS_REPLICA_IDENTITY
References LogicalRepRelation::attkeys, LogicalRepRelation::attnames, LogicalRepRelation::atttyps, bms_add_member(), i, LOGICALREP_IS_REPLICA_IDENTITY, LogicalRepRelation::natts, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), and pstrdup().
Referenced by logicalrep_read_rel().
◆ logicalrep_read_begin()
◆ logicalrep_read_begin_prepare()
Definition at line 134 of file proto.c.
135{
136
139 elog(ERROR, "prepare_lsn not set in begin prepare message");
142 elog(ERROR, "end_lsn not set in begin prepare message");
145
146
148}
size_t strlcpy(char *dst, const char *src, size_t siz)
References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), and LogicalRepPreparedTxnData::xid.
Referenced by apply_handle_begin_prepare().
◆ logicalrep_read_commit()
◆ logicalrep_read_commit_prepared()
Definition at line 267 of file proto.c.
268{
269
271
272 if (flags != 0)
273 elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
274
275
278 elog(ERROR, "commit_lsn is not set in commit prepared message");
281 elog(ERROR, "end_lsn is not set in commit prepared message");
284
285
287}
References LogicalRepCommitPreparedTxnData::commit_lsn, LogicalRepCommitPreparedTxnData::commit_time, elog, LogicalRepCommitPreparedTxnData::end_lsn, ERROR, LogicalRepCommitPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), strlcpy(), and LogicalRepCommitPreparedTxnData::xid.
Referenced by apply_handle_commit_prepared().
◆ logicalrep_read_delete()
◆ logicalrep_read_insert()
◆ logicalrep_read_namespace()
static const char * logicalrep_read_namespace ( StringInfo in) | static |
---|
◆ logicalrep_read_origin()
◆ logicalrep_read_prepare()
◆ logicalrep_read_prepare_common()
Definition at line 199 of file proto.c.
201{
202
204
205 if (flags != 0)
206 elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
207
208
211 elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
214 elog(ERROR, "end_lsn is not set in %s message", msgtype);
218 elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
219
220
222}
#define InvalidTransactionId
References elog, LogicalRepPreparedTxnData::end_lsn, ERROR, LogicalRepPreparedTxnData::gid, InvalidTransactionId, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepPreparedTxnData::prepare_lsn, LogicalRepPreparedTxnData::prepare_time, strlcpy(), and LogicalRepPreparedTxnData::xid.
Referenced by logicalrep_read_prepare(), and logicalrep_read_stream_prepare().
◆ logicalrep_read_rel()
Definition at line 698 of file proto.c.
699{
701
703
704
707
708
710
711
713
714 return rel;
715}
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
static const char * logicalrep_read_namespace(StringInfo in)
References logicalrep_read_attrs(), logicalrep_read_namespace(), LogicalRepRelation::nspname, palloc(), pq_getmsgbyte(), pq_getmsgint(), pq_getmsgstring(), pstrdup(), LogicalRepRelation::relname, LogicalRepRelation::remoteid, and LogicalRepRelation::replident.
Referenced by apply_handle_relation().
◆ logicalrep_read_rollback_prepared()
Definition at line 325 of file proto.c.
327{
328
330
331 if (flags != 0)
332 elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
333
334
337 elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
340 elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
344
345
347}
XLogRecPtr prepare_end_lsn
XLogRecPtr rollback_end_lsn
TimestampTz rollback_time
References elog, ERROR, LogicalRepRollbackPreparedTxnData::gid, InvalidXLogRecPtr, pq_getmsgbyte(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgstring(), LogicalRepRollbackPreparedTxnData::prepare_end_lsn, LogicalRepRollbackPreparedTxnData::prepare_time, LogicalRepRollbackPreparedTxnData::rollback_end_lsn, LogicalRepRollbackPreparedTxnData::rollback_time, strlcpy(), and LogicalRepRollbackPreparedTxnData::xid.
Referenced by apply_handle_rollback_prepared().
◆ logicalrep_read_stream_abort()
◆ logicalrep_read_stream_commit()
◆ logicalrep_read_stream_prepare()
◆ logicalrep_read_stream_start()
◆ logicalrep_read_truncate()
List * logicalrep_read_truncate | ( | StringInfo | in, |
---|---|---|---|
bool * | cascade, | ||
bool * | restart_seqs | ||
) |
◆ logicalrep_read_tuple()
Definition at line 861 of file proto.c.
862{
863 int i;
864 int natts;
865
866
868
869
871 tuple->colstatus = (char *) palloc(natts * sizeof(char));
872 tuple->ncols = natts;
873
874
875 for (i = 0; i < natts; i++)
876 {
877 char *buff;
878 char kind;
881
884
885 switch (kind)
886 {
888
889 break;
891
892 break;
896
897
900
901
902
903
904
905
906
907 buff[len] = '\0';
908
910 break;
911 default:
912 elog(ERROR, "unrecognized data representation type '%c'", kind);
913 }
914 }
915}
#define LOGICALREP_COLUMN_UNCHANGED
#define LOGICALREP_COLUMN_NULL
#define LOGICALREP_COLUMN_BINARY
#define LOGICALREP_COLUMN_TEXT
void * palloc0(Size size)
void pq_copymsgbytes(StringInfo msg, void *buf, int datalen)
static void initStringInfoFromString(StringInfo str, char *data, int len)
StringInfoData * colvalues
References LogicalRepTupleData::colstatus, LogicalRepTupleData::colvalues, elog, ERROR, i, initStringInfoFromString(), len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_NULL, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, LogicalRepTupleData::ncols, palloc(), palloc0(), pq_copymsgbytes(), pq_getmsgbyte(), pq_getmsgint(), and value.
Referenced by logicalrep_read_delete(), logicalrep_read_insert(), and logicalrep_read_update().
◆ logicalrep_read_typ()
◆ logicalrep_read_update()
Definition at line 487 of file proto.c.
490{
493
494
496
497
500 elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
502
503
505 {
507 *has_oldtuple = true;
508
510 }
511 else
512 *has_oldtuple = false;
513
514
516 elog(ERROR, "expected action 'N', got %c",
518
520
521 return relid;
522}
References generate_unaccent_rules::action, elog, ERROR, logicalrep_read_tuple(), pq_getmsgbyte(), and pq_getmsgint().
Referenced by apply_handle_update().
◆ logicalrep_should_publish_column()
Definition at line 1279 of file proto.c.
1281{
1282 if (att->attisdropped)
1283 return false;
1284
1285
1286 if (columns)
1288
1289
1290 if (!att->attgenerated)
1291 return true;
1292
1293
1294
1295
1296
1297 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
1298 return include_gencols_type == PUBLISH_GENCOLS_STORED;
1299
1300 return false;
1301}
bool bms_is_member(int x, const Bitmapset *a)
References bms_is_member().
Referenced by logicalrep_write_attrs(), logicalrep_write_tuple(), and send_relation_and_attrs().
◆ logicalrep_write_attrs()
static void logicalrep_write_attrs ( StringInfo out, Relation rel, Bitmapset * columns, PublishGencolsType include_gencols_type ) | static |
---|
Definition at line 921 of file proto.c.
923{
925 int i;
926 uint16 nliveatts = 0;
928 bool replidentfull;
929
931
932
933 for (i = 0; i < desc->natts; i++)
934 {
936
938 include_gencols_type))
939 continue;
940
941 nliveatts++;
942 }
944
945
946 replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
947 if (!replidentfull)
949
950
951 for (i = 0; i < desc->natts; i++)
952 {
955
957 include_gencols_type))
958 continue;
959
960
961 if (replidentfull ||
963 idattrs))
965
967
968
970
971
973
974
976 }
977
979}
void bms_free(Bitmapset *a)
FormData_pg_attribute * Form_pg_attribute
void pq_sendstring(StringInfo buf, const char *str)
static void pq_sendint32(StringInfo buf, uint32 i)
static void pq_sendbyte(StringInfo buf, uint8 byt)
static void pq_sendint16(StringInfo buf, uint16 i)
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
#define RelationGetDescr(relation)
Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)
#define FirstLowInvalidHeapAttributeNumber
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
References bms_free(), bms_is_member(), FirstLowInvalidHeapAttributeNumber, i, LOGICALREP_IS_REPLICA_IDENTITY, logicalrep_should_publish_column(), NameStr, TupleDescData::natts, pq_sendbyte(), pq_sendint16(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetDescr, RelationGetIdentityKeyBitmap(), and TupleDescAttr().
Referenced by logicalrep_write_rel().
◆ logicalrep_write_begin()
◆ logicalrep_write_begin_prepare()
Definition at line 116 of file proto.c.
117{
119
120
125
126
128}
References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_BEGIN_PREPARE, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::prepare_time, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.
Referenced by pgoutput_begin_prepare_txn().
◆ logicalrep_write_commit()
◆ logicalrep_write_commit_prepared()
Definition at line 237 of file proto.c.
239{
241
243
244
245
246
247
249
250
252
253
258
259
261}
References Assert(), ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_COMMIT_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.
Referenced by pgoutput_commit_prepared_txn().
◆ logicalrep_write_delete()
Definition at line 528 of file proto.c.
532{
533 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
534 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
535 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
536
538
539
542
543
545
546 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
547 pq_sendbyte(out, 'O');
548 else
549 pq_sendbyte(out, 'K');
550
552 include_gencols_type);
553}
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
#define RelationGetRelid(relation)
#define TransactionIdIsValid(xid)
References Assert(), LOGICAL_REP_MSG_DELETE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.
Referenced by pgoutput_change().
◆ logicalrep_write_insert()
◆ logicalrep_write_message()
Definition at line 640 of file proto.c.
643{
645
647
648
649 if (transactional)
651
652
655
661}
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
static void pq_sendint8(StringInfo buf, uint8 i)
#define MESSAGE_TRANSACTIONAL
References LOGICAL_REP_MSG_MESSAGE, MESSAGE_TRANSACTIONAL, pq_sendbyte(), pq_sendbytes(), pq_sendint32(), pq_sendint64(), pq_sendint8(), pq_sendstring(), and TransactionIdIsValid.
Referenced by pgoutput_message().
◆ logicalrep_write_namespace()
static void logicalrep_write_namespace ( StringInfo out, Oid nspid ) | static |
---|
◆ logicalrep_write_origin()
◆ logicalrep_write_prepare()
◆ logicalrep_write_prepare_common()
Definition at line 155 of file proto.c.
157{
159
161
162
163
164
165
169
170
172
173
178
179
181}
#define rbtxn_is_prepared(txn)
References Assert(), ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::prepare_time, rbtxn_is_prepared, TransactionIdIsValid, type, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.
Referenced by logicalrep_write_prepare(), and logicalrep_write_stream_prepare().
◆ logicalrep_write_rel()
Definition at line 667 of file proto.c.
670{
672
674
675
678
679
681
682
686
687
689
690
692}
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
References LOGICAL_REP_MSG_RELATION, logicalrep_write_attrs(), logicalrep_write_namespace(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RelationGetRelid, relname, and TransactionIdIsValid.
Referenced by send_relation_and_attrs().
◆ logicalrep_write_rollback_prepared()
Definition at line 293 of file proto.c.
296{
298
300
301
302
303
304
306
307
309
310
316
317
319}
References Assert(), ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::gid, LOGICAL_REP_MSG_ROLLBACK_PREPARED, pq_sendbyte(), pq_sendint32(), pq_sendint64(), pq_sendstring(), ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.
Referenced by pgoutput_rollback_prepared_txn().
◆ logicalrep_write_stream_abort()
◆ logicalrep_write_stream_commit()
◆ logicalrep_write_stream_prepare()
◆ logicalrep_write_stream_start()
◆ logicalrep_write_stream_stop()
void logicalrep_write_stream_stop | ( | StringInfo | out | ) |
---|
◆ logicalrep_write_truncate()
◆ logicalrep_write_tuple()
Definition at line 767 of file proto.c.
770{
773 bool *isnull;
774 int i;
775 uint16 nliveatts = 0;
776
778
779 for (i = 0; i < desc->natts; i++)
780 {
782
784 include_gencols_type))
785 continue;
786
787 nliveatts++;
788 }
790
794
795
796 for (i = 0; i < desc->natts; i++)
797 {
801
803 include_gencols_type))
804 continue;
805
806 if (isnull[i])
807 {
809 continue;
810 }
811
813 {
814
815
816
817
818
820 continue;
821 }
822
825 elog(ERROR, "cache lookup failed for type %u", att->atttypid);
827
828
829
830
831 if (binary && OidIsValid(typclass->typsend))
832 {
833 bytea *outputbytes;
835
841 pfree(outputbytes);
842 }
843 else
844 {
845 char *outputstr;
846
850 pfree(outputstr);
851 }
852
854 }
855}
static Datum values[MAXATTR]
#define OidIsValid(objectId)
char * OidOutputFunctionCall(Oid functionId, Datum val)
bytea * OidSendFunctionCall(Oid functionId, Datum val)
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
void pfree(void *pointer)
FormData_pg_type * Form_pg_type
static Datum ObjectIdGetDatum(Oid X)
void pq_sendcountedtext(StringInfo buf, const char *str, int slen)
static void pq_sendint(StringInfo buf, uint32 i, int b)
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
static void slot_getallattrs(TupleTableSlot *slot)
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
References elog, ERROR, GETSTRUCT(), HeapTupleIsValid, i, len, LOGICALREP_COLUMN_BINARY, LOGICALREP_COLUMN_NULL, LOGICALREP_COLUMN_TEXT, LOGICALREP_COLUMN_UNCHANGED, logicalrep_should_publish_column(), TupleDescData::natts, ObjectIdGetDatum(), OidIsValid, OidOutputFunctionCall(), OidSendFunctionCall(), pfree(), pq_sendbyte(), pq_sendbytes(), pq_sendcountedtext(), pq_sendint(), pq_sendint16(), RelationGetDescr, ReleaseSysCache(), SearchSysCache1(), slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, TupleDescAttr(), values, VARATT_IS_EXTERNAL_ONDISK, VARDATA, VARHDRSZ, and VARSIZE.
Referenced by logicalrep_write_delete(), logicalrep_write_insert(), and logicalrep_write_update().
◆ logicalrep_write_typ()
Definition at line 723 of file proto.c.
724{
728
730
731
734
737 elog(ERROR, "cache lookup failed for type %u", basetypoid);
739
740
742
743
746
748}
Oid getBaseType(Oid typid)
References elog, ERROR, getBaseType(), GETSTRUCT(), HeapTupleIsValid, LOGICAL_REP_MSG_TYPE, logicalrep_write_namespace(), NameStr, ObjectIdGetDatum(), pq_sendbyte(), pq_sendint32(), pq_sendstring(), ReleaseSysCache(), SearchSysCache1(), and TransactionIdIsValid.
Referenced by send_relation_and_attrs().
◆ logicalrep_write_update()
Definition at line 450 of file proto.c.
454{
456
457 Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
458 rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
459 rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
460
461
464
465
467
468 if (oldslot != NULL)
469 {
470 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
471 pq_sendbyte(out, 'O');
472 else
473 pq_sendbyte(out, 'K');
475 include_gencols_type);
476 }
477
478 pq_sendbyte(out, 'N');
480 include_gencols_type);
481}
References Assert(), LOGICAL_REP_MSG_UPDATE, logicalrep_write_tuple(), pq_sendbyte(), pq_sendint32(), RelationData::rd_rel, RelationGetRelid, and TransactionIdIsValid.
Referenced by pgoutput_change().