PostgreSQL Source Code: src/backend/replication/logical/sequencesync.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
53
64#include "utils/fmgroids.h"
72
73#define REMOTE_SEQ_COL_COUNT 10
74
76{
82
84
85
86
87
88
89
90
91
92
93void
95{
97 int nsyncworkers;
98 bool has_pending_sequences;
99 bool started_tx;
100
102
103 if (started_tx)
104 {
107 }
108
109 if (!has_pending_sequences)
110 return;
111
113
114
118 if (sequencesync_worker)
119 {
121 return;
122 }
123
124
125
126
127
130
131
132
133
134
137}
138
139
140
141
142
143
144
145static void
147{
150 {
153
154 if (buf->len > 0)
156
158 }
159}
160
161
162
163
164
165
166
167
168
169
170
171static void
173 List *missing_seqs_idx)
174{
176
177
178 if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
179 return;
180
182
183 if (mismatched_seqs_idx)
184 {
187 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
188 errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
189 "mismatched or renamed sequences on subscriber (%s)",
191 seqstr->data));
192 }
193
194 if (insuffperm_seqs_idx)
195 {
198 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
199 errmsg_plural("insufficient privileges on sequence (%s)",
200 "insufficient privileges on sequences (%s)",
202 seqstr->data));
203 }
204
205 if (missing_seqs_idx)
206 {
209 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
210 errmsg_plural("missing sequence on publisher (%s)",
211 "missing sequences on publisher (%s)",
213 seqstr->data));
214 }
215
217 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
218 errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
220}
221
222
223
224
225
226
227
228
232{
233 bool isnull;
234 int col = 0;
235 Oid remote_typid;
236 int64 remote_start;
237 int64 remote_increment;
238 int64 remote_min;
239 int64 remote_max;
240 bool remote_cycle;
245
248
249
250 *seqinfo = seqinfo_local =
252
255
258
261
264
267
270
273
276
279
280
282
284
286
287
288 if (!*sequence_rel)
290
292
293
295 elog(ERROR, "cache lookup failed for sequence %u",
297
299
300
301 if (local_seq->seqtypid != remote_typid ||
302 local_seq->seqstart != remote_start ||
303 local_seq->seqincrement != remote_increment ||
304 local_seq->seqmin != remote_min ||
305 local_seq->seqmax != remote_max ||
306 local_seq->seqcycle != remote_cycle)
308
309
310 if (strcmp(seqinfo_local->nspname,
314
316 return result;
317}
318
319
320
321
322
325{
330
331
332
333
334
335 if (!run_as_owner)
337
339
341 {
342 if (!run_as_owner)
344
346 }
347
348
349
350
351
352
353
354
355
357
358 if (!run_as_owner)
360
361
362
363
364
367
369}
370
371
372
373
374static void
376{
377 int cur_batch_base_index = 0;
379 List *mismatched_seqs_idx = NIL;
380 List *missing_seqs_idx = NIL;
381 List *insuffperm_seqs_idx = NIL;
385
386#define MAX_SEQUENCES_SYNC_PER_BATCH 100
387
389 "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
391
392 while (cur_batch_base_index < n_seqinfos)
393 {
395 BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
396 int batch_size = 0;
397 int batch_succeeded_count = 0;
398 int batch_mismatched_count = 0;
399 int batch_skipped_count = 0;
400 int batch_insuffperm_count = 0;
401 int batch_missing_count;
403
406
408
409 for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
410 {
411 char *nspname_literal;
412 char *seqname_literal;
413
416
417 if (seqstr->len > 0)
419
422
424 nspname_literal, seqname_literal, idx);
425
427 break;
428 }
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
463 "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
464 " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
465 " seq.seqmax, seq.seqcycle\n"
466 "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
467 "JOIN pg_namespace n ON n.nspname = s.schname\n"
468 "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
469 "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
470 "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
471 seqstr->data);
472
476 errcode(ERRCODE_CONNECTION_FAILURE),
477 errmsg("could not fetch sequence information from the publisher: %s",
478 res->err));
479
482 {
485 int seqidx;
486
488
490 {
493 }
494
496 &seqinfo, &seqidx);
499 sequence_rel->rd_rel->relowner);
500
501 switch (sync_status)
502 {
505 "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
508 batch_succeeded_count++;
509 break;
511
512
513
514
515
516
518 mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
519 seqidx);
521 batch_mismatched_count++;
522 break;
524
525
526
527
528
529
531 insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
532 seqidx);
534 batch_insuffperm_count++;
535 break;
538 errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
541 batch_skipped_count++;
542 break;
543 }
544
545 if (sequence_rel)
547 }
548
553
554 batch_missing_count = batch_size - (batch_succeeded_count +
555 batch_mismatched_count +
556 batch_insuffperm_count +
557 batch_skipped_count);
558
560 "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
563 batch_size, batch_succeeded_count, batch_mismatched_count,
564 batch_insuffperm_count, batch_missing_count, batch_skipped_count);
565
566
568
569 if (batch_missing_count)
570 {
571 for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
572 {
575
576
578 missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
579 }
580 }
581
582
583
584
585
586
587 cur_batch_base_index += batch_size;
588 }
589
590
592 missing_seqs_idx);
593}
594
595
596
597
598
599static void
601{
602 char *err;
603 bool must_use_password;
610
612
614
616 Anum_pg_subscription_rel_srsubid,
619
621 Anum_pg_subscription_rel_srsubstate,
624
626 NULL, 2, skey);
628 {
633
635
637
639
640
641 if (!sequence_rel)
642 continue;
643
644
645 if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
646 {
648 continue;
649 }
650
651
652
653
654
656
662
664
666 }
667
668
671
673
674
675
676
678 return;
679
680
683
687
688
689
690
693 must_use_password,
697 errcode(ERRCODE_CONNECTION_FAILURE),
698 errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
700
702
704}
705
706
707
708
709
710
711
712
713static void
715{
717
719 {
720
722 }
724 {
727 else
728 {
729
730
731
732
733
737
739 }
740 }
742}
743
744
745void
747{
749
751
753
755}
Datum idx(PG_FUNCTION_ARGS)
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
void DisableSubscriptionAndExit(void)
MemoryContext ApplyContext
void SetupApplyOrSyncWorker(int worker_slot)
WalReceiverConn * LogRepWorkerWalRcvConn
Subscription * MySubscription
void SetSequence(Oid relid, int64 next, bool iscalled)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
#define palloc0_object(type)
void systable_endscan(SysScanDesc sysscan)
HeapTuple systable_getnext(SysScanDesc sysscan)
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
void ProcessConfigFile(GucContext context)
Assert(PointerIsAligned(start, uint64))
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
volatile sig_atomic_t ConfigReloadPending
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
LogicalRepWorker * MyLogicalRepWorker
int logicalrep_sync_worker_count(Oid subid)
List * lappend(List *list, void *datum)
List * lappend_int(List *list, int datum)
char * get_namespace_name(Oid nspid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
char * pstrdup(const char *in)
void pfree(void *pointer)
#define CHECK_FOR_INTERRUPTS()
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int list_length(const List *l)
static void * list_nth(const List *list, int n)
#define foreach_int(var, lst)
static XLogRecPtr DatumGetLSN(Datum X)
FormData_pg_sequence * Form_pg_sequence
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
FormData_pg_subscription_rel * Form_pg_subscription_rel
static char buf[DEFAULT_XLOG_SEG_SIZE]
long pgstat_report_stat(bool force)
void pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
static bool DatumGetBool(Datum X)
static int64 DatumGetInt64(Datum X)
static Oid DatumGetObjectId(Datum X)
static Datum ObjectIdGetDatum(Oid X)
static int32 DatumGetInt32(Datum X)
static Datum CharGetDatum(char X)
char * quote_literal_cstr(const char *rawstr)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
#define REMOTE_SEQ_COL_COUNT
@ COPYSEQ_INSUFFICIENT_PERM
static CopySeqResult get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, LogicalRepSequenceInfo **seqinfo, int *seqidx)
#define MAX_SEQUENCES_SYNC_PER_BATCH
void SequenceSyncWorkerMain(Datum main_arg)
static void start_sequence_sync(void)
static void LogicalRepSyncSequences(void)
static void copy_sequences(WalReceiverConn *conn)
static void get_sequences_string(List *seqindexes, StringInfo buf)
static void report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, List *missing_seqs_idx)
void ProcessSequencesForSync(void)
static CopySeqResult copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
#define BTEqualStrategyNumber
StringInfo makeStringInfo(void)
void resetStringInfo(StringInfo str)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void initStringInfo(StringInfo str)
TimestampTz last_seqsync_start_time
Tuplestorestate * tuplestore
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
pg_noreturn void FinishSyncWorker(void)
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Relation try_table_open(Oid relationId, LOCKMODE lockmode)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
void SwitchToUntrustedUser(Oid userid, UserContext *context)
void RestoreUserContext(UserContext *context)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
@ WORKERTYPE_SEQUENCESYNC
static bool am_sequencesync_worker(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
void AbortOutOfAnyTransaction(void)
uint64 GetSystemIdentifier(void)