PostgreSQL Source Code: src/backend/executor/nodeHashjoin.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
164
175
176
177
178
179
180#define HJ_BUILD_HASHTABLE 1
181#define HJ_NEED_NEW_OUTER 2
182#define HJ_SCAN_BUCKET 3
183#define HJ_FILL_OUTER_TUPLE 4
184#define HJ_FILL_INNER_TUPLES 5
185#define HJ_NEED_NEW_BATCH 6
186
187
188#define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
189
190#define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
191
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
222{
232 int batchno;
234
235
236
237
239 otherqual = node->js.ps.qual;
245
246
247
248
249
251
252
253
254
255 for (;;)
256 {
257
258
259
260
261
262
264
266 {
268
269
270
271
272 Assert(hashtable == NULL);
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
298 {
299
301 }
302 else if (parallel)
303 {
304
305
306
307
308
309
310
311
313 }
317 {
320 {
322 return NULL;
323 }
324 else
326 }
327 else
329
330
331
332
333
334
337
338
339
340
341
342
345
346
347
348
349
350
352 {
353 if (parallel)
354 {
355
356
357
358
360
363 }
364 return NULL;
365 }
366
367
368
369
370
372
373
374
375
376
377
379
380 if (parallel)
381 {
383
389 {
390
391
392
393
394 if (hashtable->nbatch > 1)
397 WAIT_EVENT_HASH_BUILD_HASH_OUTER);
398 }
400 {
401
402
403
404
405
406 return NULL;
407 }
408
409
413
414 continue;
415 }
416 else
418
419
420
422
423
424
425
426 if (parallel)
427 outerTupleSlot =
429 &hashvalue);
430 else
431 outerTupleSlot =
433
435 {
436
438 {
439
440 if (parallel)
441 {
442
443
444
445
446
449 else
451 }
452 else
453 {
456 }
457 }
458 else
460 continue;
461 }
462
465
466
467
468
469
474 hashvalue);
476
477
478
479
480
481 if (batchno != hashtable->curbatch &&
483 {
484 bool shouldFree;
486 &shouldFree);
487
488
489
490
491
492 Assert(parallel_state == NULL);
496 hashtable);
497
498 if (shouldFree)
500
501
502 continue;
503 }
504
505
507
508
509
511
512
513
514
515 if (parallel)
516 {
518 {
519
521 continue;
522 }
523 }
524 else
525 {
527 {
528
530 continue;
531 }
532 }
533
534
535
536
537
540 continue;
541
542
543
544
545
546
547
548
549
550
551
552
553
554 if (joinqual == NULL || ExecQual(joinqual, econtext))
555 {
557
558
559
560
561
562
565
566
568 {
570 continue;
571 }
572
573
574
575
576
577
580
581
582
583
584
585
586
588 continue;
589
590 if (otherqual == NULL || ExecQual(otherqual, econtext))
592 else
594 }
595 else
597 break;
598
600
601
602
603
604
605
607
610 {
611
612
613
614
616
617 if (otherqual == NULL || ExecQual(otherqual, econtext))
619 else
621 }
622 break;
623
625
626
627
628
629
630
631
634 {
635
637 continue;
638 }
639
640
641
642
643
645
646 if (otherqual == NULL || ExecQual(otherqual, econtext))
648 else
650 break;
651
653
654
655
656
657 if (parallel)
658 {
660 return NULL;
661 }
662 else
663 {
665 return NULL;
666 }
668 break;
669
670 default:
671 elog(ERROR, "unrecognized hashjoin state: %d",
673 }
674 }
675}
676
677
678
679
680
681
682
685{
686
687
688
689
691}
692
693
694
695
696
697
698
701{
702
703
704
705
707}
708
709
710
711
712
713
714
717{
719 Plan *outerNode;
720 Hash *hashNode;
722 innerDesc;
724
725
727
728
729
730
734
735
736
737
738
739
742
743
744
745
746
747
749
750
751
752
753
754
755
756
759
764
765
766
767
770
771
772
773
776 ops);
777
778
779
780
783
784
786 {
790 break;
795 break;
800 break;
806 break;
807 default:
808 elog(ERROR, "unrecognized join type: %d",
810 }
811
812
813
814
815
816
817
818
819 {
823 Oid *outer_hashfuncid;
824 Oid *inner_hashfuncid;
825 bool *hash_strict;
827 int nkeys;
828
829
831
832
833
834
835
836
837
838
839
840
842
846
847
848
849
850
852 {
855
857 &outer_hashfuncid[i],
858 &inner_hashfuncid[i]))
860 "could not find hash function for hash operator %u",
861 hashop);
863 }
864
865
866
867
868
869
870
871
872
876 outer_hashfuncid,
879 hash_strict,
881 0,
883
884
888 inner_hashfuncid,
890 hash->hashkeys,
891 hash_strict,
892 &hashstate->ps,
893 0,
895
896
897
898
899
901 {
905 }
906
907
908 pfree(outer_hashfuncid);
909 pfree(inner_hashfuncid);
910 pfree(hash_strict);
911 }
912
913
914
915
922
923
924
925
928
933
937
938 return hjstate;
939}
940
941
942
943
944
945
946
947void
949{
950
951
952
954 {
957 }
958
959
960
961
964}
965
966
967
968
969
970
971
972
973
974
975
976
977
982{
984 int curbatch = hashtable->curbatch;
986
987 if (curbatch == 0)
988 {
989
990
991
992
996 else
998
1000 {
1001 bool isnull;
1002
1003
1004
1005
1007
1009
1011
1013 econtext,
1014 &isnull));
1015
1016 if (!isnull)
1017 {
1018
1020
1021 return slot;
1022 }
1023
1024
1025
1026
1027
1029 }
1030 }
1031 else if (curbatch < hashtable->nbatch)
1032 {
1034
1035
1036
1037
1038
1039 if (file == NULL)
1040 return NULL;
1041
1043 file,
1044 hashvalue,
1047 return slot;
1048 }
1049
1050
1051 return NULL;
1052}
1053
1054
1055
1056
1061{
1063 int curbatch = hashtable->curbatch;
1065
1066
1067
1068
1069
1070
1071 if (curbatch == 0 && hashtable->nbatch == 1)
1072 {
1074
1076 {
1077 bool isnull;
1078
1080
1082
1084
1086 econtext,
1087 &isnull));
1088
1089 if (!isnull)
1090 return slot;
1091
1092
1093
1094
1095
1097 }
1098 }
1099 else if (curbatch < hashtable->nbatch)
1100 {
1102
1104 hashvalue);
1105 if (tuple != NULL)
1106 {
1109 false);
1111 return slot;
1112 }
1113 else
1115 }
1116
1117
1119
1120 return NULL;
1121}
1122
1123
1124
1125
1126
1127
1128
1129static bool
1131{
1133 int nbatch;
1134 int curbatch;
1138
1139 nbatch = hashtable->nbatch;
1140 curbatch = hashtable->curbatch;
1141
1142 if (curbatch > 0)
1143 {
1144
1145
1146
1147
1151 }
1152 else
1153 {
1154
1155
1156
1157
1158
1159
1165 }
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185 curbatch++;
1186 while (curbatch < nbatch &&
1189 {
1192 break;
1195 break;
1198 break;
1201 break;
1202
1203
1210 curbatch++;
1211 }
1212
1213 if (curbatch >= nbatch)
1214 return false;
1215
1216 hashtable->curbatch = curbatch;
1217
1218
1219
1220
1222
1224
1225 if (innerFile != NULL)
1226 {
1227 if (BufFileSeek(innerFile, 0, 0, SEEK_SET))
1230 errmsg("could not rewind hash-join temporary file")));
1231
1233 innerFile,
1234 &hashvalue,
1236 {
1237
1238
1239
1240
1242 }
1243
1244
1245
1246
1247
1250 }
1251
1252
1253
1254
1256 {
1260 errmsg("could not rewind hash-join temporary file")));
1261 }
1262
1263 return true;
1264}
1265
1266
1267
1268
1269
1270static bool
1272{
1274 int start_batchno;
1275 int batchno;
1276
1277
1278
1279
1280
1281
1282 if (hashtable->curbatch >= 0)
1283 {
1286 }
1287
1288
1289
1290
1291
1292
1293 batchno = start_batchno =
1296 do
1297 {
1301
1303 {
1305 Barrier *batch_barrier =
1307
1309 {
1311
1312
1314 WAIT_EVENT_HASH_BATCH_ELECT))
1316
1317
1319
1321 WAIT_EVENT_HASH_BATCH_ALLOCATE);
1322
1323
1325
1330 &hashvalue)))
1331 {
1334 false);
1337 hashvalue);
1338 }
1341 WAIT_EVENT_HASH_BATCH_LOAD);
1342
1343
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1358
1359 return true;
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1375 hashtable->batches[batchno].done = true;
1377 break;
1378
1380
1381
1382
1383
1384
1386 hashtable->batches[batchno].done = true;
1388 break;
1389
1390 default:
1391 elog(ERROR, "unexpected batch phase %d",
1393 }
1394 }
1395 batchno = (batchno + 1) % hashtable->nbatch;
1396 } while (batchno != start_batchno);
1397
1398 return false;
1399}
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413void
1416{
1417 BufFile *file = *fileptr;
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433 if (file == NULL)
1434 {
1436
1438 *fileptr = file;
1439
1441 }
1442
1445}
1446
1447
1448
1449
1450
1451
1452
1453
1459{
1461 size_t nread;
1463
1464
1465
1466
1467
1468
1470
1471
1472
1473
1474
1475
1477 if (nread == 0)
1478 {
1480 return NULL;
1481 }
1482 *hashvalue = header[0];
1484 tuple->t_len = header[1];
1486 (char *) tuple + sizeof(uint32),
1487 header[1] - sizeof(uint32));
1489 return tupleSlot;
1490}
1491
1492
1493void
1495{
1498
1499
1500
1501
1502
1503
1504
1505
1507 {
1510 {
1511
1512
1513
1514
1515
1516
1517
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1531
1532
1534 }
1535 else
1536 {
1537
1539
1541
1542
1549
1551
1555
1556
1557
1558
1559
1560 if (innerPlan->chgParam == NULL)
1562 }
1563 }
1564
1565
1570
1573
1574
1575
1576
1577
1578 if (outerPlan->chgParam == NULL)
1580}
1581
1582void
1584{
1586 {
1587
1588
1589
1590
1591
1594 }
1595}
1596
1597static void
1599{
1605 int i;
1606
1608
1609
1610 for (;;)
1611 {
1612 bool isnull;
1613
1616 break;
1618
1620
1622 econtext,
1623 &isnull));
1624
1625 if (!isnull)
1626 {
1627 int batchno;
1628 int bucketno;
1629 bool shouldFree;
1631
1633 &batchno);
1635 &hashvalue, mintup);
1636
1637 if (shouldFree)
1639 }
1641 }
1642
1643
1644 for (i = 0; i < hashtable->nbatch; ++i)
1646}
1647
1648void
1650{
1653}
1654
1655void
1657{
1658 int plan_node_id = state->js.ps.plan->plan_node_id;
1661
1662
1663
1664
1665
1666 if (pcxt->seg == NULL)
1667 return;
1668
1670
1671
1672
1673
1674
1677
1678
1679
1680
1681
1682
1698
1699
1701
1702
1705}
1706
1707
1708
1709
1710
1711
1712
1713void
1715{
1716 int plan_node_id = state->js.ps.plan->plan_node_id;
1718
1719
1720 if (pcxt->seg == NULL)
1721 return;
1722
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738 if (state->hj_HashTable != NULL)
1739 {
1742 }
1743
1744
1746
1747
1749}
1750
1751void
1754{
1756 int plan_node_id = state->js.ps.plan->plan_node_id;
1759
1760
1762
1763
1766
1768}
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
int BarrierAttach(Barrier *barrier)
void BarrierInit(Barrier *barrier, int participants)
int BarrierPhase(Barrier *barrier)
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
bool BarrierDetach(Barrier *barrier)
void BufFileReadExact(BufFile *file, void *ptr, size_t size)
BufFile * BufFileCreateTemp(bool interXact)
void BufFileWrite(BufFile *file, const void *ptr, size_t size)
size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
void BufFileClose(BufFile *file)
#define pg_attribute_always_inline
#define OidIsValid(objectId)
#define InvalidDsaPointer
int errcode_for_file_access(void)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void ExecReScan(PlanState *node)
ExprState * ExecInitQual(List *qual, PlanState *parent)
ExprState * ExecBuildHash32Expr(TupleDesc desc, const TupleTableSlotOps *ops, const Oid *hashfunc_oids, const List *collations, const List *hash_exprs, const bool *opstrict, PlanState *parent, uint32 init_value, bool keep_nulls)
Node * MultiExecProcNode(PlanState *node)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
void ExecSetExecProcNode(PlanState *node, ExecProcNodeMtd function)
const TupleTableSlotOps TTSOpsVirtual
void ExecForceStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
TupleTableSlot * ExecInitNullTupleSlot(EState *estate, TupleDesc tupType, const TupleTableSlotOps *tts_ops)
TupleDesc ExecGetResultType(PlanState *planstate)
void ExecAssignExprContext(EState *estate, PlanState *planstate)
void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)
const TupleTableSlotOps * ExecGetResultSlotOps(PlanState *planstate, bool *isfixed)
#define InstrCountFiltered1(node, delta)
#define outerPlanState(node)
#define InstrCountFiltered2(node, delta)
#define innerPlanState(node)
#define EXEC_FLAG_BACKWARD
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
#define ResetExprContext(econtext)
static bool ExecQual(ExprState *state, ExprContext *econtext)
static TupleTableSlot * ExecProcNode(PlanState *node)
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
#define palloc_array(type, count)
void fmgr_info(Oid functionId, FmgrInfo *finfo)
Assert(PointerIsAligned(start, uint64))
#define PHJ_BUILD_HASH_OUTER
#define HJTUPLE_MINTUPLE(hjtup)
#define PHJ_BATCH_ALLOCATE
#define INVALID_SKEW_BUCKET_NO
void heap_free_minimal_tuple(MinimalTuple mtup)
MinimalTupleData * MinimalTuple
static void HeapTupleHeaderSetMatch(MinimalTupleData *tup)
static bool HeapTupleHeaderHasMatch(const MinimalTupleData *tup)
bool get_op_hash_functions(Oid opno, RegProcedure *lhs_procno, RegProcedure *rhs_procno)
void LWLockInitialize(LWLock *lock, int tranche_id)
@ LWTRANCHE_PARALLEL_HASH_JOIN
void pfree(void *pointer)
void * palloc0(Size size)
#define CHECK_FOR_INTERRUPTS()
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
void ExecHashTableReset(HashJoinTable hashtable)
bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
void ExecHashTableDetachBatch(HashJoinTable hashtable)
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
void ExecHashTableDetach(HashJoinTable hashtable)
bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
void ExecHashTableDestroy(HashJoinTable hashtable)
HashJoinTable ExecHashTableCreate(HashState *state)
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
void ExecHashTableResetMatchFlags(HashJoinTable hashtable)
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
#define HJ_NEED_NEW_BATCH
void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
void ExecEndHashJoin(HashJoinState *node)
#define HJ_FILL_OUTER_TUPLE
static bool ExecHashJoinNewBatch(HashJoinState *hjstate)
static TupleTableSlot * ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue)
#define HJ_FILL_INNER(hjstate)
static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
static pg_attribute_always_inline TupleTableSlot * ExecHashJoinImpl(PlanState *pstate, bool parallel)
static TupleTableSlot * ExecHashJoinGetSavedTuple(HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot)
void ExecShutdownHashJoin(HashJoinState *node)
#define HJ_FILL_INNER_TUPLES
void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
static TupleTableSlot * ExecHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue)
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr, HashJoinTable hashtable)
#define HJ_NEED_NEW_OUTER
static TupleTableSlot * ExecParallelHashJoin(PlanState *pstate)
#define HJ_FILL_OUTER(hjstate)
void ExecReScanHashJoin(HashJoinState *node)
static TupleTableSlot * ExecHashJoin(PlanState *pstate)
void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
void ExecHashJoinInitializeWorker(HashJoinState *state, ParallelWorkerContext *pwcxt)
static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
HashJoinState * ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
#define HJ_BUILD_HASHTABLE
#define castNode(_type_, nodeptr)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int list_length(const List *l)
#define foreach_current_index(var_or_cell)
static uint32 DatumGetUInt32(Datum X)
static unsigned hash(unsigned *uv, int n)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
TupleTableSlot * ecxt_innertuple
TupleTableSlot * ecxt_outertuple
HashJoinTuple hj_CurTuple
TupleTableSlot * hj_NullOuterTupleSlot
TupleTableSlot * hj_OuterTupleSlot
TupleTableSlot * hj_NullInnerTupleSlot
TupleTableSlot * hj_FirstOuterTupleSlot
HashJoinTable hj_HashTable
TupleTableSlot * hj_HashTupleSlot
ParallelHashJoinBatchAccessor * batches
ParallelHashJoinState * parallel_state
BufFile ** innerBatchFile
BufFile ** outerBatchFile
HashSkewBucket ** skewBucket
struct ParallelHashJoinState * parallel_state
FmgrInfo * skew_hashfunction
HashInstrumentation * hinstrument
shm_toc_estimator estimator
SharedTuplestoreAccessor * outer_tuples
ParallelHashJoinBatch * shared
SharedTuplestoreAccessor * inner_tuples
Barrier grow_batches_barrier
dsa_pointer chunk_work_queue
Barrier grow_buckets_barrier
ParallelHashGrowth growth
pg_atomic_uint32 distributor
const TupleTableSlotOps * resultops
Instrumentation * instrument
TupleDesc ps_ResultTupleDesc
ExprContext * ps_ExprContext
TupleTableSlot * ps_ResultTupleSlot
ProjectionInfo * ps_ProjInfo
ExecProcNodeMtd ExecProcNode
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)