PostgreSQL Source Code: src/backend/executor/execParallel.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
25
52
53
54
55
56
57
58#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
59#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
60#define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
61#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
62#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
63#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
64#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
65#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
66#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
67#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
68
69#define PARALLEL_TUPLE_QUEUE_SIZE 65536
70
71
72
73
75{
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
99{
105
106};
107#define GetInstrumentationArray(sei) \
108 (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
109 (Instrumentation *) (((char *) sei) + sei->instrument_offset))
110
111
113{
117
118
120{
125
126
133 bool reinitialize);
138
139
141
142
143
144
145static char *
147{
150
151
153
154
155
156
157
158
159
160
161
162
163 foreach(lc, plan->targetlist)
164 {
166
167 tle->resjunk = false;
168 }
169
170
171
172
173
174
192
193
194
195
196
197
198
199
202 {
204
206 subplan = NULL;
208 }
209
213 pstmt->invalItems = NIL;
218
219
221}
222
223
224
225
226
227
228
229
230
231
232static bool
234{
235 if (planstate == NULL)
236 return false;
237
238
239 e->nnodes++;
240
241 switch (nodeTag(planstate))
242 {
243 case T_SeqScanState:
246 e->pcxt);
247 break;
248 case T_IndexScanState:
249
251 e->pcxt);
252 break;
253 case T_IndexOnlyScanState:
254
256 e->pcxt);
257 break;
258 case T_BitmapIndexScanState:
259
261 e->pcxt);
262 break;
263 case T_ForeignScanState:
264 if (planstate->plan->parallel_aware)
266 e->pcxt);
267 break;
268 case T_AppendState:
269 if (planstate->plan->parallel_aware)
271 e->pcxt);
272 break;
273 case T_CustomScanState:
274 if (planstate->plan->parallel_aware)
276 e->pcxt);
277 break;
278 case T_BitmapHeapScanState:
279 if (planstate->plan->parallel_aware)
281 e->pcxt);
282 break;
283 case T_HashJoinState:
284 if (planstate->plan->parallel_aware)
286 e->pcxt);
287 break;
288 case T_HashState:
289
291 break;
292 case T_SortState:
293
295 break;
296 case T_IncrementalSortState:
297
299 break;
300 case T_AggState:
301
303 break;
304 case T_MemoizeState:
305
307 break;
308 default:
309 break;
310 }
311
313}
314
315
316
317
320{
321 int paramid;
322 Size sz = sizeof(int);
323
324 paramid = -1;
325 while ((paramid = bms_next_member(params, paramid)) >= 0)
326 {
327 Oid typeOid;
329 bool typByVal;
331
334 paramid);
335
336 sz = add_size(sz, sizeof(int));
337
338
341 else
342 {
343
344 typLen = sizeof(Datum);
345 typByVal = true;
346 }
349 typByVal, typLen));
350 }
351 return sz;
352}
353
354
355
356
357
358
359
360
361
364{
366 int nparams;
367 int paramid;
370 char *start_address;
371
372
376
377
379 memcpy(start_address, &nparams, sizeof(int));
380 start_address += sizeof(int);
381
382
383 paramid = -1;
384 while ((paramid = bms_next_member(params, paramid)) >= 0)
385 {
386 Oid typeOid;
388 bool typByVal;
389
392 paramid);
393
394
395 memcpy(start_address, ¶mid, sizeof(int));
396 start_address += sizeof(int);
397
398
401 else
402 {
403
404 typLen = sizeof(Datum);
405 typByVal = true;
406 }
408 &start_address);
409 }
410
411 return handle;
412}
413
414
415
416
417static void
419{
420 int nparams;
421 int i;
422 int paramid;
423
424 memcpy(&nparams, start_address, sizeof(int));
425 start_address += sizeof(int);
426
427 for (i = 0; i < nparams; i++)
428 {
430
431
432 memcpy(¶mid, start_address, sizeof(int));
433 start_address += sizeof(int);
435
436
439 }
440}
441
442
443
444
445
446static bool
449{
450 if (planstate == NULL)
451 return false;
452
453
457
458
460
461
462
463
464
465
466
467
468
469
470 switch (nodeTag(planstate))
471 {
472 case T_SeqScanState:
476 break;
477 case T_IndexScanState:
478
480 break;
481 case T_IndexOnlyScanState:
482
485 break;
486 case T_BitmapIndexScanState:
487
489 break;
490 case T_ForeignScanState:
491 if (planstate->plan->parallel_aware)
494 break;
495 case T_AppendState:
496 if (planstate->plan->parallel_aware)
499 break;
500 case T_CustomScanState:
501 if (planstate->plan->parallel_aware)
504 break;
505 case T_BitmapHeapScanState:
506 if (planstate->plan->parallel_aware)
509 break;
510 case T_HashJoinState:
511 if (planstate->plan->parallel_aware)
514 break;
515 case T_HashState:
516
518 break;
519 case T_SortState:
520
522 break;
523 case T_IncrementalSortState:
524
526 break;
527 case T_AggState:
528
530 break;
531 case T_MemoizeState:
532
534 break;
535 default:
536 break;
537 }
538
540}
541
542
543
544
545
548{
550 char *tqueuespace;
551 int i;
552
553
555 return NULL;
556
557
560
561
562
563
564
565 if (!reinitialize)
566 tqueuespace =
570 else
572
573
575 {
577
581
584 }
585
586
587 if (!reinitialize)
589
590
591 return responseq;
592}
593
594
595
596
597
600 Bitmapset *sendParams, int nworkers,
601 int64 tuples_needed)
602{
608 char *pstmt_data;
609 char *pstmt_space;
610 char *paramlistinfo_space;
615 int pstmt_len;
616 int paramlistinfo_len;
617 int instrumentation_len = 0;
618 int jit_instrumentation_len = 0;
619 int instrument_offset = 0;
621 char *query_string;
622 int query_len;
623
624
625
626
627
628
629
630
631
632
633
634
636
637
641
642
644
645
647 pei->pcxt = pcxt;
648
649
650
651
652
653
654
655
659
660
664
665
666 pstmt_len = strlen(pstmt_data) + 1;
669
670
674
675
676
677
678
679
680
681
685
686
687
688
692
693
697
698
699
700
701
702 e.pcxt = pcxt;
703 e.nnodes = 0;
705
706
708 {
709 instrumentation_len =
711 sizeof(int) * e.nnodes;
712 instrumentation_len = MAXALIGN(instrumentation_len);
713 instrument_offset = instrumentation_len;
714 instrumentation_len +=
719
720
722 {
723 jit_instrumentation_len =
728 }
729 }
730
731
734
735
736
737
738
739
741
742
744
745
746
747
748
749
750
751
752
753
760
761
763 memcpy(query_string, estate->es_sourceText, query_len + 1);
765
766
768 memcpy(pstmt_space, pstmt_data, pstmt_len);
770
771
775
776
781
782
787
788
790
791
793
794
795
796
797
798
800 {
802 int i;
803
810 for (i = 0; i < nworkers * e.nnodes; ++i)
813 instrumentation);
815
817 {
819 jit_instrumentation_len);
820 jit_instrumentation->num_workers = nworkers;
821 memset(jit_instrumentation->jit_instr, 0,
824 jit_instrumentation);
826 }
827 }
828
829
830
831
832
833
834 if (pcxt->seg != NULL)
835 {
836 char *area_space;
837
842 pcxt->seg);
843
844
845
846
847
848
849
851 {
855 }
856 }
857
858
859
860
861
862
863 d.pcxt = pcxt;
866
867
871
872
873
874
875
877 elog(ERROR, "inconsistent count of PlanState nodes");
878
879
880 return pei;
881}
882
883
884
885
886
887
888
889void
891{
893 int i;
894
896
897 if (nworkers > 0)
898 {
901
902 for (i = 0; i < nworkers; i++)
903 {
907 }
908 }
909}
910
911
912
913
914
915void
919{
922
923
925
926
927
928
929
930
932
937
939
940
942 {
945 }
946
947
949 {
953 }
954
955
959}
960
961
962
963
964static bool
967{
968 if (planstate == NULL)
969 return false;
970
971
972
973
974 switch (nodeTag(planstate))
975 {
976 case T_SeqScanState:
979 pcxt);
980 break;
981 case T_IndexScanState:
982 if (planstate->plan->parallel_aware)
984 pcxt);
985 break;
986 case T_IndexOnlyScanState:
987 if (planstate->plan->parallel_aware)
989 pcxt);
990 break;
991 case T_ForeignScanState:
992 if (planstate->plan->parallel_aware)
994 pcxt);
995 break;
996 case T_AppendState:
997 if (planstate->plan->parallel_aware)
999 break;
1000 case T_CustomScanState:
1001 if (planstate->plan->parallel_aware)
1003 pcxt);
1004 break;
1005 case T_BitmapHeapScanState:
1006 if (planstate->plan->parallel_aware)
1008 pcxt);
1009 break;
1010 case T_HashJoinState:
1011 if (planstate->plan->parallel_aware)
1013 pcxt);
1014 break;
1015 case T_BitmapIndexScanState:
1016 case T_HashState:
1017 case T_SortState:
1018 case T_IncrementalSortState:
1019 case T_MemoizeState:
1020
1021 break;
1022
1023 default:
1024 break;
1025 }
1026
1028}
1029
1030
1031
1032
1033
1034static bool
1037{
1039 int i;
1040 int n;
1041 int ibytes;
1044
1045
1047 if (instrumentation->plan_node_id[i] == plan_node_id)
1048 break;
1050 elog(ERROR, "plan node %d not found", plan_node_id);
1051
1052
1054 instrument += i * instrumentation->num_workers;
1055 for (n = 0; n < instrumentation->num_workers; ++n)
1057
1058
1059
1060
1061
1062
1063
1064
1070
1073
1074
1075 switch (nodeTag(planstate))
1076 {
1077 case T_IndexScanState:
1079 break;
1080 case T_IndexOnlyScanState:
1082 break;
1083 case T_BitmapIndexScanState:
1085 break;
1086 case T_SortState:
1088 break;
1089 case T_IncrementalSortState:
1091 break;
1092 case T_HashState:
1094 break;
1095 case T_AggState:
1097 break;
1098 case T_MemoizeState:
1100 break;
1101 case T_BitmapHeapScanState:
1103 break;
1104 default:
1105 break;
1106 }
1107
1109 instrumentation);
1110}
1111
1112
1113
1114
1115static void
1118{
1120 int ibytes;
1121
1122 int n;
1123
1124
1125
1126
1127
1132
1133
1134 for (n = 0; n < shared_jit->num_workers; ++n)
1136
1137
1138
1139
1140
1141
1142
1147
1149}
1150
1151
1152
1153
1154
1155void
1157{
1159 int i;
1160
1161
1163 return;
1164
1165
1166
1167
1168
1169 if (pei->tqueue != NULL)
1170 {
1171 for (i = 0; i < nworkers; i++)
1175 }
1176
1177
1178
1179
1180
1181 if (pei->reader != NULL)
1182 {
1183 for (i = 0; i < nworkers; i++)
1187 }
1188
1189
1191
1192
1193
1194
1195
1196 for (i = 0; i < nworkers; i++)
1198
1200}
1201
1202
1203
1204
1205
1206
1207
1208void
1210{
1211
1215
1216
1220
1221
1223 {
1226 }
1227 if (pei->area != NULL)
1228 {
1230 pei->area = NULL;
1231 }
1232 if (pei->pcxt != NULL)
1233 {
1235 pei->pcxt = NULL;
1236 }
1238}
1239
1240
1241
1242
1243
1246{
1247 char *mqspace;
1249
1252 mq = (shm_mq *) mqspace;
1255}
1256
1257
1258
1259
1262 int instrument_options)
1263{
1264 char *pstmtspace;
1265 char *paramspace;
1268 char *queryString;
1269
1270
1272
1273
1276
1277
1280
1281
1282
1283
1284
1285
1286
1287
1289 NULL,
1290 queryString,
1292 receiver, paramLI, NULL, instrument_options);
1293}
1294
1295
1296
1297
1298
1299static bool
1302{
1303 int i;
1306
1308
1309
1310
1311
1312
1313
1314
1316 if (instrumentation->plan_node_id[i] == plan_node_id)
1317 break;
1319 elog(ERROR, "plan node %d not found", plan_node_id);
1320
1321
1322
1323
1324
1326 instrument += i * instrumentation->num_workers;
1328 Assert(ParallelWorkerNumber < instrumentation->num_workers);
1330
1332 instrumentation);
1333}
1334
1335
1336
1337
1338
1339
1340static bool
1342{
1343 if (planstate == NULL)
1344 return false;
1345
1346 switch (nodeTag(planstate))
1347 {
1348 case T_SeqScanState:
1351 break;
1352 case T_IndexScanState:
1353
1355 break;
1356 case T_IndexOnlyScanState:
1357
1359 pwcxt);
1360 break;
1361 case T_BitmapIndexScanState:
1362
1364 pwcxt);
1365 break;
1366 case T_ForeignScanState:
1367 if (planstate->plan->parallel_aware)
1369 pwcxt);
1370 break;
1371 case T_AppendState:
1372 if (planstate->plan->parallel_aware)
1374 break;
1375 case T_CustomScanState:
1376 if (planstate->plan->parallel_aware)
1378 pwcxt);
1379 break;
1380 case T_BitmapHeapScanState:
1381 if (planstate->plan->parallel_aware)
1383 pwcxt);
1384 break;
1385 case T_HashJoinState:
1386 if (planstate->plan->parallel_aware)
1388 pwcxt);
1389 break;
1390 case T_HashState:
1391
1393 break;
1394 case T_SortState:
1395
1397 break;
1398 case T_IncrementalSortState:
1399
1401 pwcxt);
1402 break;
1403 case T_AggState:
1404
1406 break;
1407 case T_MemoizeState:
1408
1410 break;
1411 default:
1412 break;
1413 }
1414
1416 pwcxt);
1417}
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435void
1437{
1445 int instrument_options = 0;
1446 void *area_space;
1449
1450
1452
1453
1456 if (instrumentation != NULL)
1459 true);
1461
1462
1464
1465
1467
1468
1471
1472
1475 elog(ERROR, "ExecutorStart() failed unexpectedly");
1476
1477
1480 {
1481 char *paramexec_space;
1482
1485 }
1486 pwcxt.toc = toc;
1487 pwcxt.seg = seg;
1489
1490
1492
1493
1494
1495
1496
1497
1498
1499
1501
1502
1503
1504
1505
1509
1510
1512
1513
1518
1519
1520 if (instrumentation != NULL)
1522 instrumentation);
1523
1524
1525 if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1526 {
1527 Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1530 }
1531
1532
1534
1535
1538 receiver->rDestroy(receiver);
1539}
void InitializeParallelDSM(ParallelContext *pcxt)
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
void ReinitializeParallelDSM(ParallelContext *pcxt)
void DestroyParallelContext(ParallelContext *pcxt)
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
uint64 pgstat_get_my_query_id(void)
uint64 pgstat_get_my_plan_id(void)
void pgstat_report_activity(BackendState state, const char *cmd_str)
int bms_next_member(const Bitmapset *a, int prevbit)
int bms_num_members(const Bitmapset *a)
#define FLEXIBLE_ARRAY_MEMBER
#define OidIsValid(objectId)
Datum datumRestore(char **start_address, bool *isnull)
void datumSerialize(Datum value, bool isnull, bool typByVal, int typLen, char **start_address)
Size datumEstimateSpace(Datum value, bool isnull, bool typByVal, int typLen)
dsa_area * dsa_attach_in_place(void *place, dsm_segment *segment)
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
void dsa_detach(dsa_area *area)
void dsa_free(dsa_area *area, dsa_pointer dp)
size_t dsa_minimum_size(void)
#define dsa_create_in_place(place, size, tranch_id, segment)
#define dsa_allocate(area, size)
#define InvalidDsaPointer
#define DsaPointerIsValid(x)
bool ExecutorStart(QueryDesc *queryDesc, int eflags)
void ExecutorEnd(QueryDesc *queryDesc)
void ExecutorFinish(QueryDesc *queryDesc)
void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)
#define PARALLEL_KEY_BUFFER_USAGE
static bool ExecParallelReInitializeDSM(PlanState *planstate, ParallelContext *pcxt)
#define PARALLEL_KEY_JIT_INSTRUMENTATION
struct ExecParallelEstimateContext ExecParallelEstimateContext
#define PARALLEL_KEY_PARAMLISTINFO
#define PARALLEL_TUPLE_QUEUE_SIZE
static QueryDesc * ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, int instrument_options)
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation)
static dsa_pointer SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
void ExecParallelCleanup(ParallelExecutorInfo *pei)
struct ExecParallelInitializeDSMContext ExecParallelInitializeDSMContext
#define PARALLEL_KEY_INSTRUMENTATION
static DestReceiver * ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
void ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
static shm_mq_handle ** ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
#define PARALLEL_KEY_PLANNEDSTMT
static bool ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
#define GetInstrumentationArray(sei)
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
static bool ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
#define PARALLEL_KEY_TUPLE_QUEUE
#define PARALLEL_KEY_EXECUTOR_FIXED
static char * ExecSerializePlan(Plan *plan, EState *estate)
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
struct FixedParallelExecutorState FixedParallelExecutorState
#define PARALLEL_KEY_QUERY_TEXT
static Size EstimateParamExecSpace(EState *estate, Bitmapset *params)
void ExecParallelFinish(ParallelExecutorInfo *pei)
static bool ExecParallelReportInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation)
#define PARALLEL_KEY_WAL_USAGE
static void ExecParallelRetrieveJitInstrumentation(PlanState *planstate, SharedJitInstrumentation *shared_jit)
static bool ExecParallelInitializeDSM(PlanState *planstate, ExecParallelInitializeDSMContext *d)
static void RestoreParamExecParams(char *start_address, EState *estate)
void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
#define GetPerTupleExprContext(estate)
Assert(PointerIsAligned(start, uint64))
#define IsParallelWorker()
void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
void InstrEndLoop(Instrumentation *instr)
void InstrAggNode(Instrumentation *dst, Instrumentation *add)
void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
void InstrStartParallelQuery(void)
void InstrInit(Instrumentation *instr, int instrument_options)
void InstrJitAgg(JitInstrumentation *dst, JitInstrumentation *add)
struct JitInstrumentation JitInstrumentation
List * lappend(List *list, void *datum)
void get_typlenbyval(Oid typid, int16 *typlen, bool *typbyval)
@ LWTRANCHE_PARALLEL_QUERY_DSA
void * MemoryContextAlloc(MemoryContext context, Size size)
void * MemoryContextAllocZero(MemoryContext context, Size size)
void pfree(void *pointer)
void * palloc0(Size size)
void ExecAggEstimate(AggState *node, ParallelContext *pcxt)
void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
void ExecAggRetrieveInstrumentation(AggState *node)
void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt)
void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt)
void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, ParallelWorkerContext *pwcxt)
void ExecBitmapHeapEstimate(BitmapHeapScanState *node, ParallelContext *pcxt)
void ExecBitmapHeapRetrieveInstrumentation(BitmapHeapScanState *node)
void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, ParallelContext *pcxt)
void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node, ParallelContext *pcxt)
void ExecBitmapIndexScanEstimate(BitmapIndexScanState *node, ParallelContext *pcxt)
void ExecBitmapIndexScanInitializeDSM(BitmapIndexScanState *node, ParallelContext *pcxt)
void ExecBitmapIndexScanRetrieveInstrumentation(BitmapIndexScanState *node)
void ExecBitmapIndexScanInitializeWorker(BitmapIndexScanState *node, ParallelWorkerContext *pwcxt)
void ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
void ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt)
void ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
void ExecCustomScanInitializeWorker(CustomScanState *node, ParallelWorkerContext *pwcxt)
void ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
void ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
void ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
void ExecForeignScanInitializeWorker(ForeignScanState *node, ParallelWorkerContext *pwcxt)
#define planstate_tree_walker(ps, w, c)
void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
void ExecHashEstimate(HashState *node, ParallelContext *pcxt)
void ExecHashRetrieveInstrumentation(HashState *node)
void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
void ExecHashJoinInitializeWorker(HashJoinState *state, ParallelWorkerContext *pwcxt)
void ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)
void ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
void ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)
void ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)
void ExecIndexOnlyScanEstimate(IndexOnlyScanState *node, ParallelContext *pcxt)
void ExecIndexOnlyScanRetrieveInstrumentation(IndexOnlyScanState *node)
void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, ParallelWorkerContext *pwcxt)
void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node, ParallelContext *pcxt)
void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node, ParallelContext *pcxt)
void ExecIndexScanRetrieveInstrumentation(IndexScanState *node)
void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt)
void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt)
void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt)
void ExecIndexScanInitializeWorker(IndexScanState *node, ParallelWorkerContext *pwcxt)
void ExecMemoizeInitializeDSM(MemoizeState *node, ParallelContext *pcxt)
void ExecMemoizeEstimate(MemoizeState *node, ParallelContext *pcxt)
void ExecMemoizeRetrieveInstrumentation(MemoizeState *node)
void ExecMemoizeInitializeWorker(MemoizeState *node, ParallelWorkerContext *pwcxt)
void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt)
void ExecSeqScanInitializeWorker(SeqScanState *node, ParallelWorkerContext *pwcxt)
void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt)
void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt)
void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt)
void ExecSortEstimate(SortState *node, ParallelContext *pcxt)
void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt)
void ExecSortRetrieveInstrumentation(SortState *node)
void ExecSetParamPlanMulti(const Bitmapset *params, ExprContext *econtext)
char * nodeToString(const void *obj)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Size EstimateParamListSpace(ParamListInfo paramLI)
void SerializeParamList(ParamListInfo paramLI, char **start_address)
ParamListInfo RestoreParamList(char **start_address)
#define lfirst_node(type, lc)
static Oid list_nth_oid(const List *list, int n)
const char * debug_query_string
void FreeQueryDesc(QueryDesc *qdesc)
QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, CachedPlan *cplan, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, QueryEnvironment *queryEnv, int instrument_options)
void * stringToNode(const char *str)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
void shm_mq_detach(shm_mq_handle *mqh)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
Snapshot GetActiveSnapshot(void)
List * es_part_prune_infos
struct dsa_area * es_query_dsa
struct JitContext * es_jit
PlannedStmt * es_plannedstmt
struct JitInstrumentation * es_jit_worker_instr
ParamExecData * es_param_exec_vals
Bitmapset * es_unpruned_relids
ParamListInfo es_param_list_info
MemoryContext es_query_cxt
const char * es_sourceText
SharedExecutorInstrumentation * instrumentation
shm_toc_estimator estimator
ParallelWorkerInfo * worker
struct SharedJitInstrumentation * jit_instrumentation
BufferUsage * buffer_usage
SharedExecutorInstrumentation * instrumentation
struct TupleQueueReader ** reader
BackgroundWorkerHandle * bgwhandle
struct SharedJitInstrumentation * worker_jit_instrument
Instrumentation * instrument
WorkerInstrumentation * worker_instrument
Bitmapset * rewindPlanIDs
Bitmapset * unprunableRelids
PlannedStmt * plannedstmt
Instrumentation instrument[FLEXIBLE_ARRAY_MEMBER]
void(* rDestroy)(DestReceiver *self)
DestReceiver * CreateTupleQueueDestReceiver(shm_mq_handle *handle)
TupleQueueReader * CreateTupleQueueReader(shm_mq_handle *handle)
void DestroyTupleQueueReader(TupleQueueReader *reader)