PostgreSQL Source Code: src/backend/storage/aio/aio.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
40
51#include "utils/wait_event_types.h"
52
53
61
62
63
67#ifdef IOMETHOD_IO_URING_ENABLED
68 {"io_uring", IOMETHOD_IO_URING, false},
69#endif
70 {NULL, 0, false}
71};
72
73
76
77
79
80
82
83
87#ifdef IOMETHOD_IO_URING_ENABLED
88 [IOMETHOD_IO_URING] = &pgaio_uring_ops,
89#endif
90};
91
92
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
160{
162
163 while (true)
164 {
166
167 if (h != NULL)
168 return h;
169
170
171
172
173
175 }
176}
177
178
179
180
181
182
183
186{
188
190 {
193 }
194
196 elog(ERROR, "API violation: Only one IO can be handed out");
197
198
199
200
201
203
205 {
207
209
212
215
216 if (resowner)
218
219 if (ret)
220 {
223 }
224 }
225
227
228 return ioh;
229}
230
231
232
233
234
235
236void
238{
240 {
243
245
246
247
248
249
250
252 }
253 else
254 {
255 elog(ERROR, "release in unexpected state");
256 }
257}
258
259
260
261
262void
264{
266
268
269
270
271
272
274
277
278 switch (ioh->state)
279 {
282 break;
285
287 {
289 if (!on_error)
291 }
292
294 break;
297 if (!on_error)
298 elog(WARNING, "AIO handle was not submitted");
300 break;
305
306 break;
307 }
308
309
310
311
312
315
317}
318
319
320
321
322
323
324
325
326void
328{
330
332}
333
334
335
336
337
338int
340{
344}
345
346
347
348
349
350
353{
355}
356
357
358
359
360
361
362void
364{
369
373}
374
375
376
377
378
379
380
381
382static inline void
384{
385
386
387
388
389
391
393 "updating state to %s",
395
396
397
398
399
401
402 ioh->state = new_state;
403}
404
405static void
407{
410
413}
414
415
416
417
418
419
420void
422{
423 bool needs_synchronous;
424
428
429
430
431
432
433
435
436 ioh->op = op;
438
440
441
443
445
447
448
449
450
451
453
455 "staged (synchronous: %d, in_batch: %d)",
457
458 if (!needs_synchronous)
459 {
462
463
464
465
466
469 }
470 else
471 {
474 }
475
477}
478
479bool
481{
482
483
484
485
486
487
488
489
491 return true;
492
493
496
497 return false;
498}
499
500
501
502
503
504
505
506void
508{
510
512}
513
514
515
516
517
518
519
520
521
522
523
524void
526{
528
530
531 ioh->result = result;
532
534
535 INJECTION_POINT("aio-process-completion-before-shared", ioh);
536
538
540
541
543
544
547}
548
549
550
551
552
553
554
555bool
557{
560
561 return ioh->generation != ref_generation;
562}
563
564
565
566
567
568static void
570{
572 bool am_owner;
573
575
577 return;
578
579 if (am_owner)
580 {
585 {
586 elog(PANIC, "waiting for own IO %d in wrong state: %s",
588 }
589 }
590
591 while (true)
592 {
594 return;
595
597 {
601 break;
602
604
605
606
607
608
609
611 {
613 continue;
614 }
615
616
617
620
621
623
625
627
629 {
632 break;
634 }
635
637 break;
638
641
642
643
644
645
646
647
648 if (am_owner)
650 return;
651 }
652 }
653}
654
655
656
657
658
659
660
661
662
663
664static void
666{
667
670
671
673
674
675
676
677
678
680 {
682
685
687 {
690 }
691 }
692
694 "reclaiming: distilled_result: (status %s, id %u, error_data %d), raw_result: %d",
699
700
703
705 {
708 }
709
711
712
713
714
715
716
717
718
719
720
723
724
726
735
736
737
738
739
741
743}
744
745
746
747
748
749
750static void
752{
753 int reclaimed = 0;
754
755 pgaio_debug(DEBUG2, "waiting for free IO with %d pending, %d in-flight, %d idle IOs",
759
760
761
762
763
764
765
767 {
769
771 {
772
773
774
775
776
778 reclaimed++;
779 }
780 }
781
782 if (reclaimed > 0)
783 return;
784
785
786
787
788
789
792
793
795 return;
796
804
805
806
807
808
809
810
811 {
815
816 switch (ioh->state)
817 {
818
824 elog(ERROR, "shouldn't get here with io:%d in state %d",
826 break;
827
831 "waiting for free io with %d in flight",
833
834
835
836
837
838
839
840
841
842
844 break;
845
847
848
849
850
851
852
853
854
855
857 break;
858 }
859
861 elog(PANIC, "no idle IO after waiting for IO to terminate");
862 return;
863 }
864}
865
866
867
868
869
872{
874
876
878
881
882 Assert(*ref_generation != 0);
883
884 return ioh;
885}
886
887static const char *
889{
890#define PGAIO_HS_TOSTR_CASE(sym) case PGAIO_HS_##sym: return #sym
891 switch (s)
892 {
901 }
902#undef PGAIO_HS_TOSTR_CASE
903
904 return NULL;
905}
906
907const char *
909{
911}
912
913const char *
915{
916 switch (rs)
917 {
919 return "UNKNOWN";
921 return "OK";
923 return "WARNING";
925 return "PARTIAL";
927 return "ERROR";
928 }
929
930 return NULL;
931}
932
933
934
935
936
937
938
939
940
941
942
943void
945{
947}
948
949
950bool
952{
954}
955
956
957
958
959int
961{
964}
965
966
967
968
969
970void
972{
973 uint64 ref_generation;
975
977
979}
980
981
982
983
984bool
986{
987 uint64 ref_generation;
989 bool am_owner;
991
993
995 return true;
996
998 return true;
999
1001
1004 {
1005
1006
1007
1008
1009
1010 if (am_owner)
1012 return true;
1013 }
1014
1015
1016
1017
1018
1019
1020 return false;
1021}
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060void
1062{
1064 elog(ERROR, "starting batch while batch already in progress");
1066}
1067
1068
1069
1070
1071void
1073{
1075
1078}
1079
1080
1081
1082
1083
1084
1085
1086bool
1088{
1092}
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102void
1104{
1105 int total_submitted = 0;
1106 int did_submit;
1107
1109 return;
1110
1111
1113
1116
1118
1119 total_submitted += did_submit;
1120
1121 Assert(total_submitted == did_submit);
1122
1124
1126 "aio: submitted %d IOs",
1127 total_submitted);
1128}
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144void
1146{
1147
1148
1149
1150
1151
1153 {
1155
1157 }
1158
1159
1160
1161
1163}
1164
1165
1166
1167
1168
1169
1170
1171
1172void
1174{
1175
1176
1177
1178
1179
1180
1181
1182
1184 {
1186 elog(WARNING, "open AIO batch at end of (sub-)transaction");
1187 }
1188
1189
1190
1191
1193}
1194
1195
1196
1197
1198
1199void
1201{
1202
1203
1204
1205
1207 return;
1208
1209
1210
1211
1212
1214 {
1216 "submitting %d IOs before FD %d gets closed",
1219 }
1220
1221
1222
1223
1224
1226 {
1227
1228
1229
1230
1231
1232
1234 {
1238
1240 {
1242
1244
1246 break;
1247 else
1248 ioh = NULL;
1249 }
1250
1251 if (!ioh)
1252 break;
1253
1255 "waiting for IO before FD %d gets closed, %d in-flight IOs",
1257
1258
1260 }
1261 }
1262}
1263
1264
1265
1266
1267void
1269{
1272
1273
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1286 {
1289
1291 "waiting for IO to complete during shutdown, %d in-flight IOs",
1293
1294
1296 }
1297
1299}
1300
1301void
1303{
1306
1308}
1309
1310bool
1312{
1314 {
1315
1316
1317
1318
1319 return true;
1320 }
1321 else if (*newval == 0)
1322 {
1324 return false;
1325 }
1326
1327 return true;
1328}
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
bool pgaio_wref_valid(PgAioWaitRef *iow)
int pgaio_io_get_id(PgAioHandle *ioh)
PgAioBackend * pgaio_my_backend
const char * pgaio_result_status_string(PgAioResultStatus rs)
PgAioHandle * pgaio_io_acquire(struct ResourceOwnerData *resowner, PgAioReturn *ret)
void assign_io_method(int newval, void *extra)
static void pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state)
void pgaio_wref_clear(PgAioWaitRef *iow)
bool pgaio_io_needs_synchronous_execution(PgAioHandle *ioh)
static void pgaio_io_wait_for_free(void)
#define PGAIO_HS_TOSTR_CASE(sym)
static const char * pgaio_io_state_get_name(PgAioHandleState s)
void pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)
static void pgaio_io_resowner_register(PgAioHandle *ioh)
static PgAioHandle * pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation)
void pgaio_io_get_wref(PgAioHandle *ioh, PgAioWaitRef *iow)
void pgaio_closing_fd(int fd)
void pgaio_io_stage(PgAioHandle *ioh, PgAioOp op)
void pgaio_io_set_flag(PgAioHandle *ioh, PgAioHandleFlags flag)
bool pgaio_have_staged(void)
const IoMethodOps * pgaio_method_ops
bool pgaio_wref_check_done(PgAioWaitRef *iow)
static const IoMethodOps *const pgaio_method_ops_table[]
static void pgaio_io_reclaim(PgAioHandle *ioh)
ProcNumber pgaio_io_get_owner(PgAioHandle *ioh)
void pgaio_enter_batchmode(void)
void pgaio_submit_staged(void)
const char * pgaio_io_get_state_name(PgAioHandle *ioh)
const struct config_enum_entry io_method_options[]
bool pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state)
void pgaio_io_prepare_submit(PgAioHandle *ioh)
void pgaio_wref_wait(PgAioWaitRef *iow)
void pgaio_error_cleanup(void)
void pgaio_io_release(PgAioHandle *ioh)
int pgaio_wref_get_id(PgAioWaitRef *iow)
void AtEOXact_Aio(bool is_commit)
void pgaio_shutdown(int code, Datum arg)
bool check_io_max_concurrency(int *newval, void **extra, GucSource source)
static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation)
void pgaio_exit_batchmode(void)
PgAioHandle * pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret)
#define DEFAULT_IO_METHOD
void pgaio_io_call_stage(PgAioHandle *ioh)
PgAioResult pgaio_io_call_complete_local(PgAioHandle *ioh)
void pgaio_io_call_complete_shared(PgAioHandle *ioh)
@ PGAIO_HS_COMPLETED_SHARED
@ PGAIO_HS_COMPLETED_LOCAL
#define pgaio_debug(elevel, msg,...)
#define pgaio_debug_io(elevel, ioh, msg,...)
#define PGAIO_SUBMIT_BATCH_SIZE
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
bool pgaio_io_uses_fd(PgAioHandle *ioh, int fd)
bool pgaio_io_has_target(PgAioHandle *ioh)
#define pg_read_barrier()
#define pg_write_barrier()
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int errmsg_internal(const char *fmt,...)
int errdetail_internal(const char *fmt,...)
#define ereport(elevel,...)
volatile uint32 CritSectionCount
#define GUC_check_errdetail
Assert(PointerIsAligned(start, uint64))
#define dclist_container(type, membername, ptr)
#define dclist_head_element(type, membername, lhead)
static void dclist_push_tail(dclist_head *head, dlist_node *node)
static uint32 dclist_count(const dclist_head *head)
static bool dclist_is_empty(const dclist_head *head)
static void dclist_delete_from(dclist_head *head, dlist_node *node)
static dlist_node * dclist_pop_head_node(dclist_head *head)
static void dclist_push_head(dclist_head *head, dlist_node *node)
#define dlist_container(type, membername, ptr)
#define dclist_foreach(iter, lhead)
#define INJECTION_POINT(name, arg)
const IoMethodOps pgaio_sync_ops
const IoMethodOps pgaio_worker_ops
#define RESUME_INTERRUPTS()
#define INTERRUPTS_CAN_BE_PROCESSED()
#define START_CRIT_SECTION()
#define HOLD_INTERRUPTS()
#define END_CRIT_SECTION()
static rewind_source * source
static int fd(const char *x, int i)
ResourceOwner CurrentResourceOwner
void ResourceOwnerRememberAioHandle(ResourceOwner owner, struct dlist_node *ioh_node)
void ResourceOwnerForgetAioHandle(ResourceOwner owner, struct dlist_node *ioh_node)
bool wait_on_fd_before_close
int(* submit)(uint16 num_staged_ios, PgAioHandle **staged_ios)
void(* wait_one)(PgAioHandle *ioh, uint64 ref_generation)
bool(* needs_synchronous_execution)(PgAioHandle *ioh)
dclist_head in_flight_ios
PgAioHandle * staged_ios[PGAIO_SUBMIT_BATCH_SIZE]
PgAioHandle * handed_out_io
PgAioTargetData target_data
struct ResourceOwnerData * resowner
PgAioResult distilled_result
PgAioReturn * report_return
PgAioTargetData target_data