PostgreSQL Source Code: src/backend/storage/aio/method_worker.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
31
49
50
51
52#define IO_WORKER_WAKEUP_FANOUT 2
53
54
56{
63
65{
69
71{
75
76
79
82
83
87
90};
91
92
93
95
96
101
102
103static size_t
105{
106
108
110 sizeof(uint32) * *queue_size;
111}
112
113static size_t
115{
118}
119
120static size_t
122{
123 size_t sz;
124 int queue_size;
125
128
129 return sz;
130}
131
132static void
134{
135 bool found;
136 int queue_size;
137
141 &found);
142 if (!found)
143 {
147 }
148
152 &found);
153 if (!found)
154 {
157 {
160 }
161 }
162}
163
164static int
166{
167 int worker;
168
170 return -1;
171
172
175
176 return worker;
177}
178
179static bool
181{
184
186 new_head = (queue->head + 1) & (queue->size - 1);
187 if (new_head == queue->tail)
188 {
191 return false;
192 }
193
195 queue->head = new_head;
196
197 return true;
198}
199
202{
205
207 if (queue->tail == queue->head)
208 return UINT32_MAX;
209
210 result = queue->ios[queue->tail];
211 queue->tail = (queue->tail + 1) & (queue->size - 1);
212
213 return result;
214}
215
218{
221
224
225 if (tail > head)
227
228 Assert(head >= tail);
229
230 return head - tail;
231}
232
233static bool
235{
236 return
240}
241
242static void
244{
246 int nsync = 0;
248 int worker;
249
251
253 for (int i = 0; i < nios; ++i)
254 {
257 {
258
259
260
261
262 synchronous_ios[nsync++] = ios[i];
263 continue;
264 }
265
267 {
268
270 if (worker >= 0)
272
274 "choosing worker %d",
275 worker);
276 }
277 }
279
282
283
284 if (nsync > 0)
285 {
286 for (int i = 0; i < nsync; ++i)
287 {
289 }
290 }
291}
292
293static int
295{
296 for (int i = 0; i < num_staged_ios; i++)
297 {
299
301 }
302
304
305 return num_staged_ios;
306}
307
308
309
310
311
312static void
314{
318
322}
323
324
325
326
327
328static void
330{
332
333
334
335
336
338
340 {
342 {
346 break;
347 }
348 else
350 }
351
353 elog(ERROR, "couldn't find a free worker slot");
354
358
360}
361
362static void
364{
369
370 if (!ioh)
371 return;
372
375
378 owner_pid = owner_proc->pid;
379
380 errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
381}
382
383void
384IoWorkerMain(const void *startup_data, size_t startup_data_len)
385{
386 sigjmp_buf local_sigjmp_buf;
389 volatile int error_errno = 0;
390 char cmd[128];
391
394
397
398
399
400
401
403
408
409
411
414
418
419
420 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
421 {
424
426
427
428
429
430
431
432
433
435
436 if (error_ioh != NULL)
437 {
438
439 Assert(error_errno != 0);
440
441 errno = error_errno;
442
446 }
447
449 }
450
451
453
454 sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
455
457 {
460 int nlatches = 0;
461 int nwakeups = 0;
462 int worker;
463
464
467 {
468
469
470
471
472
473
475 }
476 else
477 {
478
480
481
484 for (int i = 0; i < nwakeups; ++i)
485 {
487 break;
489 }
490 }
492
493 for (int i = 0; i < nlatches; ++i)
495
496 if (io_index != UINT32_MAX)
497 {
499
501 error_ioh = ioh;
502 errcallback.arg = ioh;
503
505 "worker %d processing IO",
507
508
509
510
511
512
514
515
516
517
518
519
520
521
522 error_errno = ENOENT;
524
525
526
527
528
530
531 error_errno = 0;
532 error_ioh = NULL;
533
534
535
536
537
538
539
540
541
542#ifdef USE_VALGRIND
543 {
544 struct iovec *iov;
546
547 for (int i = 0; i < iov_length; i++)
549 }
550#endif
551
552
553
554
555
556
557
559
561 errcallback.arg = NULL;
562 }
563 else
564 {
566 WAIT_EVENT_IO_WORKER_MAIN);
568 }
569
571 }
572
575}
576
577bool
579{
581}
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
int pgaio_io_get_id(PgAioHandle *ioh)
void pgaio_io_prepare_submit(PgAioHandle *ioh)
@ PGAIO_HF_REFERENCES_LOCAL
#define pgaio_debug(elevel, msg,...)
#define pgaio_debug_io(elevel, ioh, msg,...)
#define PGAIO_SUBMIT_BATCH_SIZE
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
int pgaio_io_get_iovec_length(PgAioHandle *ioh, struct iovec **iov)
void pgaio_io_reopen(PgAioHandle *ioh)
bool pgaio_io_can_reopen(PgAioHandle *ioh)
void AuxiliaryProcessMainCommon(void)
#define FLEXIBLE_ARRAY_MEMBER
void EmitErrorReport(void)
ErrorContextCallback * error_context_stack
sigjmp_buf * PG_exception_stack
Assert(PointerIsAligned(start, uint64))
#define INJECTION_POINT(name, arg)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockReleaseAll(void)
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
static size_t pgaio_worker_control_shmem_size(void)
static uint32 pgaio_worker_submission_queue_depth(void)
struct AioWorkerControl AioWorkerControl
static void pgaio_worker_error_callback(void *arg)
static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
static int pgaio_choose_idle_worker(void)
static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
#define IO_WORKER_WAKEUP_FANOUT
static size_t pgaio_worker_shmem_size(void)
static size_t pgaio_worker_queue_shmem_size(int *queue_size)
static AioWorkerSubmissionQueue * io_worker_submission_queue
static int io_worker_queue_size
static void pgaio_worker_register(void)
struct AioWorkerSubmissionQueue AioWorkerSubmissionQueue
const IoMethodOps pgaio_worker_ops
static void pgaio_worker_die(int code, Datum arg)
struct AioWorkerSlot AioWorkerSlot
static uint32 pgaio_worker_submission_queue_consume(void)
static void pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
bool pgaio_workers_enabled(void)
void IoWorkerMain(const void *startup_data, size_t startup_data_len)
static AioWorkerControl * io_worker_control
static void pgaio_worker_shmem_init(bool first_time)
#define RESUME_INTERRUPTS()
#define START_CRIT_SECTION()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
#define END_CRIT_SECTION()
BackendType MyBackendType
static int pg_rightmost_one_pos64(uint64 word)
static uint32 pg_nextpower2_32(uint32 num)
#define GetPGProcByNumber(n)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
static void set_ps_display(const char *activity)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]
uint32 ios[FLEXIBLE_ARRAY_MEMBER]
struct ErrorContextCallback * previous
void(* callback)(void *arg)
size_t(* shmem_size)(void)
#define WL_EXIT_ON_PM_DEATH
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]