PostgreSQL Source Code: src/backend/executor/execReplication.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

16

23#include "catalog/pg_am_d.h"

36

37

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54static int

57{

58 int index_attoff;

59 int skey_attoff = 0;

60 Datum indclassDatum;

63

65 Anum_pg_index_indclass);

67

68

70 index_attoff++)

71 {

72 Oid operator;

73 Oid optype;

74 Oid opfamily;

76 int table_attno = indkey->values[index_attoff];

78

80 {

81

82

83

84

85 continue;

86 }

87

88

89

90

91

96 optype,

97 eq_strategy);

98

100 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",

101 eq_strategy, optype, optype, opfamily);

102

104

105

107 index_attoff + 1,

108 eq_strategy,

109 regop,

110 searchslot->tts_values[table_attno - 1]);

111

113

114

115 if (searchslot->tts_isnull[table_attno - 1])

117

118 skey_attoff++;

119 }

120

121

122 Assert(skey_attoff > 0);

123

124 return skey_attoff;

125}

126

127

128

129

130

131

132

133static bool

135{

136 bool refetch = false;

137

138 switch (res)

139 {

141 break;

143

147 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));

148 else

151 errmsg("concurrent update, retrying")));

152 refetch = true;

153 break;

155

158 errmsg("concurrent delete, retrying")));

159 refetch = true;

160 break;

162 elog(ERROR, "attempted to lock invisible tuple");

163 break;

164 default:

165 elog(ERROR, "unexpected table_tuple_lock status: %u", res);

166 break;

167 }

168

169 return refetch;

170}

171

172

173

174

175

176

177

178bool

183{

185 int skey_attoff;

190 bool found;

192 bool isIdxSafeToSkipDuplicates;

193

194

196

198

200

201

203

204

205 scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);

206

207retry:

208 found = false;

209

210 index_rescan(scan, skey, skey_attoff, NULL, 0);

211

212

214 {

215

216

217

218

219 if (!isIdxSafeToSkipDuplicates)

220 {

221 if (eq == NULL)

223

225 continue;

226 }

227

229

232

233

234

235

236

238 {

240 goto retry;

241 }

242

243

244 found = true;

245 break;

246 }

247

248

249 if (found)

250 {

253

255

257 outslot,

259 lockmode,

261 0 ,

262 &tmfd);

263

265

267 goto retry;

268 }

269

271

272

274

275 return found;

276}

277

278

279

280

281static bool

284{

285 int attrnum;

286

289

292

293

295 {

298

300

301

302

303

304

305 if (att->attisdropped || att->attgenerated)

306 continue;

307

308

309

310

311

313 return false;

314

315

316

317

319 continue;

320

321 typentry = eq[attrnum];

322 if (typentry == NULL)

323 {

328 (errcode(ERRCODE_UNDEFINED_FUNCTION),

329 errmsg("could not identify an equality operator for type %s",

331 eq[attrnum] = typentry;

332 }

333

335 att->attcollation,

338 return false;

339 }

340

341 return true;

342}

343

344

345

346

347

348

349

350

351

352

353

354bool

357{

363 bool found;

365

367

369

370

374

375retry:

376 found = false;

377

379

380

382 {

384 continue;

385

386 found = true;

388

391

392

393

394

395

397 {

399 goto retry;

400 }

401

402

403 break;

404 }

405

406

407 if (found)

408 {

411

413

415 outslot,

417 lockmode,

419 0 ,

420 &tmfd);

421

423

425 goto retry;

426 }

427

430

431 return found;

432}

433

434

435

436

437static void

439{

441 {

444

446 continue;

447

448

449

450

451

453

455 }

456}

457

458

459

460

461

462

463

464

465

466

467static bool

471{

476

477 *conflictslot = NULL;

478

479

480

481

482

484

485retry:

487 &conflictTid, &slot->tts_tid,

489 {

490 if (*conflictslot)

492

493 *conflictslot = NULL;

494 return false;

495 }

496

498

500

502 *conflictslot,

506 0 ,

507 &tmfd);

508

510

512 goto retry;

513

514 return true;

515}

516

517

518

519

520

521static void

525{

526 List *conflicttuples = NIL;

528

529

531 {

534 &conflictslot))

535 {

537

538 conflicttuple->slot = conflictslot;

539 conflicttuple->indexoid = uniqueidx;

540

542 &conflicttuple->origin, &conflicttuple->ts);

543

544 conflicttuples = lappend(conflicttuples, conflicttuple);

545 }

546 }

547

548

549 if (conflicttuples)

552 searchslot, remoteslot, conflicttuples);

553}

554

555

556

557

558

559

560

561void

564{

565 bool skip_tuple = false;

567

568

569 Assert(rel->rd_rel->relkind == RELKIND_RELATION);

570

572

573

576 {

578 skip_tuple = true;

579 }

580

581 if (!skip_tuple)

582 {

583 List *recheckIndexes = NIL;

584 List *conflictindexes;

585 bool conflict = false;

586

587

592

593

596 if (rel->rd_rel->relispartition)

598

599

601

603

606 slot, estate, false,

607 conflictindexes ? true : false,

608 &conflict,

609 conflictindexes, false);

610

611

612

613

614

615

616

617

618

619

620

621

622

623

624

625

626 if (conflict)

628 recheckIndexes, NULL, slot);

629

630

632 recheckIndexes, NULL);

633

634

635

636

637

638

639

641 }

642}

643

644

645

646

647

648

649

650void

654{

655 bool skip_tuple = false;

658

659

660

661

662

663 Assert(rel->rd_rel->relkind == RELKIND_RELATION);

665

667

668

671 {

673 tid, NULL, slot, NULL, NULL))

674 skip_tuple = true;

675 }

676

677 if (!skip_tuple)

678 {

679 List *recheckIndexes = NIL;

681 List *conflictindexes;

682 bool conflict = false;

683

684

689

690

693 if (rel->rd_rel->relispartition)

695

697 &update_indexes);

698

700

703 slot, estate, true,

704 conflictindexes ? true : false,

705 &conflict, conflictindexes,

707

708

709

710

711

712

713 if (conflict)

715 recheckIndexes, searchslot, slot);

716

717

719 NULL, NULL,

720 tid, NULL, slot,

721 recheckIndexes, NULL, false);

722

724 }

725}

726

727

728

729

730

731

732

733void

737{

738 bool skip_tuple = false;

741

743

744

747 {

749 tid, NULL, NULL, NULL, NULL);

750 }

751

752 if (!skip_tuple)

753 {

754

756

757

759 tid, NULL, NULL, false);

760 }

761}

762

763

764

765

766void

768{

770

771

772

773

774

775 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)

776 return;

777

778

780 return;

781

782

783

784

785

786

787

788

789

790

791

792

793

794

795

796

797

798

799

800

801

802

803

804

808 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),

809 errmsg("cannot update table \"%s\"",

811 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));

814 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),

815 errmsg("cannot update table \"%s\"",

817 errdetail("Column list used by the publication does not cover the replica identity.")));

820 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),

821 errmsg("cannot update table \"%s\"",

823 errdetail("Replica identity must not contain unpublished generated columns.")));

826 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),

827 errmsg("cannot delete from table \"%s\"",

829 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));

832 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),

833 errmsg("cannot delete from table \"%s\"",

835 errdetail("Column list used by the publication does not cover the replica identity.")));

838 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),

839 errmsg("cannot delete from table \"%s\"",

841 errdetail("Replica identity must not contain unpublished generated columns.")));

842

843

845 return;

846

847

848 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)

849 return;

850

851

852

853

854

855

858 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

859 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",

861 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));

864 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

865 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",

867 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));

868}

869

870

871

872

873

874

875

876void

879{

880 if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)

882 (errcode(ERRCODE_WRONG_OBJECT_TYPE),

883 errmsg("cannot use relation \"%s.%s\" as logical replication target",

886}

StrategyNumber IndexAmTranslateCompareType(CompareType cmptype, Oid amoid, Oid opfamily, bool missing_ok)

#define AttributeNumberIsValid(attributeNumber)

#define PG_USED_FOR_ASSERTS_ONLY

#define OidIsValid(objectId)

bool IsCatalogRelation(Relation relation)

void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)

bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)

@ CT_MULTIPLE_UNIQUE_CONFLICTS

int errdetail(const char *fmt,...)

int errhint(const char *fmt,...)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, ItemPointer conflictTid, ItemPointer tupleid, List *arbiterIndexes)

List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes, bool onlySummarizing)

bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)

void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)

static void BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)

bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)

bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)

void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)

void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)

static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq)

static bool should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)

void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)

void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)

void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)

static bool FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate, Oid conflictindex, TupleTableSlot *slot, TupleTableSlot **conflictslot)

static int build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)

static void CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, ConflictType type, List *recheckIndexes, TupleTableSlot *searchslot, TupleTableSlot *remoteslot)

void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)

#define palloc0_object(type)

Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)

char * format_type_be(Oid type_oid)

Assert(PointerIsAligned(start, uint64))

void BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)

bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)

IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, IndexScanInstrumentation *instrument, int nkeys, int norderbys)

void index_close(Relation relation, LOCKMODE lockmode)

void index_endscan(IndexScanDesc scan)

Relation index_open(Oid relationId, LOCKMODE lockmode)

void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)

static bool ItemPointerIndicatesMovedPartitions(const ItemPointerData *pointer)

List * lappend(List *list, void *datum)

void list_free(List *list)

bool list_member_oid(const List *list, Oid datum)

void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)

Oid get_opclass_input_type(Oid opclass)

Oid get_opclass_family(Oid opclass)

RegProcedure get_opcode(Oid opno)

Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)

void * palloc0(Size size)

void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)

FormData_pg_attribute * Form_pg_attribute

int errdetail_relkind_not_supported(char relkind)

static int list_length(const List *l)

#define list_make1_oid(x1)

#define foreach_oid(var, lst)

#define ERRCODE_T_R_SERIALIZATION_FAILURE

static bool DatumGetBool(Datum X)

static Pointer DatumGetPointer(Datum X)

#define RelationGetRelid(relation)

#define RelationGetDescr(relation)

#define RelationGetRelationName(relation)

#define IndexRelationGetNumberOfKeyAttributes(relation)

void RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)

Oid RelationGetReplicaIndex(Relation relation)

void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)

Snapshot GetLatestSnapshot(void)

void PushActiveSnapshot(Snapshot snapshot)

void PopActiveSnapshot(void)

Snapshot GetActiveSnapshot(void)

#define InitDirtySnapshot(snapshotdata)

Oid GetRelationIdentityOrPK(Relation rel)

PublicationActions pubactions

bool cols_valid_for_delete

bool gencols_valid_for_update

bool cols_valid_for_update

bool gencols_valid_for_delete

struct HeapTupleData * rd_indextuple

List * ri_onConflictArbiterIndexes

RelationPtr ri_IndexRelationDescs

TriggerDesc * ri_TrigDesc

IndexInfo ** ri_IndexRelationInfo

bool trig_delete_before_row

bool trig_update_before_row

bool trig_insert_before_row

bool has_generated_stored

TupleDesc tts_tupleDescriptor

int16 values[FLEXIBLE_ARRAY_MEMBER]

Oid values[FLEXIBLE_ARRAY_MEMBER]

Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)

TupleTableSlot * table_slot_create(Relation relation, List **reglist)

void simple_table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, Snapshot snapshot, TU_UpdateIndexes *update_indexes)

void simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)

void simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)

static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, struct ScanKeyData *key)

static void table_endscan(TableScanDesc scan)

static void table_rescan(TableScanDesc scan, struct ScanKeyData *key)

static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)

static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)

#define TransactionIdIsValid(xid)

bool ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, TM_Result *tmresult, TM_FailureData *tmfd)

void ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TransitionCaptureState *transition_capture, bool is_crosspart_update)

bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)

void ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ResultRelInfo *src_partinfo, ResultRelInfo *dst_partinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, List *recheckIndexes, TransitionCaptureState *transition_capture, bool is_crosspart_update)

bool ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot **epqslot, TM_Result *tmresult, TM_FailureData *tmfd)

void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)

bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)

static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)

static void slot_getallattrs(TupleTableSlot *slot)

static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)

static void ExecMaterializeSlot(TupleTableSlot *slot)

TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)

#define TYPECACHE_EQ_OPR_FINFO

CommandId GetCurrentCommandId(bool used)