PostgreSQL Source Code: src/backend/replication/logical/reorderbuffer.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

88

90#include <sys/stat.h>

91

115

116

118{

122

123

125{

129

131{

137

138

140{

141 File vfd;

142 off_t curOffset;

143

145

146

148{

155

157{

163

164

166{

169

171 Size size;

174

176

177

179{

182

184

185#define IsSpecInsert(action) \

186( \

187 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \

189#define IsSpecConfirmOrAbort(action) \

190( \

191 (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \

192 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \

194#define IsInsertOrUpdate(action) \

195( \

196 (((action) == REORDER_BUFFER_CHANGE_INSERT) || \

197 ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \

198 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \

200

201

202

203

204

205

206

207

208

209

210

211

212

215

216

218

219

220

221

222

230

232

233

234

235

236

237

238

239

240

247

248

249

250

251

252

263 bool txn_prepared);

270

274

275

276

277

278

279

284

285

286

287

288

295

296

297

298

299

300

305 bool addition, Size sz);

306

307

308

309

310

313{

317

319

320

322 "ReorderBuffer",

324

325 buffer =

327

328 memset(&hash_ctl, 0, sizeof(hash_ctl));

329

330 buffer->context = new_ctx;

331

333 "Change",

336

338 "TXN",

341

342

343

344

345

346

347

348

349

350

351

353 "Tuples",

357

361

362 buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,

364

367

368 buffer->outbuf = NULL;

370 buffer->size = 0;

371

372

374

383

385

389

390

391

392

393

394

396

397 return buffer;

398}

399

400

401

402

403void

405{

407

408

409

410

411

413

414

416}

417

418

419

420

423{

425

428

430

434

435

438

439 return txn;

440}

441

442

443

444

445static void

447{

448

450 {

453 }

454

455

456

457 if (txn->gid != NULL)

458 {

460 txn->gid = NULL;

461 }

462

464 {

467 }

468

470 {

473 }

474

475

477

478

480

482}

483

484

485

486

489{

491

494

496 return change;

497}

498

499

500

501

502void

504 bool upd_mem)

505{

506

507 if (upd_mem)

510

511

512 switch (change->action)

513 {

519 {

522 }

523

525 {

528 }

529 break;

537 break;

542 break;

545 {

548 }

549 break;

550

553 {

556 }

557 break;

562 break;

563 }

564

566}

567

568

569

570

571

574{

576 Size alloc_len;

577

579

583

584 return tuple;

585}

586

587

588

589

590void

592{

594}

595

596

597

598

599

600

601

602

603

604

607{

608 Oid *relids;

609 Size alloc_len;

610

611 alloc_len = sizeof(Oid) * nrelids;

612

614

615 return relids;

616}

617

618

619

620

621void

623{

625}

626

627

628

629

630

631

632

635 bool *is_new, XLogRecPtr lsn, bool create_as_top)

636{

639 bool found;

640

642

643

644

645

648 {

650

651 if (txn != NULL)

652 {

653

654 if (is_new)

655 *is_new = false;

656 return txn;

657 }

658

659

660

661

662

663 if (!create)

664 return NULL;

665

666 }

667

668

669

670

671

672

673

676 &xid,

678 &found);

679 if (found)

680 txn = ent->txn;

681 else if (create)

682 {

683

686

689 txn = ent->txn;

692

693 if (create_as_top)

694 {

697 }

698 }

699 else

700 txn = NULL;

701

702

705

706 if (is_new)

707 *is_new = !found;

708

709 Assert(!create || txn != NULL);

710 return txn;

711}

712

713

714

715

716

717

718

719

720

721static void

724 bool toast_insert)

725{

727

728

729

730

731

733 return;

734

735

737

738

739

740

741

742

743

744

745

746

747

748

749

750

751

752 if (toast_insert)

757 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;

758

759

760

761

762

763

768 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;

769

770

771

772

773

774

775

776

777

778

784}

785

786

787

788

789

790void

793{

795

797

798

799

800

801

802

804 {

805

806

807

808

810 return;

811 }

812

813

814

815

816

817

824 {

826

828 }

829

830 change->lsn = lsn;

831 change->txn = txn;

832

837

838

841

842

844

845

847}

848

849

850

851

852

853void

856 bool transactional, const char *prefix,

857 Size message_size, const char *message)

858{

859 if (transactional)

860 {

863

865

866

867

868

869

870

872

874

880 memcpy(change->data.msg.message, message, message_size);

881

883

885 }

886 else

887 {

889 volatile Snapshot snapshot_now = snap;

890

891

892 Assert(snapshot_now);

893

896

897

900 {

901 rb->message(rb, txn, lsn, false, prefix, message_size, message);

902

904 }

906 {

909 }

911 }

912}

913

914

915

916

917

918

919

920

921

922static void

924{

925#ifdef USE_ASSERT_CHECKING

930

931

932

933

934

935

936

937

938

939

940

942 return;

943

945 {

947 iter.cur);

948

949

951

952

955

956

958 Assert(prev_first_lsn < cur_txn->first_lsn);

959

960

962

963 prev_first_lsn = cur_txn->first_lsn;

964 }

965

967 {

969 base_snapshot_node,

970 iter.cur);

971

972

975

976

978 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);

979

980

982

984 }

985#endif

986}

987

988

989

990

991

992

993static void

995{

996#ifdef USE_ASSERT_CHECKING

999

1001 {

1003

1005

1009

1012

1013 Assert(prev_lsn <= cur_change->lsn);

1014

1015 prev_lsn = cur_change->lsn;

1016 }

1017#endif

1018}

1019

1020

1021

1022

1023

1026{

1028

1030

1032 return NULL;

1033

1035

1038 return txn;

1039}

1040

1041

1042

1043

1044

1045

1046

1047

1048

1049

1050

1051

1054{

1056

1058

1061

1065}

1066

1067void

1069{

1071}

1072

1073

1074

1075

1076

1077

1078

1079void

1082{

1085 bool new_top;

1086 bool new_sub;

1087

1090

1091 if (!new_sub)

1092 {

1094 {

1095

1096 return;

1097 }

1098 else

1099 {

1100

1101

1102

1103

1104

1106 }

1107 }

1108

1112

1113

1114 subtxn->toptxn = txn;

1115

1116

1119

1120

1122

1123

1125}

1126

1127

1128

1129

1130

1131

1132

1133

1134

1135

1136

1137

1138

1139

1140

1141

1142

1143

1144

1145static void

1148{

1150

1152 {

1155 {

1156

1157

1158

1159

1161 {

1164 }

1165

1166

1167

1168

1169

1170

1175

1176

1177

1178

1179

1183 }

1184 else

1185 {

1186

1191 }

1192 }

1193}

1194

1195

1196

1197

1198

1199void

1203{

1205

1208

1209

1210

1211

1212 if (!subtxn)

1213 return;

1214

1216 subtxn->end_lsn = end_lsn;

1217

1218

1219

1220

1221

1223}

1224

1225

1226

1227

1228

1229

1230

1231

1232

1233

1234

1235

1236

1237

1238

1239

1240

1241static int

1243{

1247

1248 if (pos_a < pos_b)

1249 return 1;

1250 else if (pos_a == pos_b)

1251 return 0;

1252 return -1;

1253}

1254

1255

1256

1257

1258

1259

1260

1261

1262

1263

1264static void

1267{

1268 Size nr_txns = 0;

1272

1273 *iter_state = NULL;

1274

1275

1277

1278

1279

1280

1281

1282

1284 nr_txns++;

1285

1287 {

1289

1291

1292

1294

1296 nr_txns++;

1297 }

1298

1299

1304

1305 state->nr_txns = nr_txns;

1307

1308 for (off = 0; off < state->nr_txns; off++)

1309 {

1310 state->entries[off].file.vfd = -1;

1311 state->entries[off].segno = 0;

1312 }

1313

1314

1318

1319

1320 *iter_state = state;

1321

1322

1323

1324

1325

1326

1327 off = 0;

1328

1329

1331 {

1333

1335 {

1336

1339 &state->entries[off].segno);

1340 }

1341

1344

1345 state->entries[off].lsn = cur_change->lsn;

1346 state->entries[off].change = cur_change;

1347 state->entries[off].txn = txn;

1348

1350 }

1351

1352

1354 {

1356

1358

1360 {

1362

1364 {

1365

1368 &state->entries[off].file,

1369 &state->entries[off].segno);

1370 }

1373

1374 state->entries[off].lsn = cur_change->lsn;

1375 state->entries[off].change = cur_change;

1376 state->entries[off].txn = cur_txn;

1377

1379 }

1380 }

1381

1382

1384}

1385

1386

1387

1388

1389

1390

1391

1394{

1398

1399

1400 if (state->heap->bh_size == 0)

1401 return NULL;

1402

1404 entry = &state->entries[off];

1405

1406

1408 {

1413 }

1414

1415 change = entry->change;

1416

1417

1418

1419

1420

1421

1422

1424 {

1428

1429

1430 state->entries[off].lsn = next_change->lsn;

1431 state->entries[off].change = next_change;

1432

1434 return change;

1435 }

1436

1437

1439 {

1440

1441

1442

1443

1446

1447

1448

1449

1450

1451

1454 &state->entries[off].segno))

1455 {

1456

1460

1461 elog(DEBUG2, "restored %u/%u changes from disk",

1464

1466

1467 state->entries[off].lsn = next_change->lsn;

1468 state->entries[off].change = next_change;

1470

1471 return change;

1472 }

1473 }

1474

1475

1477

1478 return change;

1479}

1480

1481

1482

1483

1484static void

1487{

1489

1490 for (off = 0; off < state->nr_txns; off++)

1491 {

1492 if (state->entries[off].file.vfd != -1)

1494 }

1495

1496

1498 {

1500

1505 }

1506

1509}

1510

1511

1512

1513

1514

1515static void

1517{

1518 bool found;

1520 Size mem_freed = 0;

1521

1522

1524 {

1526

1528

1529

1530

1531

1532

1533

1536

1538 }

1539

1540

1542 {

1544

1546

1547

1549

1550

1551

1552

1553

1554

1555

1557

1559 }

1560

1561

1563

1564

1565

1566

1567

1569 {

1571

1573

1574

1577

1579 }

1580

1581

1582

1583

1585 {

1588 }

1589

1590

1591

1592

1594 {

1597 }

1598

1599

1600

1601

1602

1603

1604

1605

1606

1607

1611

1612

1615

1616

1619

1620

1622}

1623

1624

1625

1626

1627

1628

1629

1630

1631

1632

1633

1634

1635

1636static void

1638{

1640 Size mem_freed = 0;

1641

1642

1644 {

1646

1648

1649

1650

1651

1652

1653

1656

1659 }

1660

1661

1663 {

1665

1667

1668

1670

1671

1673

1674

1675

1676

1677

1678

1679

1681

1683 }

1684

1685

1687

1688 if (txn_prepared)

1689 {

1690

1691

1692

1693

1694

1696 {

1698

1700

1701

1704

1705

1707

1709 }

1710 }

1711

1712

1713

1714

1715

1716

1718 {

1721 }

1722

1723

1725 {

1727 txn->txn_flags &= ~RBTXN_IS_SERIALIZED;

1728

1729

1730

1731

1732

1733

1735 }

1736

1737

1740}

1741

1742

1743

1744

1745

1746

1747

1748

1749

1750

1751

1752

1753

1754static bool

1756{

1757

1759 return false;

1760

1761

1762

1763

1764

1766 return false;

1768 {

1769

1771

1772 return true;

1773 }

1774

1775

1776

1778 return false;

1779

1781 {

1782

1783

1784

1785

1788 return false;

1789 }

1790

1791

1792

1793

1794

1795

1798

1799

1801

1802

1803

1804

1805

1808

1809 return true;

1810}

1811

1812

1813

1814

1815

1816static void

1818{

1821

1823 return;

1824

1828

1829

1830

1831

1832

1836

1838 {

1841 bool found;

1843

1845

1847

1848

1850

1852

1854 &key.tid);

1855

1858 if (!found)

1859 {

1863 }

1864 else

1865 {

1866

1867

1868

1869

1871

1872

1873

1874

1875

1880 }

1881 }

1882}

1883

1884

1885

1886

1887

1888

1892{

1895 int i = 0;

1897

1901

1903 memcpy(snap, orig_snap, sizeof(SnapshotData));

1904

1905 snap->copied = true;

1906 snap->active_count = 1;

1909

1911

1912

1913

1914

1915

1916

1919

1920

1921

1922

1923

1924

1926

1928 {

1930

1934 }

1935

1936

1938

1939

1941

1942 return snap;

1943}

1944

1945

1946

1947

1948static void

1950{

1953 else

1955}

1956

1957

1958

1959

1960

1961

1962

1963static void

1965{

1966

1968

1970

1972 {

1973

1974

1975

1976

1980

1981

1982

1983

1984

1985

1987

1989 }

1990 else

1991 {

1994 }

1995}

1996

1997

1998

1999

2000

2001

2002

2003

2004

2005

2006

2007

2008

2009

2010

2011

2012

2013

2014

2015

2016

2017

2018

2019

2020

2021

2022

2023

2024

2025

2026

2027

2028

2029static inline void

2031{

2032

2033

2034

2035

2037 return;

2038

2039

2040

2041

2042

2045 else

2047}

2048

2049

2050

2051

2052static inline void

2055 bool streaming)

2056{

2057 if (streaming)

2059 else

2060 rb->apply_change(rb, txn, relation, change);

2061}

2062

2063

2064

2065

2066static inline void

2068 int nrelations, Relation *relations,

2070{

2071 if (streaming)

2072 rb->stream_truncate(rb, txn, nrelations, relations, change);

2073 else

2074 rb->apply_truncate(rb, txn, nrelations, relations, change);

2075}

2076

2077

2078

2079

2080static inline void

2083{

2084 if (streaming)

2089 else

2090 rb->message(rb, txn, change->lsn, true,

2094}

2095

2096

2097

2098

2099

2100static inline void

2103{

2105

2106

2107 if (snapshot_now->copied)

2109 else

2111 txn, command_id);

2112}

2113

2114

2115

2116

2117

2118static void

2120{

2121

2122

2123

2124

2125

2126

2127

2128

2129

2130

2131

2132

2133

2136}

2137

2138

2139

2140

2141

2142

2143

2144

2145static void

2151{

2152

2154

2155

2157

2158

2159 if (specinsert != NULL)

2160 {

2162 specinsert = NULL;

2163 }

2164

2165

2166

2167

2168

2170 {

2173 }

2174

2175

2177}

2178

2179

2180

2181

2182

2183

2184

2185

2186

2187

2188

2189

2190

2191static void

2194 volatile Snapshot snapshot_now,

2196 bool streaming)

2197{

2198 bool using_subtxn;

2203 volatile bool stream_started = false;

2205

2206

2208

2209

2211

2212

2213

2214

2215

2216

2217

2218

2219

2220

2221

2223

2225 {

2227 int changes_count = 0;

2228

2229

2230 if (using_subtxn)

2232 else

2234

2235

2236

2237

2238

2239 if (!streaming)

2240 {

2243 else

2244 rb->begin(rb, txn);

2245 }

2246

2249 {

2251 Oid reloid;

2252

2254

2255

2256

2257

2258

2260 {

2261 if (streaming)

2262 {

2265 stream_started = true;

2266 }

2267 }

2268

2269

2270

2271

2272

2273

2275

2276 prev_lsn = change->lsn;

2277

2278

2279

2280

2281

2282

2284 {

2285 curtxn = change->txn;

2287 }

2288

2289 switch (change->action)

2290 {

2292

2293

2294

2295

2296

2297

2298 if (specinsert == NULL)

2299 elog(ERROR, "invalid ordering of speculative insertion changes");

2301 change = specinsert;

2303

2304

2308 Assert(snapshot_now);

2309

2312

2313

2314

2315

2316

2317

2318

2319

2320

2321

2322

2323

2324

2328 goto change_done;

2330 elog(ERROR, "could not map filenumber \"%s\" to relation OID",

2333

2335

2337 elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",

2338 reloid,

2341

2343 goto change_done;

2344

2345

2346

2347

2348

2350 goto change_done;

2351

2352

2353

2354

2355

2356

2357

2358 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)

2359 goto change_done;

2360

2361

2363 {

2366 streaming);

2367

2368

2369

2370

2371

2372

2375 }

2376

2378 {

2379

2380

2381

2382

2383

2384

2385

2386

2388

2391 change);

2392 }

2393

2394 change_done:

2395

2396

2397

2398

2399

2400 if (specinsert != NULL)

2401 {

2403 specinsert = NULL;

2404 }

2405

2407 {

2409 relation = NULL;

2410 }

2411 break;

2412

2414

2415

2416

2417

2418

2419

2420

2421

2422

2423

2424

2425

2426

2427

2428

2429 if (specinsert != NULL)

2430 {

2432 specinsert = NULL;

2433 }

2434

2435

2437 specinsert = change;

2438 break;

2439

2441

2442

2443

2444

2445

2446

2447

2448

2449

2450 if (specinsert != NULL)

2451 {

2452

2453

2454

2455

2456

2459

2460

2462 specinsert = NULL;

2463 }

2464 break;

2465

2467 {

2468 int i;

2470 int nrelations = 0;

2472

2474 for (i = 0; i < nrelids; i++)

2475 {

2478

2480

2482 elog(ERROR, "could not open relation with OID %u", relid);

2483

2485 continue;

2486

2487 relations[nrelations++] = rel;

2488 }

2489

2490

2492 relations, change,

2493 streaming);

2494

2495 for (i = 0; i < nrelations; i++)

2497

2498 break;

2499 }

2500

2503 break;

2504

2506

2509 break;

2510

2512

2514

2515 if (snapshot_now->copied)

2516 {

2518 snapshot_now =

2520 txn, command_id);

2521 }

2522

2523

2524

2525

2526

2527

2529 {

2530 snapshot_now =

2532 txn, command_id);

2533 }

2534 else

2535 {

2537 }

2538

2539

2541 break;

2542

2545

2546 if (command_id < change->data.command_id)

2547 {

2549

2550 if (!snapshot_now->copied)

2551 {

2552

2554 txn, command_id);

2555 }

2556

2557 snapshot_now->curcid = command_id;

2558

2561 }

2562

2563 break;

2564

2566 elog(ERROR, "tuplecid value in changequeue");

2567 break;

2568 }

2569

2570

2571

2572

2573

2574

2575

2576

2577

2578

2579

2580#define CHANGES_THRESHOLD 100

2581

2583 {

2585 changes_count = 0;

2586 }

2587 }

2588

2589

2590 Assert(!specinsert);

2591

2592

2594 iterstate = NULL;

2595

2596

2597

2598

2599

2600

2601

2602

2603

2604

2607

2609

2610

2611

2612

2613

2614 if (streaming)

2615 {

2616 if (stream_started)

2617 {

2619 stream_started = false;

2620 }

2621 }

2622 else

2623 {

2624

2625

2626

2627

2629 {

2631 rb->prepare(rb, txn, commit_lsn);

2633 }

2634 else

2635 rb->commit(rb, txn, commit_lsn);

2636 }

2637

2638

2640 elog(ERROR, "output plugin used XID %u",

2642

2643

2644

2645

2646

2647 if (streaming)

2649 else if (snapshot_now->copied)

2651

2652

2654

2655

2656

2657

2658

2659

2660

2662

2663

2665

2666 if (using_subtxn)

2668

2669

2670

2671

2672

2673

2674

2675

2676

2677

2678

2679

2680

2681

2682

2684 {

2685 if (streaming)

2687

2689

2691 }

2692 else

2694 }

2696 {

2699

2700

2701 if (iterstate)

2703

2705

2706

2707

2708

2709

2711

2712

2715

2716 if (using_subtxn)

2718

2719

2720

2721

2722

2723

2724

2725

2726

2727

2728

2729

2730

2731 if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&

2733 {

2734

2736

2737

2740 errdata = NULL;

2741

2742

2745

2746

2747 if (stream_started)

2749

2750

2752 command_id, prev_lsn,

2753 specinsert);

2754 }

2755 else

2756 {

2760 }

2761 }

2763}

2764

2765

2766

2767

2768

2769

2770

2771

2772

2773

2774

2775static void

2781{

2784

2790

2791

2792

2793

2794

2795

2796

2797

2798

2800 {

2802 return;

2803 }

2804

2805

2806

2807

2808

2809

2810

2812 {

2814

2815

2816

2817

2818

2821 return;

2822 }

2823

2825

2826

2828 command_id, false);

2829}

2830

2831

2832

2833

2834

2835

2836void

2841{

2843

2845 false);

2846

2847

2848 if (txn == NULL)

2849 return;

2850

2852 origin_id, origin_lsn);

2853}

2854

2855

2856

2857

2858

2859bool

2864{

2866

2868

2869

2870 if (txn == NULL)

2871 return false;

2872

2873

2874

2875

2876

2882

2883

2886

2887 return true;

2888}

2889

2890

2891void

2893{

2895

2897

2898

2899 if (txn == NULL)

2900 return;

2901

2902

2905}

2906

2907

2908

2909

2910

2911

2912void

2914 char *gid)

2915{

2917

2919 false);

2920

2921

2922 if (txn == NULL)

2923 return;

2924

2925

2926

2927

2928

2929

2932

2934

2937

2938

2939

2940

2941

2942

2944 {

2947 }

2948}

2949

2950

2951

2952

2953void

2958 XLogRecPtr origin_lsn, char *gid, bool is_commit)

2959{

2963

2965

2966

2967 if (txn == NULL)

2968 return;

2969

2970

2971

2972

2973

2974 prepare_end_lsn = txn->end_lsn;

2976

2977

2979

2980

2981

2982

2983

2984

2985

2986

2987

2988 if ((txn->final_lsn < two_phase_at) && is_commit)

2989 {

2990

2991

2992

2993

2994

2998

2999

3000

3001

3002

3003

3004

3005

3008 }

3009

3015

3016 if (is_commit)

3018 else

3020

3021

3025}

3026

3027

3028

3029

3030

3031

3032

3033

3034

3035

3036

3037

3038

3039void

3042{

3044

3046 false);

3047

3048

3049 if (txn == NULL)

3050 return;

3051

3053

3054

3056 {

3058

3059

3060

3061

3062

3063

3064

3068 }

3069

3070

3072

3073

3075}

3076

3077

3078

3079

3080

3081

3082

3083

3084void

3086{

3088

3089

3090

3091

3092

3093

3094

3095

3097 {

3099

3101

3103 {

3104 elog(DEBUG2, "aborting old transaction %u", txn->xid);

3105

3106

3109

3110

3112 }

3113 else

3114 return;

3115 }

3116}

3117

3118

3119

3120

3121

3122

3123

3124

3125

3126

3127

3128

3129

3130

3131void

3133{

3135

3137 false);

3138

3139

3140 if (txn == NULL)

3141 return;

3142

3143

3145

3146

3148

3149

3150

3151

3152

3153

3157 else

3159

3160

3162}

3163

3164

3165

3166

3167

3168

3169

3170

3171

3172void

3174{

3176

3178 false);

3179

3180

3181 if (txn == NULL)

3182 return;

3183

3184

3185

3186

3187

3188

3192 else

3194}

3195

3196

3197

3198

3199

3200

3201

3202

3203void

3206{

3208 int i;

3209

3210 if (use_subtxn)

3212

3213

3214

3215

3216

3217

3218

3219 if (use_subtxn)

3221

3222 for (i = 0; i < ninvalidations; i++)

3224

3225 if (use_subtxn)

3227}

3228

3229

3230

3231

3232

3233

3234

3235

3236

3237

3238

3239void

3241{

3242

3245}

3246

3247

3248

3249

3250

3251

3252void

3255{

3257

3260

3262}

3263

3264

3265

3266

3267

3268

3269

3270void

3273{

3275 bool is_new;

3276

3277 Assert(snap != NULL);

3278

3279

3280

3281

3282

3288

3292

3294}

3295

3296

3297

3298

3299

3300

3301void

3304{

3306

3309

3311}

3312

3313

3314

3315

3316

3317

3318

3319

3320

3321

3322

3323

3324

3325

3326

3327

3328

3329static void

3333 bool addition, Size sz)

3334{

3336

3337 Assert(txn || change);

3338

3339

3340

3341

3342

3343

3345 return;

3346

3347 if (sz == 0)

3348 return;

3349

3350 if (txn == NULL)

3351 txn = change->txn;

3352 Assert(txn != NULL);

3353

3354

3355

3356

3357

3359

3360 if (addition)

3361 {

3363

3364 txn->size += sz;

3365 rb->size += sz;

3366

3367

3369

3370

3371 if (oldsize != 0)

3374 }

3375 else

3376 {

3378 txn->size -= sz;

3379 rb->size -= sz;

3380

3381

3383

3384

3386 if (txn->size != 0)

3388 }

3389

3391}

3392

3393

3394

3395

3396

3397

3398

3399

3400void

3405{

3408

3410

3416 change->lsn = lsn;

3417 change->txn = txn;

3419

3422}

3423

3424

3425

3426

3427

3428

3429

3430

3431

3432

3433

3434

3435

3436

3437void

3441{

3445

3447

3449

3450

3451

3452

3453

3454

3456

3458

3459

3461 {

3467 }

3468 else

3469 {

3473

3477 }

3478

3486

3488

3490}

3491

3492

3493

3494

3495

3496static void

3498{

3499 int i;

3500

3501 for (i = 0; i < nmsgs; i++)

3503}

3504

3505

3506

3507

3508void

3511{

3513

3515

3517 {

3520 }

3521

3522

3523

3524

3525

3526

3527

3529 {

3531

3533 {

3536 }

3537 }

3538}

3539

3540

3541

3542

3543

3544

3545

3548{

3551 size_t xcnt = 0;

3552

3553

3555 return NULL;

3556

3557

3561 {

3563 catchange_node,

3564 iter.cur);

3565

3567

3568 xids[xcnt++] = txn->xid;

3569 }

3570

3572

3574 return xids;

3575}

3576

3577

3578

3579

3580

3581bool

3583{

3585

3587 false);

3588 if (txn == NULL)

3589 return false;

3590

3592}

3593

3594

3595

3596

3597

3598bool

3600{

3602

3605

3606

3607 if (txn == NULL)

3608 return false;

3609

3610

3614

3616}

3617

3618

3619

3620

3621

3622

3623

3624

3625

3626

3627

3628static void

3630{

3632 {

3635 }

3637 {

3640 }

3641}

3642

3643

3644

3645static int

3647{

3650

3652 return -1;

3654 return 1;

3655 return 0;

3656}

3657

3658

3659

3660

3663{

3665

3666

3669

3673

3674 return largest;

3675}

3676

3677

3678

3679

3680

3681

3682

3683

3684

3685

3686

3687

3688

3689

3690

3691

3692

3693

3694

3695

3696

3697

3698

3699

3700

3701

3704{

3706 Size largest_size = 0;

3708

3709

3711 {

3713

3715

3716

3718

3720

3721

3725 continue;

3726

3727

3728 if ((largest == NULL || txn->total_size > largest_size) &&

3730 {

3731 largest = txn;

3733 }

3734 }

3735

3736 return largest;

3737}

3738

3739

3740

3741

3742

3743

3744

3745

3746

3747

3748

3749

3750

3751

3752static void

3754{

3756

3757

3758

3759

3760

3763 return;

3764

3765

3766

3767

3768

3769

3770

3771

3772

3773

3774

3777 rb->size > 0))

3778 {

3779

3780

3781

3782

3785 {

3786

3790

3791

3793 continue;

3794

3796 }

3797 else

3798 {

3799

3800

3801

3802

3804

3805

3809

3810

3812 continue;

3813

3815 }

3816

3817

3818

3819

3820

3823 }

3824

3825

3827}

3828

3829

3830

3831

3832static void

3834{

3837 int fd = -1;

3839 Size spilled = 0;

3841

3842 elog(DEBUG2, "spill %u changes in XID %u to disk",

3844

3845

3847 {

3849

3852 }

3853

3854

3856 {

3858

3860

3861

3862

3863

3864

3865 if (fd == -1 ||

3867 {

3869

3870 if (fd != -1)

3872

3874

3875

3876

3877

3878

3880 curOpenSegNo);

3881

3882

3884 O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);

3885

3886 if (fd < 0)

3889 errmsg("could not open file \"%s\": %m", path)));

3890 }

3891

3895

3896 spilled++;

3897 }

3898

3899

3901

3902

3903 if (spilled)

3904 {

3907

3908

3910

3911

3913 }

3914

3919

3920 if (fd != -1)

3922}

3923

3924

3925

3926

3927static void

3930{

3933

3935

3938

3939 switch (change->action)

3940 {

3941

3946 {

3949 newtup;

3950 Size oldlen = 0;

3951 Size newlen = 0;

3952

3955

3956 if (oldtup)

3957 {

3959 oldlen = oldtup->t_len;

3960 sz += oldlen;

3961 }

3962

3963 if (newtup)

3964 {

3966 newlen = newtup->t_len;

3967 sz += newlen;

3968 }

3969

3970

3972

3974

3976

3977 if (oldlen)

3978 {

3981

3982 memcpy(data, oldtup->t_data, oldlen);

3983 data += oldlen;

3984 }

3985

3986 if (newlen)

3987 {

3990

3991 memcpy(data, newtup->t_data, newlen);

3992 data += newlen;

3993 }

3994 break;

3995 }

3997 {

4000

4002 sizeof(Size) + sizeof(Size);

4004

4006

4007

4009

4010

4011 memcpy(data, &prefix_size, sizeof(Size));

4014 prefix_size);

4015 data += prefix_size;

4016

4017

4023

4024 break;

4025 }

4027 {

4031

4032 sz += inval_size;

4033

4036

4037

4040 data += inval_size;

4041

4042 break;

4043 }

4045 {

4048

4050

4054

4055

4058

4060

4063

4064 if (snap->xcnt)

4065 {

4069 }

4070

4072 {

4076 }

4077 break;

4078 }

4080 {

4083

4084

4086 sz += size;

4087

4088

4090

4092

4094

4096 data += size;

4097

4098 break;

4099 }

4104

4105 break;

4106 }

4107

4108 ondisk->size = sz;

4109

4110 errno = 0;

4113 {

4114 int save_errno = errno;

4115

4117

4118

4119 errno = save_errno ? save_errno : ENOSPC;

4122 errmsg("could not write to data file for XID %u: %m",

4123 txn->xid)));

4124 }

4126

4127

4128

4129

4130

4131

4132

4133

4134

4137

4139}

4140

4141

4142static inline bool

4144{

4146

4148}

4149

4150

4151static inline bool

4153{

4156

4157

4159 return false;

4160

4161

4162

4163

4164

4165

4168 return true;

4169

4170 return false;

4171}

4172

4173

4174

4175

4176

4177static void

4179{

4182 Size stream_bytes;

4183 bool txn_is_streamed;

4184

4185

4187

4188

4189

4190

4191

4192

4193

4194

4195

4196

4197

4198

4199

4200

4201

4202

4203

4205 {

4207

4208

4210

4211

4213

4215 {

4217

4220 }

4221

4222

4223

4224

4225

4227 {

4229 return;

4230 }

4231

4234 txn, command_id);

4235 }

4236 else

4237 {

4238

4240

4241

4242

4243

4244

4245

4246

4247

4249

4250

4251

4252

4253

4254

4256 txn, command_id);

4257

4258

4262 }

4263

4264

4265

4266

4267

4268

4269

4272

4273

4275 command_id, true);

4276

4279

4280

4281 rb->streamTxns += (txn_is_streamed) ? 0 : 1;

4282

4283

4285

4289}

4290

4291

4292

4293

4296{

4298

4299 switch (change->action)

4300 {

4301

4306 {

4308 newtup;

4309 Size oldlen = 0;

4310 Size newlen = 0;

4311

4314

4315 if (oldtup)

4316 {

4318 oldlen = oldtup->t_len;

4319 sz += oldlen;

4320 }

4321

4322 if (newtup)

4323 {

4325 newlen = newtup->t_len;

4326 sz += newlen;

4327 }

4328

4329 break;

4330 }

4332 {

4334

4336 sizeof(Size) + sizeof(Size);

4337

4338 break;

4339 }

4341 {

4344 break;

4345 }

4347 {

4349

4351

4355

4356 break;

4357 }

4359 {

4361

4362 break;

4363 }

4368

4369 break;

4370 }

4371

4372 return sz;

4373}

4374

4375

4376

4377

4378

4382{

4383 Size restored = 0;

4387

4390

4391

4393 {

4396

4399 }

4402

4404

4406 {

4407 int readBytes;

4409

4411

4412 if (*fd == -1)

4413 {

4415

4416

4417 if (*segno == 0)

4419

4421

4422

4423

4424

4425

4427 *segno);

4428

4430

4431

4433

4434 if (*fd < 0 && errno == ENOENT)

4435 {

4436 *fd = -1;

4437 (*segno)++;

4438 continue;

4439 }

4440 else if (*fd < 0)

4443 errmsg("could not open file \"%s\": %m",

4444 path)));

4445 }

4446

4447

4448

4449

4450

4451

4455 file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);

4456

4457

4458 if (readBytes == 0)

4459 {

4461 *fd = -1;

4462 (*segno)++;

4463 continue;

4464 }

4465 else if (readBytes < 0)

4468 errmsg("could not read from reorderbuffer spill file: %m")));

4472 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",

4473 readBytes,

4475

4477

4479

4483

4488 WAIT_EVENT_REORDER_BUFFER_READ);

4489

4490 if (readBytes < 0)

4493 errmsg("could not read from reorderbuffer spill file: %m")));

4497 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",

4498 readBytes,

4500

4502

4503

4504

4505

4506

4508 restored++;

4509 }

4510

4511 return restored;

4512}

4513

4514

4515

4516

4517

4518

4519

4520

4521

4522static void

4525{

4528

4530

4532

4533

4535

4537

4538

4539 switch (change->action)

4540 {

4541

4547 {

4549

4552

4553

4557

4558

4561

4562

4564 data += tuplelen;

4565 }

4566

4568 {

4569

4571

4574

4577

4578

4582

4583

4586

4587

4589 data += tuplelen;

4590 }

4591

4592 break;

4594 {

4595 Size prefix_size;

4596

4597

4598 memcpy(&prefix_size, data, sizeof(Size));

4601 prefix_size);

4604 data += prefix_size;

4605

4606

4614

4615 break;

4616 }

4618 {

4621

4624

4625

4627

4628 break;

4629 }

4631 {

4635

4637

4641

4643

4645

4646 memcpy(newsnap, data, size);

4648 (((char *) newsnap) + sizeof(SnapshotData));

4650 newsnap->copied = true;

4651 break;

4652 }

4653

4655 {

4656 Oid *relids;

4657

4661

4662 break;

4663 }

4668 break;

4669 }

4670

4673

4674

4675

4676

4677

4678

4679

4680

4681

4684}

4685

4686

4687

4688

4689static void

4691{

4695

4698

4701

4702

4703 for (cur = first; cur <= last; cur++)

4704 {

4706

4708 if (unlink(path) != 0 && errno != ENOENT)

4711 errmsg("could not remove file \"%s\": %m", path)));

4712 }

4713}

4714

4715

4716

4717

4718

4719static void

4721{

4722 DIR *spill_dir;

4723 struct dirent *spill_de;

4724 struct stat statbuf;

4726

4728

4729

4731 return;

4732

4735 {

4736

4737 if (strncmp(spill_de->d_name, "xid", 3) == 0)

4738 {

4739 snprintf(path, sizeof(path),

4742

4743 if (unlink(path) != 0)

4746 errmsg("could not remove file \"%s\" during removal of %s/%s/xid*: %m",

4748 }

4749 }

4751}

4752

4753

4754

4755

4756

4757

4758static void

4761{

4763

4765

4770}

4771

4772

4773

4774

4775

4776void

4778{

4779 DIR *logical_dir;

4780 struct dirent *logical_de;

4781

4784 {

4785 if (strcmp(logical_de->d_name, ".") == 0 ||

4786 strcmp(logical_de->d_name, "..") == 0)

4787 continue;

4788

4789

4791 continue;

4792

4793

4794

4795

4796

4798 }

4800}

4801

4802

4803

4804

4805

4806

4807

4808

4809

4810static void

4812{

4814

4816

4822}

4823

4824

4825

4826

4827

4828

4829

4830static void

4833{

4836 bool found;

4837 int32 chunksize;

4838 bool isnull;

4841 Oid chunk_id;

4842 int32 chunk_seq;

4843

4846

4848

4854

4857

4858 if (!found)

4859 {

4863 ent->size = 0;

4866

4867 if (chunk_seq != 0)

4868 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",

4869 chunk_seq, chunk_id);

4870 }

4871 else if (found && chunk_seq != ent->last_chunk_seq + 1)

4872 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",

4874

4877

4878

4882

4884 else

4885 elog(ERROR, "unexpected type of toast chunk");

4886

4887 ent->size += chunksize;

4891}

4892

4893

4894

4895

4896

4897

4898

4899

4900

4901

4902

4903

4904

4905

4906

4907

4908

4909

4910

4911

4912

4913static void

4916{

4918 int natt;

4920 bool *isnull;

4927 Size old_size;

4928

4929

4931 return;

4932

4933

4934

4935

4936

4937

4938

4939

4940

4941

4942

4944

4946

4947

4949

4951

4954 elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",

4956

4958

4959

4961 isnull = palloc0(sizeof(bool) * desc->natts);

4963

4965

4967

4968 for (natt = 0; natt < desc->natts; natt++)

4969 {

4973

4974

4977 struct varlena *new_datum = NULL;

4978 struct varlena *reconstructed;

4980 Size data_done = 0;

4981

4982

4983 if (attr->attnum < 0)

4984 continue;

4985

4986 if (attr->attisdropped)

4987 continue;

4988

4989

4990 if (attr->attlen != -1)

4991 continue;

4992

4993

4994 if (isnull[natt])

4995 continue;

4996

4997

4999

5000

5002 continue;

5003

5005

5006

5007

5008

5013 NULL);

5014 if (ent == NULL)

5015 continue;

5016

5017 new_datum =

5019

5020 free[natt] = true;

5021

5023

5025

5026

5028 {

5029 bool cisnull;

5033

5037

5041

5042 memcpy(VARDATA(reconstructed) + data_done,

5046 }

5048

5049

5052 else

5054

5055 memset(&redirect_pointer, 0, sizeof(redirect_pointer));

5056 redirect_pointer.pointer = reconstructed;

5057

5060 sizeof(redirect_pointer));

5061

5063 }

5064

5065

5066

5067

5068

5069

5073

5076

5077

5078

5079

5080

5083 for (natt = 0; natt < desc->natts; natt++)

5084 {

5085 if (free[natt])

5087 }

5091

5093

5094

5096

5099}

5100

5101

5102

5103

5104static void

5106{

5109

5111 return;

5112

5113

5116 {

5118

5121

5123 {

5126

5129 }

5130 }

5131

5134}

5135

5136

5137

5138

5139

5140

5141

5142

5143

5144

5145

5146

5147

5148

5149

5150

5151

5152

5153

5154

5155

5156

5157

5158

5159

5160

5161

5162

5163

5164

5165

5166

5167

5169{

5173

5174#ifdef NOT_USED

5175static void

5177{

5180

5183 {

5184 elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",

5192 );

5193 }

5194}

5195#endif

5196

5197

5198

5199

5200

5201

5202

5203static void

5205{

5207 int fd;

5208 int readBytes;

5210

5213 if (fd < 0)

5216 errmsg("could not open file \"%s\": %m", path)));

5217

5218 while (true)

5219 {

5223 bool found;

5224

5225

5227

5228

5232

5233 if (readBytes < 0)

5236 errmsg("could not read file \"%s\": %m",

5237 path)));

5238 else if (readBytes == 0)

5239 break;

5243 errmsg("could not read from file \"%s\": read %d instead of %d bytes",

5244 path, readBytes,

5246

5249 &key.tid);

5250

5251

5254

5255

5256 if (!ent)

5257 continue;

5258

5261 &key.tid);

5262

5265

5266 if (found)

5267 {

5268

5269

5270

5271

5272

5275 }

5276 else

5277 {

5278

5282 }

5283 }

5284

5288 errmsg("could not close file \"%s\": %m", path)));

5289}

5290

5291

5292

5293

5294

5295static bool

5297{

5298 return bsearch(&xid, xip, num,

5300}

5301

5302

5303

5304

5305static int

5307{

5310

5312}

5313

5314

5315

5316

5317

5318static void

5320{

5321 DIR *mapping_dir;

5322 struct dirent *mapping_de;

5326

5329 {

5330 Oid f_dboid;

5331 Oid f_relid;

5336 f_lo;

5338

5339 if (strcmp(mapping_de->d_name, ".") == 0 ||

5340 strcmp(mapping_de->d_name, "..") == 0)

5341 continue;

5342

5343

5344 if (strncmp(mapping_de->d_name, "map-", 4) != 0)

5345 continue;

5346

5348 &f_dboid, &f_relid, &f_hi, &f_lo,

5349 &f_mapped_xid, &f_create_xid) != 6)

5350 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);

5351

5352 f_lsn = ((uint64) f_hi) << 32 | f_lo;

5353

5354

5355 if (f_dboid != dboid)

5356 continue;

5357

5358

5359 if (f_relid != relid)

5360 continue;

5361

5362

5364 continue;

5365

5366

5368 continue;

5369

5370

5372 f->lsn = f_lsn;

5374 files = lappend(files, f);

5375 }

5377

5378

5380

5381 foreach(file, files)

5382 {

5384

5385 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,

5386 snapshot->subxip[0]);

5389 }

5390}

5391

5392

5393

5394

5395

5396bool

5401{

5406 bool updated_mapping = false;

5407

5408

5409

5410

5411

5412

5413

5414

5415

5417 return false;

5418

5419

5420 memset(&key, 0, sizeof(key));

5421

5423

5424

5425

5426

5427

5429

5430

5433

5435 &key.tid);

5436

5437restart:

5440

5441

5442

5443

5444

5445

5446

5447 if (ent == NULL && !updated_mapping)

5448 {

5450

5451 updated_mapping = true;

5452 goto restart;

5453 }

5454 else if (ent == NULL)

5455 return false;

5456

5457 if (cmin)

5458 *cmin = ent->cmin;

5459 if (cmax)

5460 *cmax = ent->cmax;

5461 return true;

5462}

5463

5464

5465

5466

5467

5468

5469

5473{

5475

5477 false);

5478

5479 if (txn == NULL)

5480 return 0;

5481

5483

5485}

void binaryheap_build(binaryheap *heap)

void binaryheap_replace_first(binaryheap *heap, bh_node_type d)

bh_node_type binaryheap_first(binaryheap *heap)

bh_node_type binaryheap_remove_first(binaryheap *heap)

void binaryheap_free(binaryheap *heap)

void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)

binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)

static void cleanup(void)

#define BufferIsLocal(buffer)

void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)

#define FLEXIBLE_ARRAY_MEMBER

bool IsToastRelation(Relation relation)

bool IsSharedRelation(Oid relationId)

#define INDIRECT_POINTER_SIZE

#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)

void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)

void hash_destroy(HTAB *hashp)

void * hash_seq_search(HASH_SEQ_STATUS *status)

HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)

void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)

void FreeErrorData(ErrorData *edata)

int errcode_for_file_access(void)

ErrorData * CopyErrorData(void)

void FlushErrorState(void)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

int CloseTransientFile(int fd)

struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)

void FileClose(File file)

File PathNameOpenFile(const char *fileName, int fileFlags)

DIR * AllocateDir(const char *dirname)

struct dirent * ReadDir(DIR *dir, const char *dirname)

int OpenTransientFile(const char *fileName, int fileFlags)

static ssize_t FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)

MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)

Assert(PointerIsAligned(start, uint64))

HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)

void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)

HeapTupleData * HeapTuple

struct HeapTupleData HeapTupleData

HeapTupleHeaderData * HeapTupleHeader

#define SizeofHeapTupleHeader

static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)

static dlist_node * dlist_pop_head_node(dlist_head *head)

#define dlist_foreach(iter, lhead)

static void dlist_init(dlist_head *head)

#define dclist_container(type, membername, ptr)

static bool dlist_has_next(const dlist_head *head, const dlist_node *node)

static void dclist_push_tail(dclist_head *head, dlist_node *node)

static void dlist_insert_before(dlist_node *before, dlist_node *node)

#define dlist_head_element(type, membername, lhead)

static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)

static void dlist_delete(dlist_node *node)

static uint32 dclist_count(const dclist_head *head)

#define dlist_foreach_modify(iter, lhead)

static bool dlist_is_empty(const dlist_head *head)

static void dlist_push_tail(dlist_head *head, dlist_node *node)

static void dclist_delete_from(dclist_head *head, dlist_node *node)

static void dclist_init(dclist_head *head)

#define dlist_container(type, membername, ptr)

#define dclist_foreach(iter, lhead)

static int pg_cmp_u64(uint64 a, uint64 b)

void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)

if(TABLE==NULL||TABLE_index==NULL)

static OffsetNumber ItemPointerGetOffsetNumber(const ItemPointerData *pointer)

static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)

static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)

List * lappend(List *list, void *datum)

void list_sort(List *list, list_sort_comparator cmp)

void UpdateDecodingStats(LogicalDecodingContext *ctx)

void * MemoryContextAlloc(MemoryContext context, Size size)

void * MemoryContextAllocZero(MemoryContext context, Size size)

char * pstrdup(const char *in)

void * repalloc(void *pointer, Size size)

void pfree(void *pointer)

void * palloc0(Size size)

MemoryContext CurrentMemoryContext

void MemoryContextDelete(MemoryContext context)

#define AllocSetContextCreate

#define ALLOCSET_DEFAULT_SIZES

#define SLAB_DEFAULT_BLOCK_SIZE

#define CHECK_FOR_INTERRUPTS()

void pairingheap_remove(pairingheap *heap, pairingheap_node *node)

void pairingheap_add(pairingheap *heap, pairingheap_node *node)

pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)

pairingheap_node * pairingheap_first(pairingheap *heap)

#define pairingheap_container(type, membername, ptr)

#define pairingheap_const_container(type, membername, ptr)

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

FormData_pg_attribute * Form_pg_attribute

#define qsort(a, b, c, d)

static Datum PointerGetDatum(const void *X)

static Oid DatumGetObjectId(Datum X)

static Pointer DatumGetPointer(Datum X)

static Datum Int32GetDatum(int32 X)

static int32 DatumGetInt32(Datum X)

static int fd(const char *x, int i)

bool TransactionIdIsInProgress(TransactionId xid)

#define RelationIsLogicallyLogged(relation)

#define RelationGetDescr(relation)

#define RelationGetRelationName(relation)

#define RelationIsValid(relation)

Relation RelationIdGetRelation(Oid relationId)

void RelationClose(Relation relation)

Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)

#define relpathperm(rlocator, forknum)

static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)

void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids)

void ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)

static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)

static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)

void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)

static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)

static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)

void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)

static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)

void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)

static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)

static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)

void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)

static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)

static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)

struct ReorderBufferDiskChange ReorderBufferDiskChange

bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)

void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)

TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)

static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)

static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)

bool ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)

static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

void ReorderBufferFreeTupleBuf(HeapTuple tuple)

void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)

static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid)

uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage **msgs)

void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)

void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)

TransactionId * ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)

static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)

#define IsSpecInsert(action)

static Size ReorderBufferChangeSize(ReorderBufferChange *change)

ReorderBuffer * ReorderBufferAllocate(void)

int logical_decoding_work_mem

static void AssertChangeLsnOrder(ReorderBufferTXN *txn)

static bool ReorderBufferCanStream(ReorderBuffer *rb)

static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)

static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)

void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)

static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)

struct ReorderBufferIterTXNState ReorderBufferIterTXNState

void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)

struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt

int debug_logical_replication_streaming

struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry

#define IsInsertOrUpdate(action)

static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)

void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)

bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)

static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)

static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)

struct RewriteMappingFile RewriteMappingFile

void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)

static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)

#define CHANGES_THRESHOLD

static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)

static bool ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)

static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)

HeapTuple ReorderBufferAllocTupleBuf(ReorderBuffer *rb, Size tuple_len)

void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)

static void AssertTXNLsnOrder(ReorderBuffer *rb)

static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)

static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)

static void ReorderBufferCleanupSerializedTXNs(const char *slotname)

ReorderBufferChange * ReorderBufferAllocChange(ReorderBuffer *rb)

void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)

static void SetupCheckXidLive(TransactionId xid)

static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)

static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)

static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)

static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)

static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)

static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)

static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)

static ReorderBufferTXN * ReorderBufferAllocTXN(ReorderBuffer *rb)

bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

static void ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)

void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)

static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)

struct TXNEntryFile TXNEntryFile

static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)

static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)

Oid * ReorderBufferAllocRelids(ReorderBuffer *rb, int nrelids)

static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)

static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)

struct ReorderBufferToastEnt ReorderBufferToastEnt

static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)

void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)

static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)

void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)

void ReorderBufferFree(ReorderBuffer *rb)

static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)

#define IsSpecConfirmOrAbort(action)

struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt

struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey

static const Size max_changes_in_memory

void StartupReorderBuffer(void)

void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)

static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)

static void ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn)

ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)

static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

#define rbtxn_is_committed(txn)

#define rbtxn_has_streamable_change(txn)

#define rbtxn_has_catalog_changes(txn)

@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE

@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED

#define RBTXN_PREPARE_STATUS_MASK

#define rbtxn_is_serialized_clear(txn)

#define RBTXN_IS_STREAMED

#define rbtxn_is_prepared(txn)

#define RBTXN_HAS_PARTIAL_CHANGE

#define rbtxn_is_streamed(txn)

struct ReorderBufferChange ReorderBufferChange

#define RBTXN_SENT_PREPARE

#define rbtxn_is_toptxn(txn)

#define rbtxn_get_toptxn(txn)

#define rbtxn_is_known_subxact(txn)

#define rbtxn_is_subtxn(txn)

#define RBTXN_HAS_CATALOG_CHANGES

#define RBTXN_IS_COMMITTED

#define PG_LOGICAL_MAPPINGS_DIR

#define RBTXN_IS_SERIALIZED_CLEAR

#define rbtxn_sent_prepare(txn)

#define RBTXN_IS_PREPARED

#define RBTXN_SKIPPED_PREPARE

#define RBTXN_HAS_STREAMABLE_CHANGE

@ REORDER_BUFFER_CHANGE_INVALIDATION

@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM

@ REORDER_BUFFER_CHANGE_INSERT

@ REORDER_BUFFER_CHANGE_MESSAGE

@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT

@ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID

@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID

@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT

@ REORDER_BUFFER_CHANGE_TRUNCATE

@ REORDER_BUFFER_CHANGE_DELETE

@ REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT

@ REORDER_BUFFER_CHANGE_UPDATE

#define rbtxn_is_aborted(txn)

#define RBTXN_IS_SERIALIZED

#define rbtxn_is_serialized(txn)

#define rbtxn_has_partial_change(txn)

#define LOGICAL_REWRITE_FORMAT

MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)

ReplicationSlot * MyReplicationSlot

bool ReplicationSlotValidateName(const char *name, int elevel)

void SnapBuildSnapDecRefcount(Snapshot snap)

bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)

SnapBuildState SnapBuildCurrentState(SnapBuild *builder)

void TeardownHistoricSnapshot(bool is_error)

void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)

static HTAB * tuplecid_data

struct SnapshotData * Snapshot

struct SnapshotData SnapshotData

struct SnapBuild * snapshot_builder

RelFileLocator old_locator

RelFileLocator new_locator

struct ReorderBufferChange::@110::@112 truncate

ReorderBufferChangeType action

bool clear_toast_afterwards

union ReorderBufferChange::@110 data

struct ReorderBufferTXN * txn

struct ReorderBufferChange::@110::@111 tp

struct ReorderBufferChange::@110::@114 tuplecid

struct ReorderBufferChange::@110::@113 msg

struct ReorderBufferChange::@110::@115 inval

SharedInvalidationMessage * invalidations

ReorderBufferChange change

ReorderBufferChange * change

ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]

XLogRecPtr restart_decoding_lsn

pairingheap_node txn_node

XLogRecPtr base_snapshot_lsn

TransactionId toplevel_xid

dlist_node catchange_node

SharedInvalidationMessage * invalidations

struct ReorderBufferTXN * toptxn

void * output_plugin_private

dlist_node base_snapshot_node

union ReorderBufferTXN::@116 xact_time

struct varlena * reconstructed

ReorderBufferTupleCidKey key

ReorderBufferStreamMessageCB stream_message

ReorderBufferStreamChangeCB stream_change

ReorderBufferBeginCB begin_prepare

ReorderBufferStreamTruncateCB stream_truncate

ReorderBufferCommitPreparedCB commit_prepared

ReorderBufferUpdateProgressTxnCB update_progress_txn

ReorderBufferMessageCB message

dlist_head txns_by_base_snapshot_lsn

dclist_head catchange_txns

ReorderBufferRollbackPreparedCB rollback_prepared

ReorderBufferPrepareCB prepare

ReorderBufferStreamStopCB stream_stop

ReorderBufferApplyChangeCB apply_change

MemoryContext change_context

ReorderBufferTXN * by_txn_last_txn

TransactionId by_txn_last_xid

ReorderBufferStreamPrepareCB stream_prepare

ReorderBufferStreamAbortCB stream_abort

MemoryContext tup_context

ReorderBufferCommitCB commit

ReorderBufferStreamStartCB stream_start

ReorderBufferStreamCommitCB stream_commit

ReorderBufferApplyTruncateCB apply_truncate

dlist_head toplevel_by_lsn

ReorderBufferBeginCB begin

MemoryContext txn_context

XLogRecPtr current_restart_decoding_lsn

ReplicationSlotPersistentData data

bool TransactionIdDidCommit(TransactionId transactionId)

bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)

#define InvalidTransactionId

#define TransactionIdEquals(id1, id2)

#define TransactionIdIsValid(xid)

static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)

#define VARSIZE_SHORT(PTR)

#define VARATT_IS_EXTENDED(PTR)

#define VARATT_IS_SHORT(PTR)

#define SET_VARSIZE_COMPRESSED(PTR, len)

#define SET_VARTAG_EXTERNAL(PTR, tag)

#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)

#define VARDATA_EXTERNAL(PTR)

#define SET_VARSIZE(PTR, len)

#define VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer)

#define VARATT_IS_EXTERNAL(PTR)

static void pgstat_report_wait_start(uint32 wait_event_info)

static void pgstat_report_wait_end(void)

bool IsTransactionOrTransactionBlock(void)

void BeginInternalSubTransaction(const char *name)

TransactionId CheckXidAlive

void RollbackAndReleaseCurrentSubTransaction(void)

void StartTransactionCommand(void)

TransactionId GetCurrentTransactionIdIfAny(void)

TransactionId GetCurrentTransactionId(void)

void AbortCurrentTransaction(void)

int xidComparator(const void *arg1, const void *arg2)

#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)

#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

#define LSN_FORMAT_ARGS(lsn)

#define InvalidXLogRecPtr