PostgreSQL Source Code: src/backend/executor/execReplication.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

16

25#include "catalog/pg_am_d.h"

38

39

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56static int

59{

60 int index_attoff;

61 int skey_attoff = 0;

62 Datum indclassDatum;

65

67 Anum_pg_index_indclass);

69

70

72 index_attoff++)

73 {

74 Oid operator;

75 Oid optype;

76 Oid opfamily;

78 int table_attno = indkey->values[index_attoff];

80

82 {

83

84

85

86

87 continue;

88 }

89

90

91

92

93

98 optype,

99 eq_strategy);

100

102 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",

103 eq_strategy, optype, optype, opfamily);

104

106

107

109 index_attoff + 1,

110 eq_strategy,

111 regop,

112 searchslot->tts_values[table_attno - 1]);

113

115

116

117 if (searchslot->tts_isnull[table_attno - 1])

119

120 skey_attoff++;

121 }

122

123

124 Assert(skey_attoff > 0);

125

126 return skey_attoff;

127}

128

129

130

131

132

133

134

135static bool

137{

138 bool refetch = false;

139

140 switch (res)

141 {

143 break;

145

149 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));

150 else

153 errmsg("concurrent update, retrying")));

154 refetch = true;

155 break;

157

160 errmsg("concurrent delete, retrying")));

161 refetch = true;

162 break;

164 elog(ERROR, "attempted to lock invisible tuple");

165 break;

166 default:

167 elog(ERROR, "unexpected table_tuple_lock status: %u", res);

168 break;

169 }

170

171 return refetch;

172}

173

174

175

176

177

178

179

180bool

185{

187 int skey_attoff;

192 bool found;

194 bool isIdxSafeToSkipDuplicates;

195

196

198

200

202

203

205

206

207 scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);

208

209retry:

210 found = false;

211

212 index_rescan(scan, skey, skey_attoff, NULL, 0);

213

214

216 {

217

218

219

220

221 if (!isIdxSafeToSkipDuplicates)

222 {

223 if (eq == NULL)

225

226 if (tuples\_equal(outslot, searchslot, eq, NULL))

227 continue;

228 }

229

231

234

235

236

237

238

240 {

242 goto retry;

243 }

244

245

246 found = true;

247 break;

248 }

249

250

251 if (found)

252 {

255

257

259 outslot,

261 lockmode,

263 0 ,

264 &tmfd);

265

267

269 goto retry;

270 }

271

273

274

276

277 return found;

278}

279

280

281

282

283

284

285

286static bool

289{

290 int attrnum;

291

294

297

298

300 {

303

305

306

307

308

309

310 if (att->attisdropped || att->attgenerated)

311 continue;

312

313

314

315

316 if (columns &&

318 columns))

319 continue;

320

321

322

323

324

326 return false;

327

328

329

330

332 continue;

333

334 typentry = eq[attrnum];

335 if (typentry == NULL)

336 {

341 (errcode(ERRCODE_UNDEFINED_FUNCTION),

342 errmsg("could not identify an equality operator for type %s",

344 eq[attrnum] = typentry;

345 }

346

348 att->attcollation,

351 return false;

352 }

353

354 return true;

355}

356

357

358

359

360

361

362

363

364

365

366

367bool

370{

376 bool found;

378

380

382

383

387

388retry:

389 found = false;

390

392

393

395 {

396 if (tuples\_equal(scanslot, searchslot, eq, NULL))

397 continue;

398

399 found = true;

401

404

405

406

407

408

410 {

412 goto retry;

413 }

414

415

416 break;

417 }

418

419

420 if (found)

421 {

424

426

428 outslot,

430 lockmode,

432 0 ,

433 &tmfd);

434

436

438 goto retry;

439 }

440

443

444 return found;

445}

446

447

448

449

450static void

452{

454 {

457

459 continue;

460

461

462

463

464

466

468 }

469}

470

471

472

473

474

475

476

477static void

483{

487 bool recently_dead = false;

491

493

496

498

499

500

501

502

503

504

505

507 recently_dead = true;

508

510

511 if (!recently_dead)

512 return;

513

516 return;

517

518

521 {

522 *delete_xid = xmax;

523 *delete_time = localts;

524 *delete_origin = localorigin;

525 }

526}

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

560bool

566{

572

574

577 *delete_time = 0;

578

579

580

581

582

583

584

585

586

587

590

591

592 if (!indexbitmap)

595

597

598

599

600

601

602

603

606

608

609

611 {

612 if (tuples\_equal(scanslot, searchslot, eq, indexbitmap))

613 continue;

614

616 delete_time, delete_origin);

617 }

618

621

622 return *delete_time != 0;

623}

624

625

626

627

628

629bool

636{

639 int skey_attoff;

643 bool isIdxSafeToSkipDuplicates;

645

648

650 *delete_time = 0;

652

654

656

658

659

661

662

663

664

665

666

667

669

670 index_rescan(scan, skey, skey_attoff, NULL, 0);

671

672

674 {

675

676

677

678

679 if (!isIdxSafeToSkipDuplicates)

680 {

681 if (eq == NULL)

683

684 if (tuples\_equal(scanslot, searchslot, eq, NULL))

685 continue;

686 }

687

689 delete_time, delete_origin);

690 }

691

693

695

697

698 return *delete_time != 0;

699}

700

701

702

703

704

705

706

707

708

709

710static bool

714{

719

720 *conflictslot = NULL;

721

722

723

724

725

727

728retry:

730 &conflictTid, &slot->tts_tid,

732 {

733 if (*conflictslot)

735

736 *conflictslot = NULL;

737 return false;

738 }

739

741

743

745 *conflictslot,

749 0 ,

750 &tmfd);

751

753

755 goto retry;

756

757 return true;

758}

759

760

761

762

763

764static void

768{

769 List *conflicttuples = NIL;

771

772

774 {

777 &conflictslot))

778 {

780

781 conflicttuple->slot = conflictslot;

782 conflicttuple->indexoid = uniqueidx;

783

785 &conflicttuple->origin, &conflicttuple->ts);

786

787 conflicttuples = lappend(conflicttuples, conflicttuple);

788 }

789 }

790

791

792 if (conflicttuples)

795 searchslot, remoteslot, conflicttuples);

796}

797

798

799

800

801

802

803

804void

807{

808 bool skip_tuple = false;

810

811

812 Assert(rel->rd_rel->relkind == RELKIND_RELATION);

813

815

816

819 {

821 skip_tuple = true;

822 }

823

824 if (!skip_tuple)

825 {

826 List *recheckIndexes = NIL;

827 List *conflictindexes;

828 bool conflict = false;

829

830

835

836

839 if (rel->rd_rel->relispartition)

841

842

844

846

849 slot, estate, false,

850 conflictindexes ? true : false,

851 &conflict,

852 conflictindexes, false);

853

854

855

856

857

858

859

860

861

862

863

864

865

866

867

868

869 if (conflict)

871 recheckIndexes, NULL, slot);

872

873

875 recheckIndexes, NULL);

876

877

878

879

880

881

882

884 }

885}

886

887

888

889

890

891

892

893void

897{

898 bool skip_tuple = false;

901

902

903

904

905

906 Assert(rel->rd_rel->relkind == RELKIND_RELATION);

908

910

911

914 {

916 tid, NULL, slot, NULL, NULL, false))

917 skip_tuple = true;

918 }

919

920 if (!skip_tuple)

921 {

922 List *recheckIndexes = NIL;

924 List *conflictindexes;

925 bool conflict = false;

926

927

932

933

936 if (rel->rd_rel->relispartition)

938

940 &update_indexes);

941

943

946 slot, estate, true,

947 conflictindexes ? true : false,

948 &conflict, conflictindexes,

950

951

952

953

954

955

956 if (conflict)

958 recheckIndexes, searchslot, slot);

959

960

962 NULL, NULL,

963 tid, NULL, slot,

964 recheckIndexes, NULL, false);

965

967 }

968}

969

970

971

972

973

974

975

976void

980{

981 bool skip_tuple = false;

984

986

987

990 {

992 tid, NULL, NULL, NULL, NULL, false);

993 }

994

995 if (!skip_tuple)

996 {

997

999

1000

1002 tid, NULL, NULL, false);

1003 }

1004}

1005

1006

1007

1008

1009void

1011{

1013

1014

1015

1016

1017

1018 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)

1019 return;

1020

1021

1023 return;

1024

1025

1026

1027

1028

1029

1030

1031

1032

1033

1034

1035

1036

1037

1038

1039

1040

1041

1042

1043

1044

1045

1046

1047

1051 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),

1052 errmsg("cannot update table \"%s\"",

1054 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));

1057 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),

1058 errmsg("cannot update table \"%s\"",

1060 errdetail("Column list used by the publication does not cover the replica identity.")));

1063 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),

1064 errmsg("cannot update table \"%s\"",

1066 errdetail("Replica identity must not contain unpublished generated columns.")));

1069 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),

1070 errmsg("cannot delete from table \"%s\"",

1072 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));

1075 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),

1076 errmsg("cannot delete from table \"%s\"",

1078 errdetail("Column list used by the publication does not cover the replica identity.")));

1081 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),

1082 errmsg("cannot delete from table \"%s\"",

1084 errdetail("Replica identity must not contain unpublished generated columns.")));

1085

1086

1088 return;

1089

1090

1091 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)

1092 return;

1093

1094

1095

1096

1097

1098

1101 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1102 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",

1104 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));

1107 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1108 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",

1110 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));

1111}

1112

1113

1114

1115

1116

1117

1118

1119

1120void

1122 const char *nspname, const char *relname)

1123{

1124 if (localrelkind != RELKIND_RELATION &&

1125 localrelkind != RELKIND_PARTITIONED_TABLE &&

1126 localrelkind != RELKIND_SEQUENCE)

1128 (errcode(ERRCODE_WRONG_OBJECT_TYPE),

1129 errmsg("cannot use relation \"%s.%s\" as logical replication target",

1132

1133

1134

1135

1136

1137

1138 if ((localrelkind == RELKIND_SEQUENCE && remoterelkind != RELKIND_SEQUENCE) ||

1139 (localrelkind != RELKIND_SEQUENCE && remoterelkind == RELKIND_SEQUENCE))

1141 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1142

1143 errmsg("relation \"%s.%s\" type mismatch: source \"%s\", target \"%s\"",

1145 remoterelkind == RELKIND_SEQUENCE ? "sequence" : "table",

1146 localrelkind == RELKIND_SEQUENCE ? "sequence" : "table"));

1147}

StrategyNumber IndexAmTranslateCompareType(CompareType cmptype, Oid amoid, Oid opfamily, bool missing_ok)

#define AttributeNumberIsValid(attributeNumber)

bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)

bool bms_is_member(int x, const Bitmapset *a)

void LockBuffer(Buffer buffer, BufferLockMode mode)

#define PG_USED_FOR_ASSERTS_ONLY

#define OidIsValid(objectId)

bool IsCatalogRelation(Relation relation)

bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)

void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)

bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)

@ CT_MULTIPLE_UNIQUE_CONFLICTS

int errdetail(const char *fmt,...)

int errhint(const char *fmt,...)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes, bool onlySummarizing)

bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, ItemPointer conflictTid, const ItemPointerData *tupleid, List *arbiterIndexes)

bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)

void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)

static void BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)

void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)

static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq, Bitmapset *columns)

bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)

bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)

void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)

bool RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, RepOriginId *delete_origin, TimestampTz *delete_time)

static bool should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)

void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)

void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)

void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)

bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, RepOriginId *delete_origin, TimestampTz *delete_time)

static bool FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate, Oid conflictindex, TupleTableSlot *slot, TupleTableSlot **conflictslot)

static int build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)

static void CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, ConflictType type, List *recheckIndexes, TupleTableSlot *searchslot, TupleTableSlot *remoteslot)

static void update_most_recent_deletion_info(TupleTableSlot *scanslot, TransactionId oldestxmin, TransactionId *delete_xid, TimestampTz *delete_time, RepOriginId *delete_origin)

void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)

HeapTuple ExecFetchSlotHeapTuple(TupleTableSlot *slot, bool materialize, bool *shouldFree)

#define palloc0_array(type, count)

#define palloc0_object(type)

Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)

char * format_type_be(Oid type_oid)

Assert(PointerIsAligned(start, uint64))

@ HEAPTUPLE_RECENTLY_DEAD

HTSV_Result HeapTupleSatisfiesVacuum(HeapTuple htup, TransactionId OldestXmin, Buffer buffer)

static TransactionId HeapTupleHeaderGetUpdateXid(const HeapTupleHeaderData *tup)

void BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)

bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)

IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, IndexScanInstrumentation *instrument, int nkeys, int norderbys)

void index_close(Relation relation, LOCKMODE lockmode)

void index_endscan(IndexScanDesc scan)

Relation index_open(Oid relationId, LOCKMODE lockmode)

void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)

static bool ItemPointerIndicatesMovedPartitions(const ItemPointerData *pointer)

List * lappend(List *list, void *datum)

void list_free(List *list)

bool list_member_oid(const List *list, Oid datum)

void XactLockTableWait(TransactionId xid, Relation rel, const ItemPointerData *ctid, XLTW_Oper oper)

Oid get_opclass_input_type(Oid opclass)

Oid get_opclass_family(Oid opclass)

RegProcedure get_opcode(Oid opno)

Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)

void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)

#define InvalidRepOriginId

FormData_pg_attribute * Form_pg_attribute

int errdetail_relkind_not_supported(char relkind)

static int list_length(const List *l)

#define list_make1_oid(x1)

#define foreach_oid(var, lst)

static char buf[DEFAULT_XLOG_SEG_SIZE]

#define ERRCODE_T_R_SERIALIZATION_FAILURE

static bool DatumGetBool(Datum X)

static Pointer DatumGetPointer(Datum X)

#define RelationGetRelid(relation)

#define RelationGetDescr(relation)

#define RelationGetRelationName(relation)

#define IndexRelationGetNumberOfKeyAttributes(relation)

void RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)

Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)

Oid RelationGetReplicaIndex(Relation relation)

@ INDEX_ATTR_BITMAP_PRIMARY_KEY

@ INDEX_ATTR_BITMAP_IDENTITY_KEY

void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)

Snapshot GetLatestSnapshot(void)

void PushActiveSnapshot(Snapshot snapshot)

void PopActiveSnapshot(void)

Snapshot GetActiveSnapshot(void)

#define InitDirtySnapshot(snapshotdata)

Oid GetRelationIdentityOrPK(Relation rel)

PublicationActions pubactions

bool cols_valid_for_delete

bool gencols_valid_for_update

bool cols_valid_for_update

bool gencols_valid_for_delete

struct HeapTupleData * rd_indextuple

List * ri_onConflictArbiterIndexes

RelationPtr ri_IndexRelationDescs

TriggerDesc * ri_TrigDesc

IndexInfo ** ri_IndexRelationInfo

bool trig_delete_before_row

bool trig_update_before_row

bool trig_insert_before_row

bool has_generated_stored

TupleDesc tts_tupleDescriptor

int16 values[FLEXIBLE_ARRAY_MEMBER]

Oid values[FLEXIBLE_ARRAY_MEMBER]

#define FirstLowInvalidHeapAttributeNumber

Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)

TupleTableSlot * table_slot_create(Relation relation, List **reglist)

void simple_table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, Snapshot snapshot, TU_UpdateIndexes *update_indexes)

void simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)

void simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)

static void table_endscan(TableScanDesc scan)

static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)

static void table_rescan(TableScanDesc scan, ScanKeyData *key)

static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)

static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, ScanKeyData *key)

#define InvalidTransactionId

#define TransactionIdIsValid(xid)

bool ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, TM_Result *tmresult, TM_FailureData *tmfd, bool is_merge_update)

void ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TransitionCaptureState *transition_capture, bool is_crosspart_update)

bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)

void ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ResultRelInfo *src_partinfo, ResultRelInfo *dst_partinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, List *recheckIndexes, TransitionCaptureState *transition_capture, bool is_crosspart_update)

bool ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot **epqslot, TM_Result *tmresult, TM_FailureData *tmfd, bool is_merge_delete)

void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)

bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)

static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)

static void slot_getallattrs(TupleTableSlot *slot)

static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)

static void ExecMaterializeSlot(TupleTableSlot *slot)

TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)

#define TYPECACHE_EQ_OPR_FINFO

CommandId GetCurrentCommandId(bool used)