PostgreSQL Source Code: src/backend/storage/aio/method_worker.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

31

49

50

51

52#define IO_WORKER_WAKEUP_FANOUT 2

53

54

56{

63

65{

69

71{

75

76

79

82

83

87

90};

91

92

93

95

96

101

102

103static size_t

105{

106

108

110 sizeof(uint32) * *queue_size;

111}

112

113static size_t

115{

118}

119

120static size_t

122{

123 size_t sz;

124 int queue_size;

125

128

129 return sz;

130}

131

132static void

134{

135 bool found;

136 int queue_size;

137

141 &found);

142 if (!found)

143 {

147 }

148

152 &found);

153 if (!found)

154 {

157 {

160 }

161 }

162}

163

164static int

166{

167 int worker;

168

170 return -1;

171

172

175

176 return worker;

177}

178

179static bool

181{

184

186 new_head = (queue->head + 1) & (queue->size - 1);

187 if (new_head == queue->tail)

188 {

191 return false;

192 }

193

195 queue->head = new_head;

196

197 return true;

198}

199

202{

205

207 if (queue->tail == queue->head)

208 return UINT32_MAX;

209

210 result = queue->ios[queue->tail];

211 queue->tail = (queue->tail + 1) & (queue->size - 1);

212

213 return result;

214}

215

218{

221

224

225 if (tail > head)

227

228 Assert(head >= tail);

229

230 return head - tail;

231}

232

233static bool

235{

236 return

240}

241

242static void

244{

246 int nsync = 0;

248 int worker;

249

251

253 for (int i = 0; i < nios; ++i)

254 {

257 {

258

259

260

261

262 synchronous_ios[nsync++] = ios[i];

263 continue;

264 }

265

267 {

268

270 if (worker >= 0)

272

274 "choosing worker %d",

275 worker);

276 }

277 }

279

282

283

284 if (nsync > 0)

285 {

286 for (int i = 0; i < nsync; ++i)

287 {

289 }

290 }

291}

292

293static int

295{

296 for (int i = 0; i < num_staged_ios; i++)

297 {

299

301 }

302

304

305 return num_staged_ios;

306}

307

308

309

310

311

312static void

314{

318

322}

323

324

325

326

327

328static void

330{

332

333

334

335

336

338

340 {

342 {

346 break;

347 }

348 else

350 }

351

353 elog(ERROR, "couldn't find a free worker slot");

354

358

360}

361

362static void

364{

369

370 if (!ioh)

371 return;

372

375

378 owner_pid = owner_proc->pid;

379

380 errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);

381}

382

383void

384IoWorkerMain(const void *startup_data, size_t startup_data_len)

385{

386 sigjmp_buf local_sigjmp_buf;

389 volatile int error_errno = 0;

390 char cmd[128];

391

394

396 pqsignal(SIGINT, die);

397

398

399

400

401

403

408

409

411

414

418

419

420 if (sigsetjmp(local_sigjmp_buf, 1) != 0)

421 {

424

426

427

428

429

430

431

432

433

435

436 if (error_ioh != NULL)

437 {

438

439 Assert(error_errno != 0);

440

441 errno = error_errno;

442

446 }

447

449 }

450

451

453

454 sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);

455

457 {

460 int nlatches = 0;

461 int nwakeups = 0;

462 int worker;

463

464

467 {

468

469

470

471

472

473

475 }

476 else

477 {

478

480

481

484 for (int i = 0; i < nwakeups; ++i)

485 {

487 break;

489 }

490 }

492

493 for (int i = 0; i < nlatches; ++i)

495

496 if (io_index != UINT32_MAX)

497 {

499

501 error_ioh = ioh;

502 errcallback.arg = ioh;

503

505 "worker %d processing IO",

507

508

509

510

511

512

514

515

516

517

518

519

520

521

522 error_errno = ENOENT;

524

525

526

527

528

530

531 error_errno = 0;

532 error_ioh = NULL;

533

534

535

536

537

538

539

540

541

542#ifdef USE_VALGRIND

543 {

544 struct iovec *iov;

546

547 for (int i = 0; i < iov_length; i++)

549 }

550#endif

551

552

553

554

555

556

557

559

561 errcallback.arg = NULL;

562 }

563 else

564 {

566 WAIT_EVENT_IO_WORKER_MAIN);

568 }

569

571 }

572

575}

576

577bool

579{

581}

void pgaio_io_process_completion(PgAioHandle *ioh, int result)

int pgaio_io_get_id(PgAioHandle *ioh)

void pgaio_io_prepare_submit(PgAioHandle *ioh)

@ PGAIO_HF_REFERENCES_LOCAL

#define pgaio_debug(elevel, msg,...)

#define pgaio_debug_io(elevel, ioh, msg,...)

#define PGAIO_SUBMIT_BATCH_SIZE

void pgaio_io_perform_synchronously(PgAioHandle *ioh)

int pgaio_io_get_iovec_length(PgAioHandle *ioh, struct iovec **iov)

void pgaio_io_reopen(PgAioHandle *ioh)

bool pgaio_io_can_reopen(PgAioHandle *ioh)

void AuxiliaryProcessMainCommon(void)

#define FLEXIBLE_ARRAY_MEMBER

void EmitErrorReport(void)

ErrorContextCallback * error_context_stack

sigjmp_buf * PG_exception_stack

Assert(PointerIsAligned(start, uint64))

#define INJECTION_POINT(name, arg)

void SignalHandlerForShutdownRequest(SIGNAL_ARGS)

volatile sig_atomic_t ShutdownRequestPending

void SignalHandlerForConfigReload(SIGNAL_ARGS)

void on_shmem_exit(pg_on_exit_callback function, Datum arg)

void SetLatch(Latch *latch)

void ResetLatch(Latch *latch)

int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)

bool LWLockAcquire(LWLock *lock, LWLockMode mode)

void LWLockRelease(LWLock *lock)

void LWLockReleaseAll(void)

#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)

static size_t pgaio_worker_control_shmem_size(void)

static uint32 pgaio_worker_submission_queue_depth(void)

struct AioWorkerControl AioWorkerControl

static void pgaio_worker_error_callback(void *arg)

static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)

static int pgaio_choose_idle_worker(void)

static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)

#define IO_WORKER_WAKEUP_FANOUT

static size_t pgaio_worker_shmem_size(void)

static size_t pgaio_worker_queue_shmem_size(int *queue_size)

static AioWorkerSubmissionQueue * io_worker_submission_queue

static int io_worker_queue_size

static void pgaio_worker_register(void)

struct AioWorkerSubmissionQueue AioWorkerSubmissionQueue

const IoMethodOps pgaio_worker_ops

static void pgaio_worker_die(int code, Datum arg)

struct AioWorkerSlot AioWorkerSlot

static uint32 pgaio_worker_submission_queue_consume(void)

static void pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])

static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh)

bool pgaio_workers_enabled(void)

void IoWorkerMain(const void *startup_data, size_t startup_data_len)

static AioWorkerControl * io_worker_control

static void pgaio_worker_shmem_init(bool first_time)

#define RESUME_INTERRUPTS()

#define START_CRIT_SECTION()

#define CHECK_FOR_INTERRUPTS()

#define HOLD_INTERRUPTS()

#define END_CRIT_SECTION()

BackendType MyBackendType

static int pg_rightmost_one_pos64(uint64 word)

static uint32 pg_nextpower2_32(uint32 num)

#define GetPGProcByNumber(n)

void procsignal_sigusr1_handler(SIGNAL_ARGS)

static void set_ps_display(const char *activity)

Size add_size(Size s1, Size s2)

void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)

AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]

uint32 ios[FLEXIBLE_ARRAY_MEMBER]

struct ErrorContextCallback * previous

void(* callback)(void *arg)

size_t(* shmem_size)(void)

#define WL_EXIT_ON_PM_DEATH

static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]