PostgreSQL Source Code: src/backend/replication/logical/reorderbuffer.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
88
90#include <sys/stat.h>
91
115
116
118{
122
123
125{
129
131{
137
138
140{
143
145
146
148{
155
157{
163
164
166{
169
174
176
177
179{
182
184
185#define IsSpecInsert(action) \
186( \
187 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
189#define IsSpecConfirmOrAbort(action) \
190( \
191 (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
192 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
194#define IsInsertOrUpdate(action) \
195( \
196 (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
197 ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
198 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
200
201
202
203
204
205
206
207
208
209
210
211
212
215
216
218
219
220
221
222
230
232
233
234
235
236
237
238
239
240
247
248
249
250
251
252
263 bool txn_prepared);
270
274
275
276
277
278
279
284
285
286
287
288
295
296
297
298
299
300
305 bool addition, Size sz);
306
307
308
309
310
313{
317
319
320
322 "ReorderBuffer",
324
325 buffer =
327
328 memset(&hash_ctl, 0, sizeof(hash_ctl));
329
330 buffer->context = new_ctx;
331
333 "Change",
336
338 "TXN",
341
342
343
344
345
346
347
348
349
350
351
353 "Tuples",
357
361
362 buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
364
367
368 buffer->outbuf = NULL;
370 buffer->size = 0;
371
372
374
383
385
389
390
391
392
393
394
396
397 return buffer;
398}
399
400
401
402
403void
405{
407
408
409
410
411
413
414
416}
417
418
419
420
423{
425
428
430
434
435
438
439 return txn;
440}
441
442
443
444
445static void
447{
448
450 {
453 }
454
455
456
457 if (txn->gid != NULL)
458 {
460 txn->gid = NULL;
461 }
462
464 {
467 }
468
470 {
473 }
474
475
477
478
480
482}
483
484
485
486
489{
491
494
496 return change;
497}
498
499
500
501
502void
504 bool upd_mem)
505{
506
507 if (upd_mem)
510
511
512 switch (change->action)
513 {
519 {
522 }
523
525 {
528 }
529 break;
537 break;
542 break;
545 {
548 }
549 break;
550
553 {
556 }
557 break;
562 break;
563 }
564
566}
567
568
569
570
571
574{
576 Size alloc_len;
577
579
583
584 return tuple;
585}
586
587
588
589
590void
592{
594}
595
596
597
598
599
600
601
602
603
604
607{
608 Oid *relids;
609 Size alloc_len;
610
611 alloc_len = sizeof(Oid) * nrelids;
612
614
615 return relids;
616}
617
618
619
620
621void
623{
625}
626
627
628
629
630
631
632
635 bool *is_new, XLogRecPtr lsn, bool create_as_top)
636{
639 bool found;
640
642
643
644
645
648 {
650
651 if (txn != NULL)
652 {
653
654 if (is_new)
655 *is_new = false;
656 return txn;
657 }
658
659
660
661
662
663 if (!create)
664 return NULL;
665
666 }
667
668
669
670
671
672
673
676 &xid,
678 &found);
679 if (found)
680 txn = ent->txn;
681 else if (create)
682 {
683
686
689 txn = ent->txn;
692
693 if (create_as_top)
694 {
697 }
698 }
699 else
700 txn = NULL;
701
702
705
706 if (is_new)
707 *is_new = !found;
708
709 Assert(!create || txn != NULL);
710 return txn;
711}
712
713
714
715
716
717
718
719
720
721static void
724 bool toast_insert)
725{
727
728
729
730
731
733 return;
734
735
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752 if (toast_insert)
757 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
758
759
760
761
762
763
768 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
769
770
771
772
773
774
775
776
777
778
784}
785
786
787
788
789
790void
793{
795
797
798
799
800
801
802
804 {
805
806
807
808
810 return;
811 }
812
813
814
815
816
817
824 {
826
828 }
829
830 change->lsn = lsn;
831 change->txn = txn;
832
837
838
841
842
844
845
847}
848
849
850
851
852
853void
856 bool transactional, const char *prefix,
857 Size message_size, const char *message)
858{
859 if (transactional)
860 {
863
865
866
867
868
869
870
872
874
880 memcpy(change->data.msg.message, message, message_size);
881
883
885 }
886 else
887 {
889 volatile Snapshot snapshot_now = snap;
890
891
892 Assert(snapshot_now);
893
896
897
900 {
901 rb->message(rb, txn, lsn, false, prefix, message_size, message);
902
904 }
906 {
909 }
911 }
912}
913
914
915
916
917
918
919
920
921
922static void
924{
925#ifdef USE_ASSERT_CHECKING
930
931
932
933
934
935
936
937
938
939
940
942 return;
943
945 {
947 iter.cur);
948
949
951
952
955
956
958 Assert(prev_first_lsn < cur_txn->first_lsn);
959
960
962
963 prev_first_lsn = cur_txn->first_lsn;
964 }
965
967 {
969 base_snapshot_node,
970 iter.cur);
971
972
975
976
978 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
979
980
982
984 }
985#endif
986}
987
988
989
990
991
992
993static void
995{
996#ifdef USE_ASSERT_CHECKING
999
1001 {
1003
1005
1009
1012
1013 Assert(prev_lsn <= cur_change->lsn);
1014
1015 prev_lsn = cur_change->lsn;
1016 }
1017#endif
1018}
1019
1020
1021
1022
1023
1026{
1028
1030
1032 return NULL;
1033
1035
1038 return txn;
1039}
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1054{
1056
1058
1061
1065}
1066
1067void
1069{
1071}
1072
1073
1074
1075
1076
1077
1078
1079void
1082{
1085 bool new_top;
1086 bool new_sub;
1087
1090
1091 if (!new_sub)
1092 {
1094 {
1095
1096 return;
1097 }
1098 else
1099 {
1100
1101
1102
1103
1104
1106 }
1107 }
1108
1112
1113
1114 subtxn->toptxn = txn;
1115
1116
1119
1120
1122
1123
1125}
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145static void
1148{
1150
1152 {
1155 {
1156
1157
1158
1159
1161 {
1164 }
1165
1166
1167
1168
1169
1170
1175
1176
1177
1178
1179
1183 }
1184 else
1185 {
1186
1191 }
1192 }
1193}
1194
1195
1196
1197
1198
1199void
1203{
1205
1208
1209
1210
1211
1212 if (!subtxn)
1213 return;
1214
1216 subtxn->end_lsn = end_lsn;
1217
1218
1219
1220
1221
1223}
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241static int
1243{
1247
1248 if (pos_a < pos_b)
1249 return 1;
1250 else if (pos_a == pos_b)
1251 return 0;
1252 return -1;
1253}
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264static void
1267{
1268 Size nr_txns = 0;
1272
1273 *iter_state = NULL;
1274
1275
1277
1278
1279
1280
1281
1282
1284 nr_txns++;
1285
1287 {
1289
1291
1292
1294
1296 nr_txns++;
1297 }
1298
1299
1304
1305 state->nr_txns = nr_txns;
1307
1308 for (off = 0; off < state->nr_txns; off++)
1309 {
1310 state->entries[off].file.vfd = -1;
1311 state->entries[off].segno = 0;
1312 }
1313
1314
1318
1319
1320 *iter_state = state;
1321
1322
1323
1324
1325
1326
1327 off = 0;
1328
1329
1331 {
1333
1335 {
1336
1339 &state->entries[off].segno);
1340 }
1341
1344
1345 state->entries[off].lsn = cur_change->lsn;
1346 state->entries[off].change = cur_change;
1347 state->entries[off].txn = txn;
1348
1350 }
1351
1352
1354 {
1356
1358
1360 {
1362
1364 {
1365
1368 &state->entries[off].file,
1369 &state->entries[off].segno);
1370 }
1373
1374 state->entries[off].lsn = cur_change->lsn;
1375 state->entries[off].change = cur_change;
1376 state->entries[off].txn = cur_txn;
1377
1379 }
1380 }
1381
1382
1384}
1385
1386
1387
1388
1389
1390
1391
1394{
1398
1399
1400 if (state->heap->bh_size == 0)
1401 return NULL;
1402
1404 entry = &state->entries[off];
1405
1406
1408 {
1413 }
1414
1415 change = entry->change;
1416
1417
1418
1419
1420
1421
1422
1424 {
1428
1429
1430 state->entries[off].lsn = next_change->lsn;
1431 state->entries[off].change = next_change;
1432
1434 return change;
1435 }
1436
1437
1439 {
1440
1441
1442
1443
1446
1447
1448
1449
1450
1451
1454 &state->entries[off].segno))
1455 {
1456
1460
1461 elog(DEBUG2, "restored %u/%u changes from disk",
1464
1466
1467 state->entries[off].lsn = next_change->lsn;
1468 state->entries[off].change = next_change;
1470
1471 return change;
1472 }
1473 }
1474
1475
1477
1478 return change;
1479}
1480
1481
1482
1483
1484static void
1487{
1489
1490 for (off = 0; off < state->nr_txns; off++)
1491 {
1492 if (state->entries[off].file.vfd != -1)
1494 }
1495
1496
1498 {
1500
1505 }
1506
1509}
1510
1511
1512
1513
1514
1515static void
1517{
1518 bool found;
1520 Size mem_freed = 0;
1521
1522
1524 {
1526
1528
1529
1530
1531
1532
1533
1536
1538 }
1539
1540
1542 {
1544
1546
1547
1549
1550
1551
1552
1553
1554
1555
1557
1559 }
1560
1561
1563
1564
1565
1566
1567
1569 {
1571
1573
1574
1577
1579 }
1580
1581
1582
1583
1585 {
1588 }
1589
1590
1591
1592
1594 {
1597 }
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1611
1612
1615
1616
1619
1620
1622}
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636static void
1638{
1640 Size mem_freed = 0;
1641
1642
1644 {
1646
1648
1649
1650
1651
1652
1653
1656
1659 }
1660
1661
1663 {
1665
1667
1668
1670
1671
1673
1674
1675
1676
1677
1678
1679
1681
1683 }
1684
1685
1687
1688 if (txn_prepared)
1689 {
1690
1691
1692
1693
1694
1696 {
1698
1700
1701
1704
1705
1707
1709 }
1710 }
1711
1712
1713
1714
1715
1716
1718 {
1721 }
1722
1723
1725 {
1727 txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1728
1729
1730
1731
1732
1733
1735 }
1736
1737
1740}
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754static bool
1756{
1757
1759 return false;
1760
1761
1762
1763
1764
1766 return false;
1768 {
1769
1771
1772 return true;
1773 }
1774
1775
1776
1778 return false;
1779
1781 {
1782
1783
1784
1785
1788 return false;
1789 }
1790
1791
1792
1793
1794
1795
1798
1799
1801
1802
1803
1804
1805
1808
1809 return true;
1810}
1811
1812
1813
1814
1815
1816static void
1818{
1821
1823 return;
1824
1828
1829
1830
1831
1832
1836
1838 {
1841 bool found;
1843
1845
1847
1848
1850
1852
1854 &key.tid);
1855
1858 if (!found)
1859 {
1863 }
1864 else
1865 {
1866
1867
1868
1869
1871
1872
1873
1874
1875
1880 }
1881 }
1882}
1883
1884
1885
1886
1887
1888
1892{
1895 int i = 0;
1897
1901
1903 memcpy(snap, orig_snap, sizeof(SnapshotData));
1904
1905 snap->copied = true;
1906 snap->active_count = 1;
1909
1911
1912
1913
1914
1915
1916
1919
1920
1921
1922
1923
1924
1926
1928 {
1930
1934 }
1935
1936
1938
1939
1941
1942 return snap;
1943}
1944
1945
1946
1947
1948static void
1950{
1953 else
1955}
1956
1957
1958
1959
1960
1961
1962
1963static void
1965{
1966
1968
1970
1972 {
1973
1974
1975
1976
1980
1981
1982
1983
1984
1985
1987
1989 }
1990 else
1991 {
1994 }
1995}
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029static inline void
2031{
2032
2033
2034
2035
2037 return;
2038
2039
2040
2041
2042
2045 else
2047}
2048
2049
2050
2051
2052static inline void
2055 bool streaming)
2056{
2057 if (streaming)
2059 else
2060 rb->apply_change(rb, txn, relation, change);
2061}
2062
2063
2064
2065
2066static inline void
2068 int nrelations, Relation *relations,
2070{
2071 if (streaming)
2072 rb->stream_truncate(rb, txn, nrelations, relations, change);
2073 else
2074 rb->apply_truncate(rb, txn, nrelations, relations, change);
2075}
2076
2077
2078
2079
2080static inline void
2083{
2084 if (streaming)
2089 else
2090 rb->message(rb, txn, change->lsn, true,
2094}
2095
2096
2097
2098
2099
2100static inline void
2103{
2105
2106
2107 if (snapshot_now->copied)
2109 else
2111 txn, command_id);
2112}
2113
2114
2115
2116
2117
2118static void
2120{
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2136}
2137
2138
2139
2140
2141
2142
2143
2144
2145static void
2151{
2152
2154
2155
2157
2158
2159 if (specinsert != NULL)
2160 {
2162 specinsert = NULL;
2163 }
2164
2165
2166
2167
2168
2170 {
2173 }
2174
2175
2177}
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191static void
2194 volatile Snapshot snapshot_now,
2196 bool streaming)
2197{
2198 bool using_subtxn;
2203 volatile bool stream_started = false;
2205
2206
2208
2209
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2223
2225 {
2227 int changes_count = 0;
2228
2229
2230 if (using_subtxn)
2232 else
2234
2235
2236
2237
2238
2239 if (!streaming)
2240 {
2243 else
2244 rb->begin(rb, txn);
2245 }
2246
2249 {
2251 Oid reloid;
2252
2254
2255
2256
2257
2258
2260 {
2261 if (streaming)
2262 {
2265 stream_started = true;
2266 }
2267 }
2268
2269
2270
2271
2272
2273
2275
2276 prev_lsn = change->lsn;
2277
2278
2279
2280
2281
2282
2284 {
2285 curtxn = change->txn;
2287 }
2288
2289 switch (change->action)
2290 {
2292
2293
2294
2295
2296
2297
2298 if (specinsert == NULL)
2299 elog(ERROR, "invalid ordering of speculative insertion changes");
2301 change = specinsert;
2303
2304
2308 Assert(snapshot_now);
2309
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2328 goto change_done;
2330 elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2333
2335
2337 elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2338 reloid,
2341
2343 goto change_done;
2344
2345
2346
2347
2348
2350 goto change_done;
2351
2352
2353
2354
2355
2356
2357
2358 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2359 goto change_done;
2360
2361
2363 {
2366 streaming);
2367
2368
2369
2370
2371
2372
2375 }
2376
2378 {
2379
2380
2381
2382
2383
2384
2385
2386
2388
2391 change);
2392 }
2393
2394 change_done:
2395
2396
2397
2398
2399
2400 if (specinsert != NULL)
2401 {
2403 specinsert = NULL;
2404 }
2405
2407 {
2409 relation = NULL;
2410 }
2411 break;
2412
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429 if (specinsert != NULL)
2430 {
2432 specinsert = NULL;
2433 }
2434
2435
2437 specinsert = change;
2438 break;
2439
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450 if (specinsert != NULL)
2451 {
2452
2453
2454
2455
2456
2459
2460
2462 specinsert = NULL;
2463 }
2464 break;
2465
2467 {
2468 int i;
2470 int nrelations = 0;
2472
2474 for (i = 0; i < nrelids; i++)
2475 {
2478
2480
2482 elog(ERROR, "could not open relation with OID %u", relid);
2483
2485 continue;
2486
2487 relations[nrelations++] = rel;
2488 }
2489
2490
2492 relations, change,
2493 streaming);
2494
2495 for (i = 0; i < nrelations; i++)
2497
2498 break;
2499 }
2500
2503 break;
2504
2506
2509 break;
2510
2512
2514
2515 if (snapshot_now->copied)
2516 {
2518 snapshot_now =
2520 txn, command_id);
2521 }
2522
2523
2524
2525
2526
2527
2529 {
2530 snapshot_now =
2532 txn, command_id);
2533 }
2534 else
2535 {
2537 }
2538
2539
2541 break;
2542
2545
2546 if (command_id < change->data.command_id)
2547 {
2549
2550 if (!snapshot_now->copied)
2551 {
2552
2554 txn, command_id);
2555 }
2556
2557 snapshot_now->curcid = command_id;
2558
2561 }
2562
2563 break;
2564
2566 elog(ERROR, "tuplecid value in changequeue");
2567 break;
2568 }
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580#define CHANGES_THRESHOLD 100
2581
2583 {
2585 changes_count = 0;
2586 }
2587 }
2588
2589
2590 Assert(!specinsert);
2591
2592
2594 iterstate = NULL;
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2607
2609
2610
2611
2612
2613
2614 if (streaming)
2615 {
2616 if (stream_started)
2617 {
2619 stream_started = false;
2620 }
2621 }
2622 else
2623 {
2624
2625
2626
2627
2629 {
2631 rb->prepare(rb, txn, commit_lsn);
2633 }
2634 else
2635 rb->commit(rb, txn, commit_lsn);
2636 }
2637
2638
2640 elog(ERROR, "output plugin used XID %u",
2642
2643
2644
2645
2646
2647 if (streaming)
2649 else if (snapshot_now->copied)
2651
2652
2654
2655
2656
2657
2658
2659
2660
2662
2663
2665
2666 if (using_subtxn)
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2684 {
2685 if (streaming)
2687
2689
2691 }
2692 else
2694 }
2696 {
2699
2700
2701 if (iterstate)
2703
2705
2706
2707
2708
2709
2711
2712
2715
2716 if (using_subtxn)
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731 if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2733 {
2734
2736
2737
2740 errdata = NULL;
2741
2742
2745
2746
2747 if (stream_started)
2749
2750
2752 command_id, prev_lsn,
2753 specinsert);
2754 }
2755 else
2756 {
2760 }
2761 }
2763}
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775static void
2781{
2784
2790
2791
2792
2793
2794
2795
2796
2797
2798
2800 {
2802 return;
2803 }
2804
2805
2806
2807
2808
2809
2810
2812 {
2814
2815
2816
2817
2818
2821 return;
2822 }
2823
2825
2826
2828 command_id, false);
2829}
2830
2831
2832
2833
2834
2835
2836void
2841{
2843
2845 false);
2846
2847
2848 if (txn == NULL)
2849 return;
2850
2852 origin_id, origin_lsn);
2853}
2854
2855
2856
2857
2858
2859bool
2864{
2866
2868
2869
2870 if (txn == NULL)
2871 return false;
2872
2873
2874
2875
2876
2882
2883
2886
2887 return true;
2888}
2889
2890
2891void
2893{
2895
2897
2898
2899 if (txn == NULL)
2900 return;
2901
2902
2905}
2906
2907
2908
2909
2910
2911
2912void
2914 char *gid)
2915{
2917
2919 false);
2920
2921
2922 if (txn == NULL)
2923 return;
2924
2925
2926
2927
2928
2929
2932
2934
2937
2938
2939
2940
2941
2942
2944 {
2947 }
2948}
2949
2950
2951
2952
2953void
2958 XLogRecPtr origin_lsn, char *gid, bool is_commit)
2959{
2963
2965
2966
2967 if (txn == NULL)
2968 return;
2969
2970
2971
2972
2973
2974 prepare_end_lsn = txn->end_lsn;
2976
2977
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988 if ((txn->final_lsn < two_phase_at) && is_commit)
2989 {
2990
2991
2992
2993
2994
2998
2999
3000
3001
3002
3003
3004
3005
3008 }
3009
3015
3016 if (is_commit)
3018 else
3020
3021
3025}
3026
3027
3028
3029
3030
3031
3032
3033
3034
3035
3036
3037
3038
3039void
3042{
3044
3046 false);
3047
3048
3049 if (txn == NULL)
3050 return;
3051
3053
3054
3056 {
3058
3059
3060
3061
3062
3063
3064
3068 }
3069
3070
3072
3073
3075}
3076
3077
3078
3079
3080
3081
3082
3083
3084void
3086{
3088
3089
3090
3091
3092
3093
3094
3095
3097 {
3099
3101
3103 {
3104 elog(DEBUG2, "aborting old transaction %u", txn->xid);
3105
3106
3109
3110
3112 }
3113 else
3114 return;
3115 }
3116}
3117
3118
3119
3120
3121
3122
3123
3124
3125
3126
3127
3128
3129
3130
3131void
3133{
3135
3137 false);
3138
3139
3140 if (txn == NULL)
3141 return;
3142
3143
3145
3146
3148
3149
3150
3151
3152
3153
3157 else
3159
3160
3162}
3163
3164
3165
3166
3167
3168
3169
3170
3171
3172void
3174{
3176
3178 false);
3179
3180
3181 if (txn == NULL)
3182 return;
3183
3184
3185
3186
3187
3188
3192 else
3194}
3195
3196
3197
3198
3199
3200
3201
3202
3203void
3206{
3208 int i;
3209
3210 if (use_subtxn)
3212
3213
3214
3215
3216
3217
3218
3219 if (use_subtxn)
3221
3222 for (i = 0; i < ninvalidations; i++)
3224
3225 if (use_subtxn)
3227}
3228
3229
3230
3231
3232
3233
3234
3235
3236
3237
3238
3239void
3241{
3242
3245}
3246
3247
3248
3249
3250
3251
3252void
3255{
3257
3260
3262}
3263
3264
3265
3266
3267
3268
3269
3270void
3273{
3275 bool is_new;
3276
3277 Assert(snap != NULL);
3278
3279
3280
3281
3282
3288
3292
3294}
3295
3296
3297
3298
3299
3300
3301void
3304{
3306
3309
3311}
3312
3313
3314
3315
3316
3317
3318
3319
3320
3321
3322
3323
3324
3325
3326
3327
3328
3329static void
3333 bool addition, Size sz)
3334{
3336
3337 Assert(txn || change);
3338
3339
3340
3341
3342
3343
3345 return;
3346
3347 if (sz == 0)
3348 return;
3349
3350 if (txn == NULL)
3351 txn = change->txn;
3352 Assert(txn != NULL);
3353
3354
3355
3356
3357
3359
3360 if (addition)
3361 {
3363
3364 txn->size += sz;
3365 rb->size += sz;
3366
3367
3369
3370
3371 if (oldsize != 0)
3374 }
3375 else
3376 {
3378 txn->size -= sz;
3379 rb->size -= sz;
3380
3381
3383
3384
3386 if (txn->size != 0)
3388 }
3389
3391}
3392
3393
3394
3395
3396
3397
3398
3399
3400void
3405{
3408
3410
3416 change->lsn = lsn;
3417 change->txn = txn;
3419
3422}
3423
3424
3425
3426
3427
3428
3429
3430
3431
3432
3433
3434
3435
3436
3437void
3441{
3445
3447
3449
3450
3451
3452
3453
3454
3456
3458
3459
3461 {
3467 }
3468 else
3469 {
3473
3477 }
3478
3486
3488
3490}
3491
3492
3493
3494
3495
3496static void
3498{
3499 int i;
3500
3501 for (i = 0; i < nmsgs; i++)
3503}
3504
3505
3506
3507
3508void
3511{
3513
3515
3517 {
3520 }
3521
3522
3523
3524
3525
3526
3527
3529 {
3531
3533 {
3536 }
3537 }
3538}
3539
3540
3541
3542
3543
3544
3545
3548{
3551 size_t xcnt = 0;
3552
3553
3555 return NULL;
3556
3557
3561 {
3563 catchange_node,
3564 iter.cur);
3565
3567
3568 xids[xcnt++] = txn->xid;
3569 }
3570
3572
3574 return xids;
3575}
3576
3577
3578
3579
3580
3581bool
3583{
3585
3587 false);
3588 if (txn == NULL)
3589 return false;
3590
3592}
3593
3594
3595
3596
3597
3598bool
3600{
3602
3605
3606
3607 if (txn == NULL)
3608 return false;
3609
3610
3614
3616}
3617
3618
3619
3620
3621
3622
3623
3624
3625
3626
3627
3628static void
3630{
3632 {
3635 }
3637 {
3640 }
3641}
3642
3643
3644
3645static int
3647{
3650
3652 return -1;
3654 return 1;
3655 return 0;
3656}
3657
3658
3659
3660
3663{
3665
3666
3669
3673
3674 return largest;
3675}
3676
3677
3678
3679
3680
3681
3682
3683
3684
3685
3686
3687
3688
3689
3690
3691
3692
3693
3694
3695
3696
3697
3698
3699
3700
3701
3704{
3706 Size largest_size = 0;
3708
3709
3711 {
3713
3715
3716
3718
3720
3721
3725 continue;
3726
3727
3728 if ((largest == NULL || txn->total_size > largest_size) &&
3730 {
3731 largest = txn;
3733 }
3734 }
3735
3736 return largest;
3737}
3738
3739
3740
3741
3742
3743
3744
3745
3746
3747
3748
3749
3750
3751
3752static void
3754{
3756
3757
3758
3759
3760
3763 return;
3764
3765
3766
3767
3768
3769
3770
3771
3772
3773
3774
3777 rb->size > 0))
3778 {
3779
3780
3781
3782
3785 {
3786
3790
3791
3793 continue;
3794
3796 }
3797 else
3798 {
3799
3800
3801
3802
3804
3805
3809
3810
3812 continue;
3813
3815 }
3816
3817
3818
3819
3820
3823 }
3824
3825
3827}
3828
3829
3830
3831
3832static void
3834{
3837 int fd = -1;
3839 Size spilled = 0;
3841
3842 elog(DEBUG2, "spill %u changes in XID %u to disk",
3844
3845
3847 {
3849
3852 }
3853
3854
3856 {
3858
3860
3861
3862
3863
3864
3865 if (fd == -1 ||
3867 {
3869
3870 if (fd != -1)
3872
3874
3875
3876
3877
3878
3880 curOpenSegNo);
3881
3882
3884 O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3885
3886 if (fd < 0)
3889 errmsg("could not open file \"%s\": %m", path)));
3890 }
3891
3895
3896 spilled++;
3897 }
3898
3899
3901
3902
3903 if (spilled)
3904 {
3907
3908
3910
3911
3913 }
3914
3919
3920 if (fd != -1)
3922}
3923
3924
3925
3926
3927static void
3930{
3933
3935
3938
3939 switch (change->action)
3940 {
3941
3946 {
3949 newtup;
3950 Size oldlen = 0;
3951 Size newlen = 0;
3952
3955
3956 if (oldtup)
3957 {
3959 oldlen = oldtup->t_len;
3960 sz += oldlen;
3961 }
3962
3963 if (newtup)
3964 {
3966 newlen = newtup->t_len;
3967 sz += newlen;
3968 }
3969
3970
3972
3974
3976
3977 if (oldlen)
3978 {
3981
3982 memcpy(data, oldtup->t_data, oldlen);
3983 data += oldlen;
3984 }
3985
3986 if (newlen)
3987 {
3990
3991 memcpy(data, newtup->t_data, newlen);
3992 data += newlen;
3993 }
3994 break;
3995 }
3997 {
4000
4002 sizeof(Size) + sizeof(Size);
4004
4006
4007
4009
4010
4011 memcpy(data, &prefix_size, sizeof(Size));
4014 prefix_size);
4015 data += prefix_size;
4016
4017
4023
4024 break;
4025 }
4027 {
4031
4032 sz += inval_size;
4033
4036
4037
4040 data += inval_size;
4041
4042 break;
4043 }
4045 {
4048
4050
4054
4055
4058
4060
4063
4064 if (snap->xcnt)
4065 {
4069 }
4070
4072 {
4076 }
4077 break;
4078 }
4080 {
4083
4084
4086 sz += size;
4087
4088
4090
4092
4094
4096 data += size;
4097
4098 break;
4099 }
4104
4105 break;
4106 }
4107
4108 ondisk->size = sz;
4109
4110 errno = 0;
4113 {
4114 int save_errno = errno;
4115
4117
4118
4119 errno = save_errno ? save_errno : ENOSPC;
4122 errmsg("could not write to data file for XID %u: %m",
4123 txn->xid)));
4124 }
4126
4127
4128
4129
4130
4131
4132
4133
4134
4137
4139}
4140
4141
4142static inline bool
4144{
4146
4148}
4149
4150
4151static inline bool
4153{
4156
4157
4159 return false;
4160
4161
4162
4163
4164
4165
4168 return true;
4169
4170 return false;
4171}
4172
4173
4174
4175
4176
4177static void
4179{
4182 Size stream_bytes;
4183 bool txn_is_streamed;
4184
4185
4187
4188
4189
4190
4191
4192
4193
4194
4195
4196
4197
4198
4199
4200
4201
4202
4203
4205 {
4207
4208
4210
4211
4213
4215 {
4217
4220 }
4221
4222
4223
4224
4225
4227 {
4229 return;
4230 }
4231
4234 txn, command_id);
4235 }
4236 else
4237 {
4238
4240
4241
4242
4243
4244
4245
4246
4247
4249
4250
4251
4252
4253
4254
4256 txn, command_id);
4257
4258
4262 }
4263
4264
4265
4266
4267
4268
4269
4272
4273
4275 command_id, true);
4276
4279
4280
4281 rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4282
4283
4285
4289}
4290
4291
4292
4293
4296{
4298
4299 switch (change->action)
4300 {
4301
4306 {
4308 newtup;
4309 Size oldlen = 0;
4310 Size newlen = 0;
4311
4314
4315 if (oldtup)
4316 {
4318 oldlen = oldtup->t_len;
4319 sz += oldlen;
4320 }
4321
4322 if (newtup)
4323 {
4325 newlen = newtup->t_len;
4326 sz += newlen;
4327 }
4328
4329 break;
4330 }
4332 {
4334
4336 sizeof(Size) + sizeof(Size);
4337
4338 break;
4339 }
4341 {
4344 break;
4345 }
4347 {
4349
4351
4355
4356 break;
4357 }
4359 {
4361
4362 break;
4363 }
4368
4369 break;
4370 }
4371
4372 return sz;
4373}
4374
4375
4376
4377
4378
4382{
4383 Size restored = 0;
4387
4390
4391
4393 {
4396
4399 }
4402
4404
4406 {
4407 int readBytes;
4409
4411
4412 if (*fd == -1)
4413 {
4415
4416
4417 if (*segno == 0)
4419
4421
4422
4423
4424
4425
4427 *segno);
4428
4430
4431
4433
4434 if (*fd < 0 && errno == ENOENT)
4435 {
4436 *fd = -1;
4437 (*segno)++;
4438 continue;
4439 }
4440 else if (*fd < 0)
4443 errmsg("could not open file \"%s\": %m",
4444 path)));
4445 }
4446
4447
4448
4449
4450
4451
4455 file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4456
4457
4458 if (readBytes == 0)
4459 {
4461 *fd = -1;
4462 (*segno)++;
4463 continue;
4464 }
4465 else if (readBytes < 0)
4468 errmsg("could not read from reorderbuffer spill file: %m")));
4472 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4473 readBytes,
4475
4477
4479
4483
4488 WAIT_EVENT_REORDER_BUFFER_READ);
4489
4490 if (readBytes < 0)
4493 errmsg("could not read from reorderbuffer spill file: %m")));
4497 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4498 readBytes,
4500
4502
4503
4504
4505
4506
4508 restored++;
4509 }
4510
4511 return restored;
4512}
4513
4514
4515
4516
4517
4518
4519
4520
4521
4522static void
4525{
4528
4530
4532
4533
4535
4537
4538
4539 switch (change->action)
4540 {
4541
4547 {
4549
4552
4553
4557
4558
4561
4562
4564 data += tuplelen;
4565 }
4566
4568 {
4569
4571
4574
4577
4578
4582
4583
4586
4587
4589 data += tuplelen;
4590 }
4591
4592 break;
4594 {
4595 Size prefix_size;
4596
4597
4598 memcpy(&prefix_size, data, sizeof(Size));
4601 prefix_size);
4604 data += prefix_size;
4605
4606
4614
4615 break;
4616 }
4618 {
4621
4624
4625
4627
4628 break;
4629 }
4631 {
4635
4637
4641
4643
4645
4646 memcpy(newsnap, data, size);
4648 (((char *) newsnap) + sizeof(SnapshotData));
4650 newsnap->copied = true;
4651 break;
4652 }
4653
4655 {
4656 Oid *relids;
4657
4661
4662 break;
4663 }
4668 break;
4669 }
4670
4673
4674
4675
4676
4677
4678
4679
4680
4681
4684}
4685
4686
4687
4688
4689static void
4691{
4695
4698
4701
4702
4703 for (cur = first; cur <= last; cur++)
4704 {
4706
4708 if (unlink(path) != 0 && errno != ENOENT)
4711 errmsg("could not remove file \"%s\": %m", path)));
4712 }
4713}
4714
4715
4716
4717
4718
4719static void
4721{
4722 DIR *spill_dir;
4723 struct dirent *spill_de;
4724 struct stat statbuf;
4726
4728
4729
4731 return;
4732
4735 {
4736
4737 if (strncmp(spill_de->d_name, "xid", 3) == 0)
4738 {
4739 snprintf(path, sizeof(path),
4742
4743 if (unlink(path) != 0)
4746 errmsg("could not remove file \"%s\" during removal of %s/%s/xid*: %m",
4748 }
4749 }
4751}
4752
4753
4754
4755
4756
4757
4758static void
4761{
4763
4765
4770}
4771
4772
4773
4774
4775
4776void
4778{
4779 DIR *logical_dir;
4780 struct dirent *logical_de;
4781
4784 {
4785 if (strcmp(logical_de->d_name, ".") == 0 ||
4786 strcmp(logical_de->d_name, "..") == 0)
4787 continue;
4788
4789
4791 continue;
4792
4793
4794
4795
4796
4798 }
4800}
4801
4802
4803
4804
4805
4806
4807
4808
4809
4810static void
4812{
4814
4816
4822}
4823
4824
4825
4826
4827
4828
4829
4830static void
4833{
4836 bool found;
4837 int32 chunksize;
4838 bool isnull;
4841 Oid chunk_id;
4842 int32 chunk_seq;
4843
4846
4848
4854
4857
4858 if (!found)
4859 {
4863 ent->size = 0;
4866
4867 if (chunk_seq != 0)
4868 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4869 chunk_seq, chunk_id);
4870 }
4871 else if (found && chunk_seq != ent->last_chunk_seq + 1)
4872 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4874
4877
4878
4882
4884 else
4885 elog(ERROR, "unexpected type of toast chunk");
4886
4887 ent->size += chunksize;
4891}
4892
4893
4894
4895
4896
4897
4898
4899
4900
4901
4902
4903
4904
4905
4906
4907
4908
4909
4910
4911
4912
4913static void
4916{
4918 int natt;
4920 bool *isnull;
4927 Size old_size;
4928
4929
4931 return;
4932
4933
4934
4935
4936
4937
4938
4939
4940
4941
4942
4944
4946
4947
4949
4951
4954 elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4956
4958
4959
4961 isnull = palloc0(sizeof(bool) * desc->natts);
4963
4965
4967
4968 for (natt = 0; natt < desc->natts; natt++)
4969 {
4973
4974
4977 struct varlena *new_datum = NULL;
4978 struct varlena *reconstructed;
4980 Size data_done = 0;
4981
4982
4983 if (attr->attnum < 0)
4984 continue;
4985
4986 if (attr->attisdropped)
4987 continue;
4988
4989
4990 if (attr->attlen != -1)
4991 continue;
4992
4993
4994 if (isnull[natt])
4995 continue;
4996
4997
4999
5000
5002 continue;
5003
5005
5006
5007
5008
5013 NULL);
5014 if (ent == NULL)
5015 continue;
5016
5017 new_datum =
5019
5020 free[natt] = true;
5021
5023
5025
5026
5028 {
5029 bool cisnull;
5033
5037
5041
5042 memcpy(VARDATA(reconstructed) + data_done,
5046 }
5048
5049
5052 else
5054
5055 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
5056 redirect_pointer.pointer = reconstructed;
5057
5060 sizeof(redirect_pointer));
5061
5063 }
5064
5065
5066
5067
5068
5069
5073
5076
5077
5078
5079
5080
5083 for (natt = 0; natt < desc->natts; natt++)
5084 {
5085 if (free[natt])
5087 }
5091
5093
5094
5096
5099}
5100
5101
5102
5103
5104static void
5106{
5109
5111 return;
5112
5113
5116 {
5118
5121
5123 {
5126
5129 }
5130 }
5131
5134}
5135
5136
5137
5138
5139
5140
5141
5142
5143
5144
5145
5146
5147
5148
5149
5150
5151
5152
5153
5154
5155
5156
5157
5158
5159
5160
5161
5162
5163
5164
5165
5166
5167
5169{
5173
5174#ifdef NOT_USED
5175static void
5177{
5180
5183 {
5184 elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
5192 );
5193 }
5194}
5195#endif
5196
5197
5198
5199
5200
5201
5202
5203static void
5205{
5207 int fd;
5208 int readBytes;
5210
5213 if (fd < 0)
5216 errmsg("could not open file \"%s\": %m", path)));
5217
5218 while (true)
5219 {
5223 bool found;
5224
5225
5227
5228
5232
5233 if (readBytes < 0)
5236 errmsg("could not read file \"%s\": %m",
5237 path)));
5238 else if (readBytes == 0)
5239 break;
5243 errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5244 path, readBytes,
5246
5249 &key.tid);
5250
5251
5254
5255
5256 if (!ent)
5257 continue;
5258
5261 &key.tid);
5262
5265
5266 if (found)
5267 {
5268
5269
5270
5271
5272
5275 }
5276 else
5277 {
5278
5282 }
5283 }
5284
5288 errmsg("could not close file \"%s\": %m", path)));
5289}
5290
5291
5292
5293
5294
5295static bool
5297{
5298 return bsearch(&xid, xip, num,
5300}
5301
5302
5303
5304
5305static int
5307{
5310
5312}
5313
5314
5315
5316
5317
5318static void
5320{
5321 DIR *mapping_dir;
5322 struct dirent *mapping_de;
5326
5329 {
5330 Oid f_dboid;
5331 Oid f_relid;
5336 f_lo;
5338
5339 if (strcmp(mapping_de->d_name, ".") == 0 ||
5340 strcmp(mapping_de->d_name, "..") == 0)
5341 continue;
5342
5343
5344 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5345 continue;
5346
5348 &f_dboid, &f_relid, &f_hi, &f_lo,
5349 &f_mapped_xid, &f_create_xid) != 6)
5350 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5351
5352 f_lsn = ((uint64) f_hi) << 32 | f_lo;
5353
5354
5355 if (f_dboid != dboid)
5356 continue;
5357
5358
5359 if (f_relid != relid)
5360 continue;
5361
5362
5364 continue;
5365
5366
5368 continue;
5369
5370
5372 f->lsn = f_lsn;
5374 files = lappend(files, f);
5375 }
5377
5378
5380
5381 foreach(file, files)
5382 {
5384
5385 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5386 snapshot->subxip[0]);
5389 }
5390}
5391
5392
5393
5394
5395
5396bool
5401{
5406 bool updated_mapping = false;
5407
5408
5409
5410
5411
5412
5413
5414
5415
5417 return false;
5418
5419
5420 memset(&key, 0, sizeof(key));
5421
5423
5424
5425
5426
5427
5429
5430
5433
5435 &key.tid);
5436
5437restart:
5440
5441
5442
5443
5444
5445
5446
5447 if (ent == NULL && !updated_mapping)
5448 {
5450
5451 updated_mapping = true;
5452 goto restart;
5453 }
5454 else if (ent == NULL)
5455 return false;
5456
5457 if (cmin)
5458 *cmin = ent->cmin;
5459 if (cmax)
5460 *cmax = ent->cmax;
5461 return true;
5462}
5463
5464
5465
5466
5467
5468
5469
5473{
5475
5477 false);
5478
5479 if (txn == NULL)
5480 return 0;
5481
5483
5485}
void binaryheap_build(binaryheap *heap)
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
bh_node_type binaryheap_first(binaryheap *heap)
bh_node_type binaryheap_remove_first(binaryheap *heap)
void binaryheap_free(binaryheap *heap)
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
static void cleanup(void)
#define BufferIsLocal(buffer)
void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
#define FLEXIBLE_ARRAY_MEMBER
bool IsToastRelation(Relation relation)
bool IsSharedRelation(Oid relationId)
#define INDIRECT_POINTER_SIZE
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
void hash_destroy(HTAB *hashp)
void * hash_seq_search(HASH_SEQ_STATUS *status)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
void FreeErrorData(ErrorData *edata)
int errcode_for_file_access(void)
ErrorData * CopyErrorData(void)
void FlushErrorState(void)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
int CloseTransientFile(int fd)
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
void FileClose(File file)
File PathNameOpenFile(const char *fileName, int fileFlags)
DIR * AllocateDir(const char *dirname)
struct dirent * ReadDir(DIR *dir, const char *dirname)
int OpenTransientFile(const char *fileName, int fileFlags)
static ssize_t FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
HeapTupleData * HeapTuple
struct HeapTupleData HeapTupleData
HeapTupleHeaderData * HeapTupleHeader
#define SizeofHeapTupleHeader
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
static dlist_node * dlist_pop_head_node(dlist_head *head)
#define dlist_foreach(iter, lhead)
static void dlist_init(dlist_head *head)
#define dclist_container(type, membername, ptr)
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
static void dclist_push_tail(dclist_head *head, dlist_node *node)
static void dlist_insert_before(dlist_node *before, dlist_node *node)
#define dlist_head_element(type, membername, lhead)
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
static void dlist_delete(dlist_node *node)
static uint32 dclist_count(const dclist_head *head)
#define dlist_foreach_modify(iter, lhead)
static bool dlist_is_empty(const dlist_head *head)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
static void dclist_delete_from(dclist_head *head, dlist_node *node)
static void dclist_init(dclist_head *head)
#define dlist_container(type, membername, ptr)
#define dclist_foreach(iter, lhead)
static int pg_cmp_u64(uint64 a, uint64 b)
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
if(TABLE==NULL||TABLE_index==NULL)
static OffsetNumber ItemPointerGetOffsetNumber(const ItemPointerData *pointer)
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
List * lappend(List *list, void *datum)
void list_sort(List *list, list_sort_comparator cmp)
void UpdateDecodingStats(LogicalDecodingContext *ctx)
void * MemoryContextAlloc(MemoryContext context, Size size)
void * MemoryContextAllocZero(MemoryContext context, Size size)
char * pstrdup(const char *in)
void * repalloc(void *pointer, Size size)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define SLAB_DEFAULT_BLOCK_SIZE
#define CHECK_FOR_INTERRUPTS()
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
pairingheap_node * pairingheap_first(pairingheap *heap)
#define pairingheap_container(type, membername, ptr)
#define pairingheap_const_container(type, membername, ptr)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
FormData_pg_attribute * Form_pg_attribute
#define qsort(a, b, c, d)
static Datum PointerGetDatum(const void *X)
static Oid DatumGetObjectId(Datum X)
static Pointer DatumGetPointer(Datum X)
static Datum Int32GetDatum(int32 X)
static int32 DatumGetInt32(Datum X)
static int fd(const char *x, int i)
bool TransactionIdIsInProgress(TransactionId xid)
#define RelationIsLogicallyLogged(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationIsValid(relation)
Relation RelationIdGetRelation(Oid relationId)
void RelationClose(Relation relation)
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
#define relpathperm(rlocator, forknum)
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids)
void ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
struct ReorderBufferDiskChange ReorderBufferDiskChange
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
bool ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
void ReorderBufferFreeTupleBuf(HeapTuple tuple)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid)
uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage **msgs)
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
TransactionId * ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
#define IsSpecInsert(action)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
ReorderBuffer * ReorderBufferAllocate(void)
int logical_decoding_work_mem
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
static bool ReorderBufferCanStream(ReorderBuffer *rb)
static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
struct ReorderBufferIterTXNState ReorderBufferIterTXNState
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
int debug_logical_replication_streaming
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
#define IsInsertOrUpdate(action)
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
struct RewriteMappingFile RewriteMappingFile
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define CHANGES_THRESHOLD
static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
static bool ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
HeapTuple ReorderBufferAllocTupleBuf(ReorderBuffer *rb, Size tuple_len)
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
ReorderBufferChange * ReorderBufferAllocChange(ReorderBuffer *rb)
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
static void SetupCheckXidLive(TransactionId xid)
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
static ReorderBufferTXN * ReorderBufferAllocTXN(ReorderBuffer *rb)
bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
static void ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
struct TXNEntryFile TXNEntryFile
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
Oid * ReorderBufferAllocRelids(ReorderBuffer *rb, int nrelids)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
struct ReorderBufferToastEnt ReorderBufferToastEnt
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
void ReorderBufferFree(ReorderBuffer *rb)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
#define IsSpecConfirmOrAbort(action)
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
static const Size max_changes_in_memory
void StartupReorderBuffer(void)
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
#define rbtxn_is_committed(txn)
#define rbtxn_has_streamable_change(txn)
#define rbtxn_has_catalog_changes(txn)
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
#define RBTXN_PREPARE_STATUS_MASK
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_STREAMED
#define rbtxn_is_prepared(txn)
#define RBTXN_HAS_PARTIAL_CHANGE
#define rbtxn_is_streamed(txn)
struct ReorderBufferChange ReorderBufferChange
#define RBTXN_SENT_PREPARE
#define rbtxn_is_toptxn(txn)
#define rbtxn_get_toptxn(txn)
#define rbtxn_is_known_subxact(txn)
#define rbtxn_is_subtxn(txn)
#define RBTXN_HAS_CATALOG_CHANGES
#define RBTXN_IS_COMMITTED
#define PG_LOGICAL_MAPPINGS_DIR
#define RBTXN_IS_SERIALIZED_CLEAR
#define rbtxn_sent_prepare(txn)
#define RBTXN_IS_PREPARED
#define RBTXN_SKIPPED_PREPARE
#define RBTXN_HAS_STREAMABLE_CHANGE
@ REORDER_BUFFER_CHANGE_INVALIDATION
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
@ REORDER_BUFFER_CHANGE_INSERT
@ REORDER_BUFFER_CHANGE_MESSAGE
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
@ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT
@ REORDER_BUFFER_CHANGE_TRUNCATE
@ REORDER_BUFFER_CHANGE_DELETE
@ REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT
@ REORDER_BUFFER_CHANGE_UPDATE
#define rbtxn_is_aborted(txn)
#define RBTXN_IS_SERIALIZED
#define rbtxn_is_serialized(txn)
#define rbtxn_has_partial_change(txn)
#define LOGICAL_REWRITE_FORMAT
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
ReplicationSlot * MyReplicationSlot
bool ReplicationSlotValidateName(const char *name, int elevel)
void SnapBuildSnapDecRefcount(Snapshot snap)
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
void TeardownHistoricSnapshot(bool is_error)
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
static HTAB * tuplecid_data
struct SnapshotData * Snapshot
struct SnapshotData SnapshotData
struct SnapBuild * snapshot_builder
RelFileLocator old_locator
RelFileLocator new_locator
struct ReorderBufferChange::@110::@112 truncate
ReorderBufferChangeType action
bool clear_toast_afterwards
union ReorderBufferChange::@110 data
struct ReorderBufferTXN * txn
struct ReorderBufferChange::@110::@111 tp
struct ReorderBufferChange::@110::@114 tuplecid
struct ReorderBufferChange::@110::@113 msg
struct ReorderBufferChange::@110::@115 inval
SharedInvalidationMessage * invalidations
ReorderBufferChange change
ReorderBufferChange * change
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
XLogRecPtr restart_decoding_lsn
pairingheap_node txn_node
XLogRecPtr base_snapshot_lsn
TransactionId toplevel_xid
dlist_node catchange_node
SharedInvalidationMessage * invalidations
struct ReorderBufferTXN * toptxn
void * output_plugin_private
dlist_node base_snapshot_node
union ReorderBufferTXN::@116 xact_time
struct varlena * reconstructed
ReorderBufferTupleCidKey key
ReorderBufferStreamMessageCB stream_message
ReorderBufferStreamChangeCB stream_change
ReorderBufferBeginCB begin_prepare
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferMessageCB message
dlist_head txns_by_base_snapshot_lsn
dclist_head catchange_txns
ReorderBufferRollbackPreparedCB rollback_prepared
ReorderBufferPrepareCB prepare
ReorderBufferStreamStopCB stream_stop
ReorderBufferApplyChangeCB apply_change
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamAbortCB stream_abort
MemoryContext tup_context
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferStreamCommitCB stream_commit
ReorderBufferApplyTruncateCB apply_truncate
dlist_head toplevel_by_lsn
ReorderBufferBeginCB begin
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
bool TransactionIdDidCommit(TransactionId transactionId)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdEquals(id1, id2)
#define TransactionIdIsValid(xid)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
#define VARSIZE_SHORT(PTR)
#define VARATT_IS_EXTENDED(PTR)
#define VARATT_IS_SHORT(PTR)
#define SET_VARSIZE_COMPRESSED(PTR, len)
#define SET_VARTAG_EXTERNAL(PTR, tag)
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
#define VARDATA_EXTERNAL(PTR)
#define SET_VARSIZE(PTR, len)
#define VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer)
#define VARATT_IS_EXTERNAL(PTR)
static void pgstat_report_wait_start(uint32 wait_event_info)
static void pgstat_report_wait_end(void)
bool IsTransactionOrTransactionBlock(void)
void BeginInternalSubTransaction(const char *name)
TransactionId CheckXidAlive
void RollbackAndReleaseCurrentSubTransaction(void)
void StartTransactionCommand(void)
TransactionId GetCurrentTransactionIdIfAny(void)
TransactionId GetCurrentTransactionId(void)
void AbortCurrentTransaction(void)
int xidComparator(const void *arg1, const void *arg2)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr