PostgreSQL Source Code: src/backend/access/transam/xlogprefetcher.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
29
41#include "utils/fmgrprotos.h"
45
46
47
48
49
50#define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
51
52
53
54
55
56#define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
57
58
59
60
61
62#define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
63
64
65
66
67
69
70#ifdef USE_PREFETCH
71#define RecoveryPrefetchEnabled() \
72 (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
73 maintenance_io_concurrency > 0)
74#else
75#define RecoveryPrefetchEnabled() false
76#endif
77
79
80
81
82
83typedef enum
84{
89
90
91
92
93
96
97
98
99
100
101
102
104{
113 struct
114 {
119
120
121
122
123
125{
126
130
131
133
134
137
138
142
143
145
146
148
150
152};
153
154
155
156
157
158
159
161{
167
168
169
170
172{
180
181
186
198
200
204 uintptr_t lrq_private,
206{
209
210 Assert(max_distance >= max_inflight);
211
212 size = max_distance + 1;
216 lrq->size = size;
218 lrq->head = 0;
219 lrq->tail = 0;
222
223 return lrq;
224}
225
226static inline void
228{
230}
231
234{
236}
237
240{
242}
243
244static inline void
246{
247
250 {
253 {
255 return;
259 break;
263 break;
264 }
267 lrq->head = 0;
268 }
269}
270
271static inline void
273{
274
275
276
277
278 while (lrq->tail != lrq->head &&
280 {
283 else
287 lrq->tail = 0;
288 }
291}
292
293size_t
295{
297}
298
299
300
301
302void
304{
312}
313
314void
316{
317 bool found;
318
322 &found);
323
324 if (!found)
325 {
333 }
334}
335
336
337
338
339void
341{
343}
344
345
346
347
348
349
350static inline void
352{
355}
356
357
358
359
360
363{
366
368 prefetcher->reader = reader;
369
375
379
380
382
383 return prefetcher;
384}
385
386
387
388
389void
391{
394 pfree(prefetcher);
395}
396
397
398
399
402{
403 return prefetcher->reader;
404}
405
406
407
408
409void
411{
414 int64 wal_distance;
415
416
417
419 {
420 wal_distance =
423 }
424 else
425 {
426 wal_distance = 0;
427 }
428
429
432
433
437
440}
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
460{
464
465
466
467
468
469 for (;;)
470 {
472
473
474 if (prefetcher->record == NULL)
475 {
476 bool nonblocking;
477
478
479
480
481
482
483
485
486
487 if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
489
491 if (record == NULL)
492 {
493
494
495
496
497
501
503 }
504
505
506
507
508
509
511 {
514 }
515
516
517 prefetcher->record = record;
519 }
520 else
521 {
522
523 record = prefetcher->record;
524 }
525
526
527
528
529
530 if (replaying_lsn < record->lsn)
531 {
534
535 if (rmid == RM_XLOG_ID)
536 {
539 {
540
541
542
543
544
546
547#ifdef XLOGPREFETCHER_DEBUG_LEVEL
548 elog(XLOGPREFETCHER_DEBUG_LEVEL,
549 "suppressing all readahead until %X/%X is replayed due to possible TLI change",
551#endif
552
553
554 }
555 }
556 else if (rmid == RM_DBASE_ID)
557 {
558
559
560
561
562
564 {
569
570
571
572
573
574
575
576
577
579
580#ifdef XLOGPREFETCHER_DEBUG_LEVEL
581 elog(XLOGPREFETCHER_DEBUG_LEVEL,
582 "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
583 rlocator.dbOid,
585#endif
586 }
587 }
588 else if (rmid == RM_SMGR_ID)
589 {
591 {
594
596 {
597
598
599
600
601
602
603
604
606 record->lsn);
607
608#ifdef XLOGPREFETCHER_DEBUG_LEVEL
609 elog(XLOGPREFETCHER_DEBUG_LEVEL,
610 "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
615#endif
616 }
617 }
619 {
622
623
624
625
626
629 record->lsn);
630
631#ifdef XLOGPREFETCHER_DEBUG_LEVEL
632 elog(XLOGPREFETCHER_DEBUG_LEVEL,
633 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
639#endif
640 }
641 }
642 }
643
644
645 while (prefetcher->next_block_id <= record->max_block_id)
646 {
651
653 continue;
654
656
657
658
659
660
661
662 *lsn = record->lsn;
663
664
666 {
668 }
669
670
671
672
673
675 {
678 }
679
680
682 {
685 }
686
687
689 {
692 }
693
694
696 {
699 {
700
701
702
703
704
707 }
708 }
713
714
715
716
717
718
720
721
722
723
724
725
726
727
729 {
730#ifdef XLOGPREFETCHER_DEBUG_LEVEL
731 elog(XLOGPREFETCHER_DEBUG_LEVEL,
732 "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
737#endif
739 record->lsn);
742 }
743
744
745
746
747
748
750 {
751#ifdef XLOGPREFETCHER_DEBUG_LEVEL
752 elog(XLOGPREFETCHER_DEBUG_LEVEL,
753 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
759#endif
761 record->lsn);
764 }
765
766
769 {
770
774 }
776 {
777
781 }
783 {
784
785
786
787
788
789
790
792 "could not prefetch relation %u/%u/%u block %u",
797 }
798 }
799
800
801
802
803
804
805
806
807
808
809
813
814
815 prefetcher->record = NULL;
816 }
818}
819
820
821
822
825{
826#define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
830
832
834 nulls[i] = false;
835
847
848 return (Datum) 0;
849}
850
851
852
853
854
855static inline void
858{
860 bool found;
861
863 if (!found)
864 {
865
866
867
871 }
872 else
873 {
874
875
876
877
878
879
884 }
885}
886
887
888
889
890
891
892
893static inline void
895{
897 {
901
903 break;
904
907 }
908}
909
910
911
912
913static inline bool
916{
917
918
919
920
922 {
924
925
928 {
929#ifdef XLOGPREFETCHER_DEBUG_LEVEL
930 elog(XLOGPREFETCHER_DEBUG_LEVEL,
931 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
935#endif
936 return true;
937 }
938
939
943 if (filter)
944 {
945#ifdef XLOGPREFETCHER_DEBUG_LEVEL
946 elog(XLOGPREFETCHER_DEBUG_LEVEL,
947 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
950#endif
951 return true;
952 }
953 }
954
955 return false;
956}
957
958
959
960
961void
963{
964
966
967
969
971
972
974}
975
976
977
978
979
982{
985
986
987
988
989
991 {
994
997
999 {
1003 }
1004 else
1005 {
1006 max_inflight = 1;
1007 max_distance = 1;
1008 }
1009
1011 max_inflight,
1012 (uintptr_t) prefetcher,
1014
1016 }
1017
1018
1019
1020
1021
1023
1024
1025
1026
1027
1028
1030
1031
1032
1033
1034
1036
1037
1038
1039
1040
1042 {
1046 }
1047
1048
1050 if (!record)
1051 return NULL;
1052
1053
1054
1055
1056
1058
1059
1060
1061
1062
1063
1064
1065 if (record == prefetcher->record)
1066 prefetcher->record = NULL;
1067
1068
1069
1070
1071
1074
1076
1077 return &record->header;
1078}
1079
1080bool
1082{
1083#ifndef USE_PREFETCH
1085 {
1086 GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
1087 return false;
1088 }
1089#endif
1090
1091 return true;
1092}
1093
1094void
1096{
1097
1101}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
TimestampTz GetCurrentTimestamp(void)
static Datum values[MAXATTR]
PrefetchBufferResult PrefetchSharedBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum)
int maintenance_io_concurrency
static bool BufferIsValid(Buffer bufnum)
#define FLEXIBLE_ARRAY_MEMBER
#define XLOG_DBASE_CREATE_FILE_COPY
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
void hash_destroy(HTAB *hashp)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
int errmsg(const char *fmt,...)
Datum Int64GetDatum(int64 X)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
#define GUC_check_errdetail
Assert(PointerIsAligned(start, uint64))
static void dlist_init(dlist_head *head)
static void dlist_delete(dlist_node *node)
#define dlist_tail_element(type, membername, lhead)
static void dlist_push_head(dlist_head *head, dlist_node *node)
static bool dlist_is_empty(const dlist_head *head)
if(TABLE==NULL||TABLE_index==NULL)
void pfree(void *pointer)
void * palloc0(Size size)
#define AmStartupProcess()
#define XLOG_CHECKPOINT_SHUTDOWN
#define XLOG_END_OF_RECOVERY
static rewind_source * source
static Datum Int32GetDatum(int32 X)
#define INVALID_PROC_NUMBER
struct RelFileLocator RelFileLocator
#define RelFileLocatorEquals(locator1, locator2)
#define InvalidRelFileNumber
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend)
bool smgrexists(SMgrRelation reln, ForkNumber forknum)
#define XLOG_SMGR_TRUNCATE
struct LsnReadQueue::@17 queue[FLEXIBLE_ARRAY_MEMBER]
Tuplestorestate * setResult
RelFileLocatorBackend smgr_rlocator
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new
XLogRecPtr filter_until_replayed
BlockNumber filter_from_block
XLogRecPtr no_readahead_until
RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
LsnReadQueue * streaming_read
DecodedXLogRecord * record
XLogRecPtr next_stats_shm_lsn
BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
DecodedXLogRecord * record
DecodedXLogRecord * decode_queue_head
DecodedXLogRecord * decode_queue_tail
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
static Datum TimestampTzGetDatum(TimestampTz X)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
void XLogPrefetchResetStats(void)
static bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
struct LsnReadQueue LsnReadQueue
#define RecoveryPrefetchEnabled()
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
LsnReadQueueNextStatus(* LsnReadQueueNextFun)(uintptr_t lrq_private, XLogRecPtr *lsn)
static void lrq_free(LsnReadQueue *lrq)
struct XLogPrefetchStats XLogPrefetchStats
static void lrq_prefetch(LsnReadQueue *lrq)
static int XLogPrefetchReconfigureCount
Datum pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
XLogPrefetcher * XLogPrefetcherAllocate(XLogReaderState *reader)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static uint32 lrq_completed(LsnReadQueue *lrq)
static XLogPrefetchStats * SharedStats
static uint32 lrq_inflight(LsnReadQueue *lrq)
void XLogPrefetchReconfigure(void)
size_t XLogPrefetchShmemSize(void)
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS
XLogRecord * XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
XLogReaderState * XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
void XLogPrefetchShmemInit(void)
void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
void assign_recovery_prefetch(int new_value, void *extra)
static void XLogPrefetchIncrement(pg_atomic_uint64 *counter)
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE
struct XLogPrefetcherFilter XLogPrefetcherFilter
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
#define XLOGPREFETCHER_STATS_DISTANCE
static void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
void XLogPrefetcherFree(XLogPrefetcher *prefetcher)
bool check_recovery_prefetch(int *new_value, void **extra, GucSource source)
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
#define BKPBLOCK_WILL_INIT