PostgreSQL Source Code: src/backend/executor/nodeGatherMerge.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
16
25
26
27
28
29
30
31
32#define MAX_TUPLE_STORE 10
33
34
35
36
37
38
39
40
41
43{
46 int readCounter;
49
54 bool nowait, bool *done);
60 bool nowait);
62
63
64
65
66
69{
71 Plan *outerNode;
73
74
76
77
78
79
82 gm_state->ps.state = estate;
84
88
89
90
91
92
93
95
96
97
98
99
101
102
103
104
107
108
109
110
111
112
113
116
117
118
119
120
122 gm_state->tupDesc = tupDesc;
123
124
125
126
129
130
131
132
133
135 {
138 }
139
140
141
142
144 {
145 int i;
146
149
151 {
153
157 sortKey->ssup_attno = node->sortColIdx[i];
158
159
160
161
162
164
166 }
167 }
168
169
171
172 return gm_state;
173}
174
175
176
177
178
179
180
181
184{
188
190
191
192
193
194
196 {
199
200
201
202
203
205 {
207
208
209 if (!node->pei)
211 estate,
215 else
217 node->pei,
219
220
223
225
226
227
228
229
232
233
235 {
237
243 }
244 else
245 {
246
249 }
250 }
251
252
256 }
257
258
259
260
261
264
265
266
267
268
271 return NULL;
272
273
275 return slot;
276
277
278
279
282}
283
284
285
286
287
288
289
290void
292{
295}
296
297
298
299
300
301
302
303void
305{
307
308
309 if (node->pei != NULL)
310 {
312 node->pei = NULL;
313 }
314}
315
316
317
318
319
320
321
322static void
324{
325 if (node->pei != NULL)
327
328
332}
333
334
335
336
337
338
339
340void
342{
345
346
348
349
351
352
355
356
357
358
359
360
361
362
366
367
368
369
370
371
372
373
374
375
376
379}
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394static void
396{
399 int i;
400
401
402
403
404
405
406
407
408
409
412
413
416
417 for (i = 0; i < nreaders; i++)
418 {
419
421
422
426 }
427
428
431 gm_state);
432}
433
434
435
436
437
438
439
440
441static void
443{
444 int nreaders = gm_state->nreaders;
445 bool nowait = true;
446 int i;
447
448
450
451
452 gm_state->gm_slots[0] = NULL;
453
454
455 for (i = 0; i < nreaders; i++)
456 {
457
460
462
464 }
465
466
468
469
470
471
472
473
474
475
476reread:
477 for (i = 0; i <= nreaders; i++)
478 {
480
481
484 {
486 {
487
491 }
492 else
493 {
494
495
496
497
499 }
500 }
501 }
502
503
504 for (i = 1; i <= nreaders; i++)
505 {
508 {
509 nowait = false;
510 goto reread;
511 }
512 }
513
514
516
518}
519
520
521
522
523
524static void
526{
527 int i;
528
530 {
532
535
537 }
538}
539
540
541
542
543
544
547{
548 int i;
549
551 {
552
553
554
555
557 }
558 else
559 {
560
561
562
563
564
565
567
570 else
571 {
572
574 }
575 }
576
578 {
579
581 return NULL;
582 }
583 else
584 {
585
588 }
589}
590
591
592
593
594
595static void
597{
599 int i;
600
601
602 if (reader == 0)
603 return;
604
606
607
610
611
613 {
615
617 reader,
618 true,
619 &tuple_buffer->done);
620 if (!tuple)
621 break;
622 tuple_buffer->tuple[i] = tuple;
624 }
625}
626
627
628
629
630
631
632
633
634static bool
636{
639
640
641
642
643
644 if (reader == 0)
645 {
647 {
651
652
656
658 {
659 gm_state->gm_slots[0] = outerTupleSlot;
660 return true;
661 }
662
664 }
665 return false;
666 }
667
668
670
672 {
673
675 }
676 else if (tuple_buffer->done)
677 {
678
679 return false;
680 }
681 else
682 {
683
685 reader,
686 nowait,
687 &tuple_buffer->done);
688 if (!tup)
689 return false;
690
691
692
693
694
696 }
697
699
700
702 gm_state->gm_slots[reader],
703
704 true);
705
706 return true;
707}
708
709
710
711
714 bool *done)
715{
718
719
721
722
723
724
725
726
727
728
729
730 reader = gm_state->reader[nreader - 1];
732
733
734
735
736
738}
739
740
741
742
743
744
746
747
748
749
752{
756
759 int nkey;
760
763
764 for (nkey = 0; nkey < node->gm_nkeys; nkey++)
765 {
769 datum2;
770 bool isNull1,
771 isNull2;
773
776
778 datum2, isNull2,
779 sortKey);
781 {
784 }
785 }
786 return 0;
787}
void LaunchParallelWorkers(ParallelContext *pcxt)
void binaryheap_build(binaryheap *heap)
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
void binaryheap_reset(binaryheap *heap)
bh_node_type binaryheap_first(binaryheap *heap)
bh_node_type binaryheap_remove_first(binaryheap *heap)
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
#define binaryheap_empty(h)
Bitmapset * bms_add_member(Bitmapset *a, int x)
#define INVERT_COMPARE_RESULT(var)
void ExecReScan(PlanState *node)
void ExecParallelCleanup(ParallelExecutorInfo *pei)
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
void ExecParallelFinish(ParallelExecutorInfo *pei)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
void ExecInitResultTypeTL(PlanState *planstate)
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
TupleDesc ExecGetResultType(PlanState *planstate)
void ExecAssignExprContext(EState *estate, PlanState *planstate)
void ExecConditionalAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc, int varno)
#define outerPlanState(node)
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
#define ResetExprContext(econtext)
static TupleTableSlot * ExecProcNode(PlanState *node)
#define palloc0_array(type, count)
static int compare(const void *arg1, const void *arg2)
Assert(PointerIsAligned(start, uint64))
MinimalTuple heap_copy_minimal_tuple(MinimalTuple mtup, Size extra)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
#define CHECK_FOR_INTERRUPTS()
struct GMReaderTupleBuffer GMReaderTupleBuffer
static void gather_merge_init(GatherMergeState *gm_state)
static void gather_merge_setup(GatherMergeState *gm_state)
static int32 heap_compare_slots(Datum a, Datum b, void *arg)
void ExecReScanGatherMerge(GatherMergeState *node)
static void gather_merge_clear_tuples(GatherMergeState *gm_state)
void ExecShutdownGatherMerge(GatherMergeState *node)
static void load_tuple_array(GatherMergeState *gm_state, int reader)
GatherMergeState * ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
static void ExecShutdownGatherMergeWorkers(GatherMergeState *node)
static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, bool *done)
void ExecEndGatherMerge(GatherMergeState *node)
static TupleTableSlot * gather_merge_getnext(GatherMergeState *gm_state)
static TupleTableSlot * ExecGatherMerge(PlanState *pstate)
#define castNode(_type_, nodeptr)
bool parallel_leader_participation
static Datum Int32GetDatum(int32 X)
static int32 DatumGetInt32(Datum X)
void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
static int ApplySortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
struct dsa_area * es_query_dsa
int es_parallel_workers_to_launch
bool es_use_parallel_mode
int es_parallel_workers_launched
TupleTableSlot * ecxt_outertuple
struct ParallelExecutorInfo * pei
struct TupleQueueReader ** reader
struct GMReaderTupleBuffer * gm_tuple_buffers
TupleTableSlot ** gm_slots
bool need_to_scan_locally
struct binaryheap * gm_heap
struct TupleQueueReader ** reader
ExprContext * ps_ExprContext
ProjectionInfo * ps_ProjInfo
ExecProcNodeMtd ExecProcNode
MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)