PostgreSQL Source Code: src/backend/executor/nodeGatherMerge.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
16
24
25
26
27
28
29
30
31#define MAX_TUPLE_STORE 10
32
33
34
35
36
37
38
39
40
42{
45 int readCounter;
48
53 bool nowait, bool *done);
59 bool nowait);
61
62
63
64
65
68{
70 Plan *outerNode;
72
73
75
76
77
78
81 gm_state->ps.state = estate;
83
87
88
89
90
91
92
94
95
96
97
98
100
101
102
103
106
107
108
109
110
111
112
115
116
117
118
119
121 gm_state->tupDesc = tupDesc;
122
123
124
125
128
129
130
131
132
134 {
137 }
138
139
140
141
143 {
144 int i;
145
149
151 {
153
157 sortKey->ssup_attno = node->sortColIdx[i];
158
159
160
161
162
164
166 }
167 }
168
169
171
172 return gm_state;
173}
174
175
176
177
178
179
180
181
184{
188
190
191
192
193
194
196 {
199
200
201
202
203
205 {
207
208
209 if (!node->pei)
211 estate,
215 else
217 node->pei,
219
220
223
225
226
227
228
229
232
233
235 {
237
243 }
244 else
245 {
246
249 }
250 }
251
252
256 }
257
258
259
260
261
264
265
266
267
268
271 return NULL;
272
273
275 return slot;
276
277
278
279
282}
283
284
285
286
287
288
289
290void
292{
295}
296
297
298
299
300
301
302
303void
305{
307
308
309 if (node->pei != NULL)
310 {
312 node->pei = NULL;
313 }
314}
315
316
317
318
319
320
321
322static void
324{
325 if (node->pei != NULL)
327
328
332}
333
334
335
336
337
338
339
340void
342{
345
346
348
349
351
352
355
356
357
358
359
360
361
362
366
367
368
369
370
371
372
373
374
375
376
379}
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394static void
396{
399 int i;
400
401
402
403
404
405
406
407
408
409
412
413
416
417 for (i = 0; i < nreaders; i++)
418 {
419
422
423
427 }
428
429
432 gm_state);
433}
434
435
436
437
438
439
440
441
442static void
444{
445 int nreaders = gm_state->nreaders;
446 bool nowait = true;
447 int i;
448
449
451
452
453 gm_state->gm_slots[0] = NULL;
454
455
456 for (i = 0; i < nreaders; i++)
457 {
458
461
463
465 }
466
467
469
470
471
472
473
474
475
476
477reread:
478 for (i = 0; i <= nreaders; i++)
479 {
481
482
485 {
487 {
488
492 }
493 else
494 {
495
496
497
498
500 }
501 }
502 }
503
504
505 for (i = 1; i <= nreaders; i++)
506 {
509 {
510 nowait = false;
511 goto reread;
512 }
513 }
514
515
517
519}
520
521
522
523
524
525static void
527{
528 int i;
529
531 {
533
536
538 }
539}
540
541
542
543
544
545
548{
549 int i;
550
552 {
553
554
555
556
558 }
559 else
560 {
561
562
563
564
565
566
568
571 else
572 {
573
575 }
576 }
577
579 {
580
582 return NULL;
583 }
584 else
585 {
586
589 }
590}
591
592
593
594
595
596static void
598{
600 int i;
601
602
603 if (reader == 0)
604 return;
605
607
608
611
612
614 {
616
618 reader,
619 true,
620 &tuple_buffer->done);
621 if (!tuple)
622 break;
623 tuple_buffer->tuple[i] = tuple;
625 }
626}
627
628
629
630
631
632
633
634
635static bool
637{
640
641
642
643
644
645 if (reader == 0)
646 {
648 {
652
653
657
659 {
660 gm_state->gm_slots[0] = outerTupleSlot;
661 return true;
662 }
663
665 }
666 return false;
667 }
668
669
671
673 {
674
676 }
677 else if (tuple_buffer->done)
678 {
679
680 return false;
681 }
682 else
683 {
684
686 reader,
687 nowait,
688 &tuple_buffer->done);
689 if (!tup)
690 return false;
691
692
693
694
695
697 }
698
700
701
703 gm_state->gm_slots[reader],
704
705 true);
706
707 return true;
708}
709
710
711
712
715 bool *done)
716{
719
720
722
723
724
725
726
727
728
729
730
731 reader = gm_state->reader[nreader - 1];
733
734
735
736
737
739}
740
741
742
743
744
745
747
748
749
750
753{
757
760 int nkey;
761
764
765 for (nkey = 0; nkey < node->gm_nkeys; nkey++)
766 {
770 datum2;
771 bool isNull1,
772 isNull2;
774
777
779 datum2, isNull2,
780 sortKey);
782 {
785 }
786 }
787 return 0;
788}
void LaunchParallelWorkers(ParallelContext *pcxt)
void binaryheap_build(binaryheap *heap)
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
void binaryheap_reset(binaryheap *heap)
bh_node_type binaryheap_first(binaryheap *heap)
bh_node_type binaryheap_remove_first(binaryheap *heap)
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
#define binaryheap_empty(h)
Bitmapset * bms_add_member(Bitmapset *a, int x)
#define INVERT_COMPARE_RESULT(var)
void ExecReScan(PlanState *node)
void ExecParallelCleanup(ParallelExecutorInfo *pei)
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
void ExecParallelFinish(ParallelExecutorInfo *pei)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
void ExecInitResultTypeTL(PlanState *planstate)
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
TupleDesc ExecGetResultType(PlanState *planstate)
void ExecAssignExprContext(EState *estate, PlanState *planstate)
void ExecConditionalAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc, int varno)
#define outerPlanState(node)
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
#define ResetExprContext(econtext)
static TupleTableSlot * ExecProcNode(PlanState *node)
static int compare(const void *arg1, const void *arg2)
Assert(PointerIsAligned(start, uint64))
MinimalTuple heap_copy_minimal_tuple(MinimalTuple mtup, Size extra)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
#define CHECK_FOR_INTERRUPTS()
struct GMReaderTupleBuffer GMReaderTupleBuffer
static void gather_merge_init(GatherMergeState *gm_state)
static void gather_merge_setup(GatherMergeState *gm_state)
static int32 heap_compare_slots(Datum a, Datum b, void *arg)
void ExecReScanGatherMerge(GatherMergeState *node)
static void gather_merge_clear_tuples(GatherMergeState *gm_state)
void ExecShutdownGatherMerge(GatherMergeState *node)
static void load_tuple_array(GatherMergeState *gm_state, int reader)
GatherMergeState * ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
static void ExecShutdownGatherMergeWorkers(GatherMergeState *node)
static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, bool *done)
void ExecEndGatherMerge(GatherMergeState *node)
static TupleTableSlot * gather_merge_getnext(GatherMergeState *gm_state)
static TupleTableSlot * ExecGatherMerge(PlanState *pstate)
#define castNode(_type_, nodeptr)
bool parallel_leader_participation
static Datum Int32GetDatum(int32 X)
static int32 DatumGetInt32(Datum X)
void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
static int ApplySortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
struct dsa_area * es_query_dsa
int es_parallel_workers_to_launch
bool es_use_parallel_mode
int es_parallel_workers_launched
TupleTableSlot * ecxt_outertuple
struct ParallelExecutorInfo * pei
struct TupleQueueReader ** reader
struct GMReaderTupleBuffer * gm_tuple_buffers
TupleTableSlot ** gm_slots
bool need_to_scan_locally
struct binaryheap * gm_heap
struct TupleQueueReader ** reader
ExprContext * ps_ExprContext
ProjectionInfo * ps_ProjInfo
ExecProcNodeMtd ExecProcNode
MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)