PostgreSQL Source Code: src/backend/executor/nodeHash.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

25

26#include <math.h>

27#include <limits.h>

28

44

51 int mcvsToUse);

55 int bucketNumber);

57

60 size_t size,

65 int bucketno);

78 int batchno,

79 size_t size);

82

83

84

85

86

87

88

89

92{

93 elog(ERROR, "Hash node does not support ExecProcNode call convention");

94 return NULL;

95}

96

97

98

99

100

101

102

103

106{

107

110

113 else

115

116

119

120

121

122

123

124

125

126

127 return NULL;

128}

129

130

131

132

133

134

135

136

137static void

139{

144

145

146

147

150

151

152

153

155

156

157

158

159

160 for (;;)

161 {

162 bool isnull;

164

167 break;

168

170

172

174 &isnull);

175

176 if (!isnull)

177 {

179 int bucketNumber;

180

183 {

184

186 bucketNumber);

188 }

189 else

190 {

191

193 }

195 }

196 }

197

198

201

202

206

208}

209

210

211

212

213

214

215

216

217

218static void

220{

228 int i;

229

230

231

232

235

236

237

238

240

241

242

243

244

245

246

247

248

253 {

255

256

257

258

259

260

262

263

265

266

267

268

269

270

271

272

273

274

283 for (;;)

284 {

285 bool isnull;

286

289 break;

291

293

295 econtext,

296 &isnull));

297

298 if (!isnull)

301 }

302

303

304

305

306

307 for (i = 0; i < hashtable->nbatch; ++i)

309

310

311

312

313

315

318

319

320

321

322

324 WAIT_EVENT_HASH_BUILD_HASH_INNER))

325 {

326

327

328

329

330

331

332

334 }

335 }

336

337

338

339

340

345

346

347

348

349

352

353

354

355

356

357

361}

362

363

364

365

366

367

368

371{

373

374

376

377

378

379

382 hashstate->ps.state = estate;

384

386

387

388

389

390

391

393

394

395

396

398

399

400

401

402

405

407

408

409

410

411

412

413

414

416

417 return hashstate;

418}

419

420

421

422

423

424

425

426void

428{

430

431

432

433

436}

437

438

439

440

441

442

443

444

447{

450 Plan *outerNode;

451 size_t space_allowed;

452 int nbuckets;

453 int nbatch;

454 double rows;

455 int num_skew_mcvs;

456 int log2_nbuckets;

458

459

460

461

462

463

466

467

468

469

470

471

473

476 state->parallel_state != NULL,

477 state->parallel_state != NULL ?

478 state->parallel_state->nparticipants - 1 : 0,

479 &space_allowed,

480 &nbuckets, &nbatch, &num_skew_mcvs);

481

482

483 log2_nbuckets = my_log2(nbuckets);

484 Assert(nbuckets == (1 << log2_nbuckets));

485

486

487

488

489

490

491

492

494 hashtable->nbuckets = nbuckets;

505 hashtable->nbatch = nbatch;

521 hashtable->chunks = NULL;

524 hashtable->area = state->ps.state->es_query_dsa;

525 hashtable->batches = NULL;

526

527#ifdef HJDEBUG

528 printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",

529 hashtable, nbatch, nbuckets);

530#endif

531

532

533

534

535

537 "HashTableContext",

539

541 "HashBatchContext",

543

545 "HashSpillContext",

547

548

549

551

553 {

555

556

557

558

559

561

564

566

567

568

570 }

571

573

575 {

578

579

580

581

582

583

584

585

586

589

590

591

592

593

594

595

596

599 {

600 pstate->nbatch = nbatch;

603

604

606

607

608

609

610

613 }

614

615

616

617

618

619

620

621 }

622 else

623 {

624

625

626

627

629

631

632

633

634

635

636

637 if (nbatch > 1)

639

641 }

642

643 return hashtable;

644}

645

646

647

648

649

650

651

652

653

654

655#define NTUP_PER_BUCKET 1

656

657void

659 bool try_combined_hash_mem,

660 int parallel_workers,

661 size_t *space_allowed,

662 int *numbuckets,

663 int *numbatches,

664 int *num_skew_mcvs)

665{

666 int tupsize;

667 double inner_rel_bytes;

668 size_t hash_table_bytes;

669 size_t bucket_bytes;

670 size_t max_pointers;

671 int nbatch = 1;

672 int nbuckets;

673 double dbuckets;

674

675

676 if (ntuples <= 0.0)

677 ntuples = 1000.0;

678

679

680

681

682

683

687 inner_rel_bytes = ntuples * tupsize;

688

689

690

691

693

694

695

696

697

698

699 if (try_combined_hash_mem)

700 {

701

702 double newlimit;

703

704 newlimit = (double) hash_table_bytes * (double) (parallel_workers + 1);

705 newlimit = Min(newlimit, (double) SIZE_MAX);

706 hash_table_bytes = (size_t) newlimit;

707 }

708

709 *space_allowed = hash_table_bytes;

710

711

712

713

714

715

716

717

718

719

720

721

722

723

724

725 if (useskew)

726 {

727 size_t bytes_per_mcv;

728 size_t skew_mcvs;

729

730

731

732

733

734

735

736

737

738

739

740 bytes_per_mcv = tupsize +

742 sizeof(int) +

744 skew_mcvs = hash_table_bytes / bytes_per_mcv;

745

746

747

748

749

751

752

753 skew_mcvs = Min(skew_mcvs, INT_MAX);

754

755 *num_skew_mcvs = (int) skew_mcvs;

756

757

758 if (skew_mcvs > 0)

759 hash_table_bytes -= skew_mcvs * bytes_per_mcv;

760 }

761 else

762 *num_skew_mcvs = 0;

763

764

765

766

767

768

769

770

771

772

773 max_pointers = hash_table_bytes / sizeof(HashJoinTuple);

775

777

778

779

780 max_pointers = Min(max_pointers, INT_MAX / 2 + 1);

781

783 dbuckets = Min(dbuckets, max_pointers);

784 nbuckets = (int) dbuckets;

785

786 nbuckets = Max(nbuckets, 1024);

787

789

790

791

792

793

795 if (inner_rel_bytes + bucket_bytes > hash_table_bytes)

796 {

797

798 size_t sbuckets;

799 double dbatch;

800 int minbatch;

801 size_t bucket_size;

802

803

804

805

806

807 if (try_combined_hash_mem)

808 {

810 false, parallel_workers,

811 space_allowed,

812 numbuckets,

813 numbatches,

814 num_skew_mcvs);

815 return;

816 }

817

818

819

820

821

822

823

825 if (hash_table_bytes <= bucket_size)

826 sbuckets = 1;

827 else

829 sbuckets = Min(sbuckets, max_pointers);

830 nbuckets = (int) sbuckets;

833

834

835

836

837

838

839

840

841

842 Assert(bucket_bytes <= hash_table_bytes / 2);

843

844

845 dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));

846 dbatch = Min(dbatch, max_pointers);

847 minbatch = (int) dbatch;

849 }

850

851

852

853

854

855

856

857

858

859

860

861

862

863

864

865

866

867

868

869

870

871

872

873

874

875

876

877

878

879

880

881

882

883

884

885

886

887

888

889

890

891

892

893

894

895

896

897

898

899

900

901

902

903

904

905

906

907

908

909

910

911

912

913 while (nbatch > 0)

914 {

915

916 size_t current_space = hash_table_bytes + (2 * nbatch * BLCKSZ);

917

918

919 size_t new_space = hash_table_bytes * 2 + (nbatch * BLCKSZ);

920

921

922 if (current_space < new_space)

923 break;

924

925

926

927

928

929 nbatch /= 2;

930 nbuckets *= 2;

931

932 *space_allowed = (*space_allowed) * 2;

933 }

934

935 Assert(nbuckets > 0);

937

938 *numbuckets = nbuckets;

939 *numbatches = nbatch;

940}

941

942

943

944

945

946

947

948

949void

951{

952 int i;

953

954

955

956

957

958

960 {

961 for (i = 1; i < hashtable->nbatch; i++)

962 {

967 }

968 }

969

970

972

973

974 pfree(hashtable);

975}

976

977

978

979

980

981

982

983

984

985

986

987

988

989

990

991static bool

993{

994

995

996

997

998 size_t batchSpace = (hashtable->nbatch * 2 * BLCKSZ);

999

1000

1001

1002

1003

1004

1005

1006

1007

1008

1010 {

1012 return true;

1013 }

1014

1015 return false;

1016}

1017

1018

1019

1020

1021

1022

1023static void

1025{

1026 int oldnbatch = hashtable->nbatch;

1027 int curbatch = hashtable->curbatch;

1028 int nbatch;

1029 long ninmemory;

1030 long nfreed;

1032

1033

1035 return;

1036

1037

1038 if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2)))

1039 return;

1040

1041

1043 return;

1044

1045 nbatch = oldnbatch * 2;

1047

1048#ifdef HJDEBUG

1049 printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n",

1050 hashtable, nbatch, hashtable->spaceUsed);

1051#endif

1052

1054 {

1056

1057

1060

1062

1063

1065 }

1066 else

1067 {

1068

1071 }

1072

1073 hashtable->nbatch = nbatch;

1074

1075

1076

1077

1078

1079 ninmemory = nfreed = 0;

1080

1081

1083 {

1084

1086

1089

1093 }

1094

1095

1096

1097

1098

1099

1102 oldchunks = hashtable->chunks;

1103 hashtable->chunks = NULL;

1104

1105

1106 while (oldchunks != NULL)

1107 {

1109

1110

1111 size_t idx = 0;

1112

1113

1114 while (idx < oldchunks->used)

1115 {

1119 int bucketno;

1120 int batchno;

1121

1122 ninmemory++;

1124 &bucketno, &batchno);

1125

1126 if (batchno == curbatch)

1127 {

1128

1130

1132 memcpy(copyTuple, hashTuple, hashTupleSize);

1133

1134

1137 }

1138 else

1139 {

1140

1141 Assert(batchno > curbatch);

1145 hashtable);

1146

1147 hashtable->spaceUsed -= hashTupleSize;

1148 nfreed++;

1149 }

1150

1151

1153

1154

1156 }

1157

1158

1159 pfree(oldchunks);

1160 oldchunks = nextchunk;

1161 }

1162

1163#ifdef HJDEBUG

1164 printf("Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",

1165 hashtable, nfreed, ninmemory, hashtable->spaceUsed);

1166#endif

1167

1168

1169

1170

1171

1172

1173

1174

1175

1176 if (nfreed == 0 || nfreed == ninmemory)

1177 {

1179#ifdef HJDEBUG

1180 printf("Hashjoin %p: disabling further increase of nbatch\n",

1181 hashtable);

1182#endif

1183 }

1184}

1185

1186

1187

1188

1189

1190

1191static void

1193{

1195

1197

1198

1199

1200

1201

1202

1204 {

1206

1207

1208

1209

1210

1211

1212

1214 WAIT_EVENT_HASH_GROW_BATCHES_ELECT))

1215 {

1218 int new_nbatch;

1219 int i;

1220

1221

1226

1227

1229

1230

1231 if (hashtable->nbatch == 1)

1232 {

1233

1234

1235

1236

1237

1239

1240

1241

1242

1243

1244

1245

1246

1248 }

1249 else

1250 {

1251

1252

1253

1254

1255 new_nbatch = hashtable->nbatch * 2;

1256 }

1257

1258

1262

1263

1265 {

1266 double dtuples;

1267 double dbuckets;

1268 int new_nbuckets;

1270

1271

1272

1273

1274

1275

1276

1277

1278

1279

1280

1281 dtuples = (old_batch0->ntuples * 2.0) / new_nbatch;

1282

1283

1284

1285

1286

1287

1288

1292 dbuckets = Min(dbuckets, max_buckets);

1293 new_nbuckets = (int) dbuckets;

1294 new_nbuckets = Max(new_nbuckets, 1024);

1303 for (i = 0; i < new_nbuckets; ++i)

1305 pstate->nbuckets = new_nbuckets;

1306 }

1307 else

1308 {

1309

1313 for (i = 0; i < hashtable->nbuckets; ++i)

1315 }

1316

1317

1319

1320

1322 }

1323 else

1324 {

1325

1327 }

1328

1329

1331

1333 WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE);

1334

1335

1337

1340

1344

1346 WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION);

1347

1348

1350

1351

1352

1353

1354

1355

1357 WAIT_EVENT_HASH_GROW_BATCHES_DECIDE))

1358 {

1360 bool space_exhausted = false;

1361 bool extreme_skew_detected = false;

1362

1363

1366

1368

1369

1370 for (int i = 0; i < hashtable->nbatch; ++i)

1371 {

1374 int parent;

1375

1379 space_exhausted = true;

1380

1385 {

1386

1387

1388

1389

1390

1391

1393 extreme_skew_detected = true;

1394 }

1395 }

1396

1397

1398 if (extreme_skew_detected || hashtable->nbatch >= INT_MAX / 2)

1400 else if (space_exhausted)

1402 else

1404

1405

1408 }

1409

1410

1412

1414 WAIT_EVENT_HASH_GROW_BATCHES_FINISH);

1415 }

1416}

1417

1418

1419

1420

1421

1422

1423static void

1425{

1428

1430

1432 {

1433 size_t idx = 0;

1434

1435

1436 while (idx < chunk->used)

1437 {

1442 int bucketno;

1443 int batchno;

1444

1446 &bucketno, &batchno);

1447

1448 Assert(batchno < hashtable->nbatch);

1449 if (batchno == 0)

1450 {

1451

1452 copyTuple =

1455 &shared);

1459 copyTuple, shared);

1460 }

1461 else

1462 {

1463 size_t tuple_size =

1465

1466

1470 }

1471

1472

1475

1478 }

1479

1480

1482

1484 }

1485}

1486

1487

1488

1489

1490static void

1492{

1494 int old_nbatch = pstate->old_nbatch;

1497 int i;

1498

1499

1503 for (i = 1; i < old_nbatch; ++i)

1504 {

1507

1511 }

1512

1513

1514 for (i = 1; i < old_nbatch; ++i)

1515 {

1518

1519

1522 {

1524 int bucketno;

1525 int batchno;

1526

1527

1529 &batchno);

1530

1534

1535

1537 &hashvalue, tuple);

1538

1540 }

1542 }

1543

1544 pfree(old_inner_tuples);

1545}

1546

1547

1548

1549

1550static void

1552{

1554 int i;

1555

1558 for (i = 0; i < hashtable->nbatch; ++i)

1559 {

1561

1566 batch->size = 0;

1571 }

1573}

1574

1575

1576

1577

1578

1579

1580static void

1582{

1584

1585

1587 return;

1588

1589#ifdef HJDEBUG

1590 printf("Hashjoin %p: increasing nbuckets %d => %d\n",

1592#endif

1593

1596

1600

1601

1602

1603

1604

1605

1606

1610

1613

1614

1615 for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared)

1616 {

1617

1618 size_t idx = 0;

1619

1620 while (idx < chunk->used)

1621 {

1623 int bucketno;

1624 int batchno;

1625

1627 &bucketno, &batchno);

1628

1629

1632

1633

1636 }

1637

1638

1640 }

1641}

1642

1643static void

1645{

1647 int i;

1650

1652

1653

1654

1655

1656

1657

1659 {

1661

1663 WAIT_EVENT_HASH_GROW_BUCKETS_ELECT))

1664 {

1665 size_t size;

1667

1668

1680

1681

1683

1684

1686 }

1687

1688

1690

1692 WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE);

1693

1694

1696

1700 {

1701 size_t idx = 0;

1702

1703 while (idx < chunk->used)

1704 {

1707 int bucketno;

1708 int batchno;

1709

1711 &bucketno, &batchno);

1712 Assert(batchno == 0);

1713

1714

1716 hashTuple, shared);

1717

1718

1721 }

1722

1723

1725 }

1727 WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT);

1728 }

1729}

1730

1731

1732

1733

1734

1735

1736

1737

1738

1739

1740

1741

1742void

1746{

1747 bool shouldFree;

1749 int bucketno;

1750 int batchno;

1751

1753 &bucketno, &batchno);

1754

1755

1756

1757

1758 if (batchno == hashtable->curbatch)

1759 {

1760

1761

1762

1764 int hashTupleSize;

1766

1767

1770

1771 hashTuple->hashvalue = hashvalue;

1773

1774

1775

1776

1777

1778

1779

1781

1782

1785

1786

1787

1788

1789

1790

1791 if (hashtable->nbatch == 1 &&

1793 {

1794

1797 {

1800 }

1801 }

1802

1803

1804 hashtable->spaceUsed += hashTupleSize;

1811 }

1812 else

1813 {

1814

1815

1816

1819 hashvalue,

1821 hashtable);

1822 }

1823

1824 if (shouldFree)

1826}

1827

1828

1829

1830

1831

1832void

1836{

1837 bool shouldFree;

1840 int bucketno;

1841 int batchno;

1842

1843retry:

1845

1846 if (batchno == 0)

1847 {

1849

1850

1855 &shared);

1856 if (hashTuple == NULL)

1857 goto retry;

1858

1859

1860 hashTuple->hashvalue = hashvalue;

1863

1864

1866 hashTuple, shared);

1867 }

1868 else

1869 {

1871

1872 Assert(batchno > 0);

1873

1874

1876 {

1878 goto retry;

1879 }

1880

1884 tuple);

1885 }

1887

1888 if (shouldFree)

1890}

1891

1892

1893

1894

1895

1896

1897

1898void

1902{

1903 bool shouldFree;

1907 int batchno;

1908 int bucketno;

1909

1914 &shared);

1915 hashTuple->hashvalue = hashvalue;

1919 hashTuple, shared);

1920

1921 if (shouldFree)

1923}

1924

1925

1926

1927

1928

1929

1930

1931

1932

1933

1934

1935

1936

1937

1938

1939

1940

1941

1942

1943

1944

1945

1946

1947

1948

1949

1950

1951

1952

1953void

1956 int *bucketno,

1957 int *batchno)

1958{

1961

1962 if (nbatch > 1)

1963 {

1964 *bucketno = hashvalue & (nbuckets - 1);

1967 }

1968 else

1969 {

1970 *bucketno = hashvalue & (nbuckets - 1);

1971 *batchno = 0;

1972 }

1973}

1974

1975

1976

1977

1978

1979

1980

1981

1982

1983

1984

1985bool

1988{

1993

1994

1995

1996

1997

1998

1999

2000

2001 if (hashTuple != NULL)

2005 else

2007

2008 while (hashTuple != NULL)

2009 {

2010 if (hashTuple->hashvalue == hashvalue)

2011 {

2013

2014

2017 false);

2019

2021 {

2023 return true;

2024 }

2025 }

2026

2028 }

2029

2030

2031

2032

2033 return false;

2034}

2035

2036

2037

2038

2039

2040

2041

2042

2043

2044

2045

2046bool

2049{

2054

2055

2056

2057

2058

2059 if (hashTuple != NULL)

2061 else

2064

2065 while (hashTuple != NULL)

2066 {

2067 if (hashTuple->hashvalue == hashvalue)

2068 {

2070

2071

2074 false);

2076

2078 {

2080 return true;

2081 }

2082 }

2083

2085 }

2086

2087

2088

2089

2090 return false;

2091}

2092

2093

2094

2095

2096

2097void

2099{

2100

2101

2102

2103

2104

2105

2106

2107

2111}

2112

2113

2114

2115

2116

2117

2118bool

2120{

2122 int curbatch = hashtable->curbatch;

2124

2126

2127

2128

2129

2130

2131

2132

2133

2134

2136 {

2137

2139

2140

2143

2144

2145

2146

2147

2152 return false;

2153 }

2154

2155

2157

2158

2159

2160

2161

2163 {

2166 return false;

2167 }

2168

2169

2171

2172 return true;

2173}

2174

2175

2176

2177

2178

2179

2180

2181

2182

2183bool

2185{

2188

2189 for (;;)

2190 {

2191

2192

2193

2194

2195

2196 if (hashTuple != NULL)

2199 {

2202 }

2204 {

2206

2209 }

2210 else

2211 break;

2212

2213 while (hashTuple != NULL)

2214 {

2216 {

2218

2219

2222 false);

2224

2225

2226

2227

2228

2229

2231

2233 return true;

2234 }

2235

2237 }

2238

2239

2241 }

2242

2243

2244

2245

2246 return false;

2247}

2248

2249

2250

2251

2252

2253

2254

2255

2256

2257bool

2260{

2263

2264 for (;;)

2265 {

2266

2267

2268

2269

2270

2271 if (hashTuple != NULL)

2276 else

2277 break;

2278

2279 while (hashTuple != NULL)

2280 {

2282 {

2284

2285

2288 false);

2290

2291

2292

2293

2294

2295

2297

2299 return true;

2300 }

2301

2303 }

2304

2305

2307 }

2308

2309

2310

2311

2312 return false;

2313}

2314

2315

2316

2317

2318

2319

2320void

2322{

2324 int nbuckets = hashtable->nbuckets;

2325

2326

2327

2328

2329

2332

2333

2335

2337

2339

2340

2341 hashtable->chunks = NULL;

2342}

2343

2344

2345

2346

2347

2348void

2350{

2352 int i;

2353

2354

2355 for (i = 0; i < hashtable->nbuckets; i++)

2356 {

2360 }

2361

2362

2364 {

2367

2368 for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared)

2370 }

2371}

2372

2373

2374void

2376{

2378

2379

2380

2381

2382

2383 if (outerPlan->chgParam == NULL)

2385}

2386

2387

2388

2389

2390

2391

2392

2393

2394

2395

2396static void

2398 Hash *node, int mcvsToUse)

2399{

2402

2403

2405 return;

2406

2407 if (mcvsToUse <= 0)

2408 return;

2409

2410

2411

2412

2418 return;

2419

2423 {

2424 double frac;

2425 int nbuckets;

2426 int i;

2427

2428 if (mcvsToUse > sslot.nvalues)

2429 mcvsToUse = sslot.nvalues;

2430

2431

2432

2433

2434

2435

2436 frac = 0;

2437 for (i = 0; i < mcvsToUse; i++)

2440 {

2443 return;

2444 }

2445

2446

2447

2448

2449

2450

2451

2452

2453

2454

2455

2456

2457

2459

2460 nbuckets <<= 2;

2461

2464

2465

2466

2467

2468

2469

2475 mcvsToUse * sizeof(int));

2476

2478 + mcvsToUse * sizeof(int);

2480 + mcvsToUse * sizeof(int);

2483

2484

2485

2486

2487

2488

2489

2490

2491

2492

2493

2494 for (i = 0; i < mcvsToUse; i++)

2495 {

2497 int bucket;

2498

2502

2503

2504

2505

2506

2507

2508

2509 bucket = hashvalue & (nbuckets - 1);

2510 while (hashtable->skewBucket[bucket] != NULL &&

2512 bucket = (bucket + 1) & (nbuckets - 1);

2513

2514

2515

2516

2517

2518 if (hashtable->skewBucket[bucket] != NULL)

2519 continue;

2520

2521

2533 }

2534

2536 }

2537

2539}

2540

2541

2542

2543

2544

2545

2546

2547

2548int

2550{

2551 int bucket;

2552

2553

2554

2555

2556

2559

2560

2561

2562

2563 bucket = hashvalue & (hashtable->skewBucketLen - 1);

2564

2565

2566

2567

2568

2569

2570 while (hashtable->skewBucket[bucket] != NULL &&

2572 bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);

2573

2574

2575

2576

2577 if (hashtable->skewBucket[bucket] != NULL)

2578 return bucket;

2579

2580

2581

2582

2584}

2585

2586

2587

2588

2589

2590

2591

2592

2593

2594static void

2598 int bucketNumber)

2599{

2600 bool shouldFree;

2603 int hashTupleSize;

2604

2605

2608 hashTupleSize);

2609 hashTuple->hashvalue = hashvalue;

2612

2613

2617

2618

2619 hashtable->spaceUsed += hashTupleSize;

2625

2626

2629

2630 if (shouldFree)

2632}

2633

2634

2635

2636

2637

2638

2639

2640static void

2642{

2643 int bucketToRemove;

2646 int bucketno;

2647 int batchno;

2649

2650

2652 bucket = hashtable->skewBucket[bucketToRemove];

2653

2654

2655

2656

2657

2658

2659

2662

2663

2664 hashTuple = bucket->tuples;

2665 while (hashTuple != NULL)

2666 {

2669 Size tupleSize;

2670

2671

2672

2673

2674

2675

2678

2679

2680 if (batchno == hashtable->curbatch)

2681 {

2682

2684

2685

2686

2687

2688

2690 memcpy(copyTuple, hashTuple, tupleSize);

2691 pfree(hashTuple);

2692

2695

2696

2698 }

2699 else

2700 {

2701

2705 hashtable);

2706 pfree(hashTuple);

2707 hashtable->spaceUsed -= tupleSize;

2709 }

2710

2711 hashTuple = nextHashTuple;

2712

2713

2715 }

2716

2717

2718

2719

2720

2721

2722

2723

2724

2725

2726

2727

2728

2729 hashtable->skewBucket[bucketToRemove] = NULL;

2734

2735

2736

2737

2738

2740 {

2748 }

2749}

2750

2751

2752

2753

2754void

2756{

2757 size_t size;

2758

2759

2761 return;

2762

2767}

2768

2769

2770

2771

2772

2773void

2775{

2776 size_t size;

2777

2778

2780 return;

2781

2785

2786

2788

2792}

2793

2794

2795

2796

2797

2798void

2800{

2802

2803

2805 return;

2806

2807

2808

2809

2810

2811

2815}

2816

2817

2818

2819

2820

2821

2822

2823

2824void

2826{

2827

2830

2833}

2834

2835

2836

2837

2838

2839void

2841{

2843 size_t size;

2844

2845 if (shared_info == NULL)

2846 return;

2847

2848

2852 memcpy(node->shared_info, shared_info, size);

2853}

2854

2855

2856

2857

2858

2859

2860

2861

2862

2863

2864

2865

2866

2867

2868

2869

2870void

2873{

2884}

2885

2886

2887

2888

2889static void *

2891{

2893 char *ptr;

2894

2895

2897

2898

2899

2900

2902 {

2903

2906 newChunk->maxlen = size;

2907 newChunk->used = size;

2909

2910

2911

2912

2913

2914 if (hashtable->chunks != NULL)

2915 {

2918 }

2919 else

2920 {

2922 hashtable->chunks = newChunk;

2923 }

2924

2926 }

2927

2928

2929

2930

2931

2932 if ((hashtable->chunks == NULL) ||

2934 {

2935

2938

2940 newChunk->used = size;

2942

2944 hashtable->chunks = newChunk;

2945

2947 }

2948

2949

2953

2954

2955 return ptr;

2956}

2957

2958

2959

2960

2961

2962

2963

2964

2965

2966

2967

2968

2972{

2976 Size chunk_size;

2978 int curbatch = hashtable->curbatch;

2979

2981

2982

2983

2984

2985

2987 if (chunk != NULL &&

2989 chunk->maxlen - chunk->used >= size)

2990 {

2991

2996 chunk->used += size;

2997

3000

3001 return result;

3002 }

3003

3004

3006

3007

3008

3009

3012 {

3014

3017

3018

3023

3024

3025 return NULL;

3026 }

3027

3028

3031 else

3033

3034

3036 {

3037 Assert(curbatch == 0);

3039

3040

3041

3042

3043

3044

3048 {

3052

3053 return NULL;

3054 }

3055

3056

3057 if (hashtable->nbatch == 1)

3058 {

3061

3064 hashtable->nbuckets < (INT_MAX / 2) &&

3067 {

3070

3071 return NULL;

3072 }

3073 }

3074 }

3075

3076

3080

3081

3085 chunk->used = size;

3086

3087

3088

3089

3090

3091

3094

3096 {

3097

3098

3099

3100

3103 }

3105

3108

3109 return result;

3110}

3111

3112

3113

3114

3115

3116

3117static void

3119{

3123 int i;

3124

3126

3127

3131 pstate->nbatch = nbatch;

3133

3134

3135

3136

3137

3139

3140

3141 hashtable->nbatch = nbatch;

3144

3145

3146 for (i = 0; i < hashtable->nbatch; ++i)

3147 {

3151

3152

3153

3154

3155

3157 if (i == 0)

3158 {

3159

3164 }

3165

3166

3167 accessor->shared = shared;

3168

3169

3189 }

3190

3192}

3193

3194

3195

3196

3197static void

3199{

3200 int i;

3201

3202 for (i = 0; i < hashtable->nbatch; ++i)

3203 {

3204

3209 }

3211 hashtable->batches = NULL;

3212}

3213

3214

3215

3216

3217

3218static void

3220{

3224 int i;

3225

3226 if (hashtable->batches != NULL)

3227 {

3229 return;

3231 }

3232

3233

3234

3235

3236

3237

3239

3240

3241

3242

3243

3245

3246

3250

3251

3254

3255

3256 for (i = 0; i < hashtable->nbatch; ++i)

3257 {

3260

3261 accessor->shared = shared;

3263 accessor->done = false;

3274 }

3275

3277}

3278

3279

3280

3281

3282void

3284{

3288 int i;

3289

3294 for (i = 0; i < nbuckets; ++i)

3296}

3297

3298

3299

3300

3301

3302void

3304{

3307 {

3308 int curbatch = hashtable->curbatch;

3310 bool attached = true;

3311

3312

3315

3316

3319

3320

3321

3322

3323

3324

3325

3326

3327

3328

3329

3330

3333 {

3334

3335

3336

3337

3338

3340 }

3341

3342

3343

3344

3345

3346

3350 {

3351

3352

3353

3354

3355

3356

3357

3359

3360

3362 {

3366

3369 }

3371 {

3374 }

3375 }

3376

3377

3378

3379

3380

3381

3385

3386

3388 }

3389}

3390

3391

3392

3393

3394void

3396{

3398

3399

3400

3401

3402

3405

3407 {

3408 int i;

3409

3410

3412 {

3413 for (i = 0; i < hashtable->nbatch; ++i)

3414 {

3419 }

3420 }

3421

3422

3424 {

3425

3426

3427

3428

3430

3432 {

3435 }

3436 }

3437 }

3439}

3440

3441

3442

3443

3446{

3449

3453

3454 return tuple;

3455}

3456

3457

3458

3459

3462{

3464

3467

3468 return next;

3469}

3470

3471

3472

3473

3474static inline void

3478{

3479 for (;;)

3480 {

3484 tuple_shared))

3485 break;

3486 }

3487}

3488

3489

3490

3491

3492void

3494{

3496

3497 hashtable->curbatch = batchno;

3506}

3507

3508

3509

3510

3511

3512

3515{

3518

3521 {

3526 }

3527 else

3528 chunk = NULL;

3530

3531 return chunk;

3532}

3533

3534

3535

3536

3537

3538

3539

3540

3541

3542

3543

3544

3545

3546

3547

3548

3549

3550

3551

3552

3553

3554static bool

3556{

3560

3561 Assert(batchno > 0);

3562 Assert(batchno < hashtable->nbatch);

3564

3566

3567

3570 {

3572

3578

3579 return false;

3580 }

3581

3586 {

3587

3588

3589

3590

3594

3595 return false;

3596 }

3597

3602

3603 return true;

3604}

3605

3606

3607

3608

3609

3610

3611

3612

3613

3614

3615size_t

3617{

3618 double mem_limit;

3619

3620

3622

3623

3624 mem_limit = Min(mem_limit, (double) SIZE_MAX);

3625

3626 return (size_t) mem_limit;

3627}

Datum idx(PG_FUNCTION_ARGS)

void PrepareTempTablespaces(void)

bool BarrierArriveAndDetachExceptLast(Barrier *barrier)

bool BarrierArriveAndDetach(Barrier *barrier)

int BarrierAttach(Barrier *barrier)

void BarrierInit(Barrier *barrier, int participants)

int BarrierPhase(Barrier *barrier)

bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)

bool BarrierDetach(Barrier *barrier)

void BufFileClose(BufFile *file)

#define OidIsValid(objectId)

void * dsa_get_address(dsa_area *area, dsa_pointer dp)

void dsa_free(dsa_area *area, dsa_pointer dp)

#define dsa_allocate0(area, size)

#define dsa_pointer_atomic_init

#define dsa_allocate(area, size)

#define dsa_pointer_atomic_write

#define InvalidDsaPointer

#define dsa_pointer_atomic_compare_exchange

#define dsa_pointer_atomic_read

pg_atomic_uint64 dsa_pointer_atomic

#define DsaPointerIsValid(x)

void ExecReScan(PlanState *node)

void ExecEndNode(PlanState *node)

PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)

MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)

TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)

void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)

const TupleTableSlotOps TTSOpsMinimalTuple

void ExecAssignExprContext(EState *estate, PlanState *planstate)

#define outerPlanState(node)

struct HashJoinTupleData * HashJoinTuple

struct HashInstrumentation HashInstrumentation

#define EXEC_FLAG_BACKWARD

#define ResetExprContext(econtext)

static bool ExecQualAndReset(ExprState *state, ExprContext *econtext)

static TupleTableSlot * ExecProcNode(PlanState *node)

static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)

#define palloc_object(type)

#define repalloc_array(pointer, type, count)

#define palloc0_array(type, count)

#define palloc0_object(type)

Datum FunctionCall1Coll(FmgrInfo *flinfo, Oid collation, Datum arg1)

double hash_mem_multiplier

Assert(PointerIsAligned(start, uint64))

#define PHJ_GROW_BATCHES_REPARTITION

struct HashMemoryChunkData * HashMemoryChunk

#define HASH_CHUNK_DATA(hc)

#define PHJ_GROW_BUCKETS_REINSERT

#define SKEW_MIN_OUTER_FRACTION

#define PHJ_GROW_BUCKETS_ELECT

#define PHJ_GROW_BUCKETS_PHASE(n)

#define PHJ_GROW_BATCHES_ELECT

#define ParallelHashJoinBatchInner(batch)

#define PHJ_BUILD_HASH_INNER

#define NthParallelHashJoinBatch(base, n)

#define HASH_CHUNK_THRESHOLD

#define PHJ_BUILD_HASH_OUTER

#define HJTUPLE_MINTUPLE(hjtup)

#define SKEW_BUCKET_OVERHEAD

#define PHJ_GROW_BATCHES_DECIDE

#define PHJ_GROW_BATCHES_REALLOCATE

#define HASH_CHUNK_HEADER_SIZE

#define PHJ_GROW_BATCHES_FINISH

#define ParallelHashJoinBatchOuter(batch, nparticipants)

#define SKEW_HASH_MEM_PERCENT

#define PHJ_BUILD_ALLOCATE

#define PHJ_GROW_BUCKETS_REALLOCATE

#define PHJ_GROW_BATCHES_PHASE(n)

@ PHJ_GROWTH_NEED_MORE_BUCKETS

@ PHJ_GROWTH_NEED_MORE_BATCHES

#define INVALID_SKEW_BUCKET_NO

#define EstimateParallelHashJoinBatch(hashtable)

void heap_free_minimal_tuple(MinimalTuple mtup)

#define HeapTupleIsValid(tuple)

#define SizeofMinimalTupleHeader

static void HeapTupleHeaderClearMatch(MinimalTupleData *tup)

static bool HeapTupleHeaderHasMatch(const MinimalTupleData *tup)

void InstrStartNode(Instrumentation *instr)

void InstrStopNode(Instrumentation *instr, double nTuples)

if(TABLE==NULL||TABLE_index==NULL)

void free_attstatsslot(AttStatsSlot *sslot)

bool get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple, int reqkind, Oid reqop, int flags)

#define ATTSTATSSLOT_NUMBERS

#define ATTSTATSSLOT_VALUES

bool LWLockAcquire(LWLock *lock, LWLockMode mode)

void LWLockRelease(LWLock *lock)

void * MemoryContextAlloc(MemoryContext context, Size size)

void MemoryContextReset(MemoryContext context)

void * MemoryContextAllocZero(MemoryContext context, Size size)

void pfree(void *pointer)

MemoryContext CurrentMemoryContext

void MemoryContextDelete(MemoryContext context)

#define AllocSetContextCreate

#define ALLOCSET_DEFAULT_SIZES

#define CHECK_FOR_INTERRUPTS()

static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable)

static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)

void ExecParallelHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)

static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)

void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)

static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)

static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)

void ExecHashTableReset(HashJoinTable hashtable)

static void ExecHashBuildSkewHash(HashState *hashstate, HashJoinTable hashtable, Hash *node, int mcvsToUse)

static HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)

void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)

static bool ExecHashIncreaseBatchSize(HashJoinTable hashtable)

bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)

static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, dsa_pointer *shared)

static void * dense_alloc(HashJoinTable hashtable, Size size)

static void MultiExecParallelHash(HashState *node)

void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)

static void MultiExecPrivateHash(HashState *node)

void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)

static void ExecParallelHashPushTuple(dsa_pointer_atomic *head, HashJoinTuple tuple, dsa_pointer tuple_shared)

Node * MultiExecHash(HashState *node)

HashState * ExecInitHash(Hash *node, EState *estate, int eflags)

void ExecHashTableDetachBatch(HashJoinTable hashtable)

void ExecHashEstimate(HashState *node, ParallelContext *pcxt)

void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, bool try_combined_hash_mem, int parallel_workers, size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs)

void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)

void ExecHashTableDetach(HashJoinTable hashtable)

bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)

void ExecHashTableDestroy(HashJoinTable hashtable)

HashJoinTable ExecHashTableCreate(HashState *state)

int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)

static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)

size_t get_hash_memory_limit(void)

bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)

static void ExecHashSkewTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue, int bucketNumber)

static void ExecParallelHashRepartitionRest(HashJoinTable hashtable)

static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)

void ExecHashTableResetMatchFlags(HashJoinTable hashtable)

static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)

static HashJoinTuple ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)

void ExecEndHash(HashState *node)

void ExecShutdownHash(HashState *node)

void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)

static TupleTableSlot * ExecHash(PlanState *pstate)

void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)

static void ExecParallelHashMergeCounters(HashJoinTable hashtable)

void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)

bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)

void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)

static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared)

void ExecReScanHash(HashState *node)

bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)

static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable)

static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)

void ExecHashRetrieveInstrumentation(HashState *node)

void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr, HashJoinTable hashtable)

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

#define repalloc0_array(pointer, type, oldcount, count)

static uint32 pg_nextpower2_32(uint32 num)

static uint32 pg_rotate_right32(uint32 word, int n)

#define pg_nextpower2_size_t

static uint32 pg_prevpower2_32(uint32 num)

#define pg_prevpower2_size_t

static uint32 DatumGetUInt32(Datum X)

static Datum Int16GetDatum(int16 X)

static Datum BoolGetDatum(bool X)

static Datum ObjectIdGetDatum(Oid X)

void * shm_toc_allocate(shm_toc *toc, Size nbytes)

void shm_toc_insert(shm_toc *toc, uint64 key, void *address)

void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)

#define shm_toc_estimate_chunk(e, sz)

#define shm_toc_estimate_keys(e, cnt)

Size add_size(Size s1, Size s2)

Size mul_size(Size s1, Size s2)

TupleTableSlot * ecxt_innertuple

TupleTableSlot * ecxt_outertuple

HashJoinTuple hj_CurTuple

HashJoinTable hj_HashTable

TupleTableSlot * hj_HashTupleSlot

struct HashJoinTupleData ** unshared

ParallelHashJoinBatchAccessor * batches

ParallelHashJoinState * parallel_state

HashMemoryChunk current_chunk

BufFile ** innerBatchFile

int log2_nbuckets_optimal

dsa_pointer_atomic * shared

BufFile ** outerBatchFile

dsa_pointer current_chunk_shared

union HashJoinTableData::@108 buckets

HashSkewBucket ** skewBucket

union HashJoinTupleData::@106 next

struct HashJoinTupleData * unshared

union HashMemoryChunkData::@107 next

struct HashMemoryChunkData * unshared

struct ParallelHashJoinState * parallel_state

SharedHashInfo * shared_info

FmgrInfo * skew_hashfunction

HashInstrumentation * hinstrument

shm_toc_estimator estimator

SharedTuplestoreAccessor * outer_tuples

ParallelHashJoinBatch * shared

SharedTuplestoreAccessor * inner_tuples

Barrier grow_batches_barrier

dsa_pointer chunk_work_queue

Barrier grow_buckets_barrier

ParallelHashGrowth growth

Instrumentation * instrument

ExprContext * ps_ExprContext

ProjectionInfo * ps_ProjInfo

ExecProcNodeMtd ExecProcNode

void ReleaseSysCache(HeapTuple tuple)

HeapTuple SearchSysCache3(int cacheId, Datum key1, Datum key2, Datum key3)