PostgreSQL Source Code: src/backend/access/heap/rewriteheap.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
104
106
125
126
127
128
129
131{
139
141
143
145
147
154
155
156
157
158
159
160
161typedef struct
162{
166
167
168
169
170typedef struct
171{
176
178
179typedef struct
180{
184
186
187
188
189
190
192{
199
200
201
202
203
205{
207
210
211
212
214
215
219
220
221
222
223
224
225
226
227
228
229
230
231
232
236{
241
242
243
244
245
247 "Table rewrite",
250
251
253
254 state->rs_old_rel = old_heap;
255 state->rs_new_rel = new_heap;
256 state->rs_buffer = NULL;
257
259 state->rs_oldest_xmin = oldest_xmin;
260 state->rs_freeze_xid = freeze_xid;
261 state->rs_cutoff_multi = cutoff_multi;
262 state->rs_cxt = rw_cxt;
264
265
269
270 state->rs_unresolved_tups =
271 hash_create("Rewrite / Unresolved ctids",
272 128,
273 &hash_ctl,
275
277
278 state->rs_old_new_tid_map =
279 hash_create("Rewrite / Old to new tid map",
280 128,
281 &hash_ctl,
283
285
287
289}
290
291
292
293
294
295
296void
298{
301
302
303
304
305
307
308 while ((unresolved = hash_seq_search(&seq_status)) != NULL)
309 {
312 }
313
314
315 if (state->rs_buffer)
316 {
318 state->rs_buffer = NULL;
319 }
320
322
324
325
327}
328
329
330
331
332
333
334
335
336
337
338
339
340void
343{
347 bool found;
348 bool free_new;
349
351
352
353
354
355
356
357
361
366
367
368
369
370
372 state->rs_old_rel->rd_rel->relfrozenxid,
373 state->rs_old_rel->rd_rel->relminmxid,
374 state->rs_freeze_xid,
375 state->rs_cutoff_multi);
376
377
378
379
380
382
383
384
385
391 {
393
394 memset(&hashkey, 0, sizeof(hashkey));
397
401
402 if (mapping != NULL)
403 {
404
405
406
407
408
410
411
415 }
416 else
417 {
418
419
420
421
423
424 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
427
430
431
432
433
434
436 return;
437 }
438 }
439
440
441
442
443
444
445
446 old_tid = old_tuple->t_self;
447 free_new = false;
448
449 for (;;)
450 {
452
453
455 new_tid = new_tuple->t_self;
456
458
459
460
461
462
463
464
465
466
469 state->rs_oldest_xmin))
470 {
471
472
473
475
476 memset(&hashkey, 0, sizeof(hashkey));
478 hashkey.tid = old_tid;
479
480 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
482
483 if (unresolved != NULL)
484 {
485
486
487
488
489
490 if (free_new)
492 new_tuple = unresolved->tuple;
493 free_new = true;
494 old_tid = unresolved->old_tid;
496
497
498
499
500
504
505
506 continue;
507 }
508 else
509 {
510
511
512
513
515
519
520 mapping->new_tid = new_tid;
521 }
522 }
523
524
525 if (free_new)
527 break;
528 }
529
531}
532
533
534
535
536
537
538
539
540
541
542bool
544{
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
562 bool found;
563
564 memset(&hashkey, 0, sizeof(hashkey));
567
568 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
570
571 if (unresolved != NULL)
572 {
573
578 return true;
579 }
580
581 return false;
582}
583
584
585
586
587
588
589
590
591
592static void
594{
596 Size pageFreeSpace,
597 saveFreeSpace;
601
602
603
604
605
606
607
608
609 if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
610 {
611
613 heaptup = tup;
614 }
616 {
618
619
620
621
622
623
625
628 }
629 else
630 heaptup = tup;
631
633
634
635
636
639 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
640 errmsg("row is too big: size %zu, maximum size %zu",
642
643
646
647
649 if (page)
650 {
652
653 if (len + saveFreeSpace > pageFreeSpace)
654 {
655
656
657
658
659
661 state->rs_buffer = NULL;
662 page = NULL;
663 state->rs_blockno++;
664 }
665 }
666
667 if (!page)
668 {
669
673 }
674
675
679 elog(ERROR, "failed to add tuple");
680
681
683
684
685
686
687
689 {
692
695
697 }
698
699
700 if (heaptup != tup)
702}
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758static void
760{
763
764
765
766
767
768
769 state->rs_logical_rewrite =
771
772 if (->rs_logical_rewrite)
773 return;
774
776
777
778
779
780
781
783 {
784 state->rs_logical_rewrite = false;
785 return;
786 }
787
788 state->rs_logical_xmin = logical_xmin;
790 state->rs_num_rewrite_mappings = 0;
791
795
796 state->rs_logical_mappings =
798 128,
799 &hash_ctl,
801}
802
803
804
805
806static void
808{
812
814
815
816 if (state->rs_num_rewrite_mappings == 0)
817 return;
818
819 elog(DEBUG1, "flushing %u logical rewrite mapping entries",
820 state->rs_num_rewrite_mappings);
821
824 {
825 char *waldata;
826 char *waldata_start;
828 Oid dboid;
830 int written;
832
833
834 if (num_mappings == 0)
835 continue;
836
837 if (state->rs_old_rel->rd_rel->relisshared)
839 else
841
848
849
851 waldata_start = waldata = palloc(len);
852
853
854
855
857 {
859
861
862 memcpy(waldata, &pmap->map, sizeof(pmap->map));
863 waldata += sizeof(pmap->map);
864
865
868
869
870 state->rs_num_rewrite_mappings--;
871 }
872
874 Assert(waldata == waldata_start + len);
875
876
877
878
879
881 WAIT_EVENT_LOGICAL_REWRITE_WRITE);
882 if (written != len)
885 errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
886 written, len)));
888
892
893
895
896 pfree(waldata_start);
897 }
898 Assert(state->rs_num_rewrite_mappings == 0);
899}
900
901
902
903
904static void
906{
909
910
911 if (->rs_logical_rewrite)
912 return;
913
914
915 if (state->rs_num_rewrite_mappings > 0)
917
918
921 {
922 if (FileSync(src->vfd, WAIT_EVENT_LOGICAL_REWRITE_SYNC) != 0)
925 errmsg("could not fsync file \"%s\": %m", src->path)));
927 }
928
929}
930
931
932
933
934static void
937{
940 Oid relid;
941 bool found;
942
944
945
948
949
950
951
952
953 if (!found)
954 {
956 Oid dboid;
957
958 if (state->rs_old_rel->rd_rel->relisshared)
960 else
962
968
970 src->off = 0;
971 memcpy(src->path, path, sizeof(path));
973 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
974 if (src->vfd < 0)
977 errmsg("could not create file \"%s\": %m", path)));
978 }
979
984 state->rs_num_rewrite_mappings++;
985
986
987
988
989
990 if (state->rs_num_rewrite_mappings >= 1000 )
992}
993
994
995
996
997
998static void
1001{
1006 bool do_log_xmin = false;
1007 bool do_log_xmax = false;
1009
1010
1011 if (->rs_logical_rewrite)
1012 return;
1013
1015
1017
1018
1019
1020
1022 do_log_xmin = true;
1023
1025 {
1026
1027
1028
1029
1030 }
1032 {
1033
1034 }
1036 {
1037
1038 do_log_xmax = true;
1039 }
1040
1041
1042 if (!do_log_xmin && !do_log_xmax)
1043 return;
1044
1045
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062 if (do_log_xmin)
1064
1067}
1068
1069
1070
1071
1072void
1074{
1076 int fd;
1080
1082
1088
1090 O_CREAT | O_WRONLY | PG_BINARY);
1091 if (fd < 0)
1094 errmsg("could not create file \"%s\": %m", path)));
1095
1096
1097
1098
1099
1101 if (ftruncate(fd, xlrec->offset) != 0)
1104 errmsg("could not truncate file \"%s\" to %u: %m",
1107
1109
1111
1112
1113 errno = 0;
1116 {
1117
1118 if (errno == 0)
1119 errno = ENOSPC;
1122 errmsg("could not write to file \"%s\": %m", path)));
1123 }
1125
1126
1127
1128
1129
1130
1135 errmsg("could not fsync file \"%s\": %m", path)));
1137
1141 errmsg("could not close file \"%s\": %m", path)));
1142}
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154void
1156{
1159 DIR *mappings_dir;
1160 struct dirent *mapping_de;
1162
1163
1164
1165
1166
1168
1169
1171
1172
1174 cutoff = redo;
1175
1178 {
1179 Oid dboid;
1180 Oid relid;
1185 lo;
1187
1188 if (strcmp(mapping_de->d_name, ".") == 0 ||
1189 strcmp(mapping_de->d_name, "..") == 0)
1190 continue;
1191
1194
1196 continue;
1197
1198
1199 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1200 continue;
1201
1203 &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1204 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1205
1206 lsn = ((uint64) hi) << 32 | lo;
1207
1209 {
1210 elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1211 if (unlink(path) < 0)
1214 errmsg("could not remove file \"%s\": %m", path)));
1215 }
1216 else
1217 {
1218
1220
1221
1222
1223
1224
1225
1226 if (fd < 0)
1229 errmsg("could not open file \"%s\": %m", path)));
1230
1231
1232
1233
1234
1235
1240 errmsg("could not fsync file \"%s\": %m", path)));
1242
1246 errmsg("could not close file \"%s\": %m", path)));
1247 }
1248 }
1250
1251
1253}
#define RelationGetNumberOfBlocks(reln)
Size PageGetHeapFreeSpace(const PageData *page)
void PageInit(Page page, Size pageSize, Size specialSize)
static Item PageGetItem(const PageData *page, const ItemIdData *itemId)
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
BulkWriteState * smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate)
void smgr_bulk_finish(BulkWriteState *bulkstate)
TransactionId MultiXactId
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
void * hash_seq_search(HASH_SEQ_STATUS *status)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
int errcode_for_file_access(void)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
int FileSync(File file, uint32 wait_event_info)
int CloseTransientFile(int fd)
void FileClose(File file)
void fsync_fname(const char *fname, bool isdir)
int data_sync_elevel(int elevel)
File PathNameOpenFile(const char *fileName, int fileFlags)
DIR * AllocateDir(const char *dirname)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int OpenTransientFile(const char *fileName, int fileFlags)
static ssize_t FileWrite(File file, const void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Assert(PointerIsAligned(start, uint64))
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId FreezeLimit, TransactionId MultiXactCutoff)
#define HEAP_INSERT_SKIP_FSM
#define HEAP_INSERT_NO_LOGICAL
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
#define XLOG_HEAP2_REWRITE
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
#define TOAST_TUPLE_THRESHOLD
HeapTuple heap_copytuple(HeapTuple tuple)
void heap_freetuple(HeapTuple htup)
HeapTupleHeaderData * HeapTupleHeader
static bool HEAP_XMAX_IS_LOCKED_ONLY(uint16 infomask)
static bool HeapTupleHasExternal(const HeapTupleData *tuple)
static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)
static bool HeapTupleHeaderIndicatesMovedPartitions(const HeapTupleHeaderData *tup)
#define HEAP_XMAX_INVALID
static TransactionId HeapTupleHeaderGetUpdateXid(const HeapTupleHeaderData *tup)
#define dclist_container(type, membername, ptr)
static void dclist_push_tail(dclist_head *head, dlist_node *node)
static uint32 dclist_count(const dclist_head *head)
static void dclist_delete_from(dclist_head *head, dlist_node *node)
static void dclist_init(dclist_head *head)
#define dclist_foreach_modify(iter, lhead)
if(TABLE==NULL||TABLE_index==NULL)
bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
static void ItemPointerSetInvalid(ItemPointerData *pointer)
static bool ItemPointerIsValid(const ItemPointerData *pointer)
void * MemoryContextAlloc(MemoryContext context, Size size)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define InvalidOffsetNumber
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int fd(const char *x, int i)
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
#define RelationGetRelid(relation)
#define RelationGetTargetPageFreeSpace(relation, defaultff)
#define RelationIsAccessibleInLogicalDecoding(relation)
#define HEAP_DEFAULT_FILLFACTOR
#define PG_LOGICAL_MAPPINGS_DIR
struct RewriteMappingDataEntry RewriteMappingDataEntry
static void raw_heap_insert(RewriteState state, HeapTuple tup)
void end_heap_rewrite(RewriteState state)
bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
UnresolvedTupData * UnresolvedTup
RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
static void logical_heap_rewrite_flush_mappings(RewriteState state)
void heap_xlog_logical_rewrite(XLogReaderState *r)
static void logical_begin_heap_rewrite(RewriteState state)
void CheckPointLogicalRewriteHeap(void)
struct RewriteMappingFile RewriteMappingFile
static void logical_end_heap_rewrite(RewriteState state)
OldToNewMappingData * OldToNewMapping
struct RewriteStateData RewriteStateData
void rewrite_heap_tuple(RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
#define LOGICAL_REWRITE_FORMAT
struct LogicalRewriteMappingData LogicalRewriteMappingData
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
RelFileLocator old_locator
RelFileLocator new_locator
LogicalRewriteMappingData map
TransactionId rs_freeze_xid
TransactionId rs_oldest_xmin
HTAB * rs_logical_mappings
HTAB * rs_unresolved_tups
uint32 rs_num_rewrite_mappings
TransactionId rs_logical_xmin
BulkWriteState * rs_bulkstate
BulkWriteBuffer rs_buffer
HTAB * rs_old_new_tid_map
MultiXactId rs_cutoff_multi
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdEquals(id1, id2)
#define TransactionIdIsNormal(xid)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
TransactionId GetCurrentTransactionId(void)
XLogRecPtr GetRedoRecPtr(void)
XLogRecPtr GetXLogInsertRecPtr(void)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogRegisterData(const void *data, uint32 len)
void XLogBeginInsert(void)
#define XLogRecGetData(decoder)
#define XLogRecGetXid(decoder)