PostgreSQL Source Code: src/backend/executor/nodeGatherMerge.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

16

24

25

26

27

28

29

30

31#define MAX_TUPLE_STORE 10

32

33

34

35

36

37

38

39

40

42{

44 int nTuples;

45 int readCounter;

46 bool done;

48

53 bool nowait, bool *done);

59 bool nowait);

61

62

63

64

65

68{

70 Plan *outerNode;

72

73

75

76

77

78

81 gm_state->ps.state = estate;

83

87

88

89

90

91

92

94

95

96

97

98

100

101

102

103

106

107

108

109

110

111

112

115

116

117

118

119

121 gm_state->tupDesc = tupDesc;

122

123

124

125

128

129

130

131

132

134 {

137 }

138

139

140

141

143 {

144 int i;

145

149

151 {

153

157 sortKey->ssup_attno = node->sortColIdx[i];

158

159

160

161

162

164

166 }

167 }

168

169

171

172 return gm_state;

173}

174

175

176

177

178

179

180

181

184{

188

190

191

192

193

194

196 {

199

200

201

202

203

205 {

207

208

209 if (!node->pei)

211 estate,

215 else

217 node->pei,

219

220

223

225

226

227

228

229

232

233

235 {

237

243 }

244 else

245 {

246

249 }

250 }

251

252

256 }

257

258

259

260

261

264

265

266

267

268

271 return NULL;

272

273

275 return slot;

276

277

278

279

282}

283

284

285

286

287

288

289

290void

292{

295}

296

297

298

299

300

301

302

303void

305{

307

308

309 if (node->pei != NULL)

310 {

312 node->pei = NULL;

313 }

314}

315

316

317

318

319

320

321

322static void

324{

325 if (node->pei != NULL)

327

328

332}

333

334

335

336

337

338

339

340void

342{

345

346

348

349

351

352

355

356

357

358

359

360

361

362

366

367

368

369

370

371

372

373

374

375

376

379}

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394static void

396{

399 int i;

400

401

402

403

404

405

406

407

408

409

412

413

416

417 for (i = 0; i < nreaders; i++)

418 {

419

422

423

427 }

428

429

432 gm_state);

433}

434

435

436

437

438

439

440

441

442static void

444{

445 int nreaders = gm_state->nreaders;

446 bool nowait = true;

447 int i;

448

449

451

452

453 gm_state->gm_slots[0] = NULL;

454

455

456 for (i = 0; i < nreaders; i++)

457 {

458

461

463

465 }

466

467

469

470

471

472

473

474

475

476

477reread:

478 for (i = 0; i <= nreaders; i++)

479 {

481

482

485 {

487 {

488

492 }

493 else

494 {

495

496

497

498

500 }

501 }

502 }

503

504

505 for (i = 1; i <= nreaders; i++)

506 {

509 {

510 nowait = false;

511 goto reread;

512 }

513 }

514

515

517

519}

520

521

522

523

524

525static void

527{

528 int i;

529

531 {

533

536

538 }

539}

540

541

542

543

544

545

548{

549 int i;

550

552 {

553

554

555

556

558 }

559 else

560 {

561

562

563

564

565

566

568

571 else

572 {

573

575 }

576 }

577

579 {

580

582 return NULL;

583 }

584 else

585 {

586

589 }

590}

591

592

593

594

595

596static void

598{

600 int i;

601

602

603 if (reader == 0)

604 return;

605

607

608

611

612

614 {

616

618 reader,

619 true,

620 &tuple_buffer->done);

621 if (!tuple)

622 break;

623 tuple_buffer->tuple[i] = tuple;

625 }

626}

627

628

629

630

631

632

633

634

635static bool

637{

640

641

642

643

644

645 if (reader == 0)

646 {

648 {

652

653

657

659 {

660 gm_state->gm_slots[0] = outerTupleSlot;

661 return true;

662 }

663

665 }

666 return false;

667 }

668

669

671

673 {

674

676 }

677 else if (tuple_buffer->done)

678 {

679

680 return false;

681 }

682 else

683 {

684

686 reader,

687 nowait,

688 &tuple_buffer->done);

689 if (!tup)

690 return false;

691

692

693

694

695

697 }

698

700

701

703 gm_state->gm_slots[reader],

704

705 true);

706

707 return true;

708}

709

710

711

712

715 bool *done)

716{

719

720

722

723

724

725

726

727

728

729

730

731 reader = gm_state->reader[nreader - 1];

733

734

735

736

737

739}

740

741

742

743

744

745

747

748

749

750

753{

757

760 int nkey;

761

764

765 for (nkey = 0; nkey < node->gm_nkeys; nkey++)

766 {

770 datum2;

771 bool isNull1,

772 isNull2;

774

777

779 datum2, isNull2,

780 sortKey);

782 {

785 }

786 }

787 return 0;

788}

void LaunchParallelWorkers(ParallelContext *pcxt)

void binaryheap_build(binaryheap *heap)

void binaryheap_replace_first(binaryheap *heap, bh_node_type d)

void binaryheap_reset(binaryheap *heap)

bh_node_type binaryheap_first(binaryheap *heap)

bh_node_type binaryheap_remove_first(binaryheap *heap)

void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)

binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)

#define binaryheap_empty(h)

Bitmapset * bms_add_member(Bitmapset *a, int x)

#define INVERT_COMPARE_RESULT(var)

void ExecReScan(PlanState *node)

void ExecParallelCleanup(ParallelExecutorInfo *pei)

void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)

void ExecParallelCreateReaders(ParallelExecutorInfo *pei)

ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)

void ExecParallelFinish(ParallelExecutorInfo *pei)

void ExecEndNode(PlanState *node)

PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)

void ExecInitResultTypeTL(PlanState *planstate)

TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)

TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)

const TupleTableSlotOps TTSOpsMinimalTuple

TupleDesc ExecGetResultType(PlanState *planstate)

void ExecAssignExprContext(EState *estate, PlanState *planstate)

void ExecConditionalAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc, int varno)

#define outerPlanState(node)

static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)

#define ResetExprContext(econtext)

static TupleTableSlot * ExecProcNode(PlanState *node)

static int compare(const void *arg1, const void *arg2)

Assert(PointerIsAligned(start, uint64))

MinimalTuple heap_copy_minimal_tuple(MinimalTuple mtup, Size extra)

void pfree(void *pointer)

void * palloc0(Size size)

MemoryContext CurrentMemoryContext

#define CHECK_FOR_INTERRUPTS()

struct GMReaderTupleBuffer GMReaderTupleBuffer

static void gather_merge_init(GatherMergeState *gm_state)

static void gather_merge_setup(GatherMergeState *gm_state)

static int32 heap_compare_slots(Datum a, Datum b, void *arg)

void ExecReScanGatherMerge(GatherMergeState *node)

static void gather_merge_clear_tuples(GatherMergeState *gm_state)

void ExecShutdownGatherMerge(GatherMergeState *node)

static void load_tuple_array(GatherMergeState *gm_state, int reader)

GatherMergeState * ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)

static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)

static void ExecShutdownGatherMergeWorkers(GatherMergeState *node)

static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, bool *done)

void ExecEndGatherMerge(GatherMergeState *node)

static TupleTableSlot * gather_merge_getnext(GatherMergeState *gm_state)

static TupleTableSlot * ExecGatherMerge(PlanState *pstate)

#define castNode(_type_, nodeptr)

bool parallel_leader_participation

static Datum Int32GetDatum(int32 X)

static int32 DatumGetInt32(Datum X)

void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)

static int ApplySortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)

struct dsa_area * es_query_dsa

int es_parallel_workers_to_launch

bool es_use_parallel_mode

int es_parallel_workers_launched

TupleTableSlot * ecxt_outertuple

struct ParallelExecutorInfo * pei

struct TupleQueueReader ** reader

struct GMReaderTupleBuffer * gm_tuple_buffers

TupleTableSlot ** gm_slots

bool need_to_scan_locally

struct binaryheap * gm_heap

struct TupleQueueReader ** reader

ExprContext * ps_ExprContext

ProjectionInfo * ps_ProjInfo

ExecProcNodeMtd ExecProcNode

MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)

static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)

static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)