PostgreSQL Source Code: src/backend/executor/execParallel.c File Reference (original) (raw)

Go to the source code of this file.

Data Structures
struct FixedParallelExecutorState
struct SharedExecutorInstrumentation
struct ExecParallelEstimateContext
struct ExecParallelInitializeDSMContext
Macros
#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
#define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
#define GetInstrumentationArray(sei)
Typedefs
typedef struct FixedParallelExecutorState FixedParallelExecutorState
typedef struct ExecParallelEstimateContext ExecParallelEstimateContext
typedef struct ExecParallelInitializeDSMContext ExecParallelInitializeDSMContext
Functions
static char * ExecSerializePlan (Plan *plan, EState *estate)
static bool ExecParallelEstimate (PlanState *planstate, ExecParallelEstimateContext *e)
static bool ExecParallelInitializeDSM (PlanState *planstate, ExecParallelInitializeDSMContext *d)
static shm_mq_handle ** ExecParallelSetupTupleQueues (ParallelContext *pcxt, bool reinitialize)
static bool ExecParallelReInitializeDSM (PlanState *planstate, ParallelContext *pcxt)
static bool ExecParallelRetrieveInstrumentation (PlanState *planstate, SharedExecutorInstrumentation *instrumentation)
static DestReceiver * ExecParallelGetReceiver (dsm_segment *seg, shm_toc *toc)
static Size EstimateParamExecSpace (EState *estate, Bitmapset *params)
static dsa_pointer SerializeParamExecParams (EState *estate, Bitmapset *params, dsa_area *area)
static void RestoreParamExecParams (char *start_address, EState *estate)
ParallelExecutorInfo * ExecInitParallelPlan (PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
void ExecParallelCreateReaders (ParallelExecutorInfo *pei)
void ExecParallelReinitialize (PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
static void ExecParallelRetrieveJitInstrumentation (PlanState *planstate, SharedJitInstrumentation *shared_jit)
void ExecParallelFinish (ParallelExecutorInfo *pei)
void ExecParallelCleanup (ParallelExecutorInfo *pei)
static QueryDesc * ExecParallelGetQueryDesc (shm_toc *toc, DestReceiver *receiver, int instrument_options)
static bool ExecParallelReportInstrumentation (PlanState *planstate, SharedExecutorInstrumentation *instrumentation)
static bool ExecParallelInitializeWorker (PlanState *planstate, ParallelWorkerContext *pwcxt)
void ParallelQueryMain (dsm_segment *seg, shm_toc *toc)

GetInstrumentationArray

| #define GetInstrumentationArray | ( | | sei | ) | | ------------------------------- | - | | --- | - |

Value:

(Instrumentation *) (((char *) sei) + sei->instrument_offset))

#define AssertVariableIsOfTypeMacro(varname, typename)

Definition at line 108 of file execParallel.c.

PARALLEL_KEY_BUFFER_USAGE

#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)

PARALLEL_KEY_DSA

#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)

PARALLEL_KEY_EXECUTOR_FIXED

#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)

PARALLEL_KEY_INSTRUMENTATION

#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)

PARALLEL_KEY_JIT_INSTRUMENTATION

#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)

PARALLEL_KEY_PARAMLISTINFO

#define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)

PARALLEL_KEY_PLANNEDSTMT

#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)

PARALLEL_KEY_QUERY_TEXT

#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)

PARALLEL_KEY_TUPLE_QUEUE

#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)

PARALLEL_KEY_WAL_USAGE

#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)

PARALLEL_TUPLE_QUEUE_SIZE

#define PARALLEL_TUPLE_QUEUE_SIZE 65536

ExecParallelEstimateContext

ExecParallelInitializeDSMContext

FixedParallelExecutorState

EstimateParamExecSpace()

Definition at line 326 of file execParallel.c.

327{

328 int paramid;

329 Size sz = sizeof(int);

330

331 paramid = -1;

332 while ((paramid = bms_next_member(params, paramid)) >= 0)

333 {

334 Oid typeOid;

336 bool typByVal;

338

341 paramid);

342

343 sz = add_size(sz, sizeof(int));

344

345

348 else

349 {

350

351 typLen = sizeof(Datum);

352 typByVal = true;

353 }

356 typByVal, typLen));

357 }

358 return sz;

359}

int bms_next_member(const Bitmapset *a, int prevbit)

#define OidIsValid(objectId)

Size datumEstimateSpace(Datum value, bool isnull, bool typByVal, int typLen)

void get_typlenbyval(Oid typid, int16 *typlen, bool *typbyval)

static Oid list_nth_oid(const List *list, int n)

Size add_size(Size s1, Size s2)

PlannedStmt * es_plannedstmt

ParamExecData * es_param_exec_vals

References add_size(), bms_next_member(), datumEstimateSpace(), EState::es_param_exec_vals, EState::es_plannedstmt, get_typlenbyval(), ParamExecData::isnull, list_nth_oid(), OidIsValid, PlannedStmt::paramExecTypes, and ParamExecData::value.

Referenced by SerializeParamExecParams().

ExecInitParallelPlan()

Definition at line 611 of file execParallel.c.

614{

620 char *pstmt_data;

621 char *pstmt_space;

622 char *paramlistinfo_space;

627 int pstmt_len;

628 int paramlistinfo_len;

629 int instrumentation_len = 0;

630 int jit_instrumentation_len = 0;

631 int instrument_offset = 0;

633 char *query_string;

634 int query_len;

635

636

637

638

639

640

641

642

643

644

645

646

648

649

653

654

656

657

659 pei->pcxt = pcxt;

660

661

662

663

664

665

666

667

671

672

676

677

678 pstmt_len = strlen(pstmt_data) + 1;

681

682

686

687

688

689

690

691

692

693

697

698

699

700

704

705

709

710

711

712

713

714 e.pcxt = pcxt;

715 e.nnodes = 0;

717

718

720 {

721 instrumentation_len =

723 sizeof(int) * e.nnodes;

724 instrumentation_len = MAXALIGN(instrumentation_len);

725 instrument_offset = instrumentation_len;

726 instrumentation_len +=

731

732

734 {

735 jit_instrumentation_len =

740 }

741 }

742

743

746

747

748

749

750

751

753

754

756

757

758

759

760

761

762

763

764

765

772

773

775 memcpy(query_string, estate->es_sourceText, query_len + 1);

777

778

780 memcpy(pstmt_space, pstmt_data, pstmt_len);

782

783

787

788

793

794

799

800

802

803

805

806

807

808

809

810

812 {

814 int i;

815

822 for (i = 0; i < nworkers * e.nnodes; ++i)

825 instrumentation);

827

829 {

831 jit_instrumentation_len);

832 jit_instrumentation->num_workers = nworkers;

833 memset(jit_instrumentation->jit_instr, 0,

836 jit_instrumentation);

838 }

839 }

840

841

842

843

844

845

846 if (pcxt->seg != NULL)

847 {

848 char *area_space;

849

853 LWTRANCHE_PARALLEL_QUERY_DSA,

854 pcxt->seg);

855

856

857

858

859

860

861

863 {

867 }

868 }

869

870

871

872

873

874

875 d.pcxt = pcxt;

878

879

883

884

885

886

887

888 if (e.nnodes != d.nnodes)

889 elog(ERROR, "inconsistent count of PlanState nodes");

890

891

892 return pei;

893}

void InitializeParallelDSM(ParallelContext *pcxt)

ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)

size_t dsa_minimum_size(void)

#define dsa_create_in_place(place, size, tranche_id, segment)

#define InvalidDsaPointer

#define PARALLEL_KEY_BUFFER_USAGE

#define PARALLEL_KEY_JIT_INSTRUMENTATION

#define PARALLEL_KEY_PARAMLISTINFO

#define PARALLEL_TUPLE_QUEUE_SIZE

static dsa_pointer SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)

#define PARALLEL_KEY_INSTRUMENTATION

static shm_mq_handle ** ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)

#define PARALLEL_KEY_PLANNEDSTMT

static bool ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)

#define GetInstrumentationArray(sei)

#define PARALLEL_KEY_EXECUTOR_FIXED

static char * ExecSerializePlan(Plan *plan, EState *estate)

#define PARALLEL_KEY_QUERY_TEXT

#define PARALLEL_KEY_WAL_USAGE

static bool ExecParallelInitializeDSM(PlanState *planstate, ExecParallelInitializeDSMContext *d)

#define GetPerTupleExprContext(estate)

#define palloc0_object(type)

Assert(PointerIsAligned(start, uint64))

void InstrInit(Instrumentation *instr, int instrument_options)

struct JitInstrumentation JitInstrumentation

void ExecSetParamPlanMulti(const Bitmapset *params, ExprContext *econtext)

Size EstimateParamListSpace(ParamListInfo paramLI)

void SerializeParamList(ParamListInfo paramLI, char **start_address)

void * shm_toc_allocate(shm_toc *toc, Size nbytes)

void shm_toc_insert(shm_toc *toc, uint64 key, void *address)

#define shm_toc_estimate_chunk(e, sz)

#define shm_toc_estimate_keys(e, cnt)

Size mul_size(Size s1, Size s2)

Snapshot GetActiveSnapshot(void)

struct dsa_area * es_query_dsa

ParamListInfo es_param_list_info

const char * es_sourceText

SharedExecutorInstrumentation * instrumentation

shm_toc_estimator estimator

struct SharedJitInstrumentation * jit_instrumentation

BufferUsage * buffer_usage

SharedExecutorInstrumentation * instrumentation

struct TupleQueueReader ** reader

References ParallelExecutorInfo::area, Assert(), bms_is_empty, ParallelExecutorInfo::buffer_usage, CreateParallelContext(), dsa_create_in_place, dsa_minimum_size(), FixedParallelExecutorState::eflags, elog, ERROR, EState::es_instrument, EState::es_jit_flags, EState::es_param_list_info, EState::es_query_dsa, EState::es_snapshot, EState::es_sourceText, EState::es_top_eflags, EstimateParamListSpace(), ParallelContext::estimator, ExecParallelEstimate(), ExecParallelInitializeDSM(), ExecParallelSetupTupleQueues(), ExecSerializePlan(), ExecSetParamPlanMulti(), ParallelExecutorInfo::finished, GetActiveSnapshot(), GetInstrumentationArray, GetPerTupleExprContext, i, InitializeParallelDSM(), InstrInit(), SharedExecutorInstrumentation::instrument_offset, SharedExecutorInstrumentation::instrument_options, ExecParallelInitializeDSMContext::instrumentation, ParallelExecutorInfo::instrumentation, InvalidDsaPointer, FixedParallelExecutorState::jit_flags, SharedJitInstrumentation::jit_instr, ParallelExecutorInfo::jit_instrumentation, MAXALIGN, mul_size(), ExecParallelInitializeDSMContext::nnodes, SharedExecutorInstrumentation::num_plan_nodes, SharedExecutorInstrumentation::num_workers, SharedJitInstrumentation::num_workers, ParallelContext::nworkers, palloc0_object, PARALLEL_KEY_BUFFER_USAGE, PARALLEL_KEY_DSA, PARALLEL_KEY_EXECUTOR_FIXED, PARALLEL_KEY_INSTRUMENTATION, PARALLEL_KEY_JIT_INSTRUMENTATION, PARALLEL_KEY_PARAMLISTINFO, PARALLEL_KEY_PLANNEDSTMT, PARALLEL_KEY_QUERY_TEXT, PARALLEL_KEY_WAL_USAGE, PARALLEL_TUPLE_QUEUE_SIZE, FixedParallelExecutorState::param_exec, ParallelExecutorInfo::param_exec, ExecParallelInitializeDSMContext::pcxt, ParallelExecutorInfo::pcxt, PGJIT_NONE, PlanState::plan, ParallelExecutorInfo::planstate, ParallelExecutorInfo::reader, ParallelContext::seg, SerializeParamExecParams(), SerializeParamList(), shm_toc_allocate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_insert(), ParallelContext::toc, ParallelExecutorInfo::tqueue, FixedParallelExecutorState::tuples_needed, and ParallelExecutorInfo::wal_usage.

Referenced by ExecGather(), and ExecGatherMerge().

ExecParallelCleanup()

Definition at line 1226 of file execParallel.c.

1227{

1228

1232

1233

1237

1238

1240 {

1243 }

1244 if (pei->area != NULL)

1245 {

1247 pei->area = NULL;

1248 }

1249 if (pei->pcxt != NULL)

1250 {

1252 pei->pcxt = NULL;

1253 }

1255}

void DestroyParallelContext(ParallelContext *pcxt)

void dsa_detach(dsa_area *area)

void dsa_free(dsa_area *area, dsa_pointer dp)

#define DsaPointerIsValid(x)

static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation)

static void ExecParallelRetrieveJitInstrumentation(PlanState *planstate, SharedJitInstrumentation *shared_jit)

void pfree(void *pointer)

References ParallelExecutorInfo::area, DestroyParallelContext(), dsa_detach(), dsa_free(), DsaPointerIsValid, ExecParallelRetrieveInstrumentation(), ExecParallelRetrieveJitInstrumentation(), ParallelExecutorInfo::instrumentation, InvalidDsaPointer, ParallelExecutorInfo::jit_instrumentation, ParallelExecutorInfo::param_exec, ParallelExecutorInfo::pcxt, pfree(), and ParallelExecutorInfo::planstate.

Referenced by ExecShutdownGather(), and ExecShutdownGatherMerge().

ExecParallelCreateReaders()

Definition at line 902 of file execParallel.c.

903{

905 int i;

906

908

909 if (nworkers > 0)

910 {

913

914 for (i = 0; i < nworkers; i++)

915 {

919 }

920 }

921}

void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)

ParallelWorkerInfo * worker

BackgroundWorkerHandle * bgwhandle

TupleQueueReader * CreateTupleQueueReader(shm_mq_handle *handle)

References Assert(), ParallelWorkerInfo::bgwhandle, CreateTupleQueueReader(), i, ParallelContext::nworkers_launched, palloc(), ParallelExecutorInfo::pcxt, ParallelExecutorInfo::reader, shm_mq_set_handle(), ParallelExecutorInfo::tqueue, and ParallelContext::worker.

Referenced by ExecGather(), and ExecGatherMerge().

ExecParallelEstimate()

Definition at line 235 of file execParallel.c.

236{

237 if (planstate == NULL)

238 return false;

239

240

241 e->nnodes++;

242

243 switch (nodeTag(planstate))

244 {

245 case T_SeqScanState:

248 e->pcxt);

249 break;

250 case T_IndexScanState:

251

253 e->pcxt);

254 break;

255 case T_IndexOnlyScanState:

256

258 e->pcxt);

259 break;

260 case T_BitmapIndexScanState:

261

263 e->pcxt);

264 break;

265 case T_ForeignScanState:

268 e->pcxt);

269 break;

270 case T_TidRangeScanState:

273 e->pcxt);

274 break;

275 case T_AppendState:

278 e->pcxt);

279 break;

280 case T_CustomScanState:

283 e->pcxt);

284 break;

285 case T_BitmapHeapScanState:

288 e->pcxt);

289 break;

290 case T_HashJoinState:

293 e->pcxt);

294 break;

295 case T_HashState:

296

298 break;

299 case T_SortState:

300

302 break;

303 case T_IncrementalSortState:

304

306 break;

307 case T_AggState:

308

310 break;

311 case T_MemoizeState:

312

314 break;

315 default:

316 break;

317 }

318

320}

void ExecAggEstimate(AggState *node, ParallelContext *pcxt)

void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt)

void ExecBitmapHeapEstimate(BitmapHeapScanState *node, ParallelContext *pcxt)

void ExecBitmapIndexScanEstimate(BitmapIndexScanState *node, ParallelContext *pcxt)

void ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt)

void ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)

#define planstate_tree_walker(ps, w, c)

void ExecHashEstimate(HashState *node, ParallelContext *pcxt)

void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)

void ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)

void ExecIndexOnlyScanEstimate(IndexOnlyScanState *node, ParallelContext *pcxt)

void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt)

void ExecMemoizeEstimate(MemoizeState *node, ParallelContext *pcxt)

void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt)

void ExecSortEstimate(SortState *node, ParallelContext *pcxt)

void ExecTidRangeScanEstimate(TidRangeScanState *node, ParallelContext *pcxt)

References ExecAggEstimate(), ExecAppendEstimate(), ExecBitmapHeapEstimate(), ExecBitmapIndexScanEstimate(), ExecCustomScanEstimate(), ExecForeignScanEstimate(), ExecHashEstimate(), ExecHashJoinEstimate(), ExecIncrementalSortEstimate(), ExecIndexOnlyScanEstimate(), ExecIndexScanEstimate(), ExecMemoizeEstimate(), ExecParallelEstimate(), ExecSeqScanEstimate(), ExecSortEstimate(), ExecTidRangeScanEstimate(), nodeTag, Plan::parallel_aware, PlanState::plan, and planstate_tree_walker.

Referenced by ExecInitParallelPlan(), and ExecParallelEstimate().

ExecParallelFinish()

Definition at line 1173 of file execParallel.c.

1174{

1176 int i;

1177

1178

1180 return;

1181

1182

1183

1184

1185

1186 if (pei->tqueue != NULL)

1187 {

1188 for (i = 0; i < nworkers; i++)

1192 }

1193

1194

1195

1196

1197

1198 if (pei->reader != NULL)

1199 {

1200 for (i = 0; i < nworkers; i++)

1204 }

1205

1206

1208

1209

1210

1211

1212

1213 for (i = 0; i < nworkers; i++)

1215

1217}

void WaitForParallelWorkersToFinish(ParallelContext *pcxt)

void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)

void shm_mq_detach(shm_mq_handle *mqh)

void DestroyTupleQueueReader(TupleQueueReader *reader)

References ParallelExecutorInfo::buffer_usage, DestroyTupleQueueReader(), ParallelExecutorInfo::finished, i, InstrAccumParallelQuery(), ParallelContext::nworkers_launched, ParallelExecutorInfo::pcxt, pfree(), ParallelExecutorInfo::reader, shm_mq_detach(), ParallelExecutorInfo::tqueue, WaitForParallelWorkersToFinish(), and ParallelExecutorInfo::wal_usage.

Referenced by ExecShutdownGatherMergeWorkers(), and ExecShutdownGatherWorkers().

ExecParallelGetQueryDesc()

Definition at line 1278 of file execParallel.c.

1280{

1281 char *pstmtspace;

1282 char *paramspace;

1285 char *queryString;

1286

1287

1289

1290

1293

1294

1297

1298

1300 queryString,

1302 receiver, paramLI, NULL, instrument_options);

1303}

ParamListInfo RestoreParamList(char **start_address)

QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, QueryEnvironment *queryEnv, int instrument_options)

void * stringToNode(const char *str)

void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)

References CreateQueryDesc(), GetActiveSnapshot(), InvalidSnapshot, PARALLEL_KEY_PARAMLISTINFO, PARALLEL_KEY_PLANNEDSTMT, PARALLEL_KEY_QUERY_TEXT, RestoreParamList(), shm_toc_lookup(), and stringToNode().

Referenced by ParallelQueryMain().

ExecParallelGetReceiver()

Definition at line 1262 of file execParallel.c.

1263{

1264 char *mqspace;

1266

1269 mq = (shm_mq *) mqspace;

1272}

#define PARALLEL_KEY_TUPLE_QUEUE

void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)

shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)

DestReceiver * CreateTupleQueueDestReceiver(shm_mq_handle *handle)

References CreateTupleQueueDestReceiver(), MyProc, PARALLEL_KEY_TUPLE_QUEUE, PARALLEL_TUPLE_QUEUE_SIZE, ParallelWorkerNumber, shm_mq_attach(), shm_mq_set_sender(), and shm_toc_lookup().

Referenced by ParallelQueryMain().

ExecParallelInitializeDSM()

Definition at line 454 of file execParallel.c.

456{

457 if (planstate == NULL)

458 return false;

459

460

464

465

467

468

469

470

471

472

473

474

475

476

477 switch (nodeTag(planstate))

478 {

479 case T_SeqScanState:

483 break;

484 case T_IndexScanState:

485

487 break;

488 case T_IndexOnlyScanState:

489

492 break;

493 case T_BitmapIndexScanState:

494

496 break;

497 case T_ForeignScanState:

501 break;

502 case T_TidRangeScanState:

506 break;

507 case T_AppendState:

511 break;

512 case T_CustomScanState:

516 break;

517 case T_BitmapHeapScanState:

521 break;

522 case T_HashJoinState:

526 break;

527 case T_HashState:

528

530 break;

531 case T_SortState:

532

534 break;

535 case T_IncrementalSortState:

536

538 break;

539 case T_AggState:

540

542 break;

543 case T_MemoizeState:

544

546 break;

547 default:

548 break;

549 }

550

552}

void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)

void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt)

void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, ParallelContext *pcxt)

void ExecBitmapIndexScanInitializeDSM(BitmapIndexScanState *node, ParallelContext *pcxt)

void ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)

void ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)

void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)

void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)

void ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)

void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node, ParallelContext *pcxt)

void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt)

void ExecMemoizeInitializeDSM(MemoizeState *node, ParallelContext *pcxt)

void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt)

void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt)

void ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt)

References ExecAggInitializeDSM(), ExecAppendInitializeDSM(), ExecBitmapHeapInitializeDSM(), ExecBitmapIndexScanInitializeDSM(), ExecCustomScanInitializeDSM(), ExecForeignScanInitializeDSM(), ExecHashInitializeDSM(), ExecHashJoinInitializeDSM(), ExecIncrementalSortInitializeDSM(), ExecIndexOnlyScanInitializeDSM(), ExecIndexScanInitializeDSM(), ExecMemoizeInitializeDSM(), ExecParallelInitializeDSM(), ExecSeqScanInitializeDSM(), ExecSortInitializeDSM(), ExecTidRangeScanInitializeDSM(), ExecParallelInitializeDSMContext::instrumentation, ExecParallelInitializeDSMContext::nnodes, nodeTag, Plan::parallel_aware, ExecParallelInitializeDSMContext::pcxt, PlanState::plan, SharedExecutorInstrumentation::plan_node_id, Plan::plan_node_id, and planstate_tree_walker.

Referenced by ExecInitParallelPlan(), and ExecParallelInitializeDSM().

ExecParallelInitializeWorker()

Definition at line 1351 of file execParallel.c.

1352{

1353 if (planstate == NULL)

1354 return false;

1355

1356 switch (nodeTag(planstate))

1357 {

1358 case T_SeqScanState:

1361 break;

1362 case T_IndexScanState:

1363

1365 break;

1366 case T_IndexOnlyScanState:

1367

1369 pwcxt);

1370 break;

1371 case T_BitmapIndexScanState:

1372

1374 pwcxt);

1375 break;

1376 case T_ForeignScanState:

1379 pwcxt);

1380 break;

1381 case T_TidRangeScanState:

1384 pwcxt);

1385 break;

1386 case T_AppendState:

1389 break;

1390 case T_CustomScanState:

1393 pwcxt);

1394 break;

1395 case T_BitmapHeapScanState:

1398 pwcxt);

1399 break;

1400 case T_HashJoinState:

1403 pwcxt);

1404 break;

1405 case T_HashState:

1406

1408 break;

1409 case T_SortState:

1410

1412 break;

1413 case T_IncrementalSortState:

1414

1416 pwcxt);

1417 break;

1418 case T_AggState:

1419

1421 break;

1422 case T_MemoizeState:

1423

1425 break;

1426 default:

1427 break;

1428 }

1429

1431 pwcxt);

1432}

static bool ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)

void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)

void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)

void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, ParallelWorkerContext *pwcxt)

void ExecBitmapIndexScanInitializeWorker(BitmapIndexScanState *node, ParallelWorkerContext *pwcxt)

void ExecCustomScanInitializeWorker(CustomScanState *node, ParallelWorkerContext *pwcxt)

void ExecForeignScanInitializeWorker(ForeignScanState *node, ParallelWorkerContext *pwcxt)

void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)

void ExecHashJoinInitializeWorker(HashJoinState *state, ParallelWorkerContext *pwcxt)

void ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)

void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, ParallelWorkerContext *pwcxt)

void ExecIndexScanInitializeWorker(IndexScanState *node, ParallelWorkerContext *pwcxt)

void ExecMemoizeInitializeWorker(MemoizeState *node, ParallelWorkerContext *pwcxt)

void ExecSeqScanInitializeWorker(SeqScanState *node, ParallelWorkerContext *pwcxt)

void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt)

void ExecTidRangeScanInitializeWorker(TidRangeScanState *node, ParallelWorkerContext *pwcxt)

References ExecAggInitializeWorker(), ExecAppendInitializeWorker(), ExecBitmapHeapInitializeWorker(), ExecBitmapIndexScanInitializeWorker(), ExecCustomScanInitializeWorker(), ExecForeignScanInitializeWorker(), ExecHashInitializeWorker(), ExecHashJoinInitializeWorker(), ExecIncrementalSortInitializeWorker(), ExecIndexOnlyScanInitializeWorker(), ExecIndexScanInitializeWorker(), ExecMemoizeInitializeWorker(), ExecParallelInitializeWorker(), ExecSeqScanInitializeWorker(), ExecSortInitializeWorker(), ExecTidRangeScanInitializeWorker(), nodeTag, Plan::parallel_aware, PlanState::plan, and planstate_tree_walker.

Referenced by ExecParallelInitializeWorker(), and ParallelQueryMain().

ExecParallelReinitialize()

Definition at line 928 of file execParallel.c.

931{

934

935

937

938

939

940

941

942

944

949

951

952

954 {

957 }

958

959

961 {

965 }

966

967

971}

void ReinitializeParallelDSM(ParallelContext *pcxt)

static bool ExecParallelReInitializeDSM(PlanState *planstate, ParallelContext *pcxt)

References ParallelExecutorInfo::area, Assert(), bms_is_empty, dsa_free(), DsaPointerIsValid, EState::es_query_dsa, ExecParallelReInitializeDSM(), ExecParallelSetupTupleQueues(), ExecSetParamPlanMulti(), ParallelExecutorInfo::finished, GetPerTupleExprContext, InvalidDsaPointer, PARALLEL_KEY_EXECUTOR_FIXED, FixedParallelExecutorState::param_exec, ParallelExecutorInfo::param_exec, ParallelExecutorInfo::pcxt, ParallelExecutorInfo::reader, ReinitializeParallelDSM(), SerializeParamExecParams(), shm_toc_lookup(), PlanState::state, ParallelContext::toc, and ParallelExecutorInfo::tqueue.

Referenced by ExecGather(), and ExecGatherMerge().

ExecParallelReInitializeDSM()

Definition at line 977 of file execParallel.c.

979{

980 if (planstate == NULL)

981 return false;

982

983

984

985

986 switch (nodeTag(planstate))

987 {

988 case T_SeqScanState:

991 pcxt);

992 break;

993 case T_IndexScanState:

996 pcxt);

997 break;

998 case T_IndexOnlyScanState:

1001 pcxt);

1002 break;

1003 case T_ForeignScanState:

1006 pcxt);

1007 break;

1008 case T_TidRangeScanState:

1011 pcxt);

1012 break;

1013 case T_AppendState:

1016 break;

1017 case T_CustomScanState:

1020 pcxt);

1021 break;

1022 case T_BitmapHeapScanState:

1025 pcxt);

1026 break;

1027 case T_HashJoinState:

1030 pcxt);

1031 break;

1032 case T_BitmapIndexScanState:

1033 case T_HashState:

1034 case T_SortState:

1035 case T_IncrementalSortState:

1036 case T_MemoizeState:

1037

1038 break;

1039

1040 default:

1041 break;

1042 }

1043

1045}

void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)

void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node, ParallelContext *pcxt)

void ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt)

void ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)

void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)

void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node, ParallelContext *pcxt)

void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt)

void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt)

void ExecTidRangeScanReInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt)

References ExecAppendReInitializeDSM(), ExecBitmapHeapReInitializeDSM(), ExecCustomScanReInitializeDSM(), ExecForeignScanReInitializeDSM(), ExecHashJoinReInitializeDSM(), ExecIndexOnlyScanReInitializeDSM(), ExecIndexScanReInitializeDSM(), ExecParallelReInitializeDSM(), ExecSeqScanReInitializeDSM(), ExecTidRangeScanReInitializeDSM(), nodeTag, Plan::parallel_aware, PlanState::plan, and planstate_tree_walker.

Referenced by ExecParallelReinitialize(), and ExecParallelReInitializeDSM().

ExecParallelReportInstrumentation()

Definition at line 1310 of file execParallel.c.

1312{

1313 int i;

1316

1318

1319

1320

1321

1322

1323

1324

1326 if (instrumentation->plan_node_id[i] == plan_node_id)

1327 break;

1329 elog(ERROR, "plan node %d not found", plan_node_id);

1330

1331

1332

1333

1334

1336 instrument += i * instrumentation->num_workers;

1338 Assert(ParallelWorkerNumber < instrumentation->num_workers);

1340

1342 instrumentation);

1343}

static bool ExecParallelReportInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation)

#define IsParallelWorker()

void InstrEndLoop(Instrumentation *instr)

void InstrAggNode(Instrumentation *dst, Instrumentation *add)

Instrumentation * instrument

References Assert(), elog, ERROR, ExecParallelReportInstrumentation(), GetInstrumentationArray, i, InstrAggNode(), InstrEndLoop(), PlanState::instrument, IsParallelWorker, SharedExecutorInstrumentation::num_plan_nodes, SharedExecutorInstrumentation::num_workers, ParallelWorkerNumber, PlanState::plan, SharedExecutorInstrumentation::plan_node_id, Plan::plan_node_id, and planstate_tree_walker.

Referenced by ExecParallelReportInstrumentation(), and ParallelQueryMain().

ExecParallelRetrieveInstrumentation()

Definition at line 1052 of file execParallel.c.

1054{

1056 int i;

1057 int n;

1058 int ibytes;

1061

1062

1064 if (instrumentation->plan_node_id[i] == plan_node_id)

1065 break;

1067 elog(ERROR, "plan node %d not found", plan_node_id);

1068

1069

1071 instrument += i * instrumentation->num_workers;

1072 for (n = 0; n < instrumentation->num_workers; ++n)

1074

1075

1076

1077

1078

1079

1080

1081

1087

1090

1091

1092 switch (nodeTag(planstate))

1093 {

1094 case T_IndexScanState:

1096 break;

1097 case T_IndexOnlyScanState:

1099 break;

1100 case T_BitmapIndexScanState:

1102 break;

1103 case T_SortState:

1105 break;

1106 case T_IncrementalSortState:

1108 break;

1109 case T_HashState:

1111 break;

1112 case T_AggState:

1114 break;

1115 case T_MemoizeState:

1117 break;

1118 case T_BitmapHeapScanState:

1120 break;

1121 default:

1122 break;

1123 }

1124

1126 instrumentation);

1127}

void ExecAggRetrieveInstrumentation(AggState *node)

void ExecBitmapHeapRetrieveInstrumentation(BitmapHeapScanState *node)

void ExecBitmapIndexScanRetrieveInstrumentation(BitmapIndexScanState *node)

void ExecHashRetrieveInstrumentation(HashState *node)

void ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)

void ExecIndexOnlyScanRetrieveInstrumentation(IndexOnlyScanState *node)

void ExecIndexScanRetrieveInstrumentation(IndexScanState *node)

void ExecMemoizeRetrieveInstrumentation(MemoizeState *node)

void ExecSortRetrieveInstrumentation(SortState *node)

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

MemoryContext es_query_cxt

WorkerInstrumentation * worker_instrument

Instrumentation instrument[FLEXIBLE_ARRAY_MEMBER]

References elog, ERROR, EState::es_query_cxt, ExecAggRetrieveInstrumentation(), ExecBitmapHeapRetrieveInstrumentation(), ExecBitmapIndexScanRetrieveInstrumentation(), ExecHashRetrieveInstrumentation(), ExecIncrementalSortRetrieveInstrumentation(), ExecIndexOnlyScanRetrieveInstrumentation(), ExecIndexScanRetrieveInstrumentation(), ExecMemoizeRetrieveInstrumentation(), ExecParallelRetrieveInstrumentation(), ExecSortRetrieveInstrumentation(), GetInstrumentationArray, i, InstrAggNode(), WorkerInstrumentation::instrument, PlanState::instrument, MemoryContextSwitchTo(), mul_size(), nodeTag, SharedExecutorInstrumentation::num_plan_nodes, SharedExecutorInstrumentation::num_workers, WorkerInstrumentation::num_workers, palloc(), PlanState::plan, SharedExecutorInstrumentation::plan_node_id, Plan::plan_node_id, planstate_tree_walker, PlanState::state, and PlanState::worker_instrument.

Referenced by ExecParallelCleanup(), and ExecParallelRetrieveInstrumentation().

ExecParallelRetrieveJitInstrumentation()

Definition at line 1133 of file execParallel.c.

1135{

1137 int ibytes;

1138

1139 int n;

1140

1141

1142

1143

1144

1149

1150

1151 for (n = 0; n < shared_jit->num_workers; ++n)

1153

1154

1155

1156

1157

1158

1159

1164

1166}

void InstrJitAgg(JitInstrumentation *dst, JitInstrumentation *add)

void * MemoryContextAlloc(MemoryContext context, Size size)

void * MemoryContextAllocZero(MemoryContext context, Size size)

struct JitInstrumentation * es_jit_worker_instr

struct SharedJitInstrumentation * worker_jit_instrument

References EState::es_jit_worker_instr, EState::es_query_cxt, InstrJitAgg(), SharedJitInstrumentation::jit_instr, MemoryContextAlloc(), MemoryContextAllocZero(), mul_size(), SharedJitInstrumentation::num_workers, PlanState::state, and PlanState::worker_jit_instrument.

Referenced by ExecParallelCleanup().

ExecParallelSetupTupleQueues()

Definition at line 559 of file execParallel.c.

560{

562 char *tqueuespace;

563 int i;

564

565

567 return NULL;

568

569

572

573

574

575

576

577 if (!reinitialize)

578 tqueuespace =

582 else

584

585

587 {

589

593

596 }

597

598

599 if (!reinitialize)

601

602

603 return responseq;

604}

shm_mq * shm_mq_create(void *address, Size size)

void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)

References i, mul_size(), MyProc, ParallelContext::nworkers, palloc(), PARALLEL_KEY_TUPLE_QUEUE, PARALLEL_TUPLE_QUEUE_SIZE, ParallelContext::seg, shm_mq_attach(), shm_mq_create(), shm_mq_set_receiver(), shm_toc_allocate(), shm_toc_insert(), shm_toc_lookup(), and ParallelContext::toc.

Referenced by ExecInitParallelPlan(), and ExecParallelReinitialize().

ExecSerializePlan()

static char * ExecSerializePlan ( Plan * plan, EState * estate ) static

Definition at line 147 of file execParallel.c.

148{

151

152

154

155

156

157

158

159

160

161

162

163

164 foreach(lc, plan->targetlist)

165 {

167

168 tle->resjunk = false;

169 }

170

171

172

173

174

175

194

195

196

197

198

199

200

201

204 {

206

208 subplan = NULL;

210 }

211

215 pstmt->invalItems = NIL;

220

221

223}

int64 pgstat_get_my_query_id(void)

int64 pgstat_get_my_plan_id(void)

List * lappend(List *list, void *datum)

char * nodeToString(const void *obj)

#define lfirst_node(type, lc)

List * es_part_prune_infos

Bitmapset * es_unpruned_relids

Bitmapset * rewindPlanIDs

PlannedStmtOrigin planOrigin

Bitmapset * unprunableRelids

References PlannedStmt::appendRelations, PlannedStmt::canSetTag, CMD_SELECT, PlannedStmt::commandType, copyObject, PlannedStmt::dependsOnRole, EState::es_part_prune_infos, EState::es_plannedstmt, EState::es_range_table, EState::es_rteperminfos, EState::es_unpruned_relids, PlannedStmt::hasModifyingCTE, PlannedStmt::hasReturning, PlannedStmt::invalItems, lappend(), lfirst, lfirst_node, makeNode, NIL, nodeToString(), Plan::parallel_safe, PlannedStmt::parallelModeNeeded, PlannedStmt::paramExecTypes, PlannedStmt::partPruneInfos, PlannedStmt::permInfos, pgstat_get_my_plan_id(), pgstat_get_my_query_id(), plan, PLAN_STMT_INTERNAL, PlannedStmt::planId, PlannedStmt::planOrigin, PlannedStmt::planTree, PlannedStmt::queryId, PlannedStmt::relationOids, PlannedStmt::resultRelations, PlannedStmt::rewindPlanIDs, PlannedStmt::rowMarks, PlannedStmt::rtable, PlannedStmt::stmt_len, PlannedStmt::stmt_location, PlannedStmt::subplans, PlannedStmt::transientPlan, PlannedStmt::unprunableRelids, and PlannedStmt::utilityStmt.

Referenced by ExecInitParallelPlan().

ParallelQueryMain()

Definition at line 1451 of file execParallel.c.

1452{

1460 int instrument_options = 0;

1461 void *area_space;

1464

1465

1467

1468

1471 if (instrumentation != NULL)

1474 true);

1476

1477

1479

1480

1482

1483

1486

1487

1490

1491

1494 {

1495 char *paramexec_space;

1496

1499 }

1500 pwcxt.toc = toc;

1501 pwcxt.seg = seg;

1503

1504

1506

1507

1508

1509

1510

1511

1512

1513

1515

1516

1517

1518

1519

1523

1524

1526

1527

1532

1533

1534 if (instrumentation != NULL)

1536 instrumentation);

1537

1538

1539 if (queryDesc->estate->es_jit && jit_instrumentation != NULL)

1540 {

1541 Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);

1544 }

1545

1546

1548

1549

1552 receiver->rDestroy(receiver);

1553}

void pgstat_report_activity(BackendState state, const char *cmd_str)

dsa_area * dsa_attach_in_place(void *place, dsm_segment *segment)

void * dsa_get_address(dsa_area *area, dsa_pointer dp)

void ExecutorEnd(QueryDesc *queryDesc)

void ExecutorFinish(QueryDesc *queryDesc)

void ExecutorStart(QueryDesc *queryDesc, int eflags)

void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)

static QueryDesc * ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, int instrument_options)

static DestReceiver * ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)

static void RestoreParamExecParams(char *start_address, EState *estate)

void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)

void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)

void InstrStartParallelQuery(void)

const char * debug_query_string

void FreeQueryDesc(QueryDesc *qdesc)

struct JitContext * es_jit

PlannedStmt * plannedstmt

void(* rDestroy)(DestReceiver *self)

References Assert(), debug_query_string, dsa_attach_in_place(), dsa_detach(), dsa_get_address(), DsaPointerIsValid, FixedParallelExecutorState::eflags, EState::es_jit, EState::es_query_dsa, QueryDesc::estate, ExecParallelGetQueryDesc(), ExecParallelGetReceiver(), ExecParallelInitializeWorker(), ExecParallelReportInstrumentation(), ExecSetTupleBound(), ExecutorEnd(), ExecutorFinish(), ExecutorRun(), ExecutorStart(), ForwardScanDirection, FreeQueryDesc(), JitContext::instr, InstrEndParallelQuery(), InstrStartParallelQuery(), SharedExecutorInstrumentation::instrument_options, FixedParallelExecutorState::jit_flags, SharedJitInstrumentation::jit_instr, PlannedStmt::jitFlags, PARALLEL_KEY_BUFFER_USAGE, PARALLEL_KEY_DSA, PARALLEL_KEY_EXECUTOR_FIXED, PARALLEL_KEY_INSTRUMENTATION, PARALLEL_KEY_JIT_INSTRUMENTATION, PARALLEL_KEY_WAL_USAGE, ParallelWorkerNumber, FixedParallelExecutorState::param_exec, pgstat_report_activity(), QueryDesc::plannedstmt, QueryDesc::planstate, _DestReceiver::rDestroy, RestoreParamExecParams(), ParallelWorkerContext::seg, shm_toc_lookup(), QueryDesc::sourceText, PlanState::state, STATE_RUNNING, ParallelWorkerContext::toc, and FixedParallelExecutorState::tuples_needed.

RestoreParamExecParams()

static void RestoreParamExecParams ( char * start_address, EState * estate ) static

Definition at line 425 of file execParallel.c.

426{

427 int nparams;

428 int i;

429 int paramid;

430

431 memcpy(&nparams, start_address, sizeof(int));

432 start_address += sizeof(int);

433

434 for (i = 0; i < nparams; i++)

435 {

437

438

439 memcpy(&paramid, start_address, sizeof(int));

440 start_address += sizeof(int);

442

443

446 }

447}

Datum datumRestore(char **start_address, bool *isnull)

References datumRestore(), EState::es_param_exec_vals, ParamExecData::execPlan, i, ParamExecData::isnull, and ParamExecData::value.

Referenced by ParallelQueryMain().

SerializeParamExecParams()

Definition at line 370 of file execParallel.c.

371{

373 int nparams;

374 int paramid;

377 char *start_address;

378

379

383

384

386 memcpy(start_address, &nparams, sizeof(int));

387 start_address += sizeof(int);

388

389

390 paramid = -1;

391 while ((paramid = bms_next_member(params, paramid)) >= 0)

392 {

393 Oid typeOid;

395 bool typByVal;

396

399 paramid);

400

401

402 memcpy(start_address, &paramid, sizeof(int));

403 start_address += sizeof(int);

404

405

408 else

409 {

410

411 typLen = sizeof(Datum);

412 typByVal = true;

413 }

415 &start_address);

416 }

417

418 return handle;

419}

int bms_num_members(const Bitmapset *a)

void datumSerialize(Datum value, bool isnull, bool typByVal, int typLen, char **start_address)

#define dsa_allocate(area, size)

static Size EstimateParamExecSpace(EState *estate, Bitmapset *params)

References bms_next_member(), bms_num_members(), datumSerialize(), dsa_allocate, dsa_get_address(), EState::es_param_exec_vals, EState::es_plannedstmt, EstimateParamExecSpace(), get_typlenbyval(), ParamExecData::isnull, list_nth_oid(), OidIsValid, PlannedStmt::paramExecTypes, and ParamExecData::value.

Referenced by ExecInitParallelPlan(), and ExecParallelReinitialize().