PostgreSQL Source Code: src/backend/executor/nodeIncrementalSort.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

80

86

87

88

89

90

91

92

93

94

95

96

97

98#define INSTRUMENT_SORT_GROUP(node, groupName) \

99 do { \

100 if ((node)->ss.ps.instrument != NULL) \

101 { \

102 if ((node)->shared_info && (node)->am_worker) \

103 { \

104 Assert(IsParallelWorker()); \

105 Assert(ParallelWorkerNumber <= (node)->shared_info->num_workers); \

106 instrumentSortedGroup(&(node)->shared_info->sinfo[ParallelWorkerNumber].groupName##GroupInfo, \

107 (node)->groupName##_state); \

108 } \

109 else \

110 { \

111 instrumentSortedGroup(&(node)->incsort_info.groupName##GroupInfo, \

112 (node)->groupName##_state); \

113 } \

114 } \

115 } while (0)

116

117

118

119

120

121

122

123

124

125

126static void

129{

131

133

135

136

138 {

143

144 break;

149

150 break;

151 }

152

153

155}

156

157

158

159

160

161

162

163static void

165{

167

171

172

174 {

175 Oid equalityOp,

176 equalityFunc;

178

180 key->attno = plannode->sort.sortColIdx[i];

181

183 NULL);

185 elog(ERROR, "missing equality operator for ordering operator %u",

186 plannode->sort.sortOperators[i]);

187

188 equalityFunc = get_opcode(equalityOp);

190 elog(ERROR, "missing function for operator %u", equalityOp);

191

192

194

195

198 plannode->sort.collations[i], NULL, NULL);

199 key->fcinfo->args[0].isnull = false;

200 key->fcinfo->args[1].isnull = false;

201 }

202}

203

204

205

206

207

208

209

210

211static bool

213{

214 int nPresortedCols;

215

217

218

219

220

221

222

223

224 for (int i = nPresortedCols - 1; i >= 0; i--)

225 {

227 datumB,

228 result;

229 bool isnullA,

230 isnullB;

233

234 datumA = slot_getattr(pivot, attno, &isnullA);

235 datumB = slot_getattr(tuple, attno, &isnullB);

236

237

238 if (isnullA || isnullB)

239 {

240 if (isnullA == isnullB)

241 continue;

242 else

243 return false;

244 }

245

247

248 key->fcinfo->args[0].value = datumA;

249 key->fcinfo->args[1].value = datumB;

250

251

252 key->fcinfo->isnull = false;

253

255

256

257 if (key->fcinfo->isnull)

258 elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid);

259

261 return false;

262 }

263 return true;

264}

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285static void

287{

294

298

299

301 {

304

305

306

307

308

311 &(plannode->sort.sortColIdx[nPresortedCols]),

312 &(plannode->sort.sortOperators[nPresortedCols]),

313 &(plannode->sort.collations[nPresortedCols]),

314 &(plannode->sort.nullsFirst[nPresortedCols]),

316 NULL,

319 }

320 else

321 {

322

324 }

325

326

327

328

329

330

332 {

337 }

338

339

340

341

342

344 {

345

346

347

348

349

351 {

353

355 }

356 else

357 {

361

362

363

364

365

368

370 {

372 }

373 else

374 {

375

376

377

378

379

380

381

382

383

384

385

386

387

389

390

391 break;

392 }

393 }

394 }

395

396

397

398

399

400

401

402 SO1_printf("Moving " INT64_FORMAT " tuples to presorted prefix tuplesort\n", nTuples);

405

407 {

408

409

410

411

412

413

414

415

417 SO_printf("Setting execution_status to INCSORT_LOADPREFIXSORT (switchToPresortedPrefixMode)\n");

419

420

421

422

423

424

426 }

427 else

428 {

429

430

431

432

433

434

435 SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples);

437

439

441 {

442

443

444

445

446

447

451 }

452

453 SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (switchToPresortedPrefixMode)\n");

455 }

456}

457

458

459

460

461

462

463

464

465

466

467#define DEFAULT_MIN_GROUP_SIZE 32

468

469

470

471

472

473

474

475

476

477

478

479#define DEFAULT_MAX_FULL_SORT_GROUP_SIZE (2 * DEFAULT_MIN_GROUP_SIZE)

480

481

482

483

484

485

486

487

488

489

490

491

492

493

496{

506 int64 nTuples = 0;

507 int64 minGroupSize;

508

510

514

515

516

517

518

519

522 {

523

524

525

529

530

531

532

533

534

535

536

537

540

541

542

543

544

545

546 return slot;

548 {

549

550

551

552

553

554

555

556

557

558

559

560 SO1_printf("Re-calling switchToPresortedPrefixMode() because n_fullsort_remaining is > 0 (" INT64_FORMAT ")\n",

563 }

564 else

565 {

566

567

568

569

570

571

572 SO_printf("Setting execution_status to INCSORT_LOADFULLSORT (n_fullsort_remaining > 0)\n");

574 }

575 }

576

577

578

579

580

582

585

586

588 {

589

590

591

592 if (fullsort_state == NULL)

593 {

594

595

596

597

598

599

600

602

603

604

605

606

607

608

609

612 plannode->sort.sortColIdx,

613 plannode->sort.sortOperators,

614 plannode->sort.collations,

615 plannode->sort.nullsFirst,

617 NULL,

622 }

623 else

624 {

625

627 }

628

629

630

631

632

634 {

636

637

638

639

640

641

642

645

647 }

648 else

650

651

652

653

654

655

656

658 {

660 nTuples++;

661

662

663

664

665

666

667

668

669 if (nTuples != minGroupSize)

671 }

672

673

674

675

676

677

678 for (;;)

679 {

681

682

683

684

685

687 {

688

689

690

691

692

694

697

699

700 SO_printf("Setting execution_status to INCSORT_READFULLSORT (final tuple)\n");

702 break;

703 }

704

705

706 if (nTuples < minGroupSize)

707 {

708

709

710

711

712

713

714

716 nTuples++;

717

718

719

720

721

722 if (nTuples == minGroupSize)

724 }

725 else

726 {

727

728

729

730

731

732

733

734

736 {

737

738

739

740

742 nTuples++;

743 }

744 else

745 {

746

747

748

749

750

751

752

754

756 {

757

758

759

760

761

762

763

768 }

769

770

771

772

773

774

776 nTuples);

778

780

781 SO_printf("Setting execution_status to INCSORT_READFULLSORT (found end of group)\n");

783 break;

784 }

785 }

786

787

788

789

790

791

792

793

794

797 {

798

799

800

801

802

803

804

805

806

808

809

810

811

812

813

814

815

818

820

821

822

823

824

825

826

827

828

829

830

832 {

834

836 nTuples, Min(currentBound, nTuples));

837 nTuples = Min(currentBound, nTuples);

838 }

839

840 SO1_printf("Setting n_fullsort_remaining to " INT64_FORMAT " and calling switchToPresortedPrefixMode()\n",

841 nTuples);

842

843

844

845

846

847

848

850

851

853

854

855

856

857

858

859

860

861

862

863

864

865

866 break;

867 }

868 }

869 }

870

872 {

873

874

875

876

877

878

879

881

882

883

884

885

886

887

888 for (;;)

889 {

891

892

893

894

895

897 {

898

899

900

901

902

904 break;

905 }

906

907

908

909

910

911

912

913

915 {

917 nTuples++;

918 }

919 else

920 {

922 break;

923 }

924 }

925

926

927

928

929

930 SO1_printf("Sorting presorted prefix tuplesort with " INT64_FORMAT " tuples\n", nTuples);

932

934

935 SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (found end of group)\n");

937

939 {

940

941

942

943

944

945

950 }

951 }

952

953

955

956

957

958

959

964 false, slot, NULL);

965 return slot;

966}

967

968

969

970

971

972

973

974

977{

979

980 SO_printf("ExecInitIncrementalSort: initializing sort node\n");

981

982

983

984

985

986

988

989

992 incrsortstate->ss.ps.state = estate;

994

996 incrsortstate->bounded = false;

1005

1007 {

1012

1019 prefixsortGroupInfo->groupCount = 0;

1025 }

1026

1027

1028

1029

1030

1031

1032

1033

1034

1035

1036

1037

1038

1039

1040

1041

1043

1044

1045

1046

1048

1049

1050

1051

1052

1055

1056

1057

1058

1059

1066

1067 SO_printf("ExecInitIncrementalSort: sort node initialized\n");

1068

1069 return incrsortstate;

1070}

1071

1072

1073

1074

1075

1076void

1078{

1079 SO_printf("ExecEndIncrementalSort: shutting down sort node\n");

1080

1083

1084

1085

1086

1088 {

1091 }

1093 {

1096 }

1097

1098

1099

1100

1102

1103 SO_printf("ExecEndIncrementalSort: sort node shutdown\n");

1104}

1105

1106void

1108{

1110

1111

1112

1113

1114

1115

1116

1117

1118

1119

1120

1121

1122

1123

1124

1125

1126

1128

1133

1137

1139

1140

1141

1142

1143

1144

1145

1146

1147

1152

1153

1154

1155

1156

1157 if (outerPlan->chgParam == NULL)

1159}

1160

1161

1162

1163

1164

1165

1166

1167

1168

1169

1170

1171

1172void

1174{

1176

1177

1179 return;

1180

1185}

1186

1187

1188

1189

1190

1191

1192

1193void

1195{

1197

1198

1200 return;

1201

1205

1210}

1211

1212

1213

1214

1215

1216

1217

1218void

1220{

1224}

1225

1226

1227

1228

1229

1230

1231

1232void

1234{

1237

1239 return;

1240

1246}

#define OidIsValid(objectId)

void ExecReScan(PlanState *node)

void ExecEndNode(PlanState *node)

PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)

TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)

void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)

void ExecInitResultTupleSlotTL(PlanState *planstate, const TupleTableSlotOps *tts_ops)

const TupleTableSlotOps TTSOpsMinimalTuple

TupleDesc ExecGetResultType(PlanState *planstate)

void ExecCreateScanSlotFromOuterPlan(EState *estate, ScanState *scanstate, const TupleTableSlotOps *tts_ops)

#define SO2_printf(s, p1, p2)

#define outerPlanState(node)

struct IncrementalSortInfo IncrementalSortInfo

#define EXEC_FLAG_BACKWARD

static TupleTableSlot * ExecProcNode(PlanState *node)

void fmgr_info_cxt(Oid functionId, FmgrInfo *finfo, MemoryContext mcxt)

#define SizeForFunctionCallInfo(nargs)

#define InitFunctionCallInfoData(Fcinfo, Flinfo, Nargs, Collation, Context, Resultinfo)

#define FunctionCallInvoke(fcinfo)

Assert(PointerIsAligned(start, uint64))

Oid get_equality_op_for_ordering_op(Oid opno, bool *reverse)

RegProcedure get_opcode(Oid opno)

void * palloc0(Size size)

MemoryContext CurrentMemoryContext

#define CHECK_FOR_INTERRUPTS()

void ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)

static void switchToPresortedPrefixMode(PlanState *pstate)

static void instrumentSortedGroup(IncrementalSortGroupInfo *groupInfo, Tuplesortstate *sortState)

void ExecReScanIncrementalSort(IncrementalSortState *node)

void ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)

void ExecEndIncrementalSort(IncrementalSortState *node)

static TupleTableSlot * ExecIncrementalSort(PlanState *pstate)

IncrementalSortState * ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags)

#define INSTRUMENT_SORT_GROUP(node, groupName)

static bool isCurrentGroup(IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple)

static void preparePresortedCols(IncrementalSortState *node)

void ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)

#define DEFAULT_MAX_FULL_SORT_GROUP_SIZE

void ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)

#define DEFAULT_MIN_GROUP_SIZE

#define castNode(_type_, nodeptr)

static bool DatumGetBool(Datum X)

#define ScanDirectionIsForward(direction)

void * shm_toc_allocate(shm_toc *toc, Size nbytes)

void shm_toc_insert(shm_toc *toc, uint64 key, void *address)

void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)

#define shm_toc_estimate_chunk(e, sz)

#define shm_toc_estimate_keys(e, cnt)

Size add_size(Size s1, Size s2)

Size mul_size(Size s1, Size s2)

ScanDirection es_direction

int64 totalMemorySpaceUsed

IncrementalSortGroupInfo prefixsortGroupInfo

IncrementalSortGroupInfo fullsortGroupInfo

Tuplesortstate * prefixsort_state

TupleTableSlot * group_pivot

TupleTableSlot * transfer_tuple

Tuplesortstate * fullsort_state

SharedIncrementalSortInfo * shared_info

IncrementalSortExecutionStatus execution_status

PresortedKeyData * presorted_keys

IncrementalSortInfo incsort_info

int64 n_fullsort_remaining

shm_toc_estimator estimator

Instrumentation * instrument

TupleTableSlot * ps_ResultTupleSlot

ProjectionInfo * ps_ProjInfo

ExecProcNodeMtd ExecProcNode

TuplesortMethod sortMethod

TuplesortSpaceType spaceType

void tuplesort_performsort(Tuplesortstate *state)

void tuplesort_reset(Tuplesortstate *state)

bool tuplesort_used_bound(Tuplesortstate *state)

void tuplesort_get_stats(Tuplesortstate *state, TuplesortInstrumentation *stats)

void tuplesort_end(Tuplesortstate *state)

void tuplesort_set_bound(Tuplesortstate *state, int64 bound)

#define TUPLESORT_ALLOWBOUNDED

void tuplesort_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot)

Tuplesortstate * tuplesort_begin_heap(TupleDesc tupDesc, int nkeys, AttrNumber *attNums, Oid *sortOperators, Oid *sortCollations, bool *nullsFirstFlags, int workMem, SortCoordinate coordinate, int sortopt)

bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward, bool copy, TupleTableSlot *slot, Datum *abbrev)

static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)

static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)

static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)