PostgreSQL Source Code: src/backend/commands/async.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

127

128#include <limits.h>

130#include <signal.h>

131

153

154

155

156

157

158

159

160

161

162

163#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)

164

165

166

167

168

169

170

171

172

173

174

175

176

178{

179 int length;

185

186

187#define QUEUEALIGN(len) INTALIGN(len)

188

189#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)

190

191

192

193

195{

197 int offset;

199

200#define QUEUE_POS_PAGE(x) ((x).page)

201#define QUEUE_POS_OFFSET(x) ((x).offset)

202

203#define SET_QUEUE_POS(x,y,z) \

204 do { \

205 (x).page = (y); \

206 (x).offset = (z); \

207 } while (0)

208

209#define QUEUE_POS_EQUAL(x,y) \

210 ((x).page == (y).page && (x).offset == (y).offset)

211

212#define QUEUE_POS_IS_ZERO(x) \

213 ((x).page == 0 && (x).offset == 0)

214

215

216#define QUEUE_POS_MIN(x,y) \

217 (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \

218 (x).page != (y).page ? (y) : \

219 (x).offset < (y).offset ? (x) : (y))

220

221

222#define QUEUE_POS_MAX(x,y) \

223 (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \

224 (x).page != (y).page ? (x) : \

225 (x).offset > (y).offset ? (x) : (y))

226

227

228

229

230

231

232

233

234

235

236

237

238#define QUEUE_CLEANUP_DELAY 4

239

240

241

242

244{

245 int32 pid;

246 Oid dboid;

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

282{

285

287

289

293

295

296#define QUEUE_HEAD (asyncQueueControl->head)

297#define QUEUE_TAIL (asyncQueueControl->tail)

298#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)

299#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)

300#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)

301#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)

302#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)

303#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)

304

305

306

307

309

310#define NotifyCtl (&NotifyCtlData)

311#define QUEUE_PAGESIZE BLCKSZ

312

313#define QUEUE_FULL_WARN_INTERVAL 5000

314

315

316

317

318

319

321

322

323

324

325

326

327

328

329

330

331

332typedef enum

333{

338

339typedef struct

340{

344

346{

347 int nestingLevel;

351

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

382{

385

388

390{

391 int nestingLevel;

393 HTAB *hashtab;

396

397#define MIN_HASHABLE_NOTIFIES 16

398

400{

402};

403

405

406

407

408

409

410

411

412

414

415

417

418

420

421

423

424

426

427

429

430

459

460

461

462

463

464static inline int64

466{

467 return p - q;

468}

469

470

471

472

473

474static inline bool

476{

477 return p < q;

478}

479

480

481

482

485{

487

488

491

493

494 return size;

495}

496

497

498

499

500void

502{

503 bool found;

505

506

507

508

511

514

515 if (!found)

516 {

517

524 {

529 }

530 }

531

532

533

534

535

538 "pg_notify", LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU,

540

541 if (!found)

542 {

543

544

545

547 }

548}

549

550

551

552

553

554

557{

558 const char *channel;

559 const char *payload;

560

562 channel = "";

563 else

565

567 payload = "";

568 else

570

571

573

575

577}

578

579

580

581

582

583

584

585

586

587

588

589void

591{

593 size_t channel_len;

594 size_t payload_len;

597

599 elog(ERROR, "cannot send notifications from a parallel worker");

600

602 elog(DEBUG1, "Async_Notify(%s)", channel);

603

604 channel_len = channel ? strlen(channel) : 0;

605 payload_len = payload ? strlen(payload) : 0;

606

607

608 if (channel_len == 0)

610 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

611 errmsg("channel name cannot be empty")));

612

613

616 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

617 errmsg("channel name too long")));

618

621 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

622 errmsg("payload string too long")));

623

624

625

626

627

628

629

630

632

634 channel_len + payload_len + 2);

637 strcpy(n->data, channel);

638 if (payload)

639 strcpy(n->data + channel_len + 1, payload);

640 else

641 n->data[channel_len + 1] = '\0';

642

644 {

646

647

648

649

650

651

657

658 notifies->hashtab = NULL;

661 }

662 else

663 {

664

666 {

667

670 return;

671 }

672

673

675 }

676

678}

679

680

681

682

683

684

685

686

687

688static void

690{

694

695

696

697

698

699

700

702

703

705 strlen(channel) + 1);

707 strcpy(actrec->channel, channel);

708

710 {

712

713

714

715

716

717

724 }

725 else

727

729}

730

731

732

733

734

735

736void

738{

741

743}

744

745

746

747

748

749

750void

752{

755

756

758 return;

759

761}

762

763

764

765

766

767

768void

770{

773

774

776 return;

777

779}

780

781

782

783

784

785

786

787

790{

792

793

795 {

796

798 }

799

800

802

804 {

807

809 }

810

812}

813

814

815

816

817

818

819

820

821static void

823{

826}

827

828

829

830

831

832

833

834void

836{

837

840 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

841 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));

842}

843

844

845

846

847

848

849

850

851

852

853

854

855

856

857

858

859void

861{

863

865 return;

866

869

870

872 {

874 {

876

877 switch (actrec->action)

878 {

881 break;

883

884 break;

886

887 break;

888 }

889 }

890 }

891

892

894 {

896

897

898

899

900

901

902

904

905

906

907

908

909

910

911

912

913

914

915

916

917

918

919

920

923

924

926 while (nextNotify != NULL)

927 {

928

929

930

931

932

933

934

935

936

937

938

939

944 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),

945 errmsg("too many notifications in the NOTIFY queue")));

948 }

949

950

951 }

952}

953

954

955

956

957

958

959

960

961

962

963

964

965

966void

968{

970

971

972

973

974

976 return;

977

980

981

983 {

985 {

987

988 switch (actrec->action)

989 {

992 break;

995 break;

998 break;

999 }

1000 }

1001 }

1002

1003

1006

1007

1008

1009

1010

1011

1014

1015

1016

1017

1018

1019

1020

1021

1022

1023

1025 {

1028 }

1029

1030

1032}

1033

1034

1035

1036

1037

1038

1039static void

1041{

1045

1046

1047

1048

1049

1051 return;

1052

1055

1056

1057

1058

1059

1061 {

1064 }

1065

1066

1067

1068

1069

1070

1071

1072

1073

1074

1075

1076

1077

1078

1079

1080

1081

1082

1083

1084

1085

1091 {

1094

1096 prevListener = i;

1097 }

1101

1103 {

1106 }

1107 else

1108 {

1111 }

1113

1114

1116

1117

1118

1119

1120

1121

1122

1123

1124

1127}

1128

1129

1130

1131

1132

1133

1134static void

1136{

1138

1139

1141 return;

1142

1143

1144

1145

1146

1147

1148

1149

1150

1154}

1155

1156

1157

1158

1159

1160

1161static void

1163{

1165

1168

1170 {

1171 char *lchan = (char *) lfirst(q);

1172

1173 if (strcmp(lchan, channel) == 0)

1174 {

1177 break;

1178 }

1179 }

1180

1181

1182

1183

1184

1185}

1186

1187

1188

1189

1190

1191

1192static void

1194{

1197

1200}

1201

1202

1203

1204

1205

1206

1207

1208

1209

1210static bool

1212{

1214

1216 {

1217 char *lchan = (char *) lfirst(p);

1218

1219 if (strcmp(lchan, channel) == 0)

1220 return true;

1221 }

1222 return false;

1223}

1224

1225

1226

1227

1228

1229static void

1231{

1233

1235 return;

1236

1237

1238

1239

1241

1244

1247 else

1248 {

1250 {

1252 {

1254 break;

1255 }

1256 }

1257 }

1260

1261

1263}

1264

1265

1266

1267

1268

1269

1270static bool

1272{

1275 int64 occupied = headPage - tailPage;

1276

1278}

1279

1280

1281

1282

1283

1284

1285static bool

1287{

1290 bool pageJump = false;

1291

1292

1293

1294

1295

1296 offset += entryLength;

1298

1299

1300

1301

1302

1303

1305 {

1306 pageno++;

1307 offset = 0;

1308 pageJump = true;

1309 }

1310

1312 return pageJump;

1313}

1314

1315

1316

1317

1318static void

1320{

1323 int entryLength;

1324

1327

1328

1330 entryLength = QUEUEALIGN(entryLength);

1331 qe->length = entryLength;

1335 memcpy(qe->data, n->data, channellen + payloadlen + 2);

1336}

1337

1338

1339

1340

1341

1342

1343

1344

1345

1346

1347

1348

1349

1350

1351

1352

1353

1356{

1360 int offset;

1361 int slotno;

1363

1364

1365

1366

1367

1368

1369

1370

1371

1372

1373

1374

1376

1377

1378

1379

1380

1381

1384

1385

1387

1390 else

1393

1394

1395 NotifyCtl->shared->page_dirty[slotno] = true;

1396

1397 while (nextNotify != NULL)

1398 {

1400

1401

1403

1405

1406

1408 {

1409

1411 }

1412 else

1413 {

1414

1415

1416

1417

1418

1422 qe.data[0] = '\0';

1423 qe.data[1] = '\0';

1424 }

1425

1426

1427 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,

1428 &qe,

1430

1431

1433 {

1435

1438 if (lock != prevlock)

1439 {

1442 prevlock = lock;

1443 }

1444

1445

1446

1447

1448

1449

1450

1451

1452

1454

1455

1456

1457

1458

1459

1462

1463

1464 break;

1465 }

1466 }

1467

1468

1470

1472

1473 return nextNotify;

1474}

1475

1476

1477

1478

1479

1482{

1484

1485

1487

1491

1493}

1494

1495

1496

1497

1498

1499

1500

1501

1502

1503

1504

1505static double

1507{

1510 int64 occupied = headPage - tailPage;

1511

1512 if (occupied == 0)

1513 return (double) 0;

1514

1516}

1517

1518

1519

1520

1521

1522

1523

1524

1525

1526static void

1528{

1529 double fillDegree;

1531

1533 if (fillDegree < 0.5)

1534 return;

1535

1537

1540 {

1543

1545 {

1550 }

1551

1553 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),

1555 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)

1556 : 0),

1558 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")

1559 : 0)));

1560

1562 }

1563}

1564

1565

1566

1567

1568

1569

1570

1571

1572

1573

1574

1575

1576

1577

1578

1579

1580static void

1582{

1585 int count;

1586

1587

1588

1589

1590

1591

1592

1593

1594

1597 count = 0;

1598

1601 {

1604

1608 {

1609

1610

1611

1612

1614 continue;

1615 }

1616 else

1617 {

1618

1619

1620

1621

1624 continue;

1625 }

1626

1627 pids[count] = pid;

1628 procnos[count] = i;

1629 count++;

1630 }

1632

1633

1634 for (int i = 0; i < count; i++)

1635 {

1636 int32 pid = pids[i];

1637

1638

1639

1640

1641

1643 {

1645 continue;

1646 }

1647

1648

1649

1650

1651

1652

1653

1655 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);

1656 }

1657

1660}

1661

1662

1663

1664

1665

1666

1667

1668

1669

1670void

1672{

1673

1674

1675

1676

1677

1680

1681

1683}

1684

1685

1686

1687

1688

1689

1690void

1692{

1694

1695

1698 {

1701 {

1702

1704 }

1705 else

1706 {

1708

1710

1711

1712

1713

1716 childPendingActions->actions);

1717 pfree(childPendingActions);

1718 }

1719 }

1720

1721

1724 {

1726

1729 {

1730

1732 }

1733 else

1734 {

1735

1736

1737

1738

1739

1742

1744

1745 foreach(l, childPendingNotifies->events)

1746 {

1748

1751 }

1752 pfree(childPendingNotifies);

1753 }

1754 }

1755}

1756

1757

1758

1759

1760void

1762{

1764

1765

1766

1767

1768

1769

1770

1771

1772

1773

1774

1775

1778 {

1780

1782 pfree(childPendingActions);

1783 }

1784

1787 {

1789

1791 pfree(childPendingNotifies);

1792 }

1793}

1794

1795

1796

1797

1798

1799

1800

1801

1802

1803void

1805{

1806

1807

1808

1809

1810

1811

1813

1814

1816}

1817

1818

1819

1820

1821

1822

1823

1824

1825

1826

1827

1828

1829

1830

1831

1832

1833void

1835{

1837 return;

1838

1839

1842}

1843

1844

1845

1846

1847

1848

1849

1850static void

1852{

1856

1857

1859

1864

1866 {

1867

1868 return;

1869 }

1870

1871

1872

1873

1874

1875

1876

1877

1878

1879

1880

1881

1882

1883

1884

1885

1886

1887

1888

1889

1890

1891

1892

1893

1894

1895

1896

1897

1898

1899

1900

1901

1902

1903

1904

1905

1906

1907

1908

1909

1911

1912

1913

1914

1915

1916

1917

1918

1919

1920

1921

1922

1923

1924

1925

1926

1927

1928

1929 {

1931 bool reachedStop;

1932

1934

1935 do

1936 {

1937

1938

1939

1940

1941

1942

1943

1944

1945

1946

1947

1948

1949

1950

1951

1953 } while (!reachedStop);

1954

1955

1959

1961 }

1962

1963

1965}

1966

1967

1968

1969

1970

1971

1972

1973

1974

1975

1976static bool

1980{

1982 int slotno;

1983 char *page_buffer;

1984 bool reachedStop = false;

1985 bool reachedEndOfPage;

1986

1987

1988

1989

1990

1991

1993 char *local_buf_end = local_buf;

1994

1997 page_buffer = NotifyCtl->shared->page_buffer[slotno];

1998

1999 do

2000 {

2003

2005 break;

2006

2008

2009

2010

2011

2012

2013

2015

2016

2018 {

2020 {

2021

2022

2023

2024

2025

2026

2027

2028

2029

2030

2031

2032

2033

2034

2035

2036

2037

2038

2039

2040 *current = thisentry;

2041 reachedStop = true;

2042 break;

2043 }

2044

2045

2046

2047

2048

2049

2050

2051

2052

2053

2055 continue;

2056

2058 {

2059 memcpy(local_buf_end, qe, qe->length);

2060 local_buf_end += qe->length;

2061 }

2062 else

2063 {

2064

2065

2066

2067

2068 }

2069 }

2070

2071

2072 } while (!reachedEndOfPage);

2073

2074

2076

2077

2078

2079

2080

2081 Assert(local_buf_end - local_buf <= BLCKSZ);

2082 for (char *p = local_buf; p < local_buf_end;)

2083 {

2085

2086

2087 char *channel = qe->data;

2088

2090 {

2091

2092 char *payload = qe->data + strlen(channel) + 1;

2093

2095 }

2096

2098 }

2099

2101 reachedStop = true;

2102

2103 return reachedStop;

2104}

2105

2106

2107

2108

2109

2110

2111

2112

2113static void

2115{

2117 int64 oldtailpage;

2118 int64 newtailpage;

2120

2121

2123

2124

2125

2126

2127

2128

2129

2130

2131

2132

2133

2134

2135

2136

2137

2138

2139

2140

2144 {

2147 }

2151

2152

2153

2154

2155

2156

2157

2158

2162 {

2163

2164

2165

2166

2168

2172 }

2173

2175}

2176

2177

2178

2179

2180

2181

2182

2183

2184

2185

2186

2187

2188

2189

2190

2191

2192

2193

2194

2195void

2197{

2200 int64 curpage = -1;

2201 int slotno = -1;

2202 char *page_buffer = NULL;

2203 bool page_dirty = false;

2204

2205

2206

2207

2208

2209

2210

2211

2212

2215

2218

2219

2221

2222

2223

2224

2225

2226

2228 {

2233

2234

2235 if (pageno != curpage)

2236 {

2238

2239

2240 if (slotno >= 0)

2241 {

2242 if (page_dirty)

2243 {

2244 NotifyCtl->shared->page_dirty[slotno] = true;

2245 page_dirty = false;

2246 }

2248 }

2249

2254 page_buffer = NotifyCtl->shared->page_buffer[slotno];

2255 curpage = pageno;

2256 }

2257

2259 xid = qe->xid;

2260

2263 {

2265 {

2267 page_dirty = true;

2268 }

2269 else

2270 {

2272 page_dirty = true;

2273 }

2274 }

2275

2276

2278 }

2279

2280

2281 if (slotno >= 0)

2282 {

2283 if (page_dirty)

2284 NotifyCtl->shared->page_dirty[slotno] = true;

2286 }

2287

2289}

2290

2291

2292

2293

2294

2295

2296

2297

2298

2299

2300

2301

2302static void

2304{

2305

2307

2308

2310 return;

2311

2313 elog(DEBUG1, "ProcessIncomingNotify");

2314

2316

2317

2318

2319

2320

2322

2324

2326

2327

2328

2329

2330

2331 if (flush)

2333

2335

2337 elog(DEBUG1, "ProcessIncomingNotify: done");

2338}

2339

2340

2341

2342

2343void

2345{

2347 {

2349

2355

2356

2357

2358

2359

2360

2361 }

2362 else

2363 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);

2364}

2365

2366

2367static bool

2369{

2371 return false;

2372

2374 {

2375

2377 &n,

2379 NULL))

2380 return true;

2381 }

2382 else

2383 {

2384

2386

2388 {

2390

2395 return true;

2396 }

2397 }

2398

2399 return false;

2400}

2401

2402

2403

2404

2405

2406

2407

2408static void

2410{

2412

2413

2416 {

2419

2420

2428 256L,

2429 &hash_ctl,

2431

2432

2434 {

2436 bool found;

2437

2439 &oldn,

2441 &found);

2443 }

2444 }

2445

2446

2448

2449

2451 {

2452 bool found;

2453

2455 &n,

2457 &found);

2459 }

2460}

2461

2462

2463

2464

2465

2466

2469{

2471

2473

2476}

2477

2478

2479

2480

2481static int

2483{

2486

2492 return 0;

2493 return 1;

2494}

2495

2496

2497static void

2499{

2500

2501

2502

2503

2504

2505

2508}

2509

2510

2511

2512

2513bool

2515{

2517}

static void SignalBackends(void)

static double asyncQueueUsage(void)

#define MIN_HASHABLE_NOTIFIES

static void Exec_ListenCommit(const char *channel)

static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)

#define QUEUE_FIRST_LISTENER

#define QUEUE_POS_MAX(x, y)

static bool tryAdvanceTail

struct QueuePosition QueuePosition

void HandleNotifyInterrupt(void)

static void Exec_UnlistenCommit(const char *channel)

static void asyncQueueAdvanceTail(void)

int max_notify_queue_pages

static void Exec_ListenPreCommit(void)

static ActionList * pendingActions

static uint32 notification_hash(const void *key, Size keysize)

void Async_UnlistenAll(void)

static SlruCtlData NotifyCtlData

void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)

void AtCommit_Notify(void)

#define QUEUE_POS_MIN(x, y)

void ProcessNotifyInterrupt(bool flush)

static bool AsyncExistsPendingNotify(Notification *n)

#define QUEUE_BACKEND_POS(i)

static int notification_match(const void *key1, const void *key2, Size keysize)

#define SET_QUEUE_POS(x, y, z)

static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, Snapshot snapshot)

static void ProcessIncomingNotify(bool flush)

static void asyncQueueReadAllNotifications(void)

static void Async_UnlistenOnExit(int code, Datum arg)

#define QUEUE_POS_OFFSET(x)

static ListCell * asyncQueueAddEntries(ListCell *nextNotify)

static void ClearPendingActionsAndNotifies(void)

static List * listenChannels

Datum pg_listening_channels(PG_FUNCTION_ARGS)

Datum pg_notify(PG_FUNCTION_ARGS)

static NotificationList * pendingNotifies

#define AsyncQueueEntryEmptySize

static void AddEventToPendingNotifies(Notification *n)

static AsyncQueueControl * asyncQueueControl

static bool unlistenExitRegistered

static bool asyncQueuePagePrecedes(int64 p, int64 q)

static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)

void AtAbort_Notify(void)

#define QUEUE_POS_PAGE(x)

void PreCommit_Notify(void)

#define QUEUE_CLEANUP_DELAY

struct AsyncQueueControl AsyncQueueControl

static void asyncQueueFillWarning(void)

#define QUEUE_BACKEND_PID(i)

static void Exec_UnlistenAllCommit(void)

struct ActionList ActionList

Size AsyncShmemSize(void)

#define QUEUE_FULL_WARN_INTERVAL

void Async_Unlisten(const char *channel)

void Async_Listen(const char *channel)

#define NOTIFY_PAYLOAD_MAX_LENGTH

#define QUEUE_POS_IS_ZERO(x)

static int64 asyncQueuePageDiff(int64 p, int64 q)

static void queue_listen(ListenActionKind action, const char *channel)

static bool amRegisteredListener

#define QUEUE_NEXT_LISTENER(i)

#define QUEUE_BACKEND_DBOID(i)

void AtSubAbort_Notify(void)

struct NotificationList NotificationList

void AtPrepare_Notify(void)

void AtSubCommit_Notify(void)

static bool asyncQueueIsFull(void)

void AsyncShmemInit(void)

static void asyncQueueUnregister(void)

Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)

struct AsyncQueueEntry AsyncQueueEntry

#define QUEUE_POS_EQUAL(x, y)

struct Notification Notification

static bool IsListeningOn(const char *channel)

void Async_Notify(const char *channel, const char *payload)

volatile sig_atomic_t notifyInterruptPending

void AsyncNotifyFreezeXids(TransactionId newFrozenXid)

bool check_notify_buffers(int *newval, void **extra, GucSource source)

struct QueueBackendStatus QueueBackendStatus

bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)

TimestampTz GetCurrentTimestamp(void)

#define CStringGetTextDatum(s)

#define FLEXIBLE_ARRAY_MEMBER

void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)

HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)

int errdetail(const char *fmt,...)

int errhint(const char *fmt,...)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

#define PG_GETARG_TEXT_PP(n)

#define PG_RETURN_FLOAT8(x)

#define SRF_IS_FIRSTCALL()

#define SRF_PERCALL_SETUP()

#define SRF_RETURN_NEXT(_funcctx, _result)

#define SRF_FIRSTCALL_INIT()

#define SRF_RETURN_DONE(_funcctx)

static Datum hash_any(const unsigned char *k, int keylen)

Assert(PointerIsAligned(start, uint64))

#define IsParallelWorker()

void before_shmem_exit(pg_on_exit_callback function, Datum arg)

void SetLatch(Latch *latch)

List * lappend(List *list, void *datum)

List * list_concat(List *list1, const List *list2)

void list_free_deep(List *list)

void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)

#define AccessExclusiveLock

bool LWLockAcquire(LWLock *lock, LWLockMode mode)

void LWLockRelease(LWLock *lock)

void * MemoryContextAlloc(MemoryContext context, Size size)

MemoryContext TopTransactionContext

char * pstrdup(const char *in)

void pfree(void *pointer)

MemoryContext TopMemoryContext

MemoryContext CurTransactionContext

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

#define SLRU_PAGES_PER_SEGMENT

static int list_length(const List *l)

#define foreach_delete_current(lst, var_or_cell)

static void * list_nth(const List *list, int n)

static ListCell * list_head(const List *l)

static ListCell * lnext(const List *l, const ListCell *c)

static rewind_source * source

static char buf[DEFAULT_XLOG_SEG_SIZE]

CommandDest whereToSendOutput

static uint32 DatumGetUInt32(Datum X)

void pq_sendstring(StringInfo buf, const char *str)

void pq_endmessage(StringInfo buf)

void pq_beginmessage(StringInfo buf, char msgtype)

static void pq_sendint32(StringInfo buf, uint32 i)

#define INVALID_PROC_NUMBER

int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)

@ PROCSIG_NOTIFY_INTERRUPT

#define PqMsg_NotificationResponse

static void set_ps_display(const char *activity)

Size add_size(Size s1, Size s2)

Size mul_size(Size s1, Size s2)

void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)

void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)

int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)

bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)

bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)

int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)

int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)

void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)

Size SimpleLruShmemSize(int nslots, int nlsns)

bool check_slru_buffers(const char *name, int *newval)

static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)

bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)

Snapshot GetLatestSnapshot(void)

void UnregisterSnapshot(Snapshot snapshot)

Snapshot RegisterSnapshot(Snapshot snapshot)

struct ActionList * upper

QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]

TimestampTz lastQueueFillWarn

char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]

char channel[FLEXIBLE_ARRAY_MEMBER]

struct NotificationList * upper

char data[FLEXIBLE_ARRAY_MEMBER]

bool TransactionIdDidCommit(TransactionId transactionId)

#define FrozenTransactionId

#define InvalidTransactionId

#define TransactionIdIsNormal(xid)

static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)

void PreventCommandDuringRecovery(const char *cmdname)

static void usage(const char *progname)

char * text_to_cstring(const text *t)

bool IsTransactionOrTransactionBlock(void)

int GetCurrentTransactionNestLevel(void)

void StartTransactionCommand(void)

void CommitTransactionCommand(void)

TransactionId GetCurrentTransactionId(void)