PostgreSQL Source Code: src/backend/executor/nodeHash.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
25
26#include <math.h>
27#include <limits.h>
28
44
51 int mcvsToUse);
55 int bucketNumber);
57
60 size_t size,
65 int bucketno);
78 int batchno,
79 size_t size);
82
83
84
85
86
87
88
89
92{
93 elog(ERROR, "Hash node does not support ExecProcNode call convention");
94 return NULL;
95}
96
97
98
99
100
101
102
103
106{
107
110
113 else
115
116
119
120
121
122
123
124
125
126
127 return NULL;
128}
129
130
131
132
133
134
135
136
137static void
139{
144
145
146
147
150
151
152
153
155
156
157
158
159
160 for (;;)
161 {
162 bool isnull;
164
167 break;
168
170
172
174 &isnull);
175
176 if (!isnull)
177 {
179 int bucketNumber;
180
183 {
184
186 bucketNumber);
188 }
189 else
190 {
191
193 }
195 }
196 }
197
198
201
202
206
208}
209
210
211
212
213
214
215
216
217
218static void
220{
228 int i;
229
230
231
232
235
236
237
238
240
241
242
243
244
245
246
247
248
253 {
255
256
257
258
259
260
262
263
265
266
267
268
269
270
271
272
273
274
283 for (;;)
284 {
285 bool isnull;
286
289 break;
291
293
295 econtext,
296 &isnull));
297
298 if (!isnull)
301 }
302
303
304
305
306
307 for (i = 0; i < hashtable->nbatch; ++i)
309
310
311
312
313
315
318
319
320
321
322
324 WAIT_EVENT_HASH_BUILD_HASH_INNER))
325 {
326
327
328
329
330
331
332
334 }
335 }
336
337
338
339
340
345
346
347
348
349
352
353
354
355
356
357
361}
362
363
364
365
366
367
368
371{
373
374
376
377
378
379
382 hashstate->ps.state = estate;
384
386
387
388
389
390
391
393
394
395
396
398
399
400
401
402
405
407
408
409
410
411
412
413
414
416
417 return hashstate;
418}
419
420
421
422
423
424
425
426void
428{
430
431
432
433
436}
437
438
439
440
441
442
443
444
447{
450 Plan *outerNode;
451 size_t space_allowed;
452 int nbuckets;
453 int nbatch;
454 double rows;
455 int num_skew_mcvs;
456 int log2_nbuckets;
458
459
460
461
462
463
466
467
468
469
470
471
473
476 state->parallel_state != NULL,
477 state->parallel_state != NULL ?
478 state->parallel_state->nparticipants - 1 : 0,
479 &space_allowed,
480 &nbuckets, &nbatch, &num_skew_mcvs);
481
482
483 log2_nbuckets = my_log2(nbuckets);
484 Assert(nbuckets == (1 << log2_nbuckets));
485
486
487
488
489
490
491
492
494 hashtable->nbuckets = nbuckets;
505 hashtable->nbatch = nbatch;
521 hashtable->chunks = NULL;
524 hashtable->area = state->ps.state->es_query_dsa;
525 hashtable->batches = NULL;
526
527#ifdef HJDEBUG
528 printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
529 hashtable, nbatch, nbuckets);
530#endif
531
532
533
534
535
537 "HashTableContext",
539
541 "HashBatchContext",
543
545 "HashSpillContext",
547
548
549
551
553 {
555
556
557
558
559
561
564
566
567
568
570 }
571
573
575 {
578
579
580
581
582
583
584
585
586
589
590
591
592
593
594
595
596
599 {
600 pstate->nbatch = nbatch;
603
604
606
607
608
609
610
613 }
614
615
616
617
618
619
620
621 }
622 else
623 {
624
625
626
627
629
631
632
633
634
635
636
637 if (nbatch > 1)
639
641 }
642
643 return hashtable;
644}
645
646
647
648
649
650
651
652
653
654
655#define NTUP_PER_BUCKET 1
656
657void
659 bool try_combined_hash_mem,
660 int parallel_workers,
661 size_t *space_allowed,
662 int *numbuckets,
663 int *numbatches,
664 int *num_skew_mcvs)
665{
666 int tupsize;
667 double inner_rel_bytes;
668 size_t hash_table_bytes;
669 size_t bucket_bytes;
670 size_t max_pointers;
671 int nbatch = 1;
672 int nbuckets;
673 double dbuckets;
674
675
676 if (ntuples <= 0.0)
677 ntuples = 1000.0;
678
679
680
681
682
683
687 inner_rel_bytes = ntuples * tupsize;
688
689
690
691
693
694
695
696
697
698
699 if (try_combined_hash_mem)
700 {
701
702 double newlimit;
703
704 newlimit = (double) hash_table_bytes * (double) (parallel_workers + 1);
705 newlimit = Min(newlimit, (double) SIZE_MAX);
706 hash_table_bytes = (size_t) newlimit;
707 }
708
709 *space_allowed = hash_table_bytes;
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725 if (useskew)
726 {
727 size_t bytes_per_mcv;
728 size_t skew_mcvs;
729
730
731
732
733
734
735
736
737
738
739
740 bytes_per_mcv = tupsize +
742 sizeof(int) +
744 skew_mcvs = hash_table_bytes / bytes_per_mcv;
745
746
747
748
749
751
752
753 skew_mcvs = Min(skew_mcvs, INT_MAX);
754
755 *num_skew_mcvs = (int) skew_mcvs;
756
757
758 if (skew_mcvs > 0)
759 hash_table_bytes -= skew_mcvs * bytes_per_mcv;
760 }
761 else
762 *num_skew_mcvs = 0;
763
764
765
766
767
768
769
770
771
772
773 max_pointers = hash_table_bytes / sizeof(HashJoinTuple);
775
777
778
779
780 max_pointers = Min(max_pointers, INT_MAX / 2 + 1);
781
783 dbuckets = Min(dbuckets, max_pointers);
784 nbuckets = (int) dbuckets;
785
786 nbuckets = Max(nbuckets, 1024);
787
789
790
791
792
793
795 if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
796 {
797
798 size_t sbuckets;
799 double dbatch;
800 int minbatch;
801 size_t bucket_size;
802
803
804
805
806
807 if (try_combined_hash_mem)
808 {
810 false, parallel_workers,
811 space_allowed,
812 numbuckets,
813 numbatches,
814 num_skew_mcvs);
815 return;
816 }
817
818
819
820
821
822
823
825 if (hash_table_bytes <= bucket_size)
826 sbuckets = 1;
827 else
829 sbuckets = Min(sbuckets, max_pointers);
830 nbuckets = (int) sbuckets;
833
834
835
836
837
838
839
840
841
842 Assert(bucket_bytes <= hash_table_bytes / 2);
843
844
845 dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
846 dbatch = Min(dbatch, max_pointers);
847 minbatch = (int) dbatch;
849 }
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913 while (nbatch > 0)
914 {
915
916 size_t current_space = hash_table_bytes + (2 * nbatch * BLCKSZ);
917
918
919 size_t new_space = hash_table_bytes * 2 + (nbatch * BLCKSZ);
920
921
922 if (current_space < new_space)
923 break;
924
925
926
927
928
929 nbatch /= 2;
930 nbuckets *= 2;
931
932 *space_allowed = (*space_allowed) * 2;
933 }
934
935 Assert(nbuckets > 0);
937
938 *numbuckets = nbuckets;
939 *numbatches = nbatch;
940}
941
942
943
944
945
946
947
948
949void
951{
952 int i;
953
954
955
956
957
958
960 {
961 for (i = 1; i < hashtable->nbatch; i++)
962 {
967 }
968 }
969
970
972
973
974 pfree(hashtable);
975}
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991static bool
993{
994
995
996
997
998 size_t batchSpace = (hashtable->nbatch * 2 * BLCKSZ);
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1010 {
1012 return true;
1013 }
1014
1015 return false;
1016}
1017
1018
1019
1020
1021
1022
1023static void
1025{
1026 int oldnbatch = hashtable->nbatch;
1027 int curbatch = hashtable->curbatch;
1028 int nbatch;
1029 long ninmemory;
1030 long nfreed;
1032
1033
1035 return;
1036
1037
1038 if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2)))
1039 return;
1040
1041
1043 return;
1044
1045 nbatch = oldnbatch * 2;
1047
1048#ifdef HJDEBUG
1049 printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n",
1050 hashtable, nbatch, hashtable->spaceUsed);
1051#endif
1052
1054 {
1056
1057
1060
1062
1063
1065 }
1066 else
1067 {
1068
1071 }
1072
1073 hashtable->nbatch = nbatch;
1074
1075
1076
1077
1078
1079 ninmemory = nfreed = 0;
1080
1081
1083 {
1084
1086
1089
1093 }
1094
1095
1096
1097
1098
1099
1102 oldchunks = hashtable->chunks;
1103 hashtable->chunks = NULL;
1104
1105
1106 while (oldchunks != NULL)
1107 {
1109
1110
1111 size_t idx = 0;
1112
1113
1114 while (idx < oldchunks->used)
1115 {
1119 int bucketno;
1120 int batchno;
1121
1122 ninmemory++;
1124 &bucketno, &batchno);
1125
1126 if (batchno == curbatch)
1127 {
1128
1130
1132 memcpy(copyTuple, hashTuple, hashTupleSize);
1133
1134
1137 }
1138 else
1139 {
1140
1141 Assert(batchno > curbatch);
1145 hashtable);
1146
1147 hashtable->spaceUsed -= hashTupleSize;
1148 nfreed++;
1149 }
1150
1151
1153
1154
1156 }
1157
1158
1159 pfree(oldchunks);
1160 oldchunks = nextchunk;
1161 }
1162
1163#ifdef HJDEBUG
1164 printf("Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
1165 hashtable, nfreed, ninmemory, hashtable->spaceUsed);
1166#endif
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176 if (nfreed == 0 || nfreed == ninmemory)
1177 {
1179#ifdef HJDEBUG
1180 printf("Hashjoin %p: disabling further increase of nbatch\n",
1181 hashtable);
1182#endif
1183 }
1184}
1185
1186
1187
1188
1189
1190
1191static void
1193{
1195
1197
1198
1199
1200
1201
1202
1204 {
1206
1207
1208
1209
1210
1211
1212
1214 WAIT_EVENT_HASH_GROW_BATCHES_ELECT))
1215 {
1218 int new_nbatch;
1219 int i;
1220
1221
1226
1227
1229
1230
1231 if (hashtable->nbatch == 1)
1232 {
1233
1234
1235
1236
1237
1239
1240
1241
1242
1243
1244
1245
1246
1248 }
1249 else
1250 {
1251
1252
1253
1254
1255 new_nbatch = hashtable->nbatch * 2;
1256 }
1257
1258
1262
1263
1265 {
1266 double dtuples;
1267 double dbuckets;
1268 int new_nbuckets;
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281 dtuples = (old_batch0->ntuples * 2.0) / new_nbatch;
1282
1283
1284
1285
1286
1287
1288
1292 dbuckets = Min(dbuckets, max_buckets);
1293 new_nbuckets = (int) dbuckets;
1294 new_nbuckets = Max(new_nbuckets, 1024);
1303 for (i = 0; i < new_nbuckets; ++i)
1305 pstate->nbuckets = new_nbuckets;
1306 }
1307 else
1308 {
1309
1313 for (i = 0; i < hashtable->nbuckets; ++i)
1315 }
1316
1317
1319
1320
1322 }
1323 else
1324 {
1325
1327 }
1328
1329
1331
1333 WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE);
1334
1335
1337
1340
1344
1346 WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION);
1347
1348
1350
1351
1352
1353
1354
1355
1357 WAIT_EVENT_HASH_GROW_BATCHES_DECIDE))
1358 {
1360 bool space_exhausted = false;
1361 bool extreme_skew_detected = false;
1362
1363
1366
1368
1369
1370 for (int i = 0; i < hashtable->nbatch; ++i)
1371 {
1374 int parent;
1375
1379 space_exhausted = true;
1380
1385 {
1386
1387
1388
1389
1390
1391
1393 extreme_skew_detected = true;
1394 }
1395 }
1396
1397
1398 if (extreme_skew_detected || hashtable->nbatch >= INT_MAX / 2)
1400 else if (space_exhausted)
1402 else
1404
1405
1408 }
1409
1410
1412
1414 WAIT_EVENT_HASH_GROW_BATCHES_FINISH);
1415 }
1416}
1417
1418
1419
1420
1421
1422
1423static void
1425{
1428
1430
1432 {
1433 size_t idx = 0;
1434
1435
1436 while (idx < chunk->used)
1437 {
1442 int bucketno;
1443 int batchno;
1444
1446 &bucketno, &batchno);
1447
1448 Assert(batchno < hashtable->nbatch);
1449 if (batchno == 0)
1450 {
1451
1452 copyTuple =
1455 &shared);
1459 copyTuple, shared);
1460 }
1461 else
1462 {
1463 size_t tuple_size =
1465
1466
1470 }
1471
1472
1475
1478 }
1479
1480
1482
1484 }
1485}
1486
1487
1488
1489
1490static void
1492{
1494 int old_nbatch = pstate->old_nbatch;
1497 int i;
1498
1499
1503 for (i = 1; i < old_nbatch; ++i)
1504 {
1507
1511 }
1512
1513
1514 for (i = 1; i < old_nbatch; ++i)
1515 {
1518
1519
1522 {
1524 int bucketno;
1525 int batchno;
1526
1527
1529 &batchno);
1530
1534
1535
1537 &hashvalue, tuple);
1538
1540 }
1542 }
1543
1544 pfree(old_inner_tuples);
1545}
1546
1547
1548
1549
1550static void
1552{
1554 int i;
1555
1558 for (i = 0; i < hashtable->nbatch; ++i)
1559 {
1561
1566 batch->size = 0;
1571 }
1573}
1574
1575
1576
1577
1578
1579
1580static void
1582{
1584
1585
1587 return;
1588
1589#ifdef HJDEBUG
1590 printf("Hashjoin %p: increasing nbuckets %d => %d\n",
1592#endif
1593
1596
1600
1601
1602
1603
1604
1605
1606
1610
1613
1614
1615 for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared)
1616 {
1617
1618 size_t idx = 0;
1619
1620 while (idx < chunk->used)
1621 {
1623 int bucketno;
1624 int batchno;
1625
1627 &bucketno, &batchno);
1628
1629
1632
1633
1636 }
1637
1638
1640 }
1641}
1642
1643static void
1645{
1647 int i;
1650
1652
1653
1654
1655
1656
1657
1659 {
1661
1663 WAIT_EVENT_HASH_GROW_BUCKETS_ELECT))
1664 {
1665 size_t size;
1667
1668
1680
1681
1683
1684
1686 }
1687
1688
1690
1692 WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE);
1693
1694
1696
1700 {
1701 size_t idx = 0;
1702
1703 while (idx < chunk->used)
1704 {
1707 int bucketno;
1708 int batchno;
1709
1711 &bucketno, &batchno);
1712 Assert(batchno == 0);
1713
1714
1716 hashTuple, shared);
1717
1718
1721 }
1722
1723
1725 }
1727 WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT);
1728 }
1729}
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742void
1746{
1747 bool shouldFree;
1749 int bucketno;
1750 int batchno;
1751
1753 &bucketno, &batchno);
1754
1755
1756
1757
1758 if (batchno == hashtable->curbatch)
1759 {
1760
1761
1762
1764 int hashTupleSize;
1766
1767
1770
1771 hashTuple->hashvalue = hashvalue;
1773
1774
1775
1776
1777
1778
1779
1781
1782
1785
1786
1787
1788
1789
1790
1791 if (hashtable->nbatch == 1 &&
1793 {
1794
1797 {
1800 }
1801 }
1802
1803
1804 hashtable->spaceUsed += hashTupleSize;
1811 }
1812 else
1813 {
1814
1815
1816
1819 hashvalue,
1821 hashtable);
1822 }
1823
1824 if (shouldFree)
1826}
1827
1828
1829
1830
1831
1832void
1836{
1837 bool shouldFree;
1840 int bucketno;
1841 int batchno;
1842
1843retry:
1845
1846 if (batchno == 0)
1847 {
1849
1850
1855 &shared);
1856 if (hashTuple == NULL)
1857 goto retry;
1858
1859
1860 hashTuple->hashvalue = hashvalue;
1863
1864
1866 hashTuple, shared);
1867 }
1868 else
1869 {
1871
1872 Assert(batchno > 0);
1873
1874
1876 {
1878 goto retry;
1879 }
1880
1884 tuple);
1885 }
1887
1888 if (shouldFree)
1890}
1891
1892
1893
1894
1895
1896
1897
1898void
1902{
1903 bool shouldFree;
1907 int batchno;
1908 int bucketno;
1909
1914 &shared);
1915 hashTuple->hashvalue = hashvalue;
1919 hashTuple, shared);
1920
1921 if (shouldFree)
1923}
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953void
1956 int *bucketno,
1957 int *batchno)
1958{
1961
1962 if (nbatch > 1)
1963 {
1964 *bucketno = hashvalue & (nbuckets - 1);
1967 }
1968 else
1969 {
1970 *bucketno = hashvalue & (nbuckets - 1);
1971 *batchno = 0;
1972 }
1973}
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985bool
1988{
1993
1994
1995
1996
1997
1998
1999
2000
2001 if (hashTuple != NULL)
2005 else
2007
2008 while (hashTuple != NULL)
2009 {
2010 if (hashTuple->hashvalue == hashvalue)
2011 {
2013
2014
2017 false);
2019
2021 {
2023 return true;
2024 }
2025 }
2026
2028 }
2029
2030
2031
2032
2033 return false;
2034}
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046bool
2049{
2054
2055
2056
2057
2058
2059 if (hashTuple != NULL)
2061 else
2064
2065 while (hashTuple != NULL)
2066 {
2067 if (hashTuple->hashvalue == hashvalue)
2068 {
2070
2071
2074 false);
2076
2078 {
2080 return true;
2081 }
2082 }
2083
2085 }
2086
2087
2088
2089
2090 return false;
2091}
2092
2093
2094
2095
2096
2097void
2099{
2100
2101
2102
2103
2104
2105
2106
2107
2111}
2112
2113
2114
2115
2116
2117
2118bool
2120{
2122 int curbatch = hashtable->curbatch;
2124
2126
2127
2128
2129
2130
2131
2132
2133
2134
2136 {
2137
2139
2140
2143
2144
2145
2146
2147
2152 return false;
2153 }
2154
2155
2157
2158
2159
2160
2161
2163 {
2166 return false;
2167 }
2168
2169
2171
2172 return true;
2173}
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183bool
2185{
2188
2189 for (;;)
2190 {
2191
2192
2193
2194
2195
2196 if (hashTuple != NULL)
2199 {
2202 }
2204 {
2206
2209 }
2210 else
2211 break;
2212
2213 while (hashTuple != NULL)
2214 {
2216 {
2218
2219
2222 false);
2224
2225
2226
2227
2228
2229
2231
2233 return true;
2234 }
2235
2237 }
2238
2239
2241 }
2242
2243
2244
2245
2246 return false;
2247}
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257bool
2260{
2263
2264 for (;;)
2265 {
2266
2267
2268
2269
2270
2271 if (hashTuple != NULL)
2276 else
2277 break;
2278
2279 while (hashTuple != NULL)
2280 {
2282 {
2284
2285
2288 false);
2290
2291
2292
2293
2294
2295
2297
2299 return true;
2300 }
2301
2303 }
2304
2305
2307 }
2308
2309
2310
2311
2312 return false;
2313}
2314
2315
2316
2317
2318
2319
2320void
2322{
2324 int nbuckets = hashtable->nbuckets;
2325
2326
2327
2328
2329
2332
2333
2335
2337
2339
2340
2341 hashtable->chunks = NULL;
2342}
2343
2344
2345
2346
2347
2348void
2350{
2352 int i;
2353
2354
2355 for (i = 0; i < hashtable->nbuckets; i++)
2356 {
2360 }
2361
2362
2364 {
2367
2368 for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared)
2370 }
2371}
2372
2373
2374void
2376{
2378
2379
2380
2381
2382
2383 if (outerPlan->chgParam == NULL)
2385}
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396static void
2398 Hash *node, int mcvsToUse)
2399{
2402
2403
2405 return;
2406
2407 if (mcvsToUse <= 0)
2408 return;
2409
2410
2411
2412
2418 return;
2419
2423 {
2424 double frac;
2425 int nbuckets;
2426 int i;
2427
2428 if (mcvsToUse > sslot.nvalues)
2429 mcvsToUse = sslot.nvalues;
2430
2431
2432
2433
2434
2435
2436 frac = 0;
2437 for (i = 0; i < mcvsToUse; i++)
2440 {
2443 return;
2444 }
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2459
2460 nbuckets <<= 2;
2461
2464
2465
2466
2467
2468
2469
2475 mcvsToUse * sizeof(int));
2476
2478 + mcvsToUse * sizeof(int);
2480 + mcvsToUse * sizeof(int);
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494 for (i = 0; i < mcvsToUse; i++)
2495 {
2497 int bucket;
2498
2502
2503
2504
2505
2506
2507
2508
2509 bucket = hashvalue & (nbuckets - 1);
2510 while (hashtable->skewBucket[bucket] != NULL &&
2512 bucket = (bucket + 1) & (nbuckets - 1);
2513
2514
2515
2516
2517
2518 if (hashtable->skewBucket[bucket] != NULL)
2519 continue;
2520
2521
2533 }
2534
2536 }
2537
2539}
2540
2541
2542
2543
2544
2545
2546
2547
2548int
2550{
2551 int bucket;
2552
2553
2554
2555
2556
2559
2560
2561
2562
2563 bucket = hashvalue & (hashtable->skewBucketLen - 1);
2564
2565
2566
2567
2568
2569
2570 while (hashtable->skewBucket[bucket] != NULL &&
2572 bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
2573
2574
2575
2576
2577 if (hashtable->skewBucket[bucket] != NULL)
2578 return bucket;
2579
2580
2581
2582
2584}
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594static void
2598 int bucketNumber)
2599{
2600 bool shouldFree;
2603 int hashTupleSize;
2604
2605
2608 hashTupleSize);
2609 hashTuple->hashvalue = hashvalue;
2612
2613
2617
2618
2619 hashtable->spaceUsed += hashTupleSize;
2625
2626
2629
2630 if (shouldFree)
2632}
2633
2634
2635
2636
2637
2638
2639
2640static void
2642{
2643 int bucketToRemove;
2646 int bucketno;
2647 int batchno;
2649
2650
2652 bucket = hashtable->skewBucket[bucketToRemove];
2653
2654
2655
2656
2657
2658
2659
2662
2663
2664 hashTuple = bucket->tuples;
2665 while (hashTuple != NULL)
2666 {
2669 Size tupleSize;
2670
2671
2672
2673
2674
2675
2678
2679
2680 if (batchno == hashtable->curbatch)
2681 {
2682
2684
2685
2686
2687
2688
2690 memcpy(copyTuple, hashTuple, tupleSize);
2691 pfree(hashTuple);
2692
2695
2696
2698 }
2699 else
2700 {
2701
2705 hashtable);
2706 pfree(hashTuple);
2707 hashtable->spaceUsed -= tupleSize;
2709 }
2710
2711 hashTuple = nextHashTuple;
2712
2713
2715 }
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729 hashtable->skewBucket[bucketToRemove] = NULL;
2734
2735
2736
2737
2738
2740 {
2748 }
2749}
2750
2751
2752
2753
2754void
2756{
2757 size_t size;
2758
2759
2761 return;
2762
2767}
2768
2769
2770
2771
2772
2773void
2775{
2776 size_t size;
2777
2778
2780 return;
2781
2785
2786
2788
2792}
2793
2794
2795
2796
2797
2798void
2800{
2802
2803
2805 return;
2806
2807
2808
2809
2810
2811
2815}
2816
2817
2818
2819
2820
2821
2822
2823
2824void
2826{
2827
2830
2833}
2834
2835
2836
2837
2838
2839void
2841{
2843 size_t size;
2844
2845 if (shared_info == NULL)
2846 return;
2847
2848
2852 memcpy(node->shared_info, shared_info, size);
2853}
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870void
2873{
2884}
2885
2886
2887
2888
2889static void *
2891{
2893 char *ptr;
2894
2895
2897
2898
2899
2900
2902 {
2903
2906 newChunk->maxlen = size;
2907 newChunk->used = size;
2909
2910
2911
2912
2913
2914 if (hashtable->chunks != NULL)
2915 {
2918 }
2919 else
2920 {
2922 hashtable->chunks = newChunk;
2923 }
2924
2926 }
2927
2928
2929
2930
2931
2932 if ((hashtable->chunks == NULL) ||
2934 {
2935
2938
2940 newChunk->used = size;
2942
2944 hashtable->chunks = newChunk;
2945
2947 }
2948
2949
2953
2954
2955 return ptr;
2956}
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2972{
2976 Size chunk_size;
2978 int curbatch = hashtable->curbatch;
2979
2981
2982
2983
2984
2985
2987 if (chunk != NULL &&
2989 chunk->maxlen - chunk->used >= size)
2990 {
2991
2996 chunk->used += size;
2997
3000
3001 return result;
3002 }
3003
3004
3006
3007
3008
3009
3012 {
3014
3017
3018
3023
3024
3025 return NULL;
3026 }
3027
3028
3031 else
3033
3034
3036 {
3037 Assert(curbatch == 0);
3039
3040
3041
3042
3043
3044
3048 {
3052
3053 return NULL;
3054 }
3055
3056
3057 if (hashtable->nbatch == 1)
3058 {
3061
3064 hashtable->nbuckets < (INT_MAX / 2) &&
3067 {
3070
3071 return NULL;
3072 }
3073 }
3074 }
3075
3076
3080
3081
3085 chunk->used = size;
3086
3087
3088
3089
3090
3091
3094
3096 {
3097
3098
3099
3100
3103 }
3105
3108
3109 return result;
3110}
3111
3112
3113
3114
3115
3116
3117static void
3119{
3123 int i;
3124
3126
3127
3131 pstate->nbatch = nbatch;
3133
3134
3135
3136
3137
3139
3140
3141 hashtable->nbatch = nbatch;
3144
3145
3146 for (i = 0; i < hashtable->nbatch; ++i)
3147 {
3151
3152
3153
3154
3155
3157 if (i == 0)
3158 {
3159
3164 }
3165
3166
3167 accessor->shared = shared;
3168
3169
3189 }
3190
3192}
3193
3194
3195
3196
3197static void
3199{
3200 int i;
3201
3202 for (i = 0; i < hashtable->nbatch; ++i)
3203 {
3204
3209 }
3211 hashtable->batches = NULL;
3212}
3213
3214
3215
3216
3217
3218static void
3220{
3224 int i;
3225
3226 if (hashtable->batches != NULL)
3227 {
3229 return;
3231 }
3232
3233
3234
3235
3236
3237
3239
3240
3241
3242
3243
3245
3246
3250
3251
3254
3255
3256 for (i = 0; i < hashtable->nbatch; ++i)
3257 {
3260
3261 accessor->shared = shared;
3263 accessor->done = false;
3274 }
3275
3277}
3278
3279
3280
3281
3282void
3284{
3288 int i;
3289
3294 for (i = 0; i < nbuckets; ++i)
3296}
3297
3298
3299
3300
3301
3302void
3304{
3307 {
3308 int curbatch = hashtable->curbatch;
3310 bool attached = true;
3311
3312
3315
3316
3319
3320
3321
3322
3323
3324
3325
3326
3327
3328
3329
3330
3333 {
3334
3335
3336
3337
3338
3340 }
3341
3342
3343
3344
3345
3346
3350 {
3351
3352
3353
3354
3355
3356
3357
3359
3360
3362 {
3366
3369 }
3371 {
3374 }
3375 }
3376
3377
3378
3379
3380
3381
3385
3386
3388 }
3389}
3390
3391
3392
3393
3394void
3396{
3398
3399
3400
3401
3402
3405
3407 {
3408 int i;
3409
3410
3412 {
3413 for (i = 0; i < hashtable->nbatch; ++i)
3414 {
3419 }
3420 }
3421
3422
3424 {
3425
3426
3427
3428
3430
3432 {
3435 }
3436 }
3437 }
3439}
3440
3441
3442
3443
3446{
3449
3453
3454 return tuple;
3455}
3456
3457
3458
3459
3462{
3464
3467
3468 return next;
3469}
3470
3471
3472
3473
3474static inline void
3478{
3479 for (;;)
3480 {
3484 tuple_shared))
3485 break;
3486 }
3487}
3488
3489
3490
3491
3492void
3494{
3496
3497 hashtable->curbatch = batchno;
3506}
3507
3508
3509
3510
3511
3512
3515{
3518
3521 {
3526 }
3527 else
3528 chunk = NULL;
3530
3531 return chunk;
3532}
3533
3534
3535
3536
3537
3538
3539
3540
3541
3542
3543
3544
3545
3546
3547
3548
3549
3550
3551
3552
3553
3554static bool
3556{
3560
3561 Assert(batchno > 0);
3562 Assert(batchno < hashtable->nbatch);
3564
3566
3567
3570 {
3572
3578
3579 return false;
3580 }
3581
3586 {
3587
3588
3589
3590
3594
3595 return false;
3596 }
3597
3602
3603 return true;
3604}
3605
3606
3607
3608
3609
3610
3611
3612
3613
3614
3615size_t
3617{
3618 double mem_limit;
3619
3620
3622
3623
3624 mem_limit = Min(mem_limit, (double) SIZE_MAX);
3625
3626 return (size_t) mem_limit;
3627}
Datum idx(PG_FUNCTION_ARGS)
void PrepareTempTablespaces(void)
bool BarrierArriveAndDetachExceptLast(Barrier *barrier)
bool BarrierArriveAndDetach(Barrier *barrier)
int BarrierAttach(Barrier *barrier)
void BarrierInit(Barrier *barrier, int participants)
int BarrierPhase(Barrier *barrier)
bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
bool BarrierDetach(Barrier *barrier)
void BufFileClose(BufFile *file)
#define OidIsValid(objectId)
void * dsa_get_address(dsa_area *area, dsa_pointer dp)
void dsa_free(dsa_area *area, dsa_pointer dp)
#define dsa_allocate0(area, size)
#define dsa_pointer_atomic_init
#define dsa_allocate(area, size)
#define dsa_pointer_atomic_write
#define InvalidDsaPointer
#define dsa_pointer_atomic_compare_exchange
#define dsa_pointer_atomic_read
pg_atomic_uint64 dsa_pointer_atomic
#define DsaPointerIsValid(x)
void ExecReScan(PlanState *node)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
void ExecAssignExprContext(EState *estate, PlanState *planstate)
#define outerPlanState(node)
struct HashJoinTupleData * HashJoinTuple
struct HashInstrumentation HashInstrumentation
#define EXEC_FLAG_BACKWARD
#define ResetExprContext(econtext)
static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)
static TupleTableSlot * ExecProcNode(PlanState *node)
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
#define palloc_object(type)
#define repalloc_array(pointer, type, count)
#define palloc0_array(type, count)
#define palloc0_object(type)
Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)
double hash_mem_multiplier
Assert(PointerIsAligned(start, uint64))
#define PHJ_GROW_BATCHES_REPARTITION
struct HashMemoryChunkData * HashMemoryChunk
#define HASH_CHUNK_DATA(hc)
#define PHJ_GROW_BUCKETS_REINSERT
#define SKEW_MIN_OUTER_FRACTION
#define PHJ_GROW_BUCKETS_ELECT
#define PHJ_GROW_BUCKETS_PHASE(n)
#define PHJ_GROW_BATCHES_ELECT
#define ParallelHashJoinBatchInner(batch)
#define PHJ_BUILD_HASH_INNER
#define NthParallelHashJoinBatch(base, n)
#define HASH_CHUNK_THRESHOLD
#define PHJ_BUILD_HASH_OUTER
#define HJTUPLE_MINTUPLE(hjtup)
#define SKEW_BUCKET_OVERHEAD
#define PHJ_GROW_BATCHES_DECIDE
#define PHJ_GROW_BATCHES_REALLOCATE
#define HASH_CHUNK_HEADER_SIZE
#define PHJ_GROW_BATCHES_FINISH
#define ParallelHashJoinBatchOuter(batch, nparticipants)
#define SKEW_HASH_MEM_PERCENT
#define PHJ_BUILD_ALLOCATE
#define PHJ_GROW_BUCKETS_REALLOCATE
#define PHJ_GROW_BATCHES_PHASE(n)
@ PHJ_GROWTH_NEED_MORE_BUCKETS
@ PHJ_GROWTH_NEED_MORE_BATCHES
#define INVALID_SKEW_BUCKET_NO
#define EstimateParallelHashJoinBatch(hashtable)
void heap_free_minimal_tuple(MinimalTuple mtup)
#define HeapTupleIsValid(tuple)
#define SizeofMinimalTupleHeader
static void HeapTupleHeaderClearMatch(MinimalTupleData *tup)
static bool HeapTupleHeaderHasMatch(const MinimalTupleData *tup)
void InstrStartNode(Instrumentation *instr)
void InstrStopNode(Instrumentation *instr, double nTuples)
if(TABLE==NULL||TABLE_index==NULL)
void free_attstatsslot(AttStatsSlot *sslot)
bool get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple, int reqkind, Oid reqop, int flags)
#define ATTSTATSSLOT_NUMBERS
#define ATTSTATSSLOT_VALUES
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextReset(MemoryContext context)
void * MemoryContextAllocZero(MemoryContext context, Size size)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
void ExecParallelHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
void ExecHashTableReset(HashJoinTable hashtable)
static void ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable, Hash *node, int mcvsToUse)
static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
static bool ExecHashIncreaseBatchSize(HashJoinTable hashtable)
bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)
static void * dense_alloc(HashJoinTable hashtable, Size size)
static void MultiExecParallelHash(HashState *node)
void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)
static void MultiExecPrivateHash(HashState *node)
void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)
Node * MultiExecHash(HashState *node)
HashState * ExecInitHash(Hash *node, EState *estate, int eflags)
void ExecHashTableDetachBatch(HashJoinTable hashtable)
void ExecHashEstimate(HashState *node, ParallelContext *pcxt)
void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)
void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
void ExecHashTableDetach(HashJoinTable hashtable)
bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
void ExecHashTableDestroy(HashJoinTable hashtable)
HashJoinTable ExecHashTableCreate(HashState *state)
int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
size_t get_hash_memory_limit(void)
bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
static void ExecHashSkewTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int bucketNumber)
static void ExecParallelHashRepartitionRest(HashJoinTable hashtable)
static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
void ExecHashTableResetMatchFlags(HashJoinTable hashtable)
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)
static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
void ExecEndHash(HashState *node)
void ExecShutdownHash(HashState *node)
void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static TupleTableSlot * ExecHash(PlanState *pstate)
void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)
static void ExecParallelHashMergeCounters(HashJoinTable hashtable)
void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)
static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared)
void ExecReScanHash(HashState *node)
bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)
static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
void ExecHashRetrieveInstrumentation(HashState *node)
void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr, HashJoinTable hashtable)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define repalloc0_array(pointer, type, oldcount, count)
static uint32 pg_nextpower2_32(uint32 num)
static uint32 pg_rotate_right32(uint32 word, int n)
#define pg_nextpower2_size_t
static uint32 pg_prevpower2_32(uint32 num)
#define pg_prevpower2_size_t
static uint32 DatumGetUInt32(Datum X)
static Datum Int16GetDatum(int16 X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
TupleTableSlot * ecxt_innertuple
TupleTableSlot * ecxt_outertuple
HashJoinTuple hj_CurTuple
HashJoinTable hj_HashTable
TupleTableSlot * hj_HashTupleSlot
struct HashJoinTupleData ** unshared
ParallelHashJoinBatchAccessor * batches
ParallelHashJoinState * parallel_state
HashMemoryChunk current_chunk
BufFile ** innerBatchFile
int log2_nbuckets_optimal
dsa_pointer_atomic * shared
BufFile ** outerBatchFile
dsa_pointer current_chunk_shared
union HashJoinTableData::@108 buckets
HashSkewBucket ** skewBucket
union HashJoinTupleData::@106 next
struct HashJoinTupleData * unshared
union HashMemoryChunkData::@107 next
struct HashMemoryChunkData * unshared
struct ParallelHashJoinState * parallel_state
SharedHashInfo * shared_info
FmgrInfo * skew_hashfunction
HashInstrumentation * hinstrument
shm_toc_estimator estimator
SharedTuplestoreAccessor * outer_tuples
ParallelHashJoinBatch * shared
SharedTuplestoreAccessor * inner_tuples
Barrier grow_batches_barrier
dsa_pointer chunk_work_queue
Barrier grow_buckets_barrier
ParallelHashGrowth growth
Instrumentation * instrument
ExprContext * ps_ExprContext
ProjectionInfo * ps_ProjInfo
ExecProcNodeMtd ExecProcNode
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache3(int cacheId, Datum key1, Datum key2, Datum key3)