PostgreSQL Source Code: src/backend/access/heap/rewriteheap.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

104

106

125

126

127

128

129

131{

139

141

143

145

147

154

155

156

157

158

159

160

161typedef struct

162{

166

167

168

169

170typedef struct

171{

176

178

179typedef struct

180{

184

186

187

188

189

190

192{

194 int vfd;

195 off_t off;

199

200

201

202

203

205{

207

210

211

212

214

215

219

220

221

222

223

224

225

226

227

228

229

230

231

232

236{

241

242

243

244

245

247 "Table rewrite",

250

251

253

254 state->rs_old_rel = old_heap;

255 state->rs_new_rel = new_heap;

256 state->rs_buffer = NULL;

257

259 state->rs_oldest_xmin = oldest_xmin;

260 state->rs_freeze_xid = freeze_xid;

261 state->rs_cutoff_multi = cutoff_multi;

262 state->rs_cxt = rw_cxt;

264

265

269

270 state->rs_unresolved_tups =

271 hash_create("Rewrite / Unresolved ctids",

272 128,

273 &hash_ctl,

275

277

278 state->rs_old_new_tid_map =

279 hash_create("Rewrite / Old to new tid map",

280 128,

281 &hash_ctl,

283

285

287

289}

290

291

292

293

294

295

296void

298{

301

302

303

304

305

307

308 while ((unresolved = hash_seq_search(&seq_status)) != NULL)

309 {

312 }

313

314

315 if (state->rs_buffer)

316 {

318 state->rs_buffer = NULL;

319 }

320

322

324

325

327}

328

329

330

331

332

333

334

335

336

337

338

339

340void

343{

347 bool found;

348 bool free_new;

349

351

352

353

354

355

356

357

361

366

367

368

369

370

372 state->rs_old_rel->rd_rel->relfrozenxid,

373 state->rs_old_rel->rd_rel->relminmxid,

374 state->rs_freeze_xid,

375 state->rs_cutoff_multi);

376

377

378

379

380

382

383

384

385

391 {

393

394 memset(&hashkey, 0, sizeof(hashkey));

397

401

402 if (mapping != NULL)

403 {

404

405

406

407

408

410

411

415 }

416 else

417 {

418

419

420

421

423

424 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,

427

430

431

432

433

434

436 return;

437 }

438 }

439

440

441

442

443

444

445

446 old_tid = old_tuple->t_self;

447 free_new = false;

448

449 for (;;)

450 {

452

453

455 new_tid = new_tuple->t_self;

456

458

459

460

461

462

463

464

465

466

469 state->rs_oldest_xmin))

470 {

471

472

473

475

476 memset(&hashkey, 0, sizeof(hashkey));

478 hashkey.tid = old_tid;

479

480 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,

482

483 if (unresolved != NULL)

484 {

485

486

487

488

489

490 if (free_new)

492 new_tuple = unresolved->tuple;

493 free_new = true;

494 old_tid = unresolved->old_tid;

496

497

498

499

500

504

505

506 continue;

507 }

508 else

509 {

510

511

512

513

515

519

520 mapping->new_tid = new_tid;

521 }

522 }

523

524

525 if (free_new)

527 break;

528 }

529

531}

532

533

534

535

536

537

538

539

540

541

542bool

544{

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

562 bool found;

563

564 memset(&hashkey, 0, sizeof(hashkey));

567

568 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,

570

571 if (unresolved != NULL)

572 {

573

578 return true;

579 }

580

581 return false;

582}

583

584

585

586

587

588

589

590

591

592static void

594{

596 Size pageFreeSpace,

597 saveFreeSpace;

601

602

603

604

605

606

607

608

609 if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)

610 {

611

613 heaptup = tup;

614 }

616 {

618

619

620

621

622

623

625

628 }

629 else

630 heaptup = tup;

631

633

634

635

636

639 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),

640 errmsg("row is too big: size %zu, maximum size %zu",

642

643

646

647

649 if (page)

650 {

652

653 if (len + saveFreeSpace > pageFreeSpace)

654 {

655

656

657

658

659

661 state->rs_buffer = NULL;

662 page = NULL;

663 state->rs_blockno++;

664 }

665 }

666

667 if (!page)

668 {

669

673 }

674

675

679 elog(ERROR, "failed to add tuple");

680

681

683

684

685

686

687

689 {

692

695

697 }

698

699

700 if (heaptup != tup)

702}

703

704

705

706

707

708

709

710

711

712

713

714

715

716

717

718

719

720

721

722

723

724

725

726

727

728

729

730

731

732

733

734

735

736

737

738

739

740

741

742

743

744

745

746

747

748

749

750

751

752

753

754

755

756

757

758static void

760{

763

764

765

766

767

768

769 state->rs_logical_rewrite =

771

772 if (state->rs_logical_rewrite)

773 return;

774

776

777

778

779

780

781

783 {

784 state->rs_logical_rewrite = false;

785 return;

786 }

787

788 state->rs_logical_xmin = logical_xmin;

790 state->rs_num_rewrite_mappings = 0;

791

795

796 state->rs_logical_mappings =

798 128,

799 &hash_ctl,

801}

802

803

804

805

806static void

808{

812

814

815

816 if (state->rs_num_rewrite_mappings == 0)

817 return;

818

819 elog(DEBUG1, "flushing %u logical rewrite mapping entries",

820 state->rs_num_rewrite_mappings);

821

824 {

825 char *waldata;

826 char *waldata_start;

828 Oid dboid;

830 int written;

832

833

834 if (num_mappings == 0)

835 continue;

836

837 if (state->rs_old_rel->rd_rel->relisshared)

839 else

841

848

849

851 waldata_start = waldata = palloc(len);

852

853

854

855

857 {

859

861

862 memcpy(waldata, &pmap->map, sizeof(pmap->map));

863 waldata += sizeof(pmap->map);

864

865

868

869

870 state->rs_num_rewrite_mappings--;

871 }

872

874 Assert(waldata == waldata_start + len);

875

876

877

878

879

881 WAIT_EVENT_LOGICAL_REWRITE_WRITE);

882 if (written != len)

885 errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,

886 written, len)));

888

892

893

895

896 pfree(waldata_start);

897 }

898 Assert(state->rs_num_rewrite_mappings == 0);

899}

900

901

902

903

904static void

906{

909

910

911 if (state->rs_logical_rewrite)

912 return;

913

914

915 if (state->rs_num_rewrite_mappings > 0)

917

918

921 {

922 if (FileSync(src->vfd, WAIT_EVENT_LOGICAL_REWRITE_SYNC) != 0)

925 errmsg("could not fsync file \"%s\": %m", src->path)));

927 }

928

929}

930

931

932

933

934static void

937{

940 Oid relid;

941 bool found;

942

944

945

948

949

950

951

952

953 if (!found)

954 {

956 Oid dboid;

957

958 if (state->rs_old_rel->rd_rel->relisshared)

960 else

962

968

970 src->off = 0;

971 memcpy(src->path, path, sizeof(path));

973 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);

974 if (src->vfd < 0)

977 errmsg("could not create file \"%s\": %m", path)));

978 }

979

984 state->rs_num_rewrite_mappings++;

985

986

987

988

989

990 if (state->rs_num_rewrite_mappings >= 1000 )

992}

993

994

995

996

997

998static void

1001{

1006 bool do_log_xmin = false;

1007 bool do_log_xmax = false;

1009

1010

1011 if (state->rs_logical_rewrite)

1012 return;

1013

1015

1017

1018

1019

1020

1022 do_log_xmin = true;

1023

1025 {

1026

1027

1028

1029

1030 }

1032 {

1033

1034 }

1036 {

1037

1038 do_log_xmax = true;

1039 }

1040

1041

1042 if (!do_log_xmin && !do_log_xmax)

1043 return;

1044

1045

1050

1051

1052

1053

1054

1055

1056

1057

1058

1059

1060

1061

1062 if (do_log_xmin)

1064

1067}

1068

1069

1070

1071

1072void

1074{

1076 int fd;

1080

1082

1088

1090 O_CREAT | O_WRONLY | PG_BINARY);

1091 if (fd < 0)

1094 errmsg("could not create file \"%s\": %m", path)));

1095

1096

1097

1098

1099

1101 if (ftruncate(fd, xlrec->offset) != 0)

1104 errmsg("could not truncate file \"%s\" to %u: %m",

1107

1109

1111

1112

1113 errno = 0;

1116 {

1117

1118 if (errno == 0)

1119 errno = ENOSPC;

1122 errmsg("could not write to file \"%s\": %m", path)));

1123 }

1125

1126

1127

1128

1129

1130

1135 errmsg("could not fsync file \"%s\": %m", path)));

1137

1141 errmsg("could not close file \"%s\": %m", path)));

1142}

1143

1144

1145

1146

1147

1148

1149

1150

1151

1152

1153

1154void

1156{

1159 DIR *mappings_dir;

1160 struct dirent *mapping_de;

1162

1163

1164

1165

1166

1168

1169

1171

1172

1174 cutoff = redo;

1175

1178 {

1179 Oid dboid;

1180 Oid relid;

1185 lo;

1187

1188 if (strcmp(mapping_de->d_name, ".") == 0 ||

1189 strcmp(mapping_de->d_name, "..") == 0)

1190 continue;

1191

1194

1196 continue;

1197

1198

1199 if (strncmp(mapping_de->d_name, "map-", 4) != 0)

1200 continue;

1201

1203 &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)

1204 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);

1205

1206 lsn = ((uint64) hi) << 32 | lo;

1207

1209 {

1210 elog(DEBUG1, "removing logical rewrite file \"%s\"", path);

1211 if (unlink(path) < 0)

1214 errmsg("could not remove file \"%s\": %m", path)));

1215 }

1216 else

1217 {

1218

1220

1221

1222

1223

1224

1225

1226 if (fd < 0)

1229 errmsg("could not open file \"%s\": %m", path)));

1230

1231

1232

1233

1234

1235

1240 errmsg("could not fsync file \"%s\": %m", path)));

1242

1246 errmsg("could not close file \"%s\": %m", path)));

1247 }

1248 }

1250

1251

1253}

#define RelationGetNumberOfBlocks(reln)

Size PageGetHeapFreeSpace(const PageData *page)

void PageInit(Page page, Size pageSize, Size specialSize)

static Item PageGetItem(const PageData *page, const ItemIdData *itemId)

static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)

#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)

BulkWriteState * smgr_bulk_start_rel(Relation rel, ForkNumber forknum)

void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)

BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate)

void smgr_bulk_finish(BulkWriteState *bulkstate)

TransactionId MultiXactId

void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)

void * hash_seq_search(HASH_SEQ_STATUS *status)

HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)

void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)

int errcode_for_file_access(void)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

int FileSync(File file, uint32 wait_event_info)

int CloseTransientFile(int fd)

void FileClose(File file)

void fsync_fname(const char *fname, bool isdir)

int data_sync_elevel(int elevel)

File PathNameOpenFile(const char *fileName, int fileFlags)

DIR * AllocateDir(const char *dirname)

struct dirent * ReadDir(DIR *dir, const char *dirname)

int OpenTransientFile(const char *fileName, int fileFlags)

static ssize_t FileWrite(File file, const void *buffer, size_t amount, off_t offset, uint32 wait_event_info)

PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)

Assert(PointerIsAligned(start, uint64))

bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId FreezeLimit, TransactionId MultiXactCutoff)

#define HEAP_INSERT_SKIP_FSM

#define HEAP_INSERT_NO_LOGICAL

bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)

#define XLOG_HEAP2_REWRITE

HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)

#define TOAST_TUPLE_THRESHOLD

HeapTuple heap_copytuple(HeapTuple tuple)

void heap_freetuple(HeapTuple htup)

HeapTupleHeaderData * HeapTupleHeader

static bool HEAP_XMAX_IS_LOCKED_ONLY(uint16 infomask)

static bool HeapTupleHasExternal(const HeapTupleData *tuple)

static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)

static bool HeapTupleHeaderIndicatesMovedPartitions(const HeapTupleHeaderData *tup)

#define HEAP_XMAX_INVALID

static TransactionId HeapTupleHeaderGetUpdateXid(const HeapTupleHeaderData *tup)

#define dclist_container(type, membername, ptr)

static void dclist_push_tail(dclist_head *head, dlist_node *node)

static uint32 dclist_count(const dclist_head *head)

static void dclist_delete_from(dclist_head *head, dlist_node *node)

static void dclist_init(dclist_head *head)

#define dclist_foreach_modify(iter, lhead)

if(TABLE==NULL||TABLE_index==NULL)

bool ItemPointerEquals(ItemPointer pointer1, ItemPointer pointer2)

static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)

static void ItemPointerSetInvalid(ItemPointerData *pointer)

static bool ItemPointerIsValid(const ItemPointerData *pointer)

void * MemoryContextAlloc(MemoryContext context, Size size)

void pfree(void *pointer)

void * palloc0(Size size)

MemoryContext CurrentMemoryContext

void MemoryContextDelete(MemoryContext context)

#define AllocSetContextCreate

#define ALLOCSET_DEFAULT_SIZES

#define InvalidOffsetNumber

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

static int fd(const char *x, int i)

void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)

#define RelationGetRelid(relation)

#define RelationGetTargetPageFreeSpace(relation, defaultff)

#define RelationIsAccessibleInLogicalDecoding(relation)

#define HEAP_DEFAULT_FILLFACTOR

#define PG_LOGICAL_MAPPINGS_DIR

struct RewriteMappingDataEntry RewriteMappingDataEntry

static void raw_heap_insert(RewriteState state, HeapTuple tup)

void end_heap_rewrite(RewriteState state)

bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)

UnresolvedTupData * UnresolvedTup

RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)

static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)

static void logical_heap_rewrite_flush_mappings(RewriteState state)

void heap_xlog_logical_rewrite(XLogReaderState *r)

static void logical_begin_heap_rewrite(RewriteState state)

void CheckPointLogicalRewriteHeap(void)

struct RewriteMappingFile RewriteMappingFile

static void logical_end_heap_rewrite(RewriteState state)

OldToNewMappingData * OldToNewMapping

struct RewriteStateData RewriteStateData

void rewrite_heap_tuple(RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)

static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)

#define LOGICAL_REWRITE_FORMAT

struct LogicalRewriteMappingData LogicalRewriteMappingData

XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)

RelFileLocator old_locator

RelFileLocator new_locator

LogicalRewriteMappingData map

TransactionId rs_freeze_xid

TransactionId rs_oldest_xmin

HTAB * rs_logical_mappings

HTAB * rs_unresolved_tups

uint32 rs_num_rewrite_mappings

TransactionId rs_logical_xmin

BulkWriteState * rs_bulkstate

BulkWriteBuffer rs_buffer

HTAB * rs_old_new_tid_map

MultiXactId rs_cutoff_multi

bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)

#define InvalidTransactionId

#define TransactionIdEquals(id1, id2)

#define TransactionIdIsNormal(xid)

static void pgstat_report_wait_start(uint32 wait_event_info)

static void pgstat_report_wait_end(void)

TransactionId GetCurrentTransactionId(void)

XLogRecPtr GetRedoRecPtr(void)

XLogRecPtr GetXLogInsertRecPtr(void)

#define LSN_FORMAT_ARGS(lsn)

#define InvalidXLogRecPtr

XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)

void XLogRegisterData(const void *data, uint32 len)

void XLogBeginInsert(void)

#define XLogRecGetData(decoder)

#define XLogRecGetXid(decoder)