PostgreSQL Source Code: src/backend/access/transam/commit_ts.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

23

33#include "utils/fmgrprotos.h"

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

55{

59

60#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \

61 sizeof(RepOriginId))

62

63#define COMMIT_TS_XACTS_PER_PAGE \

64 (BLCKSZ / SizeOfCommitTimestampEntry)

65

66

67

68

69

70

71static inline int64

73{

75}

76

77#define TransactionIdToCTsEntry(xid) \

78 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)

79

80

81

82

84

85#define CommitTsCtl (&CommitTsCtlData)

86

87

88

89

90

91

92

93

94

95

96

97

99{

104

106

107

108

110

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140void

144{

145 int i;

148

149

150

151

152

153

154

155

156

158 return;

159

160

161

162

163

164 if (nsubxids > 0)

165 newestXact = subxids[nsubxids - 1];

166 else

167 newestXact = xid;

168

169

170

171

172

173

174

175

176 headxid = xid;

177 i = 0;

178 for (;;)

179 {

181 int j;

182

183 for (j = i; j < nsubxids; j++)

184 {

186 break;

187 }

188

189

191 pageno);

192

193

194 if (j >= nsubxids)

195 break;

196

197

198

199

200

201 headxid = subxids[j];

202 i = j + 1;

203 }

204

205

210

211

215}

216

217

218

219

220

221static void

225{

227 int slotno;

228 int i;

229

231

233

235 for (i = 0; i < nsubxids; i++)

237

238 CommitTsCtl->shared->page_dirty[slotno] = true;

239

241}

242

243

244

245

246

247

248static void

251{

254

256

257 entry.time = ts;

258 entry.nodeid = nodeid;

259

260 memcpy(CommitTsCtl->shared->page_buffer[slotno] +

263}

264

265

266

267

268

269

270

271

272

273bool

276{

279 int slotno;

283

286 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

287 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));

289 {

290

291 *ts = 0;

292 if (nodeid)

293 *nodeid = 0;

294 return false;

295 }

296

298

299

302

303

304

305

306

308 {

310 if (nodeid)

312

314 return *ts != 0;

315 }

316

319

322

323

324

325

329 {

330 *ts = 0;

331 if (nodeid)

333 return false;

334 }

335

336

338 memcpy(&entry,

339 CommitTsCtl->shared->page_buffer[slotno] +

342

343 *ts = entry.time;

344 if (nodeid)

345 *nodeid = entry.nodeid;

346

348 return *ts != 0;

349}

350

351

352

353

354

355

356

357

358

361{

363

365

366

369

371 if (ts)

373 if (nodeid)

376

377 return xid;

378}

379

380static void

382{

384 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

385 errmsg("could not get commit timestamp data"),

387 errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",

388 "track_commit_timestamp") :

389 errhint("Make sure the configuration parameter \"%s\" is set.",

390 "track_commit_timestamp")));

391}

392

393

394

395

398{

401 bool found;

402

404

405 if (!found)

407

409}

410

411

412

413

414

415

416

417

418

421{

426 bool nulls[3];

429

430

432

434 elog(ERROR, "return type must be a row type");

435

437 {

438 memset(nulls, true, sizeof(nulls));

439 }

440 else

441 {

443 nulls[0] = false;

444

446 nulls[1] = false;

447

449 nulls[2] = false;

450 }

451

453

455}

456

457

458

459

460

461

462

465{

470 bool nulls[2];

473 bool found;

474

476

478 elog(ERROR, "return type must be a row type");

479

480 if (!found)

481 {

482 memset(nulls, true, sizeof(nulls));

483 }

484 else

485 {

487 nulls[0] = false;

488

490 nulls[1] = false;

491 }

492

494

496}

497

498

499

500

501

502

503

504

505static int

507{

508

511

513}

514

515

516

517

520{

523}

524

525

526

527

528

529void

531{

532 bool found;

533

534

536 {

537 char buf[32];

538

542

543

544

545

546

547

548

552 }

554

560 false);

562

565 &found);

566

568 {

570

575 }

576 else

578}

579

580

581

582

583bool

585{

587}

588

589

590

591

592

593

594

595void

597{

598

599

600

601

602

603}

604

605

606

607

608

609

610

611

612

613

614static int

616{

617 int slotno;

618

620

621 if (writeXlog)

623

624 return slotno;

625}

626

627

628

629

630

631void

633{

635}

636

637

638

639

640

641void

643{

644

645

646

647

648

649

650

651

652

655 else

657}

658

659

660

661

662

663void

665{

666

667

668

669

670

671

672

673

674

675

676

677

678

679 if (newvalue)

680 {

683 }

686}

687

688

689

690

691

692

693

694

695

696

697

698

699

700

701

702

703

704static void

706{

709

710

713 {

715 return;

716 }

718

721

722

723

724

726

727

728

729

730

731

732

733

734

735

736

737

738

739

742 {

745 }

747

748

750 {

752 int slotno;

753

759 }

760

761

765}

766

767

768

769

770

771

772

773

774

775

776

777static void

779{

780

781

782

783

784

785

786

788

793

796

797

798

799

800

801

802

803

804

805

806

807

808

809

810

812

814}

815

816

817

818

819void

821{

822

823

824

825

826

828}

829

830

831

832

833

834

835

836

837

838

839

840

841void

843{

846

847

848

849

850

851

854 return;

855

856

857

858

859

862 return;

863

865

867

869

870

872

874}

875

876

877

878

879

880

881

882void

884{

885 int64 cutoffPage;

886

887

888

889

890

892

893

895 &cutoffPage))

896 return;

897

898

900

901

903}

904

905

906

907

908void

910{

911

912

913

914

917 {

922 }

923 else

924 {

928 }

930}

931

932

933

934

935void

937{

943}

944

945

946

947

948

949

950

951

952

953

954

955

956

957

958

959

960

961

962

963

964

965

966

967

968

969static bool

971{

974

979

982}

983

984

985

986

987

988static void

990{

994}

995

996

997

998

999static void

1001{

1003

1004 xlrec.pageno = pageno;

1006

1010}

1011

1012

1013

1014

1015void

1017{

1019

1020

1022

1024 {

1026 int slotno;

1028

1029 memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));

1030

1033

1037

1039 }

1041 {

1043

1045

1046

1047

1048

1049

1052

1054 }

1055 else

1056 elog(PANIC, "commit_ts_redo: unknown op code %u", info);

1057}

1058

1059

1060

1061

1062int

1064{

1066}

static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)

static Datum values[MAXATTR]

static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int64 pageno)

static void WriteZeroPageXlogRec(int64 pageno)

void StartupCommitTs(void)

static SlruCtlData CommitTsCtlData

Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)

struct CommitTimestampEntry CommitTimestampEntry

struct CommitTimestampShared CommitTimestampShared

Datum pg_last_committed_xact(PG_FUNCTION_ARGS)

TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)

void CommitTsParameterChange(bool newvalue, bool oldvalue)

#define COMMIT_TS_XACTS_PER_PAGE

#define TransactionIdToCTsEntry(xid)

static void DeactivateCommitTs(void)

Size CommitTsShmemSize(void)

bool track_commit_timestamp

void AdvanceOldestCommitTsXid(TransactionId oldestXact)

static CommitTimestampShared * commitTsShared

int committssyncfiletag(const FileTag *ftag, char *path)

void CompleteCommitTsInitialization(void)

bool check_commit_ts_buffers(int *newval, void **extra, GucSource source)

static void ActivateCommitTs(void)

static int64 TransactionIdToCTsPage(TransactionId xid)

void TruncateCommitTs(TransactionId oldestXact)

void commit_ts_redo(XLogReaderState *record)

bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)

static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)

Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)

static int CommitTsShmemBuffers(void)

static void error_commit_ts_disabled(void)

static bool CommitTsPagePrecedes(int64 page1, int64 page2)

#define SizeOfCommitTimestampEntry

void BootStrapCommitTs(void)

void CommitTsShmemInit(void)

void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)

void ExtendCommitTs(TransactionId newestXact)

void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)

static int ZeroCommitTsPage(int64 pageno, bool writeXlog)

static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)

void CheckPointCommitTs(void)

#define COMMIT_TS_ZEROPAGE

#define SizeOfCommitTsTruncate

#define COMMIT_TS_TRUNCATE

#define TIMESTAMP_NOBEGIN(j)

int errhint(const char *fmt,...)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

#define PG_GETARG_TRANSACTIONID(n)

#define PG_RETURN_DATUM(x)

TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)

static Datum HeapTupleGetDatum(const HeapTupleData *tuple)

int commit_timestamp_buffers

void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)

Assert(PointerIsAligned(start, uint64))

HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)

bool LWLockAcquire(LWLock *lock, LWLockMode mode)

void LWLockRelease(LWLock *lock)

@ LWTRANCHE_COMMITTS_BUFFER

@ LWTRANCHE_COMMITTS_SLRU

#define InvalidRepOriginId

static rewind_source * source

static Datum TransactionIdGetDatum(TransactionId X)

static Datum ObjectIdGetDatum(Oid X)

void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)

void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)

int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)

void SimpleLruWritePage(SlruCtl ctl, int slotno)

void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)

int SimpleLruAutotuneBuffers(int divisor, int max)

bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno)

bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)

bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)

int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)

int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)

int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)

void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)

Size SimpleLruShmemSize(int nslots, int nlsns)

bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int64 segpage, void *data)

bool check_slru_buffers(const char *name, int *newval)

static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)

#define SlruPagePrecedesUnitTests(ctl, per_page)

#define SLRU_MAX_ALLOWED_BUFFERS

TransactionId oldestCommitTsXid

TransactionId newestCommitTsXid

FullTransactionId nextXid

bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)

static TransactionId ReadNextTransactionId(void)

#define InvalidTransactionId

#define TransactionIdEquals(id1, id2)

#define XidFromFullTransactionId(x)

#define FirstNormalTransactionId

#define TransactionIdIsValid(xid)

#define TransactionIdIsNormal(xid)

static Datum TimestampTzGetDatum(TimestampTz X)

#define PG_RETURN_TIMESTAMPTZ(x)

TransamVariablesData * TransamVariables

bool RecoveryInProgress(void)

XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)

void XLogRegisterData(const void *data, uint32 len)

void XLogBeginInsert(void)

#define XLogRecGetInfo(decoder)

#define XLogRecGetData(decoder)

#define XLogRecHasAnyBlockRefs(decoder)