PostgreSQL Source Code: src/backend/access/transam/commit_ts.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
23
33#include "utils/fmgrprotos.h"
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
55{
59
60#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
61 sizeof(RepOriginId))
62
63#define COMMIT_TS_XACTS_PER_PAGE \
64 (BLCKSZ / SizeOfCommitTimestampEntry)
65
66
67
68
69
70
71static inline int64
73{
75}
76
77#define TransactionIdToCTsEntry(xid) \
78 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
79
80
81
82
84
85#define CommitTsCtl (&CommitTsCtlData)
86
87
88
89
90
91
92
93
94
95
96
97
99{
104
106
107
108
110
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138void
142{
143 int i;
146
147
148
149
150
151
152
153
154
156 return;
157
158
159
160
161
162 if (nsubxids > 0)
163 newestXact = subxids[nsubxids - 1];
164 else
165 newestXact = xid;
166
167
168
169
170
171
172
173
174 headxid = xid;
175 i = 0;
176 for (;;)
177 {
179 int j;
180
181 for (j = i; j < nsubxids; j++)
182 {
184 break;
185 }
186
187
189 pageno);
190
191
192 if (j >= nsubxids)
193 break;
194
195
196
197
198
199 headxid = subxids[j];
201 }
202
203
208
209
213}
214
215
216
217
218
219static void
223{
225 int slotno;
226 int i;
227
229
231
233 for (i = 0; i < nsubxids; i++)
235
236 CommitTsCtl->shared->page_dirty[slotno] = true;
237
239}
240
241
242
243
244
245
246static void
249{
252
254
255 entry.time = ts;
256 entry.nodeid = nodeid;
257
258 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
261}
262
263
264
265
266
267
268
269
270
271bool
274{
277 int slotno;
281
284 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
285 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
287 {
288
289 *ts = 0;
290 if (nodeid)
291 *nodeid = 0;
292 return false;
293 }
294
296
297
300
301
302
303
304
306 {
308 if (nodeid)
310
312 return *ts != 0;
313 }
314
317
320
321
322
323
327 {
328 *ts = 0;
329 if (nodeid)
331 return false;
332 }
333
334
336 memcpy(&entry,
337 CommitTsCtl->shared->page_buffer[slotno] +
340
341 *ts = entry.time;
342 if (nodeid)
343 *nodeid = entry.nodeid;
344
346 return *ts != 0;
347}
348
349
350
351
352
353
354
355
356
359{
361
363
364
367
369 if (ts)
371 if (nodeid)
374
375 return xid;
376}
377
378static void
380{
382 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
383 errmsg("could not get commit timestamp data"),
385 errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
386 "track_commit_timestamp") :
387 errhint("Make sure the configuration parameter \"%s\" is set.",
388 "track_commit_timestamp")));
389}
390
391
392
393
396{
399 bool found;
400
402
403 if (!found)
405
407}
408
409
410
411
412
413
414
415
416
419{
424 bool nulls[3];
427
428
430
432 elog(ERROR, "return type must be a row type");
433
435 {
436 memset(nulls, true, sizeof(nulls));
437 }
438 else
439 {
441 nulls[0] = false;
442
444 nulls[1] = false;
445
447 nulls[2] = false;
448 }
449
451
453}
454
455
456
457
458
459
460
463{
468 bool nulls[2];
471 bool found;
472
474
476 elog(ERROR, "return type must be a row type");
477
478 if (!found)
479 {
480 memset(nulls, true, sizeof(nulls));
481 }
482 else
483 {
485 nulls[0] = false;
486
488 nulls[1] = false;
489 }
490
492
494}
495
496
497
498
499
500
501
502
503static int
505{
506
509
511}
512
513
514
515
518{
521}
522
523
524
525
526
527void
529{
530 bool found;
531
532
534 {
535 char buf[32];
536
540
541
542
543
544
545
546
550 }
552
555 "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
556 LWTRANCHE_COMMITTS_SLRU,
558 false);
560
563 &found);
564
566 {
568
573 }
574 else
576}
577
578
579
580
581bool
583{
585}
586
587
588
589
590
591
592
593void
595{
596
597
598
599
600
601}
602
603
604
605
606
607void
609{
611}
612
613
614
615
616
617void
619{
620
621
622
623
624
625
626
627
628
631 else
633}
634
635
636
637
638
639void
641{
642
643
644
645
646
647
648
649
650
651
652
653
654
655 if (newvalue)
656 {
659 }
662}
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680static void
682{
685
686
687
688
689
691 return;
692
693
696 {
698 return;
699 }
701
704
705
706
707
709
710
711
712
713
714
715
716
717
718
719
720
721
722
725 {
728 }
730
731
734
735
739}
740
741
742
743
744
745
746
747
748
749
750
751static void
753{
754
755
756
757
758
759
760
762
767
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
786
788}
789
790
791
792
793void
795{
796
797
798
799
800
802}
803
804
805
806
807
808
809
810
811
812
813
814
815void
817{
820
821
822
823
824
825
828 return;
829
830
831
832
833
836 return;
837
839
841
843
844
846
847
850
852}
853
854
855
856
857
858
859
860void
862{
863 int64 cutoffPage;
864
865
866
867
868
870
871
873 &cutoffPage))
874 return;
875
876
878
879
881}
882
883
884
885
886void
888{
889
890
891
892
895 {
900 }
901 else
902 {
906 }
908}
909
910
911
912
913void
915{
921}
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947static bool
949{
952
957
960}
961
962
963
964
965
966static void
968{
970
971 xlrec.pageno = pageno;
973
977}
978
979
980
981
982void
984{
986
987
989
991 {
993
994 memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
996 }
998 {
1000
1002
1003
1004
1005
1006
1009
1011 }
1012 else
1013 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1014}
1015
1016
1017
1018
1019int
1021{
1023}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static Datum values[MAXATTR]
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int64 pageno)
void StartupCommitTs(void)
static SlruCtlData CommitTsCtlData
Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
struct CommitTimestampEntry CommitTimestampEntry
struct CommitTimestampShared CommitTimestampShared
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
void CommitTsParameterChange(bool newvalue, bool oldvalue)
#define COMMIT_TS_XACTS_PER_PAGE
#define TransactionIdToCTsEntry(xid)
static void DeactivateCommitTs(void)
Size CommitTsShmemSize(void)
bool track_commit_timestamp
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
static CommitTimestampShared * commitTsShared
int committssyncfiletag(const FileTag *ftag, char *path)
void CompleteCommitTsInitialization(void)
bool check_commit_ts_buffers(int *newval, void **extra, GucSource source)
static void ActivateCommitTs(void)
static int64 TransactionIdToCTsPage(TransactionId xid)
void TruncateCommitTs(TransactionId oldestXact)
void commit_ts_redo(XLogReaderState *record)
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
static int CommitTsShmemBuffers(void)
static void error_commit_ts_disabled(void)
static bool CommitTsPagePrecedes(int64 page1, int64 page2)
#define SizeOfCommitTimestampEntry
void BootStrapCommitTs(void)
void CommitTsShmemInit(void)
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
void ExtendCommitTs(TransactionId newestXact)
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
void CheckPointCommitTs(void)
#define COMMIT_TS_ZEROPAGE
#define SizeOfCommitTsTruncate
#define COMMIT_TS_TRUNCATE
#define TIMESTAMP_NOBEGIN(j)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define PG_GETARG_TRANSACTIONID(n)
#define PG_RETURN_DATUM(x)
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
int commit_timestamp_buffers
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
#define IsBootstrapProcessingMode()
#define InvalidRepOriginId
static rewind_source * source
static char buf[DEFAULT_XLOG_SEG_SIZE]
static Datum TransactionIdGetDatum(TransactionId X)
static Datum ObjectIdGetDatum(Oid X)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
int SimpleLruAutotuneBuffers(int divisor, int max)
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno)
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
void SimpleLruZeroAndWritePage(SlruCtl ctl, int64 pageno)
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Size SimpleLruShmemSize(int nslots, int nlsns)
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int64 segpage, void *data)
bool check_slru_buffers(const char *name, int *newval)
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
#define SlruPagePrecedesUnitTests(ctl, per_page)
#define SLRU_MAX_ALLOWED_BUFFERS
TransactionId oldestCommitTsXid
TransactionId newestCommitTsXid
FullTransactionId nextXid
static TransactionId ReadNextTransactionId(void)
#define InvalidTransactionId
#define TransactionIdEquals(id1, id2)
#define XidFromFullTransactionId(x)
#define FirstNormalTransactionId
#define TransactionIdIsValid(xid)
#define TransactionIdIsNormal(xid)
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
static Datum TimestampTzGetDatum(TimestampTz X)
#define PG_RETURN_TIMESTAMPTZ(x)
TransamVariablesData * TransamVariables
bool RecoveryInProgress(void)
XLogRecPtr XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value)
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogRegisterData(const void *data, uint32 len)
void XLogBeginInsert(void)
#define XLogRecGetInfo(decoder)
#define XLogRecGetData(decoder)
#define XLogRecHasAnyBlockRefs(decoder)