PostgreSQL Source Code: src/backend/executor/nodeGatherMerge.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

16

25

26

27

28

29

30

31

32#define MAX_TUPLE_STORE 10

33

34

35

36

37

38

39

40

41

43{

45 int nTuples;

46 int readCounter;

47 bool done;

49

54 bool nowait, bool *done);

60 bool nowait);

62

63

64

65

66

69{

71 Plan *outerNode;

73

74

76

77

78

79

82 gm_state->ps.state = estate;

84

88

89

90

91

92

93

95

96

97

98

99

101

102

103

104

107

108

109

110

111

112

113

116

117

118

119

120

122 gm_state->tupDesc = tupDesc;

123

124

125

126

129

130

131

132

133

135 {

138 }

139

140

141

142

144 {

145 int i;

146

149

151 {

153

157 sortKey->ssup_attno = node->sortColIdx[i];

158

159

160

161

162

164

166 }

167 }

168

169

171

172 return gm_state;

173}

174

175

176

177

178

179

180

181

184{

188

190

191

192

193

194

196 {

199

200

201

202

203

205 {

207

208

209 if (!node->pei)

211 estate,

215 else

217 node->pei,

219

220

223

225

226

227

228

229

232

233

235 {

237

243 }

244 else

245 {

246

249 }

250 }

251

252

256 }

257

258

259

260

261

264

265

266

267

268

271 return NULL;

272

273

275 return slot;

276

277

278

279

282}

283

284

285

286

287

288

289

290void

292{

295}

296

297

298

299

300

301

302

303void

305{

307

308

309 if (node->pei != NULL)

310 {

312 node->pei = NULL;

313 }

314}

315

316

317

318

319

320

321

322static void

324{

325 if (node->pei != NULL)

327

328

332}

333

334

335

336

337

338

339

340void

342{

345

346

348

349

351

352

355

356

357

358

359

360

361

362

366

367

368

369

370

371

372

373

374

375

376

379}

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394static void

396{

399 int i;

400

401

402

403

404

405

406

407

408

409

412

413

416

417 for (i = 0; i < nreaders; i++)

418 {

419

421

422

426 }

427

428

431 gm_state);

432}

433

434

435

436

437

438

439

440

441static void

443{

444 int nreaders = gm_state->nreaders;

445 bool nowait = true;

446 int i;

447

448

450

451

452 gm_state->gm_slots[0] = NULL;

453

454

455 for (i = 0; i < nreaders; i++)

456 {

457

460

462

464 }

465

466

468

469

470

471

472

473

474

475

476reread:

477 for (i = 0; i <= nreaders; i++)

478 {

480

481

484 {

486 {

487

491 }

492 else

493 {

494

495

496

497

499 }

500 }

501 }

502

503

504 for (i = 1; i <= nreaders; i++)

505 {

508 {

509 nowait = false;

510 goto reread;

511 }

512 }

513

514

516

518}

519

520

521

522

523

524static void

526{

527 int i;

528

530 {

532

535

537 }

538}

539

540

541

542

543

544

547{

548 int i;

549

551 {

552

553

554

555

557 }

558 else

559 {

560

561

562

563

564

565

567

570 else

571 {

572

574 }

575 }

576

578 {

579

581 return NULL;

582 }

583 else

584 {

585

588 }

589}

590

591

592

593

594

595static void

597{

599 int i;

600

601

602 if (reader == 0)

603 return;

604

606

607

610

611

613 {

615

617 reader,

618 true,

619 &tuple_buffer->done);

620 if (!tuple)

621 break;

622 tuple_buffer->tuple[i] = tuple;

624 }

625}

626

627

628

629

630

631

632

633

634static bool

636{

639

640

641

642

643

644 if (reader == 0)

645 {

647 {

651

652

656

658 {

659 gm_state->gm_slots[0] = outerTupleSlot;

660 return true;

661 }

662

664 }

665 return false;

666 }

667

668

670

672 {

673

675 }

676 else if (tuple_buffer->done)

677 {

678

679 return false;

680 }

681 else

682 {

683

685 reader,

686 nowait,

687 &tuple_buffer->done);

688 if (!tup)

689 return false;

690

691

692

693

694

696 }

697

699

700

702 gm_state->gm_slots[reader],

703

704 true);

705

706 return true;

707}

708

709

710

711

714 bool *done)

715{

718

719

721

722

723

724

725

726

727

728

729

730 reader = gm_state->reader[nreader - 1];

732

733

734

735

736

738}

739

740

741

742

743

744

746

747

748

749

752{

756

759 int nkey;

760

763

764 for (nkey = 0; nkey < node->gm_nkeys; nkey++)

765 {

769 datum2;

770 bool isNull1,

771 isNull2;

773

776

778 datum2, isNull2,

779 sortKey);

781 {

784 }

785 }

786 return 0;

787}

void LaunchParallelWorkers(ParallelContext *pcxt)

void binaryheap_build(binaryheap *heap)

void binaryheap_replace_first(binaryheap *heap, bh_node_type d)

void binaryheap_reset(binaryheap *heap)

bh_node_type binaryheap_first(binaryheap *heap)

bh_node_type binaryheap_remove_first(binaryheap *heap)

void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)

binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)

#define binaryheap_empty(h)

Bitmapset * bms_add_member(Bitmapset *a, int x)

#define INVERT_COMPARE_RESULT(var)

void ExecReScan(PlanState *node)

void ExecParallelCleanup(ParallelExecutorInfo *pei)

void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)

void ExecParallelCreateReaders(ParallelExecutorInfo *pei)

ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)

void ExecParallelFinish(ParallelExecutorInfo *pei)

void ExecEndNode(PlanState *node)

PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)

void ExecInitResultTypeTL(PlanState *planstate)

TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)

TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)

const TupleTableSlotOps TTSOpsMinimalTuple

TupleDesc ExecGetResultType(PlanState *planstate)

void ExecAssignExprContext(EState *estate, PlanState *planstate)

void ExecConditionalAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc, int varno)

#define outerPlanState(node)

static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)

#define ResetExprContext(econtext)

static TupleTableSlot * ExecProcNode(PlanState *node)

#define palloc0_array(type, count)

static int compare(const void *arg1, const void *arg2)

Assert(PointerIsAligned(start, uint64))

MinimalTuple heap_copy_minimal_tuple(MinimalTuple mtup, Size extra)

void pfree(void *pointer)

void * palloc0(Size size)

MemoryContext CurrentMemoryContext

#define CHECK_FOR_INTERRUPTS()

struct GMReaderTupleBuffer GMReaderTupleBuffer

static void gather_merge_init(GatherMergeState *gm_state)

static void gather_merge_setup(GatherMergeState *gm_state)

static int32 heap_compare_slots(Datum a, Datum b, void *arg)

void ExecReScanGatherMerge(GatherMergeState *node)

static void gather_merge_clear_tuples(GatherMergeState *gm_state)

void ExecShutdownGatherMerge(GatherMergeState *node)

static void load_tuple_array(GatherMergeState *gm_state, int reader)

GatherMergeState * ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)

static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)

static void ExecShutdownGatherMergeWorkers(GatherMergeState *node)

static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, bool *done)

void ExecEndGatherMerge(GatherMergeState *node)

static TupleTableSlot * gather_merge_getnext(GatherMergeState *gm_state)

static TupleTableSlot * ExecGatherMerge(PlanState *pstate)

#define castNode(_type_, nodeptr)

bool parallel_leader_participation

static Datum Int32GetDatum(int32 X)

static int32 DatumGetInt32(Datum X)

void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)

static int ApplySortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)

struct dsa_area * es_query_dsa

int es_parallel_workers_to_launch

bool es_use_parallel_mode

int es_parallel_workers_launched

TupleTableSlot * ecxt_outertuple

struct ParallelExecutorInfo * pei

struct TupleQueueReader ** reader

struct GMReaderTupleBuffer * gm_tuple_buffers

TupleTableSlot ** gm_slots

bool need_to_scan_locally

struct binaryheap * gm_heap

struct TupleQueueReader ** reader

ExprContext * ps_ExprContext

ProjectionInfo * ps_ProjInfo

ExecProcNodeMtd ExecProcNode

MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)

static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)

static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)