PostgreSQL Source Code: src/backend/access/heap/rewriteheap.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

104

106

125

126

127

128

129

131{

139

141

143

145

147

154

155

156

157

158

159

160

161typedef struct

162{

166

167

168

169

170typedef struct

171{

176

178

179typedef struct

180{

184

186

187

188

189

190

192{

194 int vfd;

195 off_t off;

199

200

201

202

203

205{

207

210

211

212

214

215

219

220

221

222

223

224

225

226

227

228

229

230

231

232

236{

241

242

243

244

245

247 "Table rewrite",

250

251

253

254 state->rs_old_rel = old_heap;

255 state->rs_new_rel = new_heap;

256 state->rs_buffer = NULL;

257

259 state->rs_oldest_xmin = oldest_xmin;

260 state->rs_freeze_xid = freeze_xid;

261 state->rs_cutoff_multi = cutoff_multi;

262 state->rs_cxt = rw_cxt;

264

265

269

270 state->rs_unresolved_tups =

271 hash_create("Rewrite / Unresolved ctids",

272 128,

273 &hash_ctl,

275

277

278 state->rs_old_new_tid_map =

279 hash_create("Rewrite / Old to new tid map",

280 128,

281 &hash_ctl,

283

285

287

289}

290

291

292

293

294

295

296void

298{

301

302

303

304

305

307

308 while ((unresolved = hash_seq_search(&seq_status)) != NULL)

309 {

312 }

313

314

315 if (state->rs_buffer)

316 {

318 state->rs_buffer = NULL;

319 }

320

322

324

325

327}

328

329

330

331

332

333

334

335

336

337

338

339

340void

343{

347 bool found;

348 bool free_new;

349

351

352

353

354

355

356

357

361

366

367

368

369

370

372 state->rs_old_rel->rd_rel->relfrozenxid,

373 state->rs_old_rel->rd_rel->relminmxid,

374 state->rs_freeze_xid,

375 state->rs_cutoff_multi);

376

377

378

379

380

382

383

384

385

391 {

393

394 memset(&hashkey, 0, sizeof(hashkey));

397

401

402 if (mapping != NULL)

403 {

404

405

406

407

408

410

411

415 }

416 else

417 {

418

419

420

421

423

424 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,

427

430

431

432

433

434

436 return;

437 }

438 }

439

440

441

442

443

444

445

446 old_tid = old_tuple->t_self;

447 free_new = false;

448

449 for (;;)

450 {

452

453

455 new_tid = new_tuple->t_self;

456

458

459

460

461

462

463

464

465

466

469 state->rs_oldest_xmin))

470 {

471

472

473

475

476 memset(&hashkey, 0, sizeof(hashkey));

478 hashkey.tid = old_tid;

479

480 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,

482

483 if (unresolved != NULL)

484 {

485

486

487

488

489

490 if (free_new)

492 new_tuple = unresolved->tuple;

493 free_new = true;

494 old_tid = unresolved->old_tid;

496

497

498

499

500

504

505

506 continue;

507 }

508 else

509 {

510

511

512

513

515

519

520 mapping->new_tid = new_tid;

521 }

522 }

523

524

525 if (free_new)

527 break;

528 }

529

531}

532

533

534

535

536

537

538

539

540

541

542bool

544{

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

562 bool found;

563

564 memset(&hashkey, 0, sizeof(hashkey));

567

568 unresolved = hash_search(state->rs_unresolved_tups, &hashkey,

570

571 if (unresolved != NULL)

572 {

573

578 return true;

579 }

580

581 return false;

582}

583

584

585

586

587

588

589

590

591

592static void

594{

596 Size pageFreeSpace,

597 saveFreeSpace;

601

602

603

604

605

606

607

608

609 if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)

610 {

611

613 heaptup = tup;

614 }

616 {

618

619

620

621

622

623

625

628 }

629 else

630 heaptup = tup;

631

633

634

635

636

639 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),

640 errmsg("row is too big: size %zu, maximum size %zu",

642

643

646

647

649 if (page)

650 {

652

653 if (len + saveFreeSpace > pageFreeSpace)

654 {

655

656

657

658

659

661 state->rs_buffer = NULL;

662 page = NULL;

663 state->rs_blockno++;

664 }

665 }

666

667 if (!page)

668 {

669

673 }

674

675

678 elog(ERROR, "failed to add tuple");

679

680

682

683

684

685

686

688 {

691

694

696 }

697

698

699 if (heaptup != tup)

701}

702

703

704

705

706

707

708

709

710

711

712

713

714

715

716

717

718

719

720

721

722

723

724

725

726

727

728

729

730

731

732

733

734

735

736

737

738

739

740

741

742

743

744

745

746

747

748

749

750

751

752

753

754

755

756

757static void

759{

762

763

764

765

766

767

768 state->rs_logical_rewrite =

770

771 if (state->rs_logical_rewrite)

772 return;

773

775

776

777

778

779

780

782 {

783 state->rs_logical_rewrite = false;

784 return;

785 }

786

787 state->rs_logical_xmin = logical_xmin;

789 state->rs_num_rewrite_mappings = 0;

790

794

795 state->rs_logical_mappings =

797 128,

798 &hash_ctl,

800}

801

802

803

804

805static void

807{

811

813

814

815 if (state->rs_num_rewrite_mappings == 0)

816 return;

817

818 elog(DEBUG1, "flushing %u logical rewrite mapping entries",

819 state->rs_num_rewrite_mappings);

820

823 {

824 char *waldata;

825 char *waldata_start;

827 Oid dboid;

829 int written;

831

832

833 if (num_mappings == 0)

834 continue;

835

836 if (state->rs_old_rel->rd_rel->relisshared)

838 else

840

847

848

850 waldata_start = waldata = palloc(len);

851

852

853

854

856 {

858

860

861 memcpy(waldata, &pmap->map, sizeof(pmap->map));

862 waldata += sizeof(pmap->map);

863

864

867

868

869 state->rs_num_rewrite_mappings--;

870 }

871

873 Assert(waldata == waldata_start + len);

874

875

876

877

878

880 WAIT_EVENT_LOGICAL_REWRITE_WRITE);

881 if (written != len)

884 errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,

885 written, len)));

887

891

892

894

895 pfree(waldata_start);

896 }

897 Assert(state->rs_num_rewrite_mappings == 0);

898}

899

900

901

902

903static void

905{

908

909

910 if (state->rs_logical_rewrite)

911 return;

912

913

914 if (state->rs_num_rewrite_mappings > 0)

916

917

920 {

921 if (FileSync(src->vfd, WAIT_EVENT_LOGICAL_REWRITE_SYNC) != 0)

924 errmsg("could not fsync file \"%s\": %m", src->path)));

926 }

927

928}

929

930

931

932

933static void

936{

939 Oid relid;

940 bool found;

941

943

944

947

948

949

950

951

952 if (!found)

953 {

955 Oid dboid;

956

957 if (state->rs_old_rel->rd_rel->relisshared)

959 else

961

967

969 src->off = 0;

970 memcpy(src->path, path, sizeof(path));

972 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);

973 if (src->vfd < 0)

976 errmsg("could not create file \"%s\": %m", path)));

977 }

978

983 state->rs_num_rewrite_mappings++;

984

985

986

987

988

989 if (state->rs_num_rewrite_mappings >= 1000 )

991}

992

993

994

995

996

997static void

1000{

1005 bool do_log_xmin = false;

1006 bool do_log_xmax = false;

1008

1009

1010 if (state->rs_logical_rewrite)

1011 return;

1012

1014

1016

1017

1018

1019

1021 do_log_xmin = true;

1022

1024 {

1025

1026

1027

1028

1029 }

1031 {

1032

1033 }

1035 {

1036

1037 do_log_xmax = true;

1038 }

1039

1040

1041 if (!do_log_xmin && !do_log_xmax)

1042 return;

1043

1044

1049

1050

1051

1052

1053

1054

1055

1056

1057

1058

1059

1060

1061 if (do_log_xmin)

1063

1066}

1067

1068

1069

1070

1071void

1073{

1075 int fd;

1079

1081

1087

1089 O_CREAT | O_WRONLY | PG_BINARY);

1090 if (fd < 0)

1093 errmsg("could not create file \"%s\": %m", path)));

1094

1095

1096

1097

1098

1100 if (ftruncate(fd, xlrec->offset) != 0)

1103 errmsg("could not truncate file \"%s\" to %u: %m",

1106

1108

1110

1111

1112 errno = 0;

1115 {

1116

1117 if (errno == 0)

1118 errno = ENOSPC;

1121 errmsg("could not write to file \"%s\": %m", path)));

1122 }

1124

1125

1126

1127

1128

1129

1134 errmsg("could not fsync file \"%s\": %m", path)));

1136

1140 errmsg("could not close file \"%s\": %m", path)));

1141}

1142

1143

1144

1145

1146

1147

1148

1149

1150

1151

1152

1153void

1155{

1158 DIR *mappings_dir;

1159 struct dirent *mapping_de;

1161

1162

1163

1164

1165

1167

1168

1170

1171

1173 cutoff = redo;

1174

1177 {

1178 Oid dboid;

1179 Oid relid;

1184 lo;

1186

1187 if (strcmp(mapping_de->d_name, ".") == 0 ||

1188 strcmp(mapping_de->d_name, "..") == 0)

1189 continue;

1190

1193

1195 continue;

1196

1197

1198 if (strncmp(mapping_de->d_name, "map-", 4) != 0)

1199 continue;

1200

1202 &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)

1203 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);

1204

1205 lsn = ((uint64) hi) << 32 | lo;

1206

1208 {

1209 elog(DEBUG1, "removing logical rewrite file \"%s\"", path);

1210 if (unlink(path) < 0)

1213 errmsg("could not remove file \"%s\": %m", path)));

1214 }

1215 else

1216 {

1217

1219

1220

1221

1222

1223

1224

1225 if (fd < 0)

1228 errmsg("could not open file \"%s\": %m", path)));

1229

1230

1231

1232

1233

1234

1239 errmsg("could not fsync file \"%s\": %m", path)));

1241

1245 errmsg("could not close file \"%s\": %m", path)));

1246 }

1247 }

1249

1250

1252}

#define RelationGetNumberOfBlocks(reln)

Size PageGetHeapFreeSpace(const PageData *page)

void PageInit(Page page, Size pageSize, Size specialSize)

static void * PageGetItem(const PageData *page, const ItemIdData *itemId)

static ItemId PageGetItemId(Page page, OffsetNumber offsetNumber)

#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap)

BulkWriteState * smgr_bulk_start_rel(Relation rel, ForkNumber forknum)

void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)

BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate)

void smgr_bulk_finish(BulkWriteState *bulkstate)

TransactionId MultiXactId

void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)

HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)

void * hash_seq_search(HASH_SEQ_STATUS *status)

void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)

int errcode_for_file_access(void)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

int FileSync(File file, uint32 wait_event_info)

int CloseTransientFile(int fd)

void FileClose(File file)

void fsync_fname(const char *fname, bool isdir)

int data_sync_elevel(int elevel)

File PathNameOpenFile(const char *fileName, int fileFlags)

DIR * AllocateDir(const char *dirname)

struct dirent * ReadDir(DIR *dir, const char *dirname)

int OpenTransientFile(const char *fileName, int fileFlags)

static ssize_t FileWrite(File file, const void *buffer, size_t amount, pgoff_t offset, uint32 wait_event_info)

#define palloc0_object(type)

PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)

Assert(PointerIsAligned(start, uint64))

bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId relfrozenxid, TransactionId relminmxid, TransactionId FreezeLimit, TransactionId MultiXactCutoff)

#define HEAP_INSERT_SKIP_FSM

#define HEAP_INSERT_NO_LOGICAL

bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)

#define XLOG_HEAP2_REWRITE

HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, int options)

#define TOAST_TUPLE_THRESHOLD

HeapTuple heap_copytuple(HeapTuple tuple)

void heap_freetuple(HeapTuple htup)

HeapTupleHeaderData * HeapTupleHeader

static bool HEAP_XMAX_IS_LOCKED_ONLY(uint16 infomask)

static bool HeapTupleHasExternal(const HeapTupleData *tuple)

static TransactionId HeapTupleHeaderGetXmin(const HeapTupleHeaderData *tup)

static bool HeapTupleHeaderIndicatesMovedPartitions(const HeapTupleHeaderData *tup)

#define HEAP_XMAX_INVALID

static TransactionId HeapTupleHeaderGetUpdateXid(const HeapTupleHeaderData *tup)

#define dclist_container(type, membername, ptr)

static void dclist_push_tail(dclist_head *head, dlist_node *node)

static uint32 dclist_count(const dclist_head *head)

static void dclist_delete_from(dclist_head *head, dlist_node *node)

static void dclist_init(dclist_head *head)

#define dclist_foreach_modify(iter, lhead)

if(TABLE==NULL||TABLE_index==NULL)

bool ItemPointerEquals(const ItemPointerData *pointer1, const ItemPointerData *pointer2)

static void ItemPointerSet(ItemPointerData *pointer, BlockNumber blockNumber, OffsetNumber offNum)

static void ItemPointerSetInvalid(ItemPointerData *pointer)

static bool ItemPointerIsValid(const ItemPointerData *pointer)

void * MemoryContextAlloc(MemoryContext context, Size size)

void pfree(void *pointer)

MemoryContext CurrentMemoryContext

void MemoryContextDelete(MemoryContext context)

#define AllocSetContextCreate

#define ALLOCSET_DEFAULT_SIZES

#define InvalidOffsetNumber

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

static int fd(const char *x, int i)

void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin)

#define RelationGetRelid(relation)

#define RelationGetTargetPageFreeSpace(relation, defaultff)

#define RelationIsAccessibleInLogicalDecoding(relation)

#define HEAP_DEFAULT_FILLFACTOR

#define PG_LOGICAL_MAPPINGS_DIR

struct RewriteMappingDataEntry RewriteMappingDataEntry

static void raw_heap_insert(RewriteState state, HeapTuple tup)

void end_heap_rewrite(RewriteState state)

bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)

UnresolvedTupData * UnresolvedTup

RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin, TransactionId freeze_xid, MultiXactId cutoff_multi)

static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple)

static void logical_heap_rewrite_flush_mappings(RewriteState state)

void heap_xlog_logical_rewrite(XLogReaderState *r)

static void logical_begin_heap_rewrite(RewriteState state)

void CheckPointLogicalRewriteHeap(void)

struct RewriteMappingFile RewriteMappingFile

static void logical_end_heap_rewrite(RewriteState state)

OldToNewMappingData * OldToNewMapping

struct RewriteStateData RewriteStateData

void rewrite_heap_tuple(RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple)

static void logical_rewrite_log_mapping(RewriteState state, TransactionId xid, LogicalRewriteMappingData *map)

#define LOGICAL_REWRITE_FORMAT

struct LogicalRewriteMappingData LogicalRewriteMappingData

XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)

RelFileLocator old_locator

RelFileLocator new_locator

LogicalRewriteMappingData map

TransactionId rs_freeze_xid

TransactionId rs_oldest_xmin

HTAB * rs_logical_mappings

HTAB * rs_unresolved_tups

uint32 rs_num_rewrite_mappings

TransactionId rs_logical_xmin

BulkWriteState * rs_bulkstate

BulkWriteBuffer rs_buffer

HTAB * rs_old_new_tid_map

MultiXactId rs_cutoff_multi

#define InvalidTransactionId

#define TransactionIdEquals(id1, id2)

#define TransactionIdIsNormal(xid)

static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)

static void pgstat_report_wait_start(uint32 wait_event_info)

static void pgstat_report_wait_end(void)

TransactionId GetCurrentTransactionId(void)

XLogRecPtr GetRedoRecPtr(void)

XLogRecPtr GetXLogInsertRecPtr(void)

#define XLogRecPtrIsValid(r)

#define LSN_FORMAT_ARGS(lsn)

XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)

void XLogRegisterData(const void *data, uint32 len)

void XLogBeginInsert(void)

#define XLogRecGetData(decoder)

#define XLogRecGetXid(decoder)