PostgreSQL Source Code: src/backend/replication/logical/sequencesync.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

53

64#include "utils/fmgroids.h"

72

73#define REMOTE_SEQ_COL_COUNT 10

74

76{

82

84

85

86

87

88

89

90

91

92

93void

95{

97 int nsyncworkers;

98 bool has_pending_sequences;

99 bool started_tx;

100

102

103 if (started_tx)

104 {

107 }

108

109 if (!has_pending_sequences)

110 return;

111

113

114

118 if (sequencesync_worker)

119 {

121 return;

122 }

123

124

125

126

127

130

131

132

133

134

137}

138

139

140

141

142

143

144

145static void

147{

150 {

153

154 if (buf->len > 0)

156

158 }

159}

160

161

162

163

164

165

166

167

168

169

170

171static void

173 List *missing_seqs_idx)

174{

176

177

178 if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)

179 return;

180

182

183 if (mismatched_seqs_idx)

184 {

187 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

188 errmsg_plural("mismatched or renamed sequence on subscriber (%s)",

189 "mismatched or renamed sequences on subscriber (%s)",

191 seqstr->data));

192 }

193

194 if (insuffperm_seqs_idx)

195 {

198 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

199 errmsg_plural("insufficient privileges on sequence (%s)",

200 "insufficient privileges on sequences (%s)",

202 seqstr->data));

203 }

204

205 if (missing_seqs_idx)

206 {

209 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

210 errmsg_plural("missing sequence on publisher (%s)",

211 "missing sequences on publisher (%s)",

213 seqstr->data));

214 }

215

217 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

218 errmsg("logical replication sequence synchronization failed for subscription \"%s\"",

220}

221

222

223

224

225

226

227

228

232{

233 bool isnull;

234 int col = 0;

235 Oid remote_typid;

236 int64 remote_start;

237 int64 remote_increment;

238 int64 remote_min;

239 int64 remote_max;

240 bool remote_cycle;

245

248

249

250 *seqinfo = seqinfo_local =

252

255

258

261

264

267

270

273

276

279

280

282

284

286

287

288 if (!*sequence_rel)

290

292

293

295 elog(ERROR, "cache lookup failed for sequence %u",

297

299

300

301 if (local_seq->seqtypid != remote_typid ||

302 local_seq->seqstart != remote_start ||

303 local_seq->seqincrement != remote_increment ||

304 local_seq->seqmin != remote_min ||

305 local_seq->seqmax != remote_max ||

306 local_seq->seqcycle != remote_cycle)

308

309

310 if (strcmp(seqinfo_local->nspname,

314

316 return result;

317}

318

319

320

321

322

325{

330

331

332

333

334

335 if (!run_as_owner)

337

339

341 {

342 if (!run_as_owner)

344

346 }

347

348

349

350

351

352

353

354

355

357

358 if (!run_as_owner)

360

361

362

363

364

367

369}

370

371

372

373

374static void

376{

377 int cur_batch_base_index = 0;

379 List *mismatched_seqs_idx = NIL;

380 List *missing_seqs_idx = NIL;

381 List *insuffperm_seqs_idx = NIL;

385

386#define MAX_SEQUENCES_SYNC_PER_BATCH 100

387

389 "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",

391

392 while (cur_batch_base_index < n_seqinfos)

393 {

395 BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};

396 int batch_size = 0;

397 int batch_succeeded_count = 0;

398 int batch_mismatched_count = 0;

399 int batch_skipped_count = 0;

400 int batch_insuffperm_count = 0;

401 int batch_missing_count;

403

406

408

409 for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)

410 {

411 char *nspname_literal;

412 char *seqname_literal;

413

416

417 if (seqstr->len > 0)

419

422

424 nspname_literal, seqname_literal, idx);

425

427 break;

428 }

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

463 "SELECT s.seqidx, ps.*, seq.seqtypid,\n"

464 " seq.seqstart, seq.seqincrement, seq.seqmin,\n"

465 " seq.seqmax, seq.seqcycle\n"

466 "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"

467 "JOIN pg_namespace n ON n.nspname = s.schname\n"

468 "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"

469 "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"

470 "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",

471 seqstr->data);

472

476 errcode(ERRCODE_CONNECTION_FAILURE),

477 errmsg("could not fetch sequence information from the publisher: %s",

478 res->err));

479

482 {

485 int seqidx;

486

488

490 {

493 }

494

496 &seqinfo, &seqidx);

499 sequence_rel->rd_rel->relowner);

500

501 switch (sync_status)

502 {

505 "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",

508 batch_succeeded_count++;

509 break;

511

512

513

514

515

516

518 mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,

519 seqidx);

521 batch_mismatched_count++;

522 break;

524

525

526

527

528

529

531 insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,

532 seqidx);

534 batch_insuffperm_count++;

535 break;

538 errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",

541 batch_skipped_count++;

542 break;

543 }

544

545 if (sequence_rel)

547 }

548

553

554 batch_missing_count = batch_size - (batch_succeeded_count +

555 batch_mismatched_count +

556 batch_insuffperm_count +

557 batch_skipped_count);

558

560 "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",

563 batch_size, batch_succeeded_count, batch_mismatched_count,

564 batch_insuffperm_count, batch_missing_count, batch_skipped_count);

565

566

568

569 if (batch_missing_count)

570 {

571 for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)

572 {

575

576

578 missing_seqs_idx = lappend_int(missing_seqs_idx, idx);

579 }

580 }

581

582

583

584

585

586

587 cur_batch_base_index += batch_size;

588 }

589

590

592 missing_seqs_idx);

593}

594

595

596

597

598

599static void

601{

602 char *err;

603 bool must_use_password;

610

612

614

616 Anum_pg_subscription_rel_srsubid,

619

621 Anum_pg_subscription_rel_srsubstate,

624

626 NULL, 2, skey);

628 {

633

635

637

639

640

641 if (!sequence_rel)

642 continue;

643

644

645 if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)

646 {

648 continue;

649 }

650

651

652

653

654

656

662

664

666 }

667

668

671

673

674

675

676

678 return;

679

680

683

687

688

689

690

693 must_use_password,

697 errcode(ERRCODE_CONNECTION_FAILURE),

698 errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",

700

702

704}

705

706

707

708

709

710

711

712

713static void

715{

717

719 {

720

722 }

724 {

727 else

728 {

729

730

731

732

733

737

739 }

740 }

742}

743

744

745void

747{

749

751

753

755}

Datum idx(PG_FUNCTION_ARGS)

AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)

void DisableSubscriptionAndExit(void)

MemoryContext ApplyContext

void SetupApplyOrSyncWorker(int worker_slot)

WalReceiverConn * LogRepWorkerWalRcvConn

Subscription * MySubscription

void SetSequence(Oid relid, int64 next, bool iscalled)

int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

void err(int eval, const char *fmt,...)

TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)

void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)

const TupleTableSlotOps TTSOpsMinimalTuple

#define palloc0_object(type)

void systable_endscan(SysScanDesc sysscan)

HeapTuple systable_getnext(SysScanDesc sysscan)

SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)

void ProcessConfigFile(GucContext context)

Assert(PointerIsAligned(start, uint64))

#define HeapTupleIsValid(tuple)

static void * GETSTRUCT(const HeapTupleData *tuple)

volatile sig_atomic_t ConfigReloadPending

LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)

LogicalRepWorker * MyLogicalRepWorker

int logicalrep_sync_worker_count(Oid subid)

List * lappend(List *list, void *datum)

List * lappend_int(List *list, int datum)

char * get_namespace_name(Oid nspid)

bool LWLockAcquire(LWLock *lock, LWLockMode mode)

void LWLockRelease(LWLock *lock)

char * pstrdup(const char *in)

void pfree(void *pointer)

#define CHECK_FOR_INTERRUPTS()

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

static int list_length(const List *l)

static void * list_nth(const List *list, int n)

#define foreach_int(var, lst)

static XLogRecPtr DatumGetLSN(Datum X)

FormData_pg_sequence * Form_pg_sequence

void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)

FormData_pg_subscription_rel * Form_pg_subscription_rel

static char buf[DEFAULT_XLOG_SEG_SIZE]

long pgstat_report_stat(bool force)

void pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)

static bool DatumGetBool(Datum X)

static int64 DatumGetInt64(Datum X)

static Oid DatumGetObjectId(Datum X)

static Datum ObjectIdGetDatum(Oid X)

static int32 DatumGetInt32(Datum X)

static Datum CharGetDatum(char X)

char * quote_literal_cstr(const char *rawstr)

#define RelationGetRelationName(relation)

#define RelationGetNamespace(relation)

void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)

#define REMOTE_SEQ_COL_COUNT

@ COPYSEQ_INSUFFICIENT_PERM

static CopySeqResult get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, LogicalRepSequenceInfo **seqinfo, int *seqidx)

#define MAX_SEQUENCES_SYNC_PER_BATCH

void SequenceSyncWorkerMain(Datum main_arg)

static void start_sequence_sync(void)

static void LogicalRepSyncSequences(void)

static void copy_sequences(WalReceiverConn *conn)

static void get_sequences_string(List *seqindexes, StringInfo buf)

static void report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, List *missing_seqs_idx)

void ProcessSequencesForSync(void)

static CopySeqResult copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)

#define BTEqualStrategyNumber

StringInfo makeStringInfo(void)

void resetStringInfo(StringInfo str)

void appendStringInfo(StringInfo str, const char *fmt,...)

void appendStringInfoString(StringInfo str, const char *s)

void initStringInfo(StringInfo str)

TimestampTz last_seqsync_start_time

Tuplestorestate * tuplestore

void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)

pg_noreturn void FinishSyncWorker(void)

void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)

void ReleaseSysCache(HeapTuple tuple)

HeapTuple SearchSysCache1(int cacheId, Datum key1)

Relation try_table_open(Oid relationId, LOCKMODE lockmode)

void table_close(Relation relation, LOCKMODE lockmode)

Relation table_open(Oid relationId, LOCKMODE lockmode)

bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)

static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)

void SwitchToUntrustedUser(Oid userid, UserContext *context)

void RestoreUserContext(UserContext *context)

#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)

static void walrcv_clear_result(WalRcvExecResult *walres)

#define walrcv_exec(conn, exec, nRetTypes, retTypes)

@ WORKERTYPE_SEQUENCESYNC

static bool am_sequencesync_worker(void)

void StartTransactionCommand(void)

void CommitTransactionCommand(void)

void AbortOutOfAnyTransaction(void)

uint64 GetSystemIdentifier(void)