PostgreSQL Source Code: src/backend/access/transam/commit_ts.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
23
33#include "utils/fmgrprotos.h"
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
55{
59
60#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
61 sizeof(RepOriginId))
62
63#define COMMIT_TS_XACTS_PER_PAGE \
64 (BLCKSZ / SizeOfCommitTimestampEntry)
65
66
67
68
69
70
71static inline int64
73{
75}
76
77#define TransactionIdToCTsEntry(xid) \
78 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
79
80
81
82
84
85#define CommitTsCtl (&CommitTsCtlData)
86
87
88
89
90
91
92
93
94
95
96
97
99{
104
106
107
108
110
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140void
144{
145 int i;
148
149
150
151
152
153
154
155
156
158 return;
159
160
161
162
163
164 if (nsubxids > 0)
165 newestXact = subxids[nsubxids - 1];
166 else
167 newestXact = xid;
168
169
170
171
172
173
174
175
176 headxid = xid;
177 i = 0;
178 for (;;)
179 {
181 int j;
182
183 for (j = i; j < nsubxids; j++)
184 {
186 break;
187 }
188
189
191 pageno);
192
193
194 if (j >= nsubxids)
195 break;
196
197
198
199
200
201 headxid = subxids[j];
203 }
204
205
210
211
215}
216
217
218
219
220
221static void
225{
227 int slotno;
228 int i;
229
231
233
235 for (i = 0; i < nsubxids; i++)
237
238 CommitTsCtl->shared->page_dirty[slotno] = true;
239
241}
242
243
244
245
246
247
248static void
251{
254
256
257 entry.time = ts;
258 entry.nodeid = nodeid;
259
260 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
263}
264
265
266
267
268
269
270
271
272
273bool
276{
279 int slotno;
283
286 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
287 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
289 {
290
291 *ts = 0;
292 if (nodeid)
293 *nodeid = 0;
294 return false;
295 }
296
298
299
302
303
304
305
306
308 {
310 if (nodeid)
312
314 return *ts != 0;
315 }
316
319
322
323
324
325
329 {
330 *ts = 0;
331 if (nodeid)
333 return false;
334 }
335
336
338 memcpy(&entry,
339 CommitTsCtl->shared->page_buffer[slotno] +
342
343 *ts = entry.time;
344 if (nodeid)
345 *nodeid = entry.nodeid;
346
348 return *ts != 0;
349}
350
351
352
353
354
355
356
357
358
361{
363
365
366
369
371 if (ts)
373 if (nodeid)
376
377 return xid;
378}
379
380static void
382{
384 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
385 errmsg("could not get commit timestamp data"),
387 errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
388 "track_commit_timestamp") :
389 errhint("Make sure the configuration parameter \"%s\" is set.",
390 "track_commit_timestamp")));
391}
392
393
394
395
398{
401 bool found;
402
404
405 if (!found)
407
409}
410
411
412
413
414
415
416
417
418
421{
426 bool nulls[3];
429
430
432
434 elog(ERROR, "return type must be a row type");
435
437 {
438 memset(nulls, true, sizeof(nulls));
439 }
440 else
441 {
443 nulls[0] = false;
444
446 nulls[1] = false;
447
449 nulls[2] = false;
450 }
451
453
455}
456
457
458
459
460
461
462
465{
470 bool nulls[2];
473 bool found;
474
476
478 elog(ERROR, "return type must be a row type");
479
480 if (!found)
481 {
482 memset(nulls, true, sizeof(nulls));
483 }
484 else
485 {
487 nulls[0] = false;
488
490 nulls[1] = false;
491 }
492
494
496}
497
498
499
500
501
502
503
504
505static int
507{
508
511
513}
514
515
516
517
520{
523}
524
525
526
527
528
529void
531{
532 bool found;
533
534
536 {
537 char buf[32];
538
542
543
544
545
546
547
548
552 }
554
560 false);
562
565 &found);
566
568 {
570
575 }
576 else
578}
579
580
581
582
583bool
585{
587}
588
589
590
591
592
593
594
595void
597{
598
599
600
601
602
603}
604
605
606
607
608
609
610
611
612
613
614static int
616{
617 int slotno;
618
620
621 if (writeXlog)
623
624 return slotno;
625}
626
627
628
629
630
631void
633{
635}
636
637
638
639
640
641void
643{
644
645
646
647
648
649
650
651
652
655 else
657}
658
659
660
661
662
663void
665{
666
667
668
669
670
671
672
673
674
675
676
677
678
679 if (newvalue)
680 {
683 }
686}
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704static void
706{
709
710
713 {
715 return;
716 }
718
721
722
723
724
726
727
728
729
730
731
732
733
734
735
736
737
738
739
742 {
745 }
747
748
750 {
752 int slotno;
753
759 }
760
761
765}
766
767
768
769
770
771
772
773
774
775
776
777static void
779{
780
781
782
783
784
785
786
788
793
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
812
814}
815
816
817
818
819void
821{
822
823
824
825
826
828}
829
830
831
832
833
834
835
836
837
838
839
840
841void
843{
846
847
848
849
850
851
854 return;
855
856
857
858
859
862 return;
863
865
867
869
870
872
874}
875
876
877
878
879
880
881
882void
884{
885 int64 cutoffPage;
886
887
888
889
890
892
893
895 &cutoffPage))
896 return;
897
898
900
901
903}
904
905
906
907
908void
910{
911
912
913
914
917 {
922 }
923 else
924 {
928 }
930}
931
932
933
934
935void
937{
943}
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969static bool
971{
974
979
982}
983
984
985
986
987
988static void
990{
994}
995
996
997
998
999static void
1001{
1003
1004 xlrec.pageno = pageno;
1006
1010}
1011
1012
1013
1014
1015void
1017{
1019
1020
1022
1024 {
1026 int slotno;
1028
1029 memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
1030
1033
1037
1039 }
1041 {
1043
1045
1046
1047
1048
1049
1052
1054 }
1055 else
1056 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1057}
1058
1059
1060
1061
1062int
1064{
1066}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static Datum values[MAXATTR]
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz ts, RepOriginId nodeid, int64 pageno)
static void WriteZeroPageXlogRec(int64 pageno)
void StartupCommitTs(void)
static SlruCtlData CommitTsCtlData
Datum pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
struct CommitTimestampEntry CommitTimestampEntry
struct CommitTimestampShared CommitTimestampShared
Datum pg_last_committed_xact(PG_FUNCTION_ARGS)
TransactionId GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
void CommitTsParameterChange(bool newvalue, bool oldvalue)
#define COMMIT_TS_XACTS_PER_PAGE
#define TransactionIdToCTsEntry(xid)
static void DeactivateCommitTs(void)
Size CommitTsShmemSize(void)
bool track_commit_timestamp
void AdvanceOldestCommitTsXid(TransactionId oldestXact)
static CommitTimestampShared * commitTsShared
int committssyncfiletag(const FileTag *ftag, char *path)
void CompleteCommitTsInitialization(void)
bool check_commit_ts_buffers(int *newval, void **extra, GucSource source)
static void ActivateCommitTs(void)
static int64 TransactionIdToCTsPage(TransactionId xid)
void TruncateCommitTs(TransactionId oldestXact)
void commit_ts_redo(XLogReaderState *record)
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, RepOriginId nodeid, int slotno)
Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
static int CommitTsShmemBuffers(void)
static void error_commit_ts_disabled(void)
static bool CommitTsPagePrecedes(int64 page1, int64 page2)
#define SizeOfCommitTimestampEntry
void BootStrapCommitTs(void)
void CommitTsShmemInit(void)
void SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
void ExtendCommitTs(TransactionId newestXact)
void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, TransactionId *subxids, TimestampTz timestamp, RepOriginId nodeid)
static int ZeroCommitTsPage(int64 pageno, bool writeXlog)
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
void CheckPointCommitTs(void)
#define COMMIT_TS_ZEROPAGE
#define SizeOfCommitTsTruncate
#define COMMIT_TS_TRUNCATE
#define TIMESTAMP_NOBEGIN(j)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define PG_GETARG_TRANSACTIONID(n)
#define PG_RETURN_DATUM(x)
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
int commit_timestamp_buffers
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
@ LWTRANCHE_COMMITTS_BUFFER
@ LWTRANCHE_COMMITTS_SLRU
#define InvalidRepOriginId
static rewind_source * source
static Datum TransactionIdGetDatum(TransactionId X)
static Datum ObjectIdGetDatum(Oid X)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
void SimpleLruWritePage(SlruCtl ctl, int slotno)
void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
int SimpleLruAutotuneBuffers(int divisor, int max)
bool SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno)
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
int SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
Size SimpleLruShmemSize(int nslots, int nlsns)
bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int64 segpage, void *data)
bool check_slru_buffers(const char *name, int *newval)
static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
#define SlruPagePrecedesUnitTests(ctl, per_page)
#define SLRU_MAX_ALLOWED_BUFFERS
TransactionId oldestCommitTsXid
TransactionId newestCommitTsXid
FullTransactionId nextXid
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
static TransactionId ReadNextTransactionId(void)
#define InvalidTransactionId
#define TransactionIdEquals(id1, id2)
#define XidFromFullTransactionId(x)
#define FirstNormalTransactionId
#define TransactionIdIsValid(xid)
#define TransactionIdIsNormal(xid)
static Datum TimestampTzGetDatum(TimestampTz X)
#define PG_RETURN_TIMESTAMPTZ(x)
TransamVariablesData * TransamVariables
bool RecoveryInProgress(void)
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogRegisterData(const void *data, uint32 len)
void XLogBeginInsert(void)
#define XLogRecGetInfo(decoder)
#define XLogRecGetData(decoder)
#define XLogRecHasAnyBlockRefs(decoder)