git.postgresql.org Git - postgresql.git/commitdiff (original) (raw)

{

rInfo = ExecGetTriggerResultRel(estate, evtshared->ats_relid);

rel = rInfo->ri_RelationDesc;

+ /* Catch calls with insufficient relcache refcounting */

+ Assert(!RelationHasReferenceCountZero(rel));

trigdesc = rInfo->ri_TrigDesc;

finfo = rInfo->ri_TrigFunctions;

instr = rInfo->ri_TrigInstrument;

int remote_attnum;

} SlotErrCallbackArg;

+typedef struct ApplyExecutionData

+{

+ EState *estate; /* executor state, used to track resources */

+ LogicalRepRelMapEntry *targetRel; /* replication target rel */

+ ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */

+ /* These fields are used when the target relation is partitioned: */

+ ModifyTableState *mtstate; /* dummy ModifyTable state */

+ PartitionTupleRouting *proute; /* partition routing info */

+} ApplyExecutionData;

static MemoryContext ApplyMessageContext = NULL;

MemoryContext ApplyContext = NULL;

LogicalRepRelation *remoterel,

TupleTableSlot *remoteslot,

TupleTableSlot **localslot);

-static void apply_handle_tuple_routing(ResultRelInfo *relinfo,

- EState *estate,

+static void apply_handle_tuple_routing(ApplyExecutionData *edata,

TupleTableSlot *remoteslot,

LogicalRepTupleData *newtup,

- LogicalRepRelMapEntry *relmapentry,

CmdType operation);

/*

/*

* Executor state preparation for evaluation of constraint expressions,

- * indexes and triggers.

+ * indexes and triggers for the specified relation.

*

- * This is based on similar code in copy.c

+ * Note that the caller must open and close any indexes to be updated.

*/

-static EState *

-create_estate_for_relation(LogicalRepRelMapEntry *rel)

+static ApplyExecutionData *

+create_edata_for_relation(LogicalRepRelMapEntry *rel)

{

+ ApplyExecutionData *edata;

EState *estate;

ResultRelInfo *resultRelInfo;

RangeTblEntry *rte;

/*

* Input functions may need an active snapshot, as may AFTER triggers

- * invoked during finish_estate. For safety, ensure an active snapshot

+ * invoked during finish_edata. For safety, ensure an active snapshot

* exists throughout all our usage of the executor.

*/

PushActiveSnapshot(GetTransactionSnapshot());

- estate = CreateExecutorState();

+ edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));

+ edata->targetRel = rel;

+ edata->estate = estate = CreateExecutorState();

rte = makeNode(RangeTblEntry);

rte->rtekind = RTE_RELATION;

rte->rellockmode = AccessShareLock;

ExecInitRangeTable(estate, list_make1(rte));

- resultRelInfo = makeNode(ResultRelInfo);

+ edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);

+ /*

+ * Use Relation opened by logicalrep_rel_open() instead of opening it

+ * again.

+ */

InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);

estate->es_result_relations = resultRelInfo;

/* Prepare to catch AFTER triggers. */

AfterTriggerBeginQuery();

- return estate;

+ /* other fields of edata remain NULL for now */

+ return edata;

}

/*

* Finish any operations related to the executor state created by

- * create_estate_for_relation().

+ * create_edata_for_relation().

*/

static void

-finish_estate(EState *estate)

+finish_edata(ApplyExecutionData *edata)

{

+ EState *estate = edata->estate;

/* Handle any queued AFTER triggers. */

AfterTriggerEndQuery(estate);

- /* Cleanup. */

+ /* Shut down tuple routing, if any was done. */

+ if (edata->proute)

+ ExecCleanupTupleRouting(edata->mtstate, edata->proute);

+ /*

+ * Cleanup. It might seem that we should call ExecCloseResultRelations()

+ * here, but we intentionally don't. It would close the rel we added to

+ * the estate above, which is wrong because we took no corresponding

+ * refcount. We rely on ExecCleanupTupleRouting() to close any other

+ * relations opened during execution.

+ */

ExecResetTupleTable(estate->es_tupleTable, false);

FreeExecutorState(estate);

+ pfree(edata);

PopActiveSnapshot();

}

LogicalRepRelMapEntry *rel;

LogicalRepTupleData newtup;

LogicalRepRelId relid;

+ ApplyExecutionData *edata;

EState *estate;

TupleTableSlot *remoteslot;

MemoryContext oldctx;

}

/* Initialize the executor state. */

- estate = create_estate_for_relation(rel);

+ edata = create_edata_for_relation(rel);

+ estate = edata->estate;

remoteslot = ExecInitExtraTupleSlot(estate,

RelationGetDescr(rel->localrel),

&TTSOpsVirtual);

/* For a partitioned table, insert the tuple into a partition. */

if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)

- apply_handle_tuple_routing(estate->es_result_relation_info, estate,

- remoteslot, NULL, rel, CMD_INSERT);

+ apply_handle_tuple_routing(edata,

+ remoteslot, NULL, CMD_INSERT);

else

apply_handle_insert_internal(estate->es_result_relation_info, estate,

remoteslot);

- finish_estate(estate);

+ finish_edata(edata);

logicalrep_rel_close(rel, NoLock);

{

LogicalRepRelMapEntry *rel;

LogicalRepRelId relid;

+ ApplyExecutionData *edata;

EState *estate;

LogicalRepTupleData oldtup;

LogicalRepTupleData newtup;

check_relation_updatable(rel);

/* Initialize the executor state. */

- estate = create_estate_for_relation(rel);

+ edata = create_edata_for_relation(rel);

+ estate = edata->estate;

remoteslot = ExecInitExtraTupleSlot(estate,

RelationGetDescr(rel->localrel),

&TTSOpsVirtual);

/* For a partitioned table, apply update to correct partition. */

if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)

- apply_handle_tuple_routing(estate->es_result_relation_info, estate,

- remoteslot, &newtup, rel, CMD_UPDATE);

+ apply_handle_tuple_routing(edata,

+ remoteslot, &newtup, CMD_UPDATE);

else

apply_handle_update_internal(estate->es_result_relation_info, estate,

remoteslot, &newtup, rel);

- finish_estate(estate);

+ finish_edata(edata);

logicalrep_rel_close(rel, NoLock);

LogicalRepRelMapEntry *rel;

LogicalRepTupleData oldtup;

LogicalRepRelId relid;

+ ApplyExecutionData *edata;

EState *estate;

TupleTableSlot *remoteslot;

MemoryContext oldctx;

check_relation_updatable(rel);

/* Initialize the executor state. */

- estate = create_estate_for_relation(rel);

+ edata = create_edata_for_relation(rel);

+ estate = edata->estate;

remoteslot = ExecInitExtraTupleSlot(estate,

RelationGetDescr(rel->localrel),

&TTSOpsVirtual);

/* For a partitioned table, apply delete to correct partition. */

if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)

- apply_handle_tuple_routing(estate->es_result_relation_info, estate,

- remoteslot, NULL, rel, CMD_DELETE);

+ apply_handle_tuple_routing(edata,

+ remoteslot, NULL, CMD_DELETE);

else

apply_handle_delete_internal(estate->es_result_relation_info, estate,

remoteslot, &rel->remoterel);

- finish_estate(estate);

+ finish_edata(edata);

logicalrep_rel_close(rel, NoLock);

* This handles insert, update, delete on a partitioned table.

*/

static void

-apply_handle_tuple_routing(ResultRelInfo *relinfo,

- EState *estate,

+apply_handle_tuple_routing(ApplyExecutionData *edata,

TupleTableSlot *remoteslot,

LogicalRepTupleData *newtup,

- LogicalRepRelMapEntry *relmapentry,

CmdType operation)

{

+ EState *estate = edata->estate;

+ LogicalRepRelMapEntry *relmapentry = edata->targetRel;

+ ResultRelInfo *relinfo = edata->targetRelInfo;

Relation parentrel = relinfo->ri_RelationDesc;

- ModifyTableState *mtstate = NULL;

- PartitionTupleRouting *proute = NULL;

+ ModifyTableState *mtstate;

+ PartitionTupleRouting *proute;

ResultRelInfo *partrelinfo;

Relation partrel;

TupleTableSlot *remoteslot_part;

MemoryContext oldctx;

/* ModifyTableState is needed for ExecFindPartition(). */

- mtstate = makeNode(ModifyTableState);

+ edata->mtstate = mtstate = makeNode(ModifyTableState);

mtstate->ps.plan = NULL;

mtstate->ps.state = estate;

mtstate->operation = operation;

mtstate->resultRelInfo = relinfo;

- proute = ExecSetupPartitionTupleRouting(estate, mtstate, parentrel);

+ /* ... as is PartitionTupleRouting. */

+ edata->proute = proute = ExecSetupPartitionTupleRouting(estate, mtstate,

+ parentrel);

/*

* Find the partition to which the "search tuple" belongs.

elog(ERROR, "unrecognized CmdType: %d", (int) operation);

break;

}

-

- ExecCleanupTupleRouting(mtstate, proute);

}

/*