PostgreSQL Source Code: src/backend/access/transam/commit_ts.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

23

33#include "utils/fmgrprotos.h"

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

55{

59

60#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \

61 sizeof(RepOriginId))

62

63#define COMMIT_TS_XACTS_PER_PAGE \

64 (BLCKSZ / SizeOfCommitTimestampEntry)

65

66

67

68

69

70

71static inline int64

73{

75}

76

77#define TransactionIdToCTsEntry(xid) \

78 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)

79

80

81

82

84

85#define CommitTsCtl (&CommitTsCtlData)

86

87

88

89

90

91

92

93

94

95

96

97

99{

104

106

107

108

110

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138void

142{

143 int i;

146

147

148

149

150

151

152

153

154

156 return;

157

158

159

160

161

162 if (nsubxids > 0)

163 newestXact = subxids[nsubxids - 1];

164 else

165 newestXact = xid;

166

167

168

169

170

171

172

173

174 headxid = xid;

175 i = 0;

176 for (;;)

177 {

179 int j;

180

181 for (j = i; j < nsubxids; j++)

182 {

184 break;

185 }

186

187

189 pageno);

190

191

192 if (j >= nsubxids)

193 break;

194

195

196

197

198

199 headxid = subxids[j];

200 i = j + 1;

201 }

202

203

208

209

213}

214

215

216

217

218

219static void

223{

225 int slotno;

226 int i;

227

229

231

233 for (i = 0; i < nsubxids; i++)

235

236 CommitTsCtl->shared->page_dirty[slotno] = true;

237

239}

240

241

242

243

244

245

246static void

249{

252

254

255 entry.time = ts;

256 entry.nodeid = nodeid;

257

258 memcpy(CommitTsCtl->shared->page_buffer[slotno] +

261}

262

263

264

265

266

267

268

269

270

271bool

274{

277 int slotno;

281

284 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

285 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));

287 {

288

289 *ts = 0;

290 if (nodeid)

291 *nodeid = 0;

292 return false;

293 }

294

296

297

300

301

302

303

304

306 {

308 if (nodeid)

310

312 return *ts != 0;

313 }

314

317

320

321

322

323

327 {

328 *ts = 0;

329 if (nodeid)

331 return false;

332 }

333

334

336 memcpy(&entry,

337 CommitTsCtl->shared->page_buffer[slotno] +

340

341 *ts = entry.time;

342 if (nodeid)

343 *nodeid = entry.nodeid;

344

346 return *ts != 0;

347}

348

349

350

351

352

353

354

355

356

359{

361

363

364

367

369 if (ts)

371 if (nodeid)

374

375 return xid;

376}

377

378static void

380{

382 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

383 errmsg("could not get commit timestamp data"),

385 errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",

386 "track_commit_timestamp") :

387 errhint("Make sure the configuration parameter \"%s\" is set.",

388 "track_commit_timestamp")));

389}

390

391

392

393

396{

399 bool found;

400

402

403 if (!found)

405

407}

408

409

410

411

412

413

414

415

416

419{

424 bool nulls[3];

427

428

430

432 elog(ERROR, "return type must be a row type");

433

435 {

436 memset(nulls, true, sizeof(nulls));

437 }

438 else

439 {

441 nulls[0] = false;

442

444 nulls[1] = false;

445

447 nulls[2] = false;

448 }

449

451

453}

454

455

456

457

458

459

460

463{

468 bool nulls[2];

471 bool found;

472

474

476 elog(ERROR, "return type must be a row type");

477

478 if (!found)

479 {

480 memset(nulls, true, sizeof(nulls));

481 }

482 else

483 {

485 nulls[0] = false;

486

488 nulls[1] = false;

489 }

490

492

494}

495

496

497

498

499

500

501

502

503static int

505{

506

509

511}

512

513

514

515

518{

521}

522

523

524

525

526

527void

529{

530 bool found;

531

532

534 {

535 char buf[32];

536

540

541

542

543

544

545

546

550 }

552

555 "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,

556 LWTRANCHE_COMMITTS_SLRU,

558 false);

560

563 &found);

564

566 {

568

573 }

574 else

576}

577

578

579

580

581bool

583{

585}

586

587

588

589

590

591

592

593void

595{

596

597

598

599

600

601}

602

603

604

605

606

607void

609{

611}

612

613

614

615

616

617void

619{

620

621

622

623

624

625

626

627

628

631 else

633}

634

635

636

637

638

639void

641{

642

643

644

645

646

647

648

649

650

651

652

653

654

655 if (newvalue)

656 {

659 }

662}

663

664

665

666

667

668

669

670

671

672

673

674

675

676

677

678

679

680static void

682{

685

686

687

688

689

691 return;

692

693

696 {

698 return;

699 }

701

704

705

706

707

709

710

711

712

713

714

715

716

717

718

719

720

721

722

725 {

728 }

730

731

734

735

739}

740

741

742

743

744

745

746

747

748

749

750

751static void

753{

754

755

756

757

758

759

760

762

767

770

771

772

773

774

775

776

777

778

779

780

781

782

783

784

786

788}

789

790

791

792

793void

795{

796

797

798

799

800

802}

803

804

805

806

807

808

809

810

811

812

813

814

815void

817{

820

821

822

823

824

825

828 return;

829

830

831

832

833

836 return;

837

839

841

843

844

846

847

850

852}

853

854

855

856

857

858

859

860void

862{

863 int64 cutoffPage;

864

865

866

867

868

870

871

873 &cutoffPage))

874 return;

875

876

878

879

881}

882

883

884

885

886void

888{

889

890

891

892

895 {

900 }

901 else

902 {

906 }

908}

909

910

911

912

913void

915{

921}

922

923

924

925

926

927

928

929

930

931

932

933

934

935

936

937

938

939

940

941

942

943

944

945

946

947static bool

949{

952

957

960}

961

962

963

964

965

966static void

968{

970

971 xlrec.pageno = pageno;

973

977}

978

979

980

981

982void

984{

986

987

989

991 {

993

994 memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));

996 }

998 {

1000

1002

1003

1004

1005

1006

1009

1011 }

1012 else

1013 elog(PANIC, "commit_ts_redo: unknown op code %u", info);

1014}

1015

1016

1017

1018

1019int

1021{

1023}

static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)

static Datum values[MAXATTR]

static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int64 pageno)

void StartupCommitTs(void)

static SlruCtlData CommitTsCtlData

Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)

struct CommitTimestampEntry CommitTimestampEntry

struct CommitTimestampShared CommitTimestampShared

Datum pg_last_committed_xact(PG_FUNCTION_ARGS)

TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)

void CommitTsParameterChange(bool newvalue, bool oldvalue)

#define COMMIT_TS_XACTS_PER_PAGE

#define TransactionIdToCTsEntry(xid)

static void DeactivateCommitTs(void)

Size CommitTsShmemSize(void)

bool track_commit_timestamp

void AdvanceOldestCommitTsXid(TransactionId oldestXact)

static CommitTimestampShared * commitTsShared

int committssyncfiletag(const FileTag *ftag, char *path)

void CompleteCommitTsInitialization(void)

bool check_commit_ts_buffers(int *newval, void **extra, GucSource source)

static void ActivateCommitTs(void)

static int64 TransactionIdToCTsPage(TransactionId xid)

void TruncateCommitTs(TransactionId oldestXact)

void commit_ts_redo(XLogReaderState *record)

bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)

static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)

Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)

static int CommitTsShmemBuffers(void)

static void error_commit_ts_disabled(void)

static bool CommitTsPagePrecedes(int64 page1, int64 page2)

#define SizeOfCommitTimestampEntry

void BootStrapCommitTs(void)

void CommitTsShmemInit(void)

void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)

void ExtendCommitTs(TransactionId newestXact)

void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)

static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)

void CheckPointCommitTs(void)

#define COMMIT_TS_ZEROPAGE

#define SizeOfCommitTsTruncate

#define COMMIT_TS_TRUNCATE

#define TIMESTAMP_NOBEGIN(j)

int errhint(const char *fmt,...)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

#define PG_GETARG_TRANSACTIONID(n)

#define PG_RETURN_DATUM(x)

TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)

static Datum HeapTupleGetDatum(const HeapTupleData *tuple)

int commit_timestamp_buffers

void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)

Assert(PointerIsAligned(start, uint64))

HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)

bool LWLockAcquire(LWLock *lock, LWLockMode mode)

void LWLockRelease(LWLock *lock)

#define IsBootstrapProcessingMode()

#define InvalidRepOriginId

static rewind_source * source

static char buf[DEFAULT_XLOG_SEG_SIZE]

static Datum TransactionIdGetDatum(TransactionId X)

static Datum ObjectIdGetDatum(Oid X)

void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)

void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)

int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)

void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)

int SimpleLruAutotuneBuffers(int divisor, int max)

bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno)

bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)

bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)

int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)

int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)

int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)

void SimpleLruZeroAndWritePage(SlruCtl ctl, int64 pageno)

void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)

Size SimpleLruShmemSize(int nslots, int nlsns)

bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int64 segpage, void *data)

bool check_slru_buffers(const char *name, int *newval)

static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)

#define SlruPagePrecedesUnitTests(ctl, per_page)

#define SLRU_MAX_ALLOWED_BUFFERS

TransactionId oldestCommitTsXid

TransactionId newestCommitTsXid

FullTransactionId nextXid

static TransactionId ReadNextTransactionId(void)

#define InvalidTransactionId

#define TransactionIdEquals(id1, id2)

#define XidFromFullTransactionId(x)

#define FirstNormalTransactionId

#define TransactionIdIsValid(xid)

#define TransactionIdIsNormal(xid)

static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)

static Datum TimestampTzGetDatum(TimestampTz X)

#define PG_RETURN_TIMESTAMPTZ(x)

TransamVariablesData * TransamVariables

bool RecoveryInProgress(void)

XLogRecPtr XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value)

XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)

void XLogRegisterData(const void *data, uint32 len)

void XLogBeginInsert(void)

#define XLogRecGetInfo(decoder)

#define XLogRecGetData(decoder)

#define XLogRecHasAnyBlockRefs(decoder)