PostgreSQL Source Code: src/backend/access/transam/xlogprefetcher.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

29

41#include "utils/fmgrprotos.h"

45

46

47

48

49

50#define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ

51

52

53

54

55

56#define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4

57

58

59

60

61

62#define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4

63

64

65

66

67

69

70#ifdef USE_PREFETCH

71#define RecoveryPrefetchEnabled() \

72 (recovery_prefetch != RECOVERY_PREFETCH_OFF && \

73 maintenance_io_concurrency > 0)

74#else

75#define RecoveryPrefetchEnabled() false

76#endif

77

79

80

81

82

83typedef enum

84{

89

90

91

92

93

96

97

98

99

100

101

102

104{

113 struct

114 {

119

120

121

122

123

125{

126

130

131

133

134

137

138

142

143

145

146

148

150

152};

153

154

155

156

157

158

159

161{

167

168

169

170

172{

180

181

184 int io_depth;

186

198

200

204 uintptr_t lrq_private,

206{

209

210 Assert(max_distance >= max_inflight);

211

212 size = max_distance + 1;

216 lrq->size = size;

218 lrq->head = 0;

219 lrq->tail = 0;

222

223 return lrq;

224}

225

226static inline void

228{

230}

231

234{

236}

237

240{

242}

243

244static inline void

246{

247

250 {

253 {

255 return;

259 break;

263 break;

264 }

267 lrq->head = 0;

268 }

269}

270

271static inline void

273{

274

275

276

277

278 while (lrq->tail != lrq->head &&

280 {

283 else

287 lrq->tail = 0;

288 }

291}

292

293size_t

295{

297}

298

299

300

301

302void

304{

312}

313

314void

316{

317 bool found;

318

322 &found);

323

324 if (!found)

325 {

333 }

334}

335

336

337

338

339void

341{

343}

344

345

346

347

348

349

350static inline void

352{

355}

356

357

358

359

360

363{

366

368 prefetcher->reader = reader;

369

375

379

380

382

383 return prefetcher;

384}

385

386

387

388

389void

391{

394 pfree(prefetcher);

395}

396

397

398

399

402{

403 return prefetcher->reader;

404}

405

406

407

408

409void

411{

414 int64 wal_distance;

415

416

417

419 {

420 wal_distance =

423 }

424 else

425 {

426 wal_distance = 0;

427 }

428

429

432

433

437

440}

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

460{

464

465

466

467

468

469 for (;;)

470 {

472

473

474 if (prefetcher->record == NULL)

475 {

476 bool nonblocking;

477

478

479

480

481

482

483

485

486

487 if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)

489

491 if (record == NULL)

492 {

493

494

495

496

497

501

503 }

504

505

506

507

508

509

511 {

514 }

515

516

517 prefetcher->record = record;

519 }

520 else

521 {

522

523 record = prefetcher->record;

524 }

525

526

527

528

529

530 if (replaying_lsn < record->lsn)

531 {

534

535 if (rmid == RM_XLOG_ID)

536 {

539 {

540

541

542

543

544

546

547#ifdef XLOGPREFETCHER_DEBUG_LEVEL

548 elog(XLOGPREFETCHER_DEBUG_LEVEL,

549 "suppressing all readahead until %X/%X is replayed due to possible TLI change",

551#endif

552

553

554 }

555 }

556 else if (rmid == RM_DBASE_ID)

557 {

558

559

560

561

562

564 {

569

570

571

572

573

574

575

576

577

579

580#ifdef XLOGPREFETCHER_DEBUG_LEVEL

581 elog(XLOGPREFETCHER_DEBUG_LEVEL,

582 "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",

583 rlocator.dbOid,

585#endif

586 }

587 }

588 else if (rmid == RM_SMGR_ID)

589 {

591 {

594

596 {

597

598

599

600

601

602

603

604

606 record->lsn);

607

608#ifdef XLOGPREFETCHER_DEBUG_LEVEL

609 elog(XLOGPREFETCHER_DEBUG_LEVEL,

610 "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",

615#endif

616 }

617 }

619 {

622

623

624

625

626

629 record->lsn);

630

631#ifdef XLOGPREFETCHER_DEBUG_LEVEL

632 elog(XLOGPREFETCHER_DEBUG_LEVEL,

633 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",

639#endif

640 }

641 }

642 }

643

644

645 while (prefetcher->next_block_id <= record->max_block_id)

646 {

651

653 continue;

654

656

657

658

659

660

661

662 *lsn = record->lsn;

663

664

666 {

668 }

669

670

671

672

673

675 {

678 }

679

680

682 {

685 }

686

687

689 {

692 }

693

694

696 {

699 {

700

701

702

703

704

707 }

708 }

713

714

715

716

717

718

720

721

722

723

724

725

726

727

729 {

730#ifdef XLOGPREFETCHER_DEBUG_LEVEL

731 elog(XLOGPREFETCHER_DEBUG_LEVEL,

732 "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",

737#endif

739 record->lsn);

742 }

743

744

745

746

747

748

750 {

751#ifdef XLOGPREFETCHER_DEBUG_LEVEL

752 elog(XLOGPREFETCHER_DEBUG_LEVEL,

753 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",

759#endif

761 record->lsn);

764 }

765

766

769 {

770

774 }

776 {

777

781 }

783 {

784

785

786

787

788

789

790

792 "could not prefetch relation %u/%u/%u block %u",

797 }

798 }

799

800

801

802

803

804

805

806

807

808

809

813

814

815 prefetcher->record = NULL;

816 }

818}

819

820

821

822

825{

826#define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10

830

832

834 nulls[i] = false;

835

847

848 return (Datum) 0;

849}

850

851

852

853

854

855static inline void

858{

860 bool found;

861

863 if (!found)

864 {

865

866

867

871 }

872 else

873 {

874

875

876

877

878

879

884 }

885}

886

887

888

889

890

891

892

893static inline void

895{

897 {

901

903 break;

904

907 }

908}

909

910

911

912

913static inline bool

916{

917

918

919

920

922 {

924

925

928 {

929#ifdef XLOGPREFETCHER_DEBUG_LEVEL

930 elog(XLOGPREFETCHER_DEBUG_LEVEL,

931 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",

935#endif

936 return true;

937 }

938

939

943 if (filter)

944 {

945#ifdef XLOGPREFETCHER_DEBUG_LEVEL

946 elog(XLOGPREFETCHER_DEBUG_LEVEL,

947 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",

950#endif

951 return true;

952 }

953 }

954

955 return false;

956}

957

958

959

960

961void

963{

964

966

967

969

971

972

974}

975

976

977

978

979

982{

985

986

987

988

989

991 {

994

997

999 {

1003 }

1004 else

1005 {

1006 max_inflight = 1;

1007 max_distance = 1;

1008 }

1009

1011 max_inflight,

1012 (uintptr_t) prefetcher,

1014

1016 }

1017

1018

1019

1020

1021

1023

1024

1025

1026

1027

1028

1030

1031

1032

1033

1034

1036

1037

1038

1039

1040

1042 {

1046 }

1047

1048

1050 if (!record)

1051 return NULL;

1052

1053

1054

1055

1056

1058

1059

1060

1061

1062

1063

1064

1065 if (record == prefetcher->record)

1066 prefetcher->record = NULL;

1067

1068

1069

1070

1071

1074

1076

1077 return &record->header;

1078}

1079

1080bool

1082{

1083#ifndef USE_PREFETCH

1085 {

1086 GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");

1087 return false;

1088 }

1089#endif

1090

1091 return true;

1092}

1093

1094void

1096{

1097

1101}

static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)

static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)

static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)

TimestampTz GetCurrentTimestamp(void)

static Datum values[MAXATTR]

PrefetchBufferResult PrefetchSharedBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum)

int maintenance_io_concurrency

static bool BufferIsValid(Buffer bufnum)

#define FLEXIBLE_ARRAY_MEMBER

#define XLOG_DBASE_CREATE_FILE_COPY

void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)

void hash_destroy(HTAB *hashp)

HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)

int errmsg(const char *fmt,...)

Datum Int64GetDatum(int64 X)

void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)

#define GUC_check_errdetail

Assert(PointerIsAligned(start, uint64))

static void dlist_init(dlist_head *head)

static void dlist_delete(dlist_node *node)

#define dlist_tail_element(type, membername, lhead)

static void dlist_push_head(dlist_head *head, dlist_node *node)

static bool dlist_is_empty(const dlist_head *head)

if(TABLE==NULL||TABLE_index==NULL)

void pfree(void *pointer)

void * palloc0(Size size)

#define AmStartupProcess()

#define XLOG_CHECKPOINT_SHUTDOWN

#define XLOG_END_OF_RECOVERY

static rewind_source * source

static Datum Int32GetDatum(int32 X)

#define INVALID_PROC_NUMBER

struct RelFileLocator RelFileLocator

#define RelFileLocatorEquals(locator1, locator2)

#define InvalidRelFileNumber

void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)

BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)

SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend)

bool smgrexists(SMgrRelation reln, ForkNumber forknum)

#define XLOG_SMGR_TRUNCATE

struct LsnReadQueue::@17 queue[FLEXIBLE_ARRAY_MEMBER]

Tuplestorestate * setResult

RelFileLocatorBackend smgr_rlocator

pg_atomic_uint64 skip_fpw

pg_atomic_uint64 skip_init

pg_atomic_uint64 reset_time

pg_atomic_uint64 prefetch

pg_atomic_uint64 skip_rep

pg_atomic_uint64 skip_new

XLogRecPtr filter_until_replayed

BlockNumber filter_from_block

XLogRecPtr no_readahead_until

RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE]

LsnReadQueue * streaming_read

DecodedXLogRecord * record

XLogRecPtr next_stats_shm_lsn

BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE]

DecodedXLogRecord * record

DecodedXLogRecord * decode_queue_head

DecodedXLogRecord * decode_queue_tail

void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)

static Datum TimestampTzGetDatum(TimestampTz X)

#define LSN_FORMAT_ARGS(lsn)

#define InvalidXLogRecPtr

void XLogPrefetchResetStats(void)

static bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)

void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)

struct LsnReadQueue LsnReadQueue

#define RecoveryPrefetchEnabled()

static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)

LsnReadQueueNextStatus(* LsnReadQueueNextFun)(uintptr_t lrq_private, XLogRecPtr *lsn)

static void lrq_free(LsnReadQueue *lrq)

struct XLogPrefetchStats XLogPrefetchStats

static void lrq_prefetch(LsnReadQueue *lrq)

static int XLogPrefetchReconfigureCount

Datum pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)

XLogPrefetcher * XLogPrefetcherAllocate(XLogReaderState *reader)

static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)

static uint32 lrq_completed(LsnReadQueue *lrq)

static XLogPrefetchStats * SharedStats

static uint32 lrq_inflight(LsnReadQueue *lrq)

void XLogPrefetchReconfigure(void)

size_t XLogPrefetchShmemSize(void)

#define PG_STAT_GET_RECOVERY_PREFETCH_COLS

XLogRecord * XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)

XLogReaderState * XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)

static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)

void XLogPrefetchShmemInit(void)

void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)

void assign_recovery_prefetch(int new_value, void *extra)

static void XLogPrefetchIncrement(pg_atomic_uint64 *counter)

#define XLOGPREFETCHER_SEQ_WINDOW_SIZE

struct XLogPrefetcherFilter XLogPrefetcherFilter

static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)

#define XLOGPREFETCHER_STATS_DISTANCE

static void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)

#define XLOGPREFETCHER_DISTANCE_MULTIPLIER

void XLogPrefetcherFree(XLogPrefetcher *prefetcher)

bool check_recovery_prefetch(int *new_value, void **extra, GucSource source)

DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)

DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)

void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)

XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)

static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)

#define BKPBLOCK_WILL_INIT