PostgreSQL Source Code: src/backend/access/heap/rewriteheap.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
104
106
125
126
127
128
129
131{
139
141
143
145
147
154
155
156
157
158
159
160
161typedef struct
162{
166
167
168
169
170typedef struct
171{
176
178
179typedef struct
180{
184
186
187
188
189
190
192{
199
200
201
202
203
205{
207
210
211
212
214
215
219
220
221
222
223
224
225
226
227
228
229
230
231
232
236{
241
242
243
244
245
247 "Table rewrite",
250
251
253
254 state->rs_old_rel = old_heap;
255 state->rs_new_rel = new_heap;
256 state->rs_buffer = NULL;
257
259 state->rs_oldest_xmin = oldest_xmin;
260 state->rs_freeze_xid = freeze_xid;
261 state->rs_cutoff_multi = cutoff_multi;
262 state->rs_cxt = rw_cxt;
264
265
269
270 state->rs_unresolved_tups =
271 hash_create("Rewrite / Unresolved ctids",
272 128,
273 &hash_ctl,
275
277
278 state->rs_old_new_tid_map =
279 hash_create("Rewrite / Old to new tid map",
280 128,
281 &hash_ctl,
283
285
287
289}
290
291
292
293
294
295
296void
298{
301
302
303
304
305
307
308 while ((unresolved = hash_seq_search(&seq_status)) != NULL)
309 {
312 }
313
314
315 if (state->rs_buffer)
316 {
318 state->rs_buffer = NULL;
319 }
320
322
324
325
327}
328
329
330
331
332
333
334
335
336
337
338
339
340void
343{
347 bool found;
348 bool free_new;
349
351
352
353
354
355
356
357
361
366
367
368
369
370
372 state->rs_old_rel->rd_rel->relfrozenxid,
373 state->rs_old_rel->rd_rel->relminmxid,
374 state->rs_freeze_xid,
375 state->rs_cutoff_multi);
376
377
378
379
380
382
383
384
385
391 {
393
394 memset(&hashkey, 0, sizeof(hashkey));
397
401
402 if (mapping != NULL)
403 {
404
405
406
407
408
410
411
415 }
416 else
417 {
418
419
420
421
423
424 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
427
430
431
432
433
434
436 return;
437 }
438 }
439
440
441
442
443
444
445
446 old_tid = old_tuple->t_self;
447 free_new = false;
448
449 for (;;)
450 {
452
453
455 new_tid = new_tuple->t_self;
456
458
459
460
461
462
463
464
465
466
469 state->rs_oldest_xmin))
470 {
471
472
473
475
476 memset(&hashkey, 0, sizeof(hashkey));
478 hashkey.tid = old_tid;
479
480 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
482
483 if (unresolved != NULL)
484 {
485
486
487
488
489
490 if (free_new)
492 new_tuple = unresolved->tuple;
493 free_new = true;
494 old_tid = unresolved->old_tid;
496
497
498
499
500
504
505
506 continue;
507 }
508 else
509 {
510
511
512
513
515
519
520 mapping->new_tid = new_tid;
521 }
522 }
523
524
525 if (free_new)
527 break;
528 }
529
531}
532
533
534
535
536
537
538
539
540
541
542bool
544{
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
562 bool found;
563
564 memset(&hashkey, 0, sizeof(hashkey));
567
568 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
570
571 if (unresolved != NULL)
572 {
573
578 return true;
579 }
580
581 return false;
582}
583
584
585
586
587
588
589
590
591
592static void
594{
596 Size pageFreeSpace,
597 saveFreeSpace;
601
602
603
604
605
606
607
608
609 if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
610 {
611
613 heaptup = tup;
614 }
616 {
618
619
620
621
622
623
625
628 }
629 else
630 heaptup = tup;
631
633
634
635
636
639 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
640 errmsg("row is too big: size %zu, maximum size %zu",
642
643
646
647
649 if (page)
650 {
652
653 if (len + saveFreeSpace > pageFreeSpace)
654 {
655
656
657
658
659
661 state->rs_buffer = NULL;
662 page = NULL;
663 state->rs_blockno++;
664 }
665 }
666
667 if (!page)
668 {
669
673 }
674
675
678 elog(ERROR, "failed to add tuple");
679
680
682
683
684
685
686
688 {
691
694
696 }
697
698
699 if (heaptup != tup)
701}
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757static void
759{
762
763
764
765
766
767
768 state->rs_logical_rewrite =
770
771 if (->rs_logical_rewrite)
772 return;
773
775
776
777
778
779
780
782 {
783 state->rs_logical_rewrite = false;
784 return;
785 }
786
787 state->rs_logical_xmin = logical_xmin;
789 state->rs_num_rewrite_mappings = 0;
790
794
795 state->rs_logical_mappings =
797 128,
798 &hash_ctl,
800}
801
802
803
804
805static void
807{
811
813
814
815 if (state->rs_num_rewrite_mappings == 0)
816 return;
817
818 elog(DEBUG1, "flushing %u logical rewrite mapping entries",
819 state->rs_num_rewrite_mappings);
820
823 {
824 char *waldata;
825 char *waldata_start;
827 Oid dboid;
829 int written;
831
832
833 if (num_mappings == 0)
834 continue;
835
836 if (state->rs_old_rel->rd_rel->relisshared)
838 else
840
847
848
850 waldata_start = waldata = palloc(len);
851
852
853
854
856 {
858
860
861 memcpy(waldata, &pmap->map, sizeof(pmap->map));
862 waldata += sizeof(pmap->map);
863
864
867
868
869 state->rs_num_rewrite_mappings--;
870 }
871
873 Assert(waldata == waldata_start + len);
874
875
876
877
878
880 WAIT_EVENT_LOGICAL_REWRITE_WRITE);
881 if (written != len)
884 errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
885 written, len)));
887
891
892
894
895 pfree(waldata_start);
896 }
897 Assert(state->rs_num_rewrite_mappings == 0);
898}
899
900
901
902
903static void
905{
908
909
910 if (->rs_logical_rewrite)
911 return;
912
913
914 if (state->rs_num_rewrite_mappings > 0)
916
917
920 {
921 if (FileSync(src->vfd, WAIT_EVENT_LOGICAL_REWRITE_SYNC) != 0)
924 errmsg("could not fsync file \"%s\": %m", src->path)));
926 }
927
928}
929
930
931
932
933static void
936{
939 Oid relid;
940 bool found;
941
943
944
947
948
949
950
951
952 if (!found)
953 {
955 Oid dboid;
956
957 if (state->rs_old_rel->rd_rel->relisshared)
959 else
961
967
969 src->off = 0;
970 memcpy(src->path, path, sizeof(path));
972 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
973 if (src->vfd < 0)
976 errmsg("could not create file \"%s\": %m", path)));
977 }
978
983 state->rs_num_rewrite_mappings++;
984
985
986
987
988
989 if (state->rs_num_rewrite_mappings >= 1000 )
991}
992
993
994
995
996
997static void
1000{
1005 bool do_log_xmin = false;
1006 bool do_log_xmax = false;
1008
1009
1010 if (->rs_logical_rewrite)
1011 return;
1012
1014
1016
1017
1018
1019
1021 do_log_xmin = true;
1022
1024 {
1025
1026
1027
1028
1029 }
1031 {
1032
1033 }
1035 {
1036
1037 do_log_xmax = true;
1038 }
1039
1040
1041 if (!do_log_xmin && !do_log_xmax)
1042 return;
1043
1044
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061 if (do_log_xmin)
1063
1066}
1067
1068
1069
1070
1071void
1073{
1075 int fd;
1079
1081
1087
1089 O_CREAT | O_WRONLY | PG_BINARY);
1090 if (fd < 0)
1093 errmsg("could not create file \"%s\": %m", path)));
1094
1095
1096
1097
1098
1100 if (ftruncate(fd, xlrec->offset) != 0)
1103 errmsg("could not truncate file \"%s\" to %u: %m",
1106
1108
1110
1111
1112 errno = 0;
1115 {
1116
1117 if (errno == 0)
1118 errno = ENOSPC;
1121 errmsg("could not write to file \"%s\": %m", path)));
1122 }
1124
1125
1126
1127
1128
1129
1134 errmsg("could not fsync file \"%s\": %m", path)));
1136
1140 errmsg("could not close file \"%s\": %m", path)));
1141}
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153void
1155{
1158 DIR *mappings_dir;
1159 struct dirent *mapping_de;
1161
1162
1163
1164
1165
1167
1168
1170
1171
1173 cutoff = redo;
1174
1177 {
1178 Oid dboid;
1179 Oid relid;
1184 lo;
1186
1187 if (strcmp(mapping_de->d_name, ".") == 0 ||
1188 strcmp(mapping_de->d_name, "..") == 0)
1189 continue;
1190
1193
1195 continue;
1196
1197
1198 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
1199 continue;
1200
1202 &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
1203 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
1204
1205 lsn = ((uint64) hi) << 32 | lo;
1206
1208 {
1209 elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
1210 if (unlink(path) < 0)
1213 errmsg("could not remove file \"%s\": %m", path)));
1214 }
1215 else
1216 {
1217
1219
1220
1221
1222
1223
1224
1225 if (fd < 0)
1228 errmsg("could not open file \"%s\": %m", path)));
1229
1230
1231
1232
1233
1234
1239 errmsg("could not fsync file \"%s\": %m", path)));
1241
1245 errmsg("could not close file \"%s\": %m", path)));
1246 }
1247 }
1249
1250
1252}
#define RelationGetNumberOfBlocks(reln)
Size PageGetHeapFreeSpace(const PageData *page)
void PageInit(Page page, Size pageSize, Size specialSize)
static void * PageGetItem(const PageData *page, const ItemIdData *itemId)
static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)
BulkWriteState * smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate)
void smgr_bulk_finish(BulkWriteState *bulkstate)
TransactionId MultiXactId
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
void * hash_seq_search(HASH_SEQ_STATUS *status)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
int errcode_for_file_access(void)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
int FileSync(File file, uint32 wait_event_info)
int CloseTransientFile(int fd)
void FileClose(File file)
void fsync_fname(const char *fname, bool isdir)
int data_sync_elevel(int elevel)
File PathNameOpenFile(const char *fileName, int fileFlags)
DIR * AllocateDir(const char *dirname)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int OpenTransientFile(const char *fileName, int fileFlags)
static ssize_t FileWrite(File file, const void *buffer, size_t amount, pgoff_t offset, uint32 wait_event_info)
#define palloc0_object(type)
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Assert(PointerIsAligned(start, uint64))
bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId FreezeLimit, TransactionId MultiXactCutoff)
#define HEAP_INSERT_SKIP_FSM
#define HEAP_INSERT_NO_LOGICAL
bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
#define XLOG_HEAP2_REWRITE
HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)
#define TOAST_TUPLE_THRESHOLD
HeapTuple heap_copytuple(HeapTuple tuple)
void heap_freetuple(HeapTuple htup)
HeapTupleHeaderData * HeapTupleHeader
static bool HEAP_XMAX_IS_LOCKED_ONLY(uint16 infomask)
static bool HeapTupleHasExternal(const HeapTupleData *tuple)
static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)
static bool HeapTupleHeaderIndicatesMovedPartitions(const HeapTupleHeaderData *tup)
#define HEAP_XMAX_INVALID
static TransactionId HeapTupleHeaderGetUpdateXid(const HeapTupleHeaderData *tup)
#define dclist_container(type, membername, ptr)
static void dclist_push_tail(dclist_head *head, dlist_node *node)
static uint32 dclist_count(const dclist_head *head)
static void dclist_delete_from(dclist_head *head, dlist_node *node)
static void dclist_init(dclist_head *head)
#define dclist_foreach_modify(iter, lhead)
if(TABLE==NULL||TABLE_index==NULL)
bool ItemPointerEquals(const ItemPointerData *pointer1, const ItemPointerData *pointer2)
static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)
static void ItemPointerSetInvalid(ItemPointerData *pointer)
static bool ItemPointerIsValid(const ItemPointerData *pointer)
void * MemoryContextAlloc(MemoryContext context, Size size)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define InvalidOffsetNumber
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int fd(const char *x, int i)
void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)
#define RelationGetRelid(relation)
#define RelationGetTargetPageFreeSpace(relation, defaultff)
#define RelationIsAccessibleInLogicalDecoding(relation)
#define HEAP_DEFAULT_FILLFACTOR
#define PG_LOGICAL_MAPPINGS_DIR
struct RewriteMappingDataEntry RewriteMappingDataEntry
static void raw_heap_insert(RewriteState state, HeapTuple tup)
void end_heap_rewrite(RewriteState state)
bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
UnresolvedTupData * UnresolvedTup
RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)
static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)
static void logical_heap_rewrite_flush_mappings(RewriteState state)
void heap_xlog_logical_rewrite(XLogReaderState *r)
static void logical_begin_heap_rewrite(RewriteState state)
void CheckPointLogicalRewriteHeap(void)
struct RewriteMappingFile RewriteMappingFile
static void logical_end_heap_rewrite(RewriteState state)
OldToNewMappingData * OldToNewMapping
struct RewriteStateData RewriteStateData
void rewrite_heap_tuple(RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)
static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)
#define LOGICAL_REWRITE_FORMAT
struct LogicalRewriteMappingData LogicalRewriteMappingData
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
RelFileLocator old_locator
RelFileLocator new_locator
LogicalRewriteMappingData map
TransactionId rs_freeze_xid
TransactionId rs_oldest_xmin
HTAB * rs_logical_mappings
HTAB * rs_unresolved_tups
uint32 rs_num_rewrite_mappings
TransactionId rs_logical_xmin
BulkWriteState * rs_bulkstate
BulkWriteBuffer rs_buffer
HTAB * rs_old_new_tid_map
MultiXactId rs_cutoff_multi
#define InvalidTransactionId
#define TransactionIdEquals(id1, id2)
#define TransactionIdIsNormal(xid)
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
TransactionId GetCurrentTransactionId(void)
XLogRecPtr GetRedoRecPtr(void)
XLogRecPtr GetXLogInsertRecPtr(void)
#define XLogRecPtrIsValid(r)
#define LSN_FORMAT_ARGS(lsn)
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogRegisterData(const void *data, uint32 len)
void XLogBeginInsert(void)
#define XLogRecGetData(decoder)
#define XLogRecGetXid(decoder)