PostgreSQL Source Code: src/backend/executor/execParallel.c File Reference (original) (raw)

Go to the source code of this file.

Data Structures
struct FixedParallelExecutorState
struct SharedExecutorInstrumentation
struct ExecParallelEstimateContext
struct ExecParallelInitializeDSMContext
Macros
#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
#define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
#define GetInstrumentationArray(sei)
Typedefs
typedef struct FixedParallelExecutorState FixedParallelExecutorState
typedef struct ExecParallelEstimateContext ExecParallelEstimateContext
typedef struct ExecParallelInitializeDSMContext ExecParallelInitializeDSMContext
Functions
static char * ExecSerializePlan (Plan *plan, EState *estate)
static bool ExecParallelEstimate (PlanState *planstate, ExecParallelEstimateContext *e)
static bool ExecParallelInitializeDSM (PlanState *planstate, ExecParallelInitializeDSMContext *d)
static shm_mq_handle ** ExecParallelSetupTupleQueues (ParallelContext *pcxt, bool reinitialize)
static bool ExecParallelReInitializeDSM (PlanState *planstate, ParallelContext *pcxt)
static bool ExecParallelRetrieveInstrumentation (PlanState *planstate, SharedExecutorInstrumentation *instrumentation)
static DestReceiver * ExecParallelGetReceiver (dsm_segment *seg, shm_toc *toc)
static Size EstimateParamExecSpace (EState *estate, Bitmapset *params)
static dsa_pointer SerializeParamExecParams (EState *estate, Bitmapset *params, dsa_area *area)
static void RestoreParamExecParams (char *start_address, EState *estate)
ParallelExecutorInfo * ExecInitParallelPlan (PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
void ExecParallelCreateReaders (ParallelExecutorInfo *pei)
void ExecParallelReinitialize (PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
static void ExecParallelRetrieveJitInstrumentation (PlanState *planstate, SharedJitInstrumentation *shared_jit)
void ExecParallelFinish (ParallelExecutorInfo *pei)
void ExecParallelCleanup (ParallelExecutorInfo *pei)
static QueryDesc * ExecParallelGetQueryDesc (shm_toc *toc, DestReceiver *receiver, int instrument_options)
static bool ExecParallelReportInstrumentation (PlanState *planstate, SharedExecutorInstrumentation *instrumentation)
static bool ExecParallelInitializeWorker (PlanState *planstate, ParallelWorkerContext *pwcxt)
void ParallelQueryMain (dsm_segment *seg, shm_toc *toc)

GetInstrumentationArray

| #define GetInstrumentationArray | ( | | sei | ) | | ------------------------------- | - | | --- | - |

Value:

(Instrumentation *) (((char *) sei) + sei->instrument_offset))

#define AssertVariableIsOfTypeMacro(varname, typename)

Definition at line 107 of file execParallel.c.

PARALLEL_KEY_BUFFER_USAGE

#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)

PARALLEL_KEY_DSA

#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)

PARALLEL_KEY_EXECUTOR_FIXED

#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)

PARALLEL_KEY_INSTRUMENTATION

#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)

PARALLEL_KEY_JIT_INSTRUMENTATION

#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)

PARALLEL_KEY_PARAMLISTINFO

#define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)

PARALLEL_KEY_PLANNEDSTMT

#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)

PARALLEL_KEY_QUERY_TEXT

#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)

PARALLEL_KEY_TUPLE_QUEUE

#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)

PARALLEL_KEY_WAL_USAGE

#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)

PARALLEL_TUPLE_QUEUE_SIZE

#define PARALLEL_TUPLE_QUEUE_SIZE 65536

ExecParallelEstimateContext

ExecParallelInitializeDSMContext

FixedParallelExecutorState

EstimateParamExecSpace()

Definition at line 319 of file execParallel.c.

320{

321 int paramid;

322 Size sz = sizeof(int);

323

324 paramid = -1;

325 while ((paramid = bms_next_member(params, paramid)) >= 0)

326 {

327 Oid typeOid;

329 bool typByVal;

331

334 paramid);

335

336 sz = add_size(sz, sizeof(int));

337

338

341 else

342 {

343

344 typLen = sizeof(Datum);

345 typByVal = true;

346 }

349 typByVal, typLen));

350 }

351 return sz;

352}

int bms_next_member(const Bitmapset *a, int prevbit)

#define OidIsValid(objectId)

Size datumEstimateSpace(Datum value, bool isnull, bool typByVal, int typLen)

void get_typlenbyval(Oid typid, int16 *typlen, bool *typbyval)

static Oid list_nth_oid(const List *list, int n)

Size add_size(Size s1, Size s2)

PlannedStmt * es_plannedstmt

ParamExecData * es_param_exec_vals

References add_size(), bms_next_member(), datumEstimateSpace(), EState::es_param_exec_vals, EState::es_plannedstmt, get_typlenbyval(), ParamExecData::isnull, list_nth_oid(), OidIsValid, PlannedStmt::paramExecTypes, and ParamExecData::value.

Referenced by SerializeParamExecParams().

ExecInitParallelPlan()

Definition at line 599 of file execParallel.c.

602{

608 char *pstmt_data;

609 char *pstmt_space;

610 char *paramlistinfo_space;

615 int pstmt_len;

616 int paramlistinfo_len;

617 int instrumentation_len = 0;

618 int jit_instrumentation_len = 0;

619 int instrument_offset = 0;

621 char *query_string;

622 int query_len;

623

624

625

626

627

628

629

630

631

632

633

634

636

637

641

642

644

645

647 pei->pcxt = pcxt;

648

649

650

651

652

653

654

655

659

660

664

665

666 pstmt_len = strlen(pstmt_data) + 1;

669

670

674

675

676

677

678

679

680

681

685

686

687

688

692

693

697

698

699

700

701

702 e.pcxt = pcxt;

703 e.nnodes = 0;

705

706

708 {

709 instrumentation_len =

711 sizeof(int) * e.nnodes;

712 instrumentation_len = MAXALIGN(instrumentation_len);

713 instrument_offset = instrumentation_len;

714 instrumentation_len +=

719

720

722 {

723 jit_instrumentation_len =

728 }

729 }

730

731

734

735

736

737

738

739

741

742

744

745

746

747

748

749

750

751

752

753

760

761

763 memcpy(query_string, estate->es_sourceText, query_len + 1);

765

766

768 memcpy(pstmt_space, pstmt_data, pstmt_len);

770

771

775

776

781

782

787

788

790

791

793

794

795

796

797

798

800 {

802 int i;

803

810 for (i = 0; i < nworkers * e.nnodes; ++i)

813 instrumentation);

815

817 {

819 jit_instrumentation_len);

820 jit_instrumentation->num_workers = nworkers;

821 memset(jit_instrumentation->jit_instr, 0,

824 jit_instrumentation);

826 }

827 }

828

829

830

831

832

833

834 if (pcxt->seg != NULL)

835 {

836 char *area_space;

837

842 pcxt->seg);

843

844

845

846

847

848

849

851 {

855 }

856 }

857

858

859

860

861

862

863 d.pcxt = pcxt;

866

867

871

872

873

874

875

876 if (e.nnodes != d.nnodes)

877 elog(ERROR, "inconsistent count of PlanState nodes");

878

879

880 return pei;

881}

void InitializeParallelDSM(ParallelContext *pcxt)

ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)

size_t dsa_minimum_size(void)

#define dsa_create_in_place(place, size, tranch_id, segment)

#define InvalidDsaPointer

#define PARALLEL_KEY_BUFFER_USAGE

#define PARALLEL_KEY_JIT_INSTRUMENTATION

#define PARALLEL_KEY_PARAMLISTINFO

#define PARALLEL_TUPLE_QUEUE_SIZE

static dsa_pointer SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)

#define PARALLEL_KEY_INSTRUMENTATION

static shm_mq_handle ** ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)

#define PARALLEL_KEY_PLANNEDSTMT

static bool ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)

#define GetInstrumentationArray(sei)

#define PARALLEL_KEY_EXECUTOR_FIXED

static char * ExecSerializePlan(Plan *plan, EState *estate)

#define PARALLEL_KEY_QUERY_TEXT

#define PARALLEL_KEY_WAL_USAGE

static bool ExecParallelInitializeDSM(PlanState *planstate, ExecParallelInitializeDSMContext *d)

#define GetPerTupleExprContext(estate)

Assert(PointerIsAligned(start, uint64))

void InstrInit(Instrumentation *instr, int instrument_options)

struct JitInstrumentation JitInstrumentation

@ LWTRANCHE_PARALLEL_QUERY_DSA

void * palloc0(Size size)

void ExecSetParamPlanMulti(const Bitmapset *params, ExprContext *econtext)

Size EstimateParamListSpace(ParamListInfo paramLI)

void SerializeParamList(ParamListInfo paramLI, char **start_address)

void * shm_toc_allocate(shm_toc *toc, Size nbytes)

void shm_toc_insert(shm_toc *toc, uint64 key, void *address)

#define shm_toc_estimate_chunk(e, sz)

#define shm_toc_estimate_keys(e, cnt)

Size mul_size(Size s1, Size s2)

Snapshot GetActiveSnapshot(void)

struct dsa_area * es_query_dsa

ParamListInfo es_param_list_info

const char * es_sourceText

SharedExecutorInstrumentation * instrumentation

shm_toc_estimator estimator

struct SharedJitInstrumentation * jit_instrumentation

BufferUsage * buffer_usage

SharedExecutorInstrumentation * instrumentation

struct TupleQueueReader ** reader

References ParallelExecutorInfo::area, Assert(), bms_is_empty, ParallelExecutorInfo::buffer_usage, CreateParallelContext(), dsa_create_in_place, dsa_minimum_size(), FixedParallelExecutorState::eflags, elog, ERROR, EState::es_instrument, EState::es_jit_flags, EState::es_param_list_info, EState::es_query_dsa, EState::es_snapshot, EState::es_sourceText, EState::es_top_eflags, EstimateParamListSpace(), ParallelContext::estimator, ExecParallelEstimate(), ExecParallelInitializeDSM(), ExecParallelSetupTupleQueues(), ExecSerializePlan(), ExecSetParamPlanMulti(), ParallelExecutorInfo::finished, GetActiveSnapshot(), GetInstrumentationArray, GetPerTupleExprContext, i, InitializeParallelDSM(), InstrInit(), SharedExecutorInstrumentation::instrument_offset, SharedExecutorInstrumentation::instrument_options, ExecParallelInitializeDSMContext::instrumentation, ParallelExecutorInfo::instrumentation, InvalidDsaPointer, FixedParallelExecutorState::jit_flags, SharedJitInstrumentation::jit_instr, ParallelExecutorInfo::jit_instrumentation, LWTRANCHE_PARALLEL_QUERY_DSA, MAXALIGN, mul_size(), ExecParallelInitializeDSMContext::nnodes, SharedExecutorInstrumentation::num_plan_nodes, SharedExecutorInstrumentation::num_workers, SharedJitInstrumentation::num_workers, ParallelContext::nworkers, palloc0(), PARALLEL_KEY_BUFFER_USAGE, PARALLEL_KEY_DSA, PARALLEL_KEY_EXECUTOR_FIXED, PARALLEL_KEY_INSTRUMENTATION, PARALLEL_KEY_JIT_INSTRUMENTATION, PARALLEL_KEY_PARAMLISTINFO, PARALLEL_KEY_PLANNEDSTMT, PARALLEL_KEY_QUERY_TEXT, PARALLEL_KEY_WAL_USAGE, PARALLEL_TUPLE_QUEUE_SIZE, FixedParallelExecutorState::param_exec, ParallelExecutorInfo::param_exec, ExecParallelInitializeDSMContext::pcxt, ParallelExecutorInfo::pcxt, PGJIT_NONE, PlanState::plan, ParallelExecutorInfo::planstate, ParallelExecutorInfo::reader, ParallelContext::seg, SerializeParamExecParams(), SerializeParamList(), shm_toc_allocate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_insert(), ParallelContext::toc, ParallelExecutorInfo::tqueue, FixedParallelExecutorState::tuples_needed, and ParallelExecutorInfo::wal_usage.

Referenced by ExecGather(), and ExecGatherMerge().

ExecParallelCleanup()

Definition at line 1209 of file execParallel.c.

1210{

1211

1215

1216

1220

1221

1223 {

1226 }

1227 if (pei->area != NULL)

1228 {

1230 pei->area = NULL;

1231 }

1232 if (pei->pcxt != NULL)

1233 {

1235 pei->pcxt = NULL;

1236 }

1238}

void DestroyParallelContext(ParallelContext *pcxt)

void dsa_detach(dsa_area *area)

void dsa_free(dsa_area *area, dsa_pointer dp)

#define DsaPointerIsValid(x)

static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation)

static void ExecParallelRetrieveJitInstrumentation(PlanState *planstate, SharedJitInstrumentation *shared_jit)

void pfree(void *pointer)

References ParallelExecutorInfo::area, DestroyParallelContext(), dsa_detach(), dsa_free(), DsaPointerIsValid, ExecParallelRetrieveInstrumentation(), ExecParallelRetrieveJitInstrumentation(), ParallelExecutorInfo::instrumentation, InvalidDsaPointer, ParallelExecutorInfo::jit_instrumentation, ParallelExecutorInfo::param_exec, ParallelExecutorInfo::pcxt, pfree(), and ParallelExecutorInfo::planstate.

Referenced by ExecShutdownGather(), and ExecShutdownGatherMerge().

ExecParallelCreateReaders()

Definition at line 890 of file execParallel.c.

891{

893 int i;

894

896

897 if (nworkers > 0)

898 {

901

902 for (i = 0; i < nworkers; i++)

903 {

907 }

908 }

909}

void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)

ParallelWorkerInfo * worker

BackgroundWorkerHandle * bgwhandle

TupleQueueReader * CreateTupleQueueReader(shm_mq_handle *handle)

References Assert(), ParallelWorkerInfo::bgwhandle, CreateTupleQueueReader(), i, ParallelContext::nworkers_launched, palloc(), ParallelExecutorInfo::pcxt, ParallelExecutorInfo::reader, shm_mq_set_handle(), ParallelExecutorInfo::tqueue, and ParallelContext::worker.

Referenced by ExecGather(), and ExecGatherMerge().

ExecParallelEstimate()

Definition at line 233 of file execParallel.c.

234{

235 if (planstate == NULL)

236 return false;

237

238

239 e->nnodes++;

240

241 switch (nodeTag(planstate))

242 {

243 case T_SeqScanState:

246 e->pcxt);

247 break;

248 case T_IndexScanState:

249

251 e->pcxt);

252 break;

253 case T_IndexOnlyScanState:

254

256 e->pcxt);

257 break;

258 case T_BitmapIndexScanState:

259

261 e->pcxt);

262 break;

263 case T_ForeignScanState:

266 e->pcxt);

267 break;

268 case T_AppendState:

271 e->pcxt);

272 break;

273 case T_CustomScanState:

276 e->pcxt);

277 break;

278 case T_BitmapHeapScanState:

281 e->pcxt);

282 break;

283 case T_HashJoinState:

286 e->pcxt);

287 break;

288 case T_HashState:

289

291 break;

292 case T_SortState:

293

295 break;

296 case T_IncrementalSortState:

297

299 break;

300 case T_AggState:

301

303 break;

304 case T_MemoizeState:

305

307 break;

308 default:

309 break;

310 }

311

313}

void ExecAggEstimate(AggState *node, ParallelContext *pcxt)

void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt)

void ExecBitmapHeapEstimate(BitmapHeapScanState *node, ParallelContext *pcxt)

void ExecBitmapIndexScanEstimate(BitmapIndexScanState *node, ParallelContext *pcxt)

void ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt)

void ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)

#define planstate_tree_walker(ps, w, c)

void ExecHashEstimate(HashState *node, ParallelContext *pcxt)

void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)

void ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)

void ExecIndexOnlyScanEstimate(IndexOnlyScanState *node, ParallelContext *pcxt)

void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt)

void ExecMemoizeEstimate(MemoizeState *node, ParallelContext *pcxt)

void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt)

void ExecSortEstimate(SortState *node, ParallelContext *pcxt)

References ExecAggEstimate(), ExecAppendEstimate(), ExecBitmapHeapEstimate(), ExecBitmapIndexScanEstimate(), ExecCustomScanEstimate(), ExecForeignScanEstimate(), ExecHashEstimate(), ExecHashJoinEstimate(), ExecIncrementalSortEstimate(), ExecIndexOnlyScanEstimate(), ExecIndexScanEstimate(), ExecMemoizeEstimate(), ExecParallelEstimate(), ExecSeqScanEstimate(), ExecSortEstimate(), nodeTag, Plan::parallel_aware, PlanState::plan, and planstate_tree_walker.

Referenced by ExecInitParallelPlan(), and ExecParallelEstimate().

ExecParallelFinish()

Definition at line 1156 of file execParallel.c.

1157{

1159 int i;

1160

1161

1163 return;

1164

1165

1166

1167

1168

1169 if (pei->tqueue != NULL)

1170 {

1171 for (i = 0; i < nworkers; i++)

1175 }

1176

1177

1178

1179

1180

1181 if (pei->reader != NULL)

1182 {

1183 for (i = 0; i < nworkers; i++)

1187 }

1188

1189

1191

1192

1193

1194

1195

1196 for (i = 0; i < nworkers; i++)

1198

1200}

void WaitForParallelWorkersToFinish(ParallelContext *pcxt)

void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)

void shm_mq_detach(shm_mq_handle *mqh)

void DestroyTupleQueueReader(TupleQueueReader *reader)

References ParallelExecutorInfo::buffer_usage, DestroyTupleQueueReader(), ParallelExecutorInfo::finished, i, InstrAccumParallelQuery(), ParallelContext::nworkers_launched, ParallelExecutorInfo::pcxt, pfree(), ParallelExecutorInfo::reader, shm_mq_detach(), ParallelExecutorInfo::tqueue, WaitForParallelWorkersToFinish(), and ParallelExecutorInfo::wal_usage.

Referenced by ExecShutdownGatherMergeWorkers(), and ExecShutdownGatherWorkers().

ExecParallelGetQueryDesc()

Definition at line 1261 of file execParallel.c.

1263{

1264 char *pstmtspace;

1265 char *paramspace;

1268 char *queryString;

1269

1270

1272

1273

1276

1277

1280

1281

1282

1283

1284

1285

1286

1287

1289 NULL,

1290 queryString,

1292 receiver, paramLI, NULL, instrument_options);

1293}

ParamListInfo RestoreParamList(char **start_address)

QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, CachedPlan *cplan, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, QueryEnvironment *queryEnv, int instrument_options)

void * stringToNode(const char *str)

void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)

References CreateQueryDesc(), GetActiveSnapshot(), InvalidSnapshot, PARALLEL_KEY_PARAMLISTINFO, PARALLEL_KEY_PLANNEDSTMT, PARALLEL_KEY_QUERY_TEXT, RestoreParamList(), shm_toc_lookup(), and stringToNode().

Referenced by ParallelQueryMain().

ExecParallelGetReceiver()

Definition at line 1245 of file execParallel.c.

1246{

1247 char *mqspace;

1249

1252 mq = (shm_mq *) mqspace;

1255}

#define PARALLEL_KEY_TUPLE_QUEUE

void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)

shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)

DestReceiver * CreateTupleQueueDestReceiver(shm_mq_handle *handle)

References CreateTupleQueueDestReceiver(), MyProc, PARALLEL_KEY_TUPLE_QUEUE, PARALLEL_TUPLE_QUEUE_SIZE, ParallelWorkerNumber, shm_mq_attach(), shm_mq_set_sender(), and shm_toc_lookup().

Referenced by ParallelQueryMain().

ExecParallelInitializeDSM()

Definition at line 447 of file execParallel.c.

449{

450 if (planstate == NULL)

451 return false;

452

453

457

458

460

461

462

463

464

465

466

467

468

469

470 switch (nodeTag(planstate))

471 {

472 case T_SeqScanState:

476 break;

477 case T_IndexScanState:

478

480 break;

481 case T_IndexOnlyScanState:

482

485 break;

486 case T_BitmapIndexScanState:

487

489 break;

490 case T_ForeignScanState:

494 break;

495 case T_AppendState:

499 break;

500 case T_CustomScanState:

504 break;

505 case T_BitmapHeapScanState:

509 break;

510 case T_HashJoinState:

514 break;

515 case T_HashState:

516

518 break;

519 case T_SortState:

520

522 break;

523 case T_IncrementalSortState:

524

526 break;

527 case T_AggState:

528

530 break;

531 case T_MemoizeState:

532

534 break;

535 default:

536 break;

537 }

538

540}

void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)

void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt)

void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, ParallelContext *pcxt)

void ExecBitmapIndexScanInitializeDSM(BitmapIndexScanState *node, ParallelContext *pcxt)

void ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)

void ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)

void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)

void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)

void ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)

void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node, ParallelContext *pcxt)

void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt)

void ExecMemoizeInitializeDSM(MemoizeState *node, ParallelContext *pcxt)

void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt)

void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt)

References ExecAggInitializeDSM(), ExecAppendInitializeDSM(), ExecBitmapHeapInitializeDSM(), ExecBitmapIndexScanInitializeDSM(), ExecCustomScanInitializeDSM(), ExecForeignScanInitializeDSM(), ExecHashInitializeDSM(), ExecHashJoinInitializeDSM(), ExecIncrementalSortInitializeDSM(), ExecIndexOnlyScanInitializeDSM(), ExecIndexScanInitializeDSM(), ExecMemoizeInitializeDSM(), ExecParallelInitializeDSM(), ExecSeqScanInitializeDSM(), ExecSortInitializeDSM(), ExecParallelInitializeDSMContext::instrumentation, ExecParallelInitializeDSMContext::nnodes, nodeTag, Plan::parallel_aware, ExecParallelInitializeDSMContext::pcxt, PlanState::plan, SharedExecutorInstrumentation::plan_node_id, Plan::plan_node_id, and planstate_tree_walker.

Referenced by ExecInitParallelPlan(), and ExecParallelInitializeDSM().

ExecParallelInitializeWorker()

Definition at line 1341 of file execParallel.c.

1342{

1343 if (planstate == NULL)

1344 return false;

1345

1346 switch (nodeTag(planstate))

1347 {

1348 case T_SeqScanState:

1351 break;

1352 case T_IndexScanState:

1353

1355 break;

1356 case T_IndexOnlyScanState:

1357

1359 pwcxt);

1360 break;

1361 case T_BitmapIndexScanState:

1362

1364 pwcxt);

1365 break;

1366 case T_ForeignScanState:

1369 pwcxt);

1370 break;

1371 case T_AppendState:

1374 break;

1375 case T_CustomScanState:

1378 pwcxt);

1379 break;

1380 case T_BitmapHeapScanState:

1383 pwcxt);

1384 break;

1385 case T_HashJoinState:

1388 pwcxt);

1389 break;

1390 case T_HashState:

1391

1393 break;

1394 case T_SortState:

1395

1397 break;

1398 case T_IncrementalSortState:

1399

1401 pwcxt);

1402 break;

1403 case T_AggState:

1404

1406 break;

1407 case T_MemoizeState:

1408

1410 break;

1411 default:

1412 break;

1413 }

1414

1416 pwcxt);

1417}

static bool ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)

void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)

void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)

void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, ParallelWorkerContext *pwcxt)

void ExecBitmapIndexScanInitializeWorker(BitmapIndexScanState *node, ParallelWorkerContext *pwcxt)

void ExecCustomScanInitializeWorker(CustomScanState *node, ParallelWorkerContext *pwcxt)

void ExecForeignScanInitializeWorker(ForeignScanState *node, ParallelWorkerContext *pwcxt)

void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)

void ExecHashJoinInitializeWorker(HashJoinState *state, ParallelWorkerContext *pwcxt)

void ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)

void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, ParallelWorkerContext *pwcxt)

void ExecIndexScanInitializeWorker(IndexScanState *node, ParallelWorkerContext *pwcxt)

void ExecMemoizeInitializeWorker(MemoizeState *node, ParallelWorkerContext *pwcxt)

void ExecSeqScanInitializeWorker(SeqScanState *node, ParallelWorkerContext *pwcxt)

void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt)

References ExecAggInitializeWorker(), ExecAppendInitializeWorker(), ExecBitmapHeapInitializeWorker(), ExecBitmapIndexScanInitializeWorker(), ExecCustomScanInitializeWorker(), ExecForeignScanInitializeWorker(), ExecHashInitializeWorker(), ExecHashJoinInitializeWorker(), ExecIncrementalSortInitializeWorker(), ExecIndexOnlyScanInitializeWorker(), ExecIndexScanInitializeWorker(), ExecMemoizeInitializeWorker(), ExecParallelInitializeWorker(), ExecSeqScanInitializeWorker(), ExecSortInitializeWorker(), nodeTag, Plan::parallel_aware, PlanState::plan, and planstate_tree_walker.

Referenced by ExecParallelInitializeWorker(), and ParallelQueryMain().

ExecParallelReinitialize()

Definition at line 916 of file execParallel.c.

919{

922

923

925

926

927

928

929

930

932

937

939

940

942 {

945 }

946

947

949 {

953 }

954

955

959}

void ReinitializeParallelDSM(ParallelContext *pcxt)

static bool ExecParallelReInitializeDSM(PlanState *planstate, ParallelContext *pcxt)

References ParallelExecutorInfo::area, Assert(), bms_is_empty, dsa_free(), DsaPointerIsValid, EState::es_query_dsa, ExecParallelReInitializeDSM(), ExecParallelSetupTupleQueues(), ExecSetParamPlanMulti(), ParallelExecutorInfo::finished, GetPerTupleExprContext, InvalidDsaPointer, PARALLEL_KEY_EXECUTOR_FIXED, FixedParallelExecutorState::param_exec, ParallelExecutorInfo::param_exec, ParallelExecutorInfo::pcxt, ParallelExecutorInfo::reader, ReinitializeParallelDSM(), SerializeParamExecParams(), shm_toc_lookup(), PlanState::state, ParallelContext::toc, and ParallelExecutorInfo::tqueue.

Referenced by ExecGather(), and ExecGatherMerge().

ExecParallelReInitializeDSM()

Definition at line 965 of file execParallel.c.

967{

968 if (planstate == NULL)

969 return false;

970

971

972

973

974 switch (nodeTag(planstate))

975 {

976 case T_SeqScanState:

979 pcxt);

980 break;

981 case T_IndexScanState:

984 pcxt);

985 break;

986 case T_IndexOnlyScanState:

989 pcxt);

990 break;

991 case T_ForeignScanState:

994 pcxt);

995 break;

996 case T_AppendState:

999 break;

1000 case T_CustomScanState:

1003 pcxt);

1004 break;

1005 case T_BitmapHeapScanState:

1008 pcxt);

1009 break;

1010 case T_HashJoinState:

1013 pcxt);

1014 break;

1015 case T_BitmapIndexScanState:

1016 case T_HashState:

1017 case T_SortState:

1018 case T_IncrementalSortState:

1019 case T_MemoizeState:

1020

1021 break;

1022

1023 default:

1024 break;

1025 }

1026

1028}

void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)

void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node, ParallelContext *pcxt)

void ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt)

void ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)

void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)

void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node, ParallelContext *pcxt)

void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt)

void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt)

References ExecAppendReInitializeDSM(), ExecBitmapHeapReInitializeDSM(), ExecCustomScanReInitializeDSM(), ExecForeignScanReInitializeDSM(), ExecHashJoinReInitializeDSM(), ExecIndexOnlyScanReInitializeDSM(), ExecIndexScanReInitializeDSM(), ExecParallelReInitializeDSM(), ExecSeqScanReInitializeDSM(), nodeTag, Plan::parallel_aware, PlanState::plan, and planstate_tree_walker.

Referenced by ExecParallelReinitialize(), and ExecParallelReInitializeDSM().

ExecParallelReportInstrumentation()

Definition at line 1300 of file execParallel.c.

1302{

1303 int i;

1306

1308

1309

1310

1311

1312

1313

1314

1316 if (instrumentation->plan_node_id[i] == plan_node_id)

1317 break;

1319 elog(ERROR, "plan node %d not found", plan_node_id);

1320

1321

1322

1323

1324

1326 instrument += i * instrumentation->num_workers;

1328 Assert(ParallelWorkerNumber < instrumentation->num_workers);

1330

1332 instrumentation);

1333}

static bool ExecParallelReportInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation)

#define IsParallelWorker()

void InstrEndLoop(Instrumentation *instr)

void InstrAggNode(Instrumentation *dst, Instrumentation *add)

Instrumentation * instrument

References Assert(), elog, ERROR, ExecParallelReportInstrumentation(), GetInstrumentationArray, i, InstrAggNode(), InstrEndLoop(), PlanState::instrument, IsParallelWorker, SharedExecutorInstrumentation::num_plan_nodes, SharedExecutorInstrumentation::num_workers, ParallelWorkerNumber, PlanState::plan, SharedExecutorInstrumentation::plan_node_id, Plan::plan_node_id, and planstate_tree_walker.

Referenced by ExecParallelReportInstrumentation(), and ParallelQueryMain().

ExecParallelRetrieveInstrumentation()

Definition at line 1035 of file execParallel.c.

1037{

1039 int i;

1040 int n;

1041 int ibytes;

1044

1045

1047 if (instrumentation->plan_node_id[i] == plan_node_id)

1048 break;

1050 elog(ERROR, "plan node %d not found", plan_node_id);

1051

1052

1054 instrument += i * instrumentation->num_workers;

1055 for (n = 0; n < instrumentation->num_workers; ++n)

1057

1058

1059

1060

1061

1062

1063

1064

1070

1073

1074

1075 switch (nodeTag(planstate))

1076 {

1077 case T_IndexScanState:

1079 break;

1080 case T_IndexOnlyScanState:

1082 break;

1083 case T_BitmapIndexScanState:

1085 break;

1086 case T_SortState:

1088 break;

1089 case T_IncrementalSortState:

1091 break;

1092 case T_HashState:

1094 break;

1095 case T_AggState:

1097 break;

1098 case T_MemoizeState:

1100 break;

1101 case T_BitmapHeapScanState:

1103 break;

1104 default:

1105 break;

1106 }

1107

1109 instrumentation);

1110}

void ExecAggRetrieveInstrumentation(AggState *node)

void ExecBitmapHeapRetrieveInstrumentation(BitmapHeapScanState *node)

void ExecBitmapIndexScanRetrieveInstrumentation(BitmapIndexScanState *node)

void ExecHashRetrieveInstrumentation(HashState *node)

void ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)

void ExecIndexOnlyScanRetrieveInstrumentation(IndexOnlyScanState *node)

void ExecIndexScanRetrieveInstrumentation(IndexScanState *node)

void ExecMemoizeRetrieveInstrumentation(MemoizeState *node)

void ExecSortRetrieveInstrumentation(SortState *node)

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

MemoryContext es_query_cxt

WorkerInstrumentation * worker_instrument

Instrumentation instrument[FLEXIBLE_ARRAY_MEMBER]

References elog, ERROR, EState::es_query_cxt, ExecAggRetrieveInstrumentation(), ExecBitmapHeapRetrieveInstrumentation(), ExecBitmapIndexScanRetrieveInstrumentation(), ExecHashRetrieveInstrumentation(), ExecIncrementalSortRetrieveInstrumentation(), ExecIndexOnlyScanRetrieveInstrumentation(), ExecIndexScanRetrieveInstrumentation(), ExecMemoizeRetrieveInstrumentation(), ExecParallelRetrieveInstrumentation(), ExecSortRetrieveInstrumentation(), GetInstrumentationArray, i, InstrAggNode(), WorkerInstrumentation::instrument, PlanState::instrument, MemoryContextSwitchTo(), mul_size(), nodeTag, SharedExecutorInstrumentation::num_plan_nodes, SharedExecutorInstrumentation::num_workers, WorkerInstrumentation::num_workers, palloc(), PlanState::plan, SharedExecutorInstrumentation::plan_node_id, Plan::plan_node_id, planstate_tree_walker, PlanState::state, and PlanState::worker_instrument.

Referenced by ExecParallelCleanup(), and ExecParallelRetrieveInstrumentation().

ExecParallelRetrieveJitInstrumentation()

Definition at line 1116 of file execParallel.c.

1118{

1120 int ibytes;

1121

1122 int n;

1123

1124

1125

1126

1127

1132

1133

1134 for (n = 0; n < shared_jit->num_workers; ++n)

1136

1137

1138

1139

1140

1141

1142

1147

1149}

void InstrJitAgg(JitInstrumentation *dst, JitInstrumentation *add)

void * MemoryContextAlloc(MemoryContext context, Size size)

void * MemoryContextAllocZero(MemoryContext context, Size size)

struct JitInstrumentation * es_jit_worker_instr

struct SharedJitInstrumentation * worker_jit_instrument

References EState::es_jit_worker_instr, EState::es_query_cxt, InstrJitAgg(), SharedJitInstrumentation::jit_instr, MemoryContextAlloc(), MemoryContextAllocZero(), mul_size(), SharedJitInstrumentation::num_workers, PlanState::state, and PlanState::worker_jit_instrument.

Referenced by ExecParallelCleanup().

ExecParallelSetupTupleQueues()

Definition at line 547 of file execParallel.c.

548{

550 char *tqueuespace;

551 int i;

552

553

555 return NULL;

556

557

560

561

562

563

564

565 if (!reinitialize)

566 tqueuespace =

570 else

572

573

575 {

577

581

584 }

585

586

587 if (!reinitialize)

589

590

591 return responseq;

592}

shm_mq * shm_mq_create(void *address, Size size)

void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)

References i, mul_size(), MyProc, ParallelContext::nworkers, palloc(), PARALLEL_KEY_TUPLE_QUEUE, PARALLEL_TUPLE_QUEUE_SIZE, ParallelContext::seg, shm_mq_attach(), shm_mq_create(), shm_mq_set_receiver(), shm_toc_allocate(), shm_toc_insert(), shm_toc_lookup(), and ParallelContext::toc.

Referenced by ExecInitParallelPlan(), and ExecParallelReinitialize().

ExecSerializePlan()

static char * ExecSerializePlan ( Plan * plan, EState * estate ) static

Definition at line 146 of file execParallel.c.

147{

150

151

153

154

155

156

157

158

159

160

161

162

163 foreach(lc, plan->targetlist)

164 {

166

167 tle->resjunk = false;

168 }

169

170

171

172

173

174

192

193

194

195

196

197

198

199

202 {

204

206 subplan = NULL;

208 }

209

213 pstmt->invalItems = NIL;

218

219

221}

uint64 pgstat_get_my_query_id(void)

uint64 pgstat_get_my_plan_id(void)

List * lappend(List *list, void *datum)

char * nodeToString(const void *obj)

#define lfirst_node(type, lc)

List * es_part_prune_infos

Bitmapset * es_unpruned_relids

Bitmapset * rewindPlanIDs

Bitmapset * unprunableRelids

References PlannedStmt::appendRelations, PlannedStmt::canSetTag, CMD_SELECT, PlannedStmt::commandType, copyObject, PlannedStmt::dependsOnRole, EState::es_part_prune_infos, EState::es_plannedstmt, EState::es_range_table, EState::es_rteperminfos, EState::es_unpruned_relids, PlannedStmt::hasModifyingCTE, PlannedStmt::hasReturning, PlannedStmt::invalItems, lappend(), lfirst, lfirst_node, makeNode, NIL, nodeToString(), Plan::parallel_safe, PlannedStmt::parallelModeNeeded, PlannedStmt::paramExecTypes, PlannedStmt::partPruneInfos, PlannedStmt::permInfos, pgstat_get_my_plan_id(), pgstat_get_my_query_id(), plan, PlannedStmt::planId, PlannedStmt::planTree, PlannedStmt::queryId, PlannedStmt::relationOids, PlannedStmt::resultRelations, PlannedStmt::rewindPlanIDs, PlannedStmt::rowMarks, PlannedStmt::rtable, PlannedStmt::stmt_len, PlannedStmt::stmt_location, PlannedStmt::subplans, PlannedStmt::transientPlan, PlannedStmt::unprunableRelids, and PlannedStmt::utilityStmt.

Referenced by ExecInitParallelPlan().

ParallelQueryMain()

Definition at line 1436 of file execParallel.c.

1437{

1445 int instrument_options = 0;

1446 void *area_space;

1449

1450

1452

1453

1456 if (instrumentation != NULL)

1459 true);

1461

1462

1464

1465

1467

1468

1471

1472

1475 elog(ERROR, "ExecutorStart() failed unexpectedly");

1476

1477

1480 {

1481 char *paramexec_space;

1482

1485 }

1486 pwcxt.toc = toc;

1487 pwcxt.seg = seg;

1489

1490

1492

1493

1494

1495

1496

1497

1498

1499

1501

1502

1503

1504

1505

1509

1510

1512

1513

1518

1519

1520 if (instrumentation != NULL)

1522 instrumentation);

1523

1524

1525 if (queryDesc->estate->es_jit && jit_instrumentation != NULL)

1526 {

1527 Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);

1530 }

1531

1532

1534

1535

1538 receiver->rDestroy(receiver);

1539}

void pgstat_report_activity(BackendState state, const char *cmd_str)

dsa_area * dsa_attach_in_place(void *place, dsm_segment *segment)

void * dsa_get_address(dsa_area *area, dsa_pointer dp)

bool ExecutorStart(QueryDesc *queryDesc, int eflags)

void ExecutorEnd(QueryDesc *queryDesc)

void ExecutorFinish(QueryDesc *queryDesc)

void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)

static QueryDesc * ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, int instrument_options)

static DestReceiver * ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)

static void RestoreParamExecParams(char *start_address, EState *estate)

void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)

void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)

void InstrStartParallelQuery(void)

const char * debug_query_string

void FreeQueryDesc(QueryDesc *qdesc)

struct JitContext * es_jit

PlannedStmt * plannedstmt

void(* rDestroy)(DestReceiver *self)

References Assert(), debug_query_string, dsa_attach_in_place(), dsa_detach(), dsa_get_address(), DsaPointerIsValid, FixedParallelExecutorState::eflags, elog, ERROR, EState::es_jit, EState::es_query_dsa, QueryDesc::estate, ExecParallelGetQueryDesc(), ExecParallelGetReceiver(), ExecParallelInitializeWorker(), ExecParallelReportInstrumentation(), ExecSetTupleBound(), ExecutorEnd(), ExecutorFinish(), ExecutorRun(), ExecutorStart(), ForwardScanDirection, FreeQueryDesc(), JitContext::instr, InstrEndParallelQuery(), InstrStartParallelQuery(), SharedExecutorInstrumentation::instrument_options, FixedParallelExecutorState::jit_flags, SharedJitInstrumentation::jit_instr, PlannedStmt::jitFlags, PARALLEL_KEY_BUFFER_USAGE, PARALLEL_KEY_DSA, PARALLEL_KEY_EXECUTOR_FIXED, PARALLEL_KEY_INSTRUMENTATION, PARALLEL_KEY_JIT_INSTRUMENTATION, PARALLEL_KEY_WAL_USAGE, ParallelWorkerNumber, FixedParallelExecutorState::param_exec, pgstat_report_activity(), QueryDesc::plannedstmt, QueryDesc::planstate, _DestReceiver::rDestroy, RestoreParamExecParams(), ParallelWorkerContext::seg, shm_toc_lookup(), QueryDesc::sourceText, PlanState::state, STATE_RUNNING, ParallelWorkerContext::toc, and FixedParallelExecutorState::tuples_needed.

RestoreParamExecParams()

static void RestoreParamExecParams ( char * start_address, EState * estate ) static

Definition at line 418 of file execParallel.c.

419{

420 int nparams;

421 int i;

422 int paramid;

423

424 memcpy(&nparams, start_address, sizeof(int));

425 start_address += sizeof(int);

426

427 for (i = 0; i < nparams; i++)

428 {

430

431

432 memcpy(&paramid, start_address, sizeof(int));

433 start_address += sizeof(int);

435

436

439 }

440}

Datum datumRestore(char **start_address, bool *isnull)

References datumRestore(), EState::es_param_exec_vals, ParamExecData::execPlan, i, ParamExecData::isnull, and ParamExecData::value.

Referenced by ParallelQueryMain().

SerializeParamExecParams()

Definition at line 363 of file execParallel.c.

364{

366 int nparams;

367 int paramid;

370 char *start_address;

371

372

376

377

379 memcpy(start_address, &nparams, sizeof(int));

380 start_address += sizeof(int);

381

382

383 paramid = -1;

384 while ((paramid = bms_next_member(params, paramid)) >= 0)

385 {

386 Oid typeOid;

388 bool typByVal;

389

392 paramid);

393

394

395 memcpy(start_address, &paramid, sizeof(int));

396 start_address += sizeof(int);

397

398

401 else

402 {

403

404 typLen = sizeof(Datum);

405 typByVal = true;

406 }

408 &start_address);

409 }

410

411 return handle;

412}

int bms_num_members(const Bitmapset *a)

void datumSerialize(Datum value, bool isnull, bool typByVal, int typLen, char **start_address)

#define dsa_allocate(area, size)

static Size EstimateParamExecSpace(EState *estate, Bitmapset *params)

References bms_next_member(), bms_num_members(), datumSerialize(), dsa_allocate, dsa_get_address(), EState::es_param_exec_vals, EState::es_plannedstmt, EstimateParamExecSpace(), get_typlenbyval(), ParamExecData::isnull, list_nth_oid(), OidIsValid, PlannedStmt::paramExecTypes, and ParamExecData::value.

Referenced by ExecInitParallelPlan(), and ExecParallelReinitialize().