PostgreSQL Source Code: src/backend/executor/nodeHashjoin.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

164

175

176

177

178

179

180#define HJ_BUILD_HASHTABLE 1

181#define HJ_NEED_NEW_OUTER 2

182#define HJ_SCAN_BUCKET 3

183#define HJ_FILL_OUTER_TUPLE 4

184#define HJ_FILL_INNER_TUPLES 5

185#define HJ_NEED_NEW_BATCH 6

186

187

188#define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)

189

190#define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)

191

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

222{

232 int batchno;

234

235

236

237

239 otherqual = node->js.ps.qual;

245

246

247

248

249

251

252

253

254

255 for (;;)

256 {

257

258

259

260

261

262

264

266 {

268

269

270

271

272 Assert(hashtable == NULL);

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

298 {

299

301 }

302 else if (parallel)

303 {

304

305

306

307

308

309

310

311

313 }

317 {

320 {

322 return NULL;

323 }

324 else

326 }

327 else

329

330

331

332

333

334

337

338

339

340

341

342

345

346

347

348

349

350

352 {

353 if (parallel)

354 {

355

356

357

358

360

363 }

364 return NULL;

365 }

366

367

368

369

370

372

373

374

375

376

377

379

380 if (parallel)

381 {

383

389 {

390

391

392

393

394 if (hashtable->nbatch > 1)

397 WAIT_EVENT_HASH_BUILD_HASH_OUTER);

398 }

400 {

401

402

403

404

405

406 return NULL;

407 }

408

409

413

414 continue;

415 }

416 else

418

419

420

422

423

424

425

426 if (parallel)

427 outerTupleSlot =

429 &hashvalue);

430 else

431 outerTupleSlot =

433

435 {

436

438 {

439

440 if (parallel)

441 {

442

443

444

445

446

449 else

451 }

452 else

453 {

456 }

457 }

458 else

460 continue;

461 }

462

465

466

467

468

469

474 hashvalue);

476

477

478

479

480

481 if (batchno != hashtable->curbatch &&

483 {

484 bool shouldFree;

486 &shouldFree);

487

488

489

490

491

492 Assert(parallel_state == NULL);

496 hashtable);

497

498 if (shouldFree)

500

501

502 continue;

503 }

504

505

507

508

509

511

512

513

514

515 if (parallel)

516 {

518 {

519

521 continue;

522 }

523 }

524 else

525 {

527 {

528

530 continue;

531 }

532 }

533

534

535

536

537

540 continue;

541

542

543

544

545

546

547

548

549

550

551

552

553

554 if (joinqual == NULL || ExecQual(joinqual, econtext))

555 {

557

558

559

560

561

562

565

566

568 {

570 continue;

571 }

572

573

574

575

576

577

580

581

582

583

584

585

586

588 continue;

589

590 if (otherqual == NULL || ExecQual(otherqual, econtext))

592 else

594 }

595 else

597 break;

598

600

601

602

603

604

605

607

610 {

611

612

613

614

616

617 if (otherqual == NULL || ExecQual(otherqual, econtext))

619 else

621 }

622 break;

623

625

626

627

628

629

630

631

634 {

635

637 continue;

638 }

639

640

641

642

643

645

646 if (otherqual == NULL || ExecQual(otherqual, econtext))

648 else

650 break;

651

653

654

655

656

657 if (parallel)

658 {

660 return NULL;

661 }

662 else

663 {

665 return NULL;

666 }

668 break;

669

670 default:

671 elog(ERROR, "unrecognized hashjoin state: %d",

673 }

674 }

675}

676

677

678

679

680

681

682

685{

686

687

688

689

691}

692

693

694

695

696

697

698

701{

702

703

704

705

707}

708

709

710

711

712

713

714

717{

719 Plan *outerNode;

720 Hash *hashNode;

722 innerDesc;

724

725

727

728

729

730

734

735

736

737

738

739

742

743

744

745

746

747

749

750

751

752

753

754

755

756

759

764

765

766

767

770

771

772

773

776 ops);

777

778

779

780

783

784

786 {

790 break;

795 break;

800 break;

806 break;

807 default:

808 elog(ERROR, "unrecognized join type: %d",

810 }

811

812

813

814

815

816

817

818

819 {

823 Oid *outer_hashfuncid;

824 Oid *inner_hashfuncid;

825 bool *hash_strict;

827 int nkeys;

828

829

831

832

833

834

835

836

837

838

839

840

842

846

847

848

849

850

852 {

855

857 &outer_hashfuncid[i],

858 &inner_hashfuncid[i]))

860 "could not find hash function for hash operator %u",

861 hashop);

863 }

864

865

866

867

868

869

870

871

872

876 outer_hashfuncid,

879 hash_strict,

880 &hjstate->js.ps,

881 0,

883

884

888 inner_hashfuncid,

890 hash->hashkeys,

891 hash_strict,

892 &hashstate->ps,

893 0,

895

896

897

898

899

901 {

905 }

906

907

908 pfree(outer_hashfuncid);

909 pfree(inner_hashfuncid);

910 pfree(hash_strict);

911 }

912

913

914

915

922

923

924

925

928

933

937

938 return hjstate;

939}

940

941

942

943

944

945

946

947void

949{

950

951

952

954 {

957 }

958

959

960

961

964}

965

966

967

968

969

970

971

972

973

974

975

976

977

982{

984 int curbatch = hashtable->curbatch;

986

987 if (curbatch == 0)

988 {

989

990

991

992

996 else

998

1000 {

1001 bool isnull;

1002

1003

1004

1005

1007

1009

1011

1013 econtext,

1014 &isnull));

1015

1016 if (!isnull)

1017 {

1018

1020

1021 return slot;

1022 }

1023

1024

1025

1026

1027

1029 }

1030 }

1031 else if (curbatch < hashtable->nbatch)

1032 {

1034

1035

1036

1037

1038

1039 if (file == NULL)

1040 return NULL;

1041

1043 file,

1044 hashvalue,

1047 return slot;

1048 }

1049

1050

1051 return NULL;

1052}

1053

1054

1055

1056

1061{

1063 int curbatch = hashtable->curbatch;

1065

1066

1067

1068

1069

1070

1071 if (curbatch == 0 && hashtable->nbatch == 1)

1072 {

1074

1076 {

1077 bool isnull;

1078

1080

1082

1084

1086 econtext,

1087 &isnull));

1088

1089 if (!isnull)

1090 return slot;

1091

1092

1093

1094

1095

1097 }

1098 }

1099 else if (curbatch < hashtable->nbatch)

1100 {

1102

1104 hashvalue);

1105 if (tuple != NULL)

1106 {

1109 false);

1111 return slot;

1112 }

1113 else

1115 }

1116

1117

1119

1120 return NULL;

1121}

1122

1123

1124

1125

1126

1127

1128

1129static bool

1131{

1133 int nbatch;

1134 int curbatch;

1138

1139 nbatch = hashtable->nbatch;

1140 curbatch = hashtable->curbatch;

1141

1142 if (curbatch > 0)

1143 {

1144

1145

1146

1147

1151 }

1152 else

1153 {

1154

1155

1156

1157

1158

1159

1165 }

1166

1167

1168

1169

1170

1171

1172

1173

1174

1175

1176

1177

1178

1179

1180

1181

1182

1183

1184

1185 curbatch++;

1186 while (curbatch < nbatch &&

1189 {

1192 break;

1195 break;

1198 break;

1201 break;

1202

1203

1210 curbatch++;

1211 }

1212

1213 if (curbatch >= nbatch)

1214 return false;

1215

1216 hashtable->curbatch = curbatch;

1217

1218

1219

1220

1222

1224

1225 if (innerFile != NULL)

1226 {

1227 if (BufFileSeek(innerFile, 0, 0, SEEK_SET))

1230 errmsg("could not rewind hash-join temporary file")));

1231

1233 innerFile,

1234 &hashvalue,

1236 {

1237

1238

1239

1240

1242 }

1243

1244

1245

1246

1247

1250 }

1251

1252

1253

1254

1256 {

1260 errmsg("could not rewind hash-join temporary file")));

1261 }

1262

1263 return true;

1264}

1265

1266

1267

1268

1269

1270static bool

1272{

1274 int start_batchno;

1275 int batchno;

1276

1277

1278

1279

1280

1281

1282 if (hashtable->curbatch >= 0)

1283 {

1286 }

1287

1288

1289

1290

1291

1292

1293 batchno = start_batchno =

1296 do

1297 {

1301

1303 {

1305 Barrier *batch_barrier =

1307

1309 {

1311

1312

1314 WAIT_EVENT_HASH_BATCH_ELECT))

1316

1317

1319

1321 WAIT_EVENT_HASH_BATCH_ALLOCATE);

1322

1323

1325

1330 &hashvalue)))

1331 {

1334 false);

1337 hashvalue);

1338 }

1341 WAIT_EVENT_HASH_BATCH_LOAD);

1342

1343

1345

1346

1347

1348

1349

1350

1351

1352

1353

1354

1355

1358

1359 return true;

1361

1362

1363

1364

1365

1366

1367

1368

1369

1370

1371

1372

1373

1375 hashtable->batches[batchno].done = true;

1377 break;

1378

1380

1381

1382

1383

1384

1386 hashtable->batches[batchno].done = true;

1388 break;

1389

1390 default:

1391 elog(ERROR, "unexpected batch phase %d",

1393 }

1394 }

1395 batchno = (batchno + 1) % hashtable->nbatch;

1396 } while (batchno != start_batchno);

1397

1398 return false;

1399}

1400

1401

1402

1403

1404

1405

1406

1407

1408

1409

1410

1411

1412

1413void

1416{

1417 BufFile *file = *fileptr;

1418

1419

1420

1421

1422

1423

1424

1425

1426

1427

1428

1429

1430

1431

1432

1433 if (file == NULL)

1434 {

1436

1438 *fileptr = file;

1439

1441 }

1442

1445}

1446

1447

1448

1449

1450

1451

1452

1453

1459{

1461 size_t nread;

1463

1464

1465

1466

1467

1468

1470

1471

1472

1473

1474

1475

1477 if (nread == 0)

1478 {

1480 return NULL;

1481 }

1482 *hashvalue = header[0];

1484 tuple->t_len = header[1];

1486 (char *) tuple + sizeof(uint32),

1487 header[1] - sizeof(uint32));

1489 return tupleSlot;

1490}

1491

1492

1493void

1495{

1498

1499

1500

1501

1502

1503

1504

1505

1507 {

1510 {

1511

1512

1513

1514

1515

1516

1517

1520

1521

1522

1523

1524

1525

1526

1527

1528

1529

1531

1532

1534 }

1535 else

1536 {

1537

1539

1541

1542

1549

1551

1555

1556

1557

1558

1559

1560 if (innerPlan->chgParam == NULL)

1562 }

1563 }

1564

1565

1570

1573

1574

1575

1576

1577

1578 if (outerPlan->chgParam == NULL)

1580}

1581

1582void

1584{

1586 {

1587

1588

1589

1590

1591

1594 }

1595}

1596

1597static void

1599{

1605 int i;

1606

1608

1609

1610 for (;;)

1611 {

1612 bool isnull;

1613

1616 break;

1618

1620

1622 econtext,

1623 &isnull));

1624

1625 if (!isnull)

1626 {

1627 int batchno;

1628 int bucketno;

1629 bool shouldFree;

1631

1633 &batchno);

1635 &hashvalue, mintup);

1636

1637 if (shouldFree)

1639 }

1641 }

1642

1643

1644 for (i = 0; i < hashtable->nbatch; ++i)

1646}

1647

1648void

1650{

1653}

1654

1655void

1657{

1658 int plan_node_id = state->js.ps.plan->plan_node_id;

1661

1662

1663

1664

1665

1666 if (pcxt->seg == NULL)

1667 return;

1668

1670

1671

1672

1673

1674

1677

1678

1679

1680

1681

1682

1698

1699

1701

1702

1705}

1706

1707

1708

1709

1710

1711

1712

1713void

1715{

1716 int plan_node_id = state->js.ps.plan->plan_node_id;

1718

1719

1720 if (pcxt->seg == NULL)

1721 return;

1722

1724

1725

1726

1727

1728

1729

1730

1731

1732

1733

1734

1735

1736

1737

1738 if (state->hj_HashTable != NULL)

1739 {

1742 }

1743

1744

1746

1747

1749}

1750

1751void

1754{

1756 int plan_node_id = state->js.ps.plan->plan_node_id;

1759

1760

1762

1763

1766

1768}

static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)

static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)

int BarrierAttach(Barrier *barrier)

void BarrierInit(Barrier *barrier, int participants)

int BarrierPhase(Barrier *barrier)

bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)

bool BarrierDetach(Barrier *barrier)

void BufFileReadExact(BufFile *file, void *ptr, size_t size)

BufFile * BufFileCreateTemp(bool interXact)

void BufFileWrite(BufFile *file, const void *ptr, size_t size)

size_t BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)

int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)

void BufFileClose(BufFile *file)

#define pg_attribute_always_inline

#define OidIsValid(objectId)

#define InvalidDsaPointer

int errcode_for_file_access(void)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

void ExecReScan(PlanState *node)

ExprState * ExecInitQual(List *qual, PlanState *parent)

ExprState * ExecBuildHash32Expr(TupleDesc desc, const TupleTableSlotOps *ops, const Oid *hashfunc_oids, const List *collations, const List *hash_exprs, const bool *opstrict, PlanState *parent, uint32 init_value, bool keep_nulls)

Node * MultiExecProcNode(PlanState *node)

void ExecEndNode(PlanState *node)

PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)

void ExecSetExecProcNode(PlanState *node, ExecProcNodeMtd function)

const TupleTableSlotOps TTSOpsVirtual

void ExecForceStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)

MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)

TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)

void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)

TupleTableSlot * ExecInitNullTupleSlot(EState *estate, TupleDesc tupType, const TupleTableSlotOps *tts_ops)

TupleDesc ExecGetResultType(PlanState *planstate)

void ExecAssignExprContext(EState *estate, PlanState *planstate)

void ExecAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc)

const TupleTableSlotOps * ExecGetResultSlotOps(PlanState *planstate, bool *isfixed)

#define InstrCountFiltered1(node, delta)

#define outerPlanState(node)

#define InstrCountFiltered2(node, delta)

#define innerPlanState(node)

#define EXEC_FLAG_BACKWARD

static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)

#define ResetExprContext(econtext)

static bool ExecQual(ExprState *state, ExprContext *econtext)

static TupleTableSlot * ExecProcNode(PlanState *node)

static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)

#define palloc_array(type, count)

void fmgr_info(Oid functionId, FmgrInfo *finfo)

Assert(PointerIsAligned(start, uint64))

#define PHJ_BUILD_HASH_OUTER

#define HJTUPLE_MINTUPLE(hjtup)

#define PHJ_BATCH_ALLOCATE

#define INVALID_SKEW_BUCKET_NO

void heap_free_minimal_tuple(MinimalTuple mtup)

MinimalTupleData * MinimalTuple

static void HeapTupleHeaderSetMatch(MinimalTupleData *tup)

static bool HeapTupleHeaderHasMatch(const MinimalTupleData *tup)

bool get_op_hash_functions(Oid opno, RegProcedure *lhs_procno, RegProcedure *rhs_procno)

void LWLockInitialize(LWLock *lock, int tranche_id)

@ LWTRANCHE_PARALLEL_HASH_JOIN

void pfree(void *pointer)

void * palloc0(Size size)

#define CHECK_FOR_INTERRUPTS()

void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)

void ExecHashTableReset(HashJoinTable hashtable)

bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)

void ExecHashAccumInstrumentation(HashInstrumentation *instrument, HashJoinTable hashtable)

void ExecHashTableDetachBatch(HashJoinTable hashtable)

void ExecPrepHashTableForUnmatched(HashJoinState *hjstate)

void ExecHashTableDetach(HashJoinTable hashtable)

bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)

void ExecHashTableDestroy(HashJoinTable hashtable)

HashJoinTable ExecHashTableCreate(HashState *state)

int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)

bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)

void ExecHashTableResetMatchFlags(HashJoinTable hashtable)

void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)

void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno)

void ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)

bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)

void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue)

bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext)

#define HJ_NEED_NEW_BATCH

void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)

void ExecEndHashJoin(HashJoinState *node)

#define HJ_FILL_OUTER_TUPLE

static bool ExecHashJoinNewBatch(HashJoinState *hjstate)

static TupleTableSlot * ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue)

#define HJ_FILL_INNER(hjstate)

static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate)

static pg_attribute_always_inline TupleTableSlot * ExecHashJoinImpl(PlanState *pstate, bool parallel)

static TupleTableSlot * ExecHashJoinGetSavedTuple(HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot)

void ExecShutdownHashJoin(HashJoinState *node)

#define HJ_FILL_INNER_TUPLES

void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)

static TupleTableSlot * ExecHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue)

void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr, HashJoinTable hashtable)

#define HJ_NEED_NEW_OUTER

static TupleTableSlot * ExecParallelHashJoin(PlanState *pstate)

#define HJ_FILL_OUTER(hjstate)

void ExecReScanHashJoin(HashJoinState *node)

static TupleTableSlot * ExecHashJoin(PlanState *pstate)

void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)

void ExecHashJoinInitializeWorker(HashJoinState *state, ParallelWorkerContext *pwcxt)

static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)

HashJoinState * ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)

#define HJ_BUILD_HASHTABLE

#define castNode(_type_, nodeptr)

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

static int list_length(const List *l)

#define foreach_current_index(var_or_cell)

static uint32 DatumGetUInt32(Datum X)

static unsigned hash(unsigned *uv, int n)

void * shm_toc_allocate(shm_toc *toc, Size nbytes)

void shm_toc_insert(shm_toc *toc, uint64 key, void *address)

void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)

#define shm_toc_estimate_chunk(e, sz)

#define shm_toc_estimate_keys(e, cnt)

TupleTableSlot * ecxt_innertuple

TupleTableSlot * ecxt_outertuple

HashJoinTuple hj_CurTuple

TupleTableSlot * hj_NullOuterTupleSlot

TupleTableSlot * hj_OuterTupleSlot

TupleTableSlot * hj_NullInnerTupleSlot

TupleTableSlot * hj_FirstOuterTupleSlot

HashJoinTable hj_HashTable

TupleTableSlot * hj_HashTupleSlot

ParallelHashJoinBatchAccessor * batches

ParallelHashJoinState * parallel_state

BufFile ** innerBatchFile

BufFile ** outerBatchFile

HashSkewBucket ** skewBucket

struct ParallelHashJoinState * parallel_state

FmgrInfo * skew_hashfunction

HashInstrumentation * hinstrument

shm_toc_estimator estimator

SharedTuplestoreAccessor * outer_tuples

ParallelHashJoinBatch * shared

SharedTuplestoreAccessor * inner_tuples

Barrier grow_batches_barrier

dsa_pointer chunk_work_queue

Barrier grow_buckets_barrier

ParallelHashGrowth growth

pg_atomic_uint32 distributor

const TupleTableSlotOps * resultops

Instrumentation * instrument

TupleDesc ps_ResultTupleDesc

ExprContext * ps_ExprContext

TupleTableSlot * ps_ResultTupleSlot

ProjectionInfo * ps_ProjInfo

ExecProcNodeMtd ExecProcNode

static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)