PostgreSQL Source Code: src/backend/storage/aio/aio.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

40

51#include "utils/wait_event_types.h"

52

53

61

62

63

67#ifdef IOMETHOD_IO_URING_ENABLED

68 {"io_uring", IOMETHOD_IO_URING, false},

69#endif

70 {NULL, 0, false}

71};

72

73

76

77

79

80

82

83

87#ifdef IOMETHOD_IO_URING_ENABLED

88 [IOMETHOD_IO_URING] = &pgaio_uring_ops,

89#endif

90};

91

92

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

160{

162

163 while (true)

164 {

166

167 if (h != NULL)

168 return h;

169

170

171

172

173

175 }

176}

177

178

179

180

181

182

183

186{

188

190 {

193 }

194

196 elog(ERROR, "API violation: Only one IO can be handed out");

197

198

199

200

201

203

205 {

207

209

212

215

216 if (resowner)

218

219 if (ret)

220 {

223 }

224 }

225

227

228 return ioh;

229}

230

231

232

233

234

235

236void

238{

240 {

243

245

246

247

248

249

250

252 }

253 else

254 {

255 elog(ERROR, "release in unexpected state");

256 }

257}

258

259

260

261

262void

264{

266

268

269

270

271

272

274

277

278 switch (ioh->state)

279 {

282 break;

285

287 {

289 if (!on_error)

291 }

292

294 break;

297 if (!on_error)

298 elog(WARNING, "AIO handle was not submitted");

300 break;

305

306 break;

307 }

308

309

310

311

312

315

317}

318

319

320

321

322

323

324

325

326void

328{

330

332}

333

334

335

336

337

338int

340{

344}

345

346

347

348

349

350

353{

355}

356

357

358

359

360

361

362void

364{

369

373}

374

375

376

377

378

379

380

381

382static inline void

384{

385

386

387

388

389

391

393 "updating state to %s",

395

396

397

398

399

401

402 ioh->state = new_state;

403}

404

405static void

407{

410

413}

414

415

416

417

418

419

420void

422{

423 bool needs_synchronous;

424

428

429

430

431

432

433

435

436 ioh->op = op;

438

440

441

443

445

447

448

449

450

451

453

455 "staged (synchronous: %d, in_batch: %d)",

457

458 if (!needs_synchronous)

459 {

462

463

464

465

466

469 }

470 else

471 {

474 }

475

477}

478

479bool

481{

482

483

484

485

486

487

488

489

491 return true;

492

493

496

497 return false;

498}

499

500

501

502

503

504

505

506void

508{

510

512}

513

514

515

516

517

518

519

520

521

522

523

524void

526{

528

530

531 ioh->result = result;

532

534

535 INJECTION_POINT("aio-process-completion-before-shared", ioh);

536

538

540

541

543

544

547}

548

549

550

551

552

553

554

555bool

557{

560

561 return ioh->generation != ref_generation;

562}

563

564

565

566

567

568static void

570{

572 bool am_owner;

573

575

577 return;

578

579 if (am_owner)

580 {

585 {

586 elog(PANIC, "waiting for own IO %d in wrong state: %s",

588 }

589 }

590

591 while (true)

592 {

594 return;

595

597 {

601 break;

602

604

605

606

607

608

609

611 {

613 continue;

614 }

615

616

617

620

621

623

625

627

629 {

632 break;

634 }

635

637 break;

638

641

642

643

644

645

646

647

648 if (am_owner)

650 return;

651 }

652 }

653}

654

655

656

657

658

659

660

661

662

663

664static void

666{

667

670

671

673

674

675

676

677

678

680 {

682

685

687 {

690 }

691 }

692

694 "reclaiming: distilled_result: (status %s, id %u, error_data %d), raw_result: %d",

699

700

703

705 {

708 }

709

711

712

713

714

715

716

717

718

719

720

723

724

726

735

736

737

738

739

741

743}

744

745

746

747

748

749

750static void

752{

753 int reclaimed = 0;

754

755 pgaio_debug(DEBUG2, "waiting for free IO with %d pending, %d in-flight, %d idle IOs",

759

760

761

762

763

764

765

767 {

769

771 {

772

773

774

775

776

778 reclaimed++;

779 }

780 }

781

782 if (reclaimed > 0)

783 return;

784

785

786

787

788

789

792

793

795 return;

796

804

805

806

807

808

809

810

811 {

815

816 switch (ioh->state)

817 {

818

824 elog(ERROR, "shouldn't get here with io:%d in state %d",

826 break;

827

831 "waiting for free io with %d in flight",

833

834

835

836

837

838

839

840

841

842

844 break;

845

847

848

849

850

851

852

853

854

855

857 break;

858 }

859

861 elog(PANIC, "no idle IO after waiting for IO to terminate");

862 return;

863 }

864}

865

866

867

868

869

872{

874

876

878

881

882 Assert(*ref_generation != 0);

883

884 return ioh;

885}

886

887static const char *

889{

890#define PGAIO_HS_TOSTR_CASE(sym) case PGAIO_HS_##sym: return #sym

891 switch (s)

892 {

901 }

902#undef PGAIO_HS_TOSTR_CASE

903

904 return NULL;

905}

906

907const char *

909{

911}

912

913const char *

915{

916 switch (rs)

917 {

919 return "UNKNOWN";

921 return "OK";

923 return "WARNING";

925 return "PARTIAL";

927 return "ERROR";

928 }

929

930 return NULL;

931}

932

933

934

935

936

937

938

939

940

941

942

943void

945{

947}

948

949

950bool

952{

954}

955

956

957

958

959int

961{

964}

965

966

967

968

969

970void

972{

973 uint64 ref_generation;

975

977

979}

980

981

982

983

984bool

986{

987 uint64 ref_generation;

989 bool am_owner;

991

993

995 return true;

996

998 return true;

999

1001

1004 {

1005

1006

1007

1008

1009

1010 if (am_owner)

1012 return true;

1013 }

1014

1015

1016

1017

1018

1019

1020 return false;

1021}

1022

1023

1024

1025

1026

1027

1028

1029

1030

1031

1032

1033

1034

1035

1036

1037

1038

1039

1040

1041

1042

1043

1044

1045

1046

1047

1048

1049

1050

1051

1052

1053

1054

1055

1056

1057

1058

1059

1060void

1062{

1064 elog(ERROR, "starting batch while batch already in progress");

1066}

1067

1068

1069

1070

1071void

1073{

1075

1078}

1079

1080

1081

1082

1083

1084

1085

1086bool

1088{

1092}

1093

1094

1095

1096

1097

1098

1099

1100

1101

1102void

1104{

1105 int total_submitted = 0;

1106 int did_submit;

1107

1109 return;

1110

1111

1113

1116

1118

1119 total_submitted += did_submit;

1120

1121 Assert(total_submitted == did_submit);

1122

1124

1126 "aio: submitted %d IOs",

1127 total_submitted);

1128}

1129

1130

1131

1132

1133

1134

1135

1136

1137

1138

1139

1140

1141

1142

1143

1144void

1146{

1147

1148

1149

1150

1151

1153 {

1155

1157 }

1158

1159

1160

1161

1163}

1164

1165

1166

1167

1168

1169

1170

1171

1172void

1174{

1175

1176

1177

1178

1179

1180

1181

1182

1184 {

1186 elog(WARNING, "open AIO batch at end of (sub-)transaction");

1187 }

1188

1189

1190

1191

1193}

1194

1195

1196

1197

1198

1199void

1201{

1202

1203

1204

1205

1207 return;

1208

1209

1210

1211

1212

1214 {

1216 "submitting %d IOs before FD %d gets closed",

1219 }

1220

1221

1222

1223

1224

1226 {

1227

1228

1229

1230

1231

1232

1234 {

1238

1240 {

1242

1244

1246 break;

1247 else

1248 ioh = NULL;

1249 }

1250

1251 if (!ioh)

1252 break;

1253

1255 "waiting for IO before FD %d gets closed, %d in-flight IOs",

1257

1258

1260 }

1261 }

1262}

1263

1264

1265

1266

1267void

1269{

1272

1273

1275

1276

1277

1278

1279

1280

1281

1282

1283

1284

1286 {

1289

1291 "waiting for IO to complete during shutdown, %d in-flight IOs",

1293

1294

1296 }

1297

1299}

1300

1301void

1303{

1306

1308}

1309

1310bool

1312{

1314 {

1315

1316

1317

1318

1319 return true;

1320 }

1321 else if (*newval == 0)

1322 {

1324 return false;

1325 }

1326

1327 return true;

1328}

void pgaio_io_process_completion(PgAioHandle *ioh, int result)

bool pgaio_wref_valid(PgAioWaitRef *iow)

int pgaio_io_get_id(PgAioHandle *ioh)

PgAioBackend * pgaio_my_backend

const char * pgaio_result_status_string(PgAioResultStatus rs)

PgAioHandle * pgaio_io_acquire(struct ResourceOwnerData *resowner, PgAioReturn *ret)

void assign_io_method(int newval, void *extra)

static void pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state)

void pgaio_wref_clear(PgAioWaitRef *iow)

bool pgaio_io_needs_synchronous_execution(PgAioHandle *ioh)

static void pgaio_io_wait_for_free(void)

#define PGAIO_HS_TOSTR_CASE(sym)

static const char * pgaio_io_state_get_name(PgAioHandleState s)

void pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)

static void pgaio_io_resowner_register(PgAioHandle *ioh)

static PgAioHandle * pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation)

void pgaio_io_get_wref(PgAioHandle *ioh, PgAioWaitRef *iow)

void pgaio_closing_fd(int fd)

void pgaio_io_stage(PgAioHandle *ioh, PgAioOp op)

void pgaio_io_set_flag(PgAioHandle *ioh, PgAioHandleFlags flag)

bool pgaio_have_staged(void)

const IoMethodOps * pgaio_method_ops

bool pgaio_wref_check_done(PgAioWaitRef *iow)

static const IoMethodOps *const pgaio_method_ops_table[]

static void pgaio_io_reclaim(PgAioHandle *ioh)

ProcNumber pgaio_io_get_owner(PgAioHandle *ioh)

void pgaio_enter_batchmode(void)

void pgaio_submit_staged(void)

const char * pgaio_io_get_state_name(PgAioHandle *ioh)

const struct config_enum_entry io_method_options[]

bool pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state)

void pgaio_io_prepare_submit(PgAioHandle *ioh)

void pgaio_wref_wait(PgAioWaitRef *iow)

void pgaio_error_cleanup(void)

void pgaio_io_release(PgAioHandle *ioh)

int pgaio_wref_get_id(PgAioWaitRef *iow)

void AtEOXact_Aio(bool is_commit)

void pgaio_shutdown(int code, Datum arg)

bool check_io_max_concurrency(int *newval, void **extra, GucSource source)

static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation)

void pgaio_exit_batchmode(void)

PgAioHandle * pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret)

#define DEFAULT_IO_METHOD

void pgaio_io_call_stage(PgAioHandle *ioh)

PgAioResult pgaio_io_call_complete_local(PgAioHandle *ioh)

void pgaio_io_call_complete_shared(PgAioHandle *ioh)

@ PGAIO_HS_COMPLETED_SHARED

@ PGAIO_HS_COMPLETED_LOCAL

#define pgaio_debug(elevel, msg,...)

#define pgaio_debug_io(elevel, ioh, msg,...)

#define PGAIO_SUBMIT_BATCH_SIZE

void pgaio_io_perform_synchronously(PgAioHandle *ioh)

bool pgaio_io_uses_fd(PgAioHandle *ioh, int fd)

bool pgaio_io_has_target(PgAioHandle *ioh)

#define pg_read_barrier()

#define pg_write_barrier()

bool ConditionVariableCancelSleep(void)

void ConditionVariableBroadcast(ConditionVariable *cv)

void ConditionVariablePrepareToSleep(ConditionVariable *cv)

void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)

int errmsg_internal(const char *fmt,...)

int errdetail_internal(const char *fmt,...)

#define ereport(elevel,...)

volatile uint32 CritSectionCount

#define GUC_check_errdetail

Assert(PointerIsAligned(start, uint64))

#define dclist_container(type, membername, ptr)

#define dclist_head_element(type, membername, lhead)

static void dclist_push_tail(dclist_head *head, dlist_node *node)

static uint32 dclist_count(const dclist_head *head)

static bool dclist_is_empty(const dclist_head *head)

static void dclist_delete_from(dclist_head *head, dlist_node *node)

static dlist_node * dclist_pop_head_node(dclist_head *head)

static void dclist_push_head(dclist_head *head, dlist_node *node)

#define dlist_container(type, membername, ptr)

#define dclist_foreach(iter, lhead)

#define INJECTION_POINT(name, arg)

const IoMethodOps pgaio_sync_ops

const IoMethodOps pgaio_worker_ops

#define RESUME_INTERRUPTS()

#define INTERRUPTS_CAN_BE_PROCESSED()

#define START_CRIT_SECTION()

#define HOLD_INTERRUPTS()

#define END_CRIT_SECTION()

static rewind_source * source

static int fd(const char *x, int i)

ResourceOwner CurrentResourceOwner

void ResourceOwnerRememberAioHandle(ResourceOwner owner, struct dlist_node *ioh_node)

void ResourceOwnerForgetAioHandle(ResourceOwner owner, struct dlist_node *ioh_node)

bool wait_on_fd_before_close

int(* submit)(uint16 num_staged_ios, PgAioHandle **staged_ios)

void(* wait_one)(PgAioHandle *ioh, uint64 ref_generation)

bool(* needs_synchronous_execution)(PgAioHandle *ioh)

dclist_head in_flight_ios

PgAioHandle * staged_ios[PGAIO_SUBMIT_BATCH_SIZE]

PgAioHandle * handed_out_io

PgAioTargetData target_data

struct ResourceOwnerData * resowner

PgAioResult distilled_result

PgAioReturn * report_return

PgAioTargetData target_data