PostgreSQL Source Code: src/backend/commands/async.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

127

128#include <limits.h>

130#include <signal.h>

131

153

154

155

156

157

158

159

160

161

162

163#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)

164

165

166

167

168

169

170

171

172

173

174

175

176

178{

179 int length;

185

186

187#define QUEUEALIGN(len) INTALIGN(len)

188

189#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)

190

191

192

193

195{

197 int offset;

199

200#define QUEUE_POS_PAGE(x) ((x).page)

201#define QUEUE_POS_OFFSET(x) ((x).offset)

202

203#define SET_QUEUE_POS(x,y,z) \

204 do { \

205 (x).page = (y); \

206 (x).offset = (z); \

207 } while (0)

208

209#define QUEUE_POS_EQUAL(x,y) \

210 ((x).page == (y).page && (x).offset == (y).offset)

211

212#define QUEUE_POS_IS_ZERO(x) \

213 ((x).page == 0 && (x).offset == 0)

214

215

216#define QUEUE_POS_MIN(x,y) \

217 (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \

218 (x).page != (y).page ? (y) : \

219 (x).offset < (y).offset ? (x) : (y))

220

221

222#define QUEUE_POS_MAX(x,y) \

223 (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \

224 (x).page != (y).page ? (x) : \

225 (x).offset > (y).offset ? (x) : (y))

226

227

228

229

230

231

232

233

234

235

236

237

238#define QUEUE_CLEANUP_DELAY 4

239

240

241

242

244{

245 int32 pid;

246 Oid dboid;

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

282{

285

287

289

293

295

296#define QUEUE_HEAD (asyncQueueControl->head)

297#define QUEUE_TAIL (asyncQueueControl->tail)

298#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)

299#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)

300#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)

301#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)

302#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)

303#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)

304

305

306

307

309

310#define NotifyCtl (&NotifyCtlData)

311#define QUEUE_PAGESIZE BLCKSZ

312

313#define QUEUE_FULL_WARN_INTERVAL 5000

314

315

316

317

318

319

321

322

323

324

325

326

327

328

329

330

331

332typedef enum

333{

338

339typedef struct

340{

344

346{

347 int nestingLevel;

351

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

382{

385

388

390{

391 int nestingLevel;

393 HTAB *hashtab;

396

397#define MIN_HASHABLE_NOTIFIES 16

398

400{

402};

403

405

406

407

408

409

410

411

412

414

415

417

418

420

421

423

424

426

427

429

430

451 char *page_buffer,

460

461

462

463

464

465static inline int64

467{

468 return p - q;

469}

470

471

472

473

474

475static inline bool

477{

478 return p < q;

479}

480

481

482

483

486{

488

489

492

494

495 return size;

496}

497

498

499

500

501void

503{

504 bool found;

506

507

508

509

512

515

516 if (!found)

517 {

518

525 {

530 }

531 }

532

533

534

535

536

541

542 if (!found)

543 {

544

545

546

548 }

549}

550

551

552

553

554

555

558{

559 const char *channel;

560 const char *payload;

561

563 channel = "";

564 else

566

568 payload = "";

569 else

571

572

574

576

578}

579

580

581

582

583

584

585

586

587

588

589

590void

592{

594 size_t channel_len;

595 size_t payload_len;

598

600 elog(ERROR, "cannot send notifications from a parallel worker");

601

603 elog(DEBUG1, "Async_Notify(%s)", channel);

604

605 channel_len = channel ? strlen(channel) : 0;

606 payload_len = payload ? strlen(payload) : 0;

607

608

609 if (channel_len == 0)

611 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

612 errmsg("channel name cannot be empty")));

613

614

617 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

618 errmsg("channel name too long")));

619

622 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

623 errmsg("payload string too long")));

624

625

626

627

628

629

630

631

633

635 channel_len + payload_len + 2);

638 strcpy(n->data, channel);

639 if (payload)

640 strcpy(n->data + channel_len + 1, payload);

641 else

642 n->data[channel_len + 1] = '\0';

643

645 {

647

648

649

650

651

652

658

659 notifies->hashtab = NULL;

662 }

663 else

664 {

665

667 {

668

671 return;

672 }

673

674

676 }

677

679}

680

681

682

683

684

685

686

687

688

689static void

691{

695

696

697

698

699

700

701

703

704

706 strlen(channel) + 1);

708 strcpy(actrec->channel, channel);

709

711 {

713

714

715

716

717

718

725 }

726 else

728

730}

731

732

733

734

735

736

737void

739{

742

744}

745

746

747

748

749

750

751void

753{

756

757

759 return;

760

762}

763

764

765

766

767

768

769void

771{

774

775

777 return;

778

780}

781

782

783

784

785

786

787

788

791{

793

794

796 {

797

799 }

800

801

803

805 {

808

810 }

811

813}

814

815

816

817

818

819

820

821

822static void

824{

827}

828

829

830

831

832

833

834

835void

837{

838

841 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

842 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));

843}

844

845

846

847

848

849

850

851

852

853

854

855

856

857

858

859

860void

862{

864

866 return;

867

870

871

873 {

875 {

877

878 switch (actrec->action)

879 {

882 break;

884

885 break;

887

888 break;

889 }

890 }

891 }

892

893

895 {

897

898

899

900

901

902

903

905

906

907

908

909

910

911

912

913

914

915

916

917

918

919

920

921

924

925

927 while (nextNotify != NULL)

928 {

929

930

931

932

933

934

935

936

937

938

939

940

945 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),

946 errmsg("too many notifications in the NOTIFY queue")));

949 }

950

951

952 }

953}

954

955

956

957

958

959

960

961

962

963

964

965

966

967void

969{

971

972

973

974

975

977 return;

978

981

982

984 {

986 {

988

989 switch (actrec->action)

990 {

993 break;

996 break;

999 break;

1000 }

1001 }

1002 }

1003

1004

1007

1008

1009

1010

1011

1012

1015

1016

1017

1018

1019

1020

1021

1022

1023

1024

1026 {

1029 }

1030

1031

1033}

1034

1035

1036

1037

1038

1039

1040static void

1042{

1046

1047

1048

1049

1050

1052 return;

1053

1056

1057

1058

1059

1060

1062 {

1065 }

1066

1067

1068

1069

1070

1071

1072

1073

1074

1075

1076

1077

1078

1079

1080

1081

1082

1083

1084

1085

1086

1092 {

1095

1097 prevListener = i;

1098 }

1102

1104 {

1107 }

1108 else

1109 {

1112 }

1114

1115

1117

1118

1119

1120

1121

1122

1123

1124

1125

1128}

1129

1130

1131

1132

1133

1134

1135static void

1137{

1139

1140

1142 return;

1143

1144

1145

1146

1147

1148

1149

1150

1151

1155}

1156

1157

1158

1159

1160

1161

1162static void

1164{

1166

1169

1171 {

1172 char *lchan = (char *) lfirst(q);

1173

1174 if (strcmp(lchan, channel) == 0)

1175 {

1178 break;

1179 }

1180 }

1181

1182

1183

1184

1185

1186}

1187

1188

1189

1190

1191

1192

1193static void

1195{

1198

1201}

1202

1203

1204

1205

1206

1207

1208

1209

1210

1211static bool

1213{

1215

1217 {

1218 char *lchan = (char *) lfirst(p);

1219

1220 if (strcmp(lchan, channel) == 0)

1221 return true;

1222 }

1223 return false;

1224}

1225

1226

1227

1228

1229

1230static void

1232{

1234

1236 return;

1237

1238

1239

1240

1242

1245

1248 else

1249 {

1251 {

1253 {

1255 break;

1256 }

1257 }

1258 }

1261

1262

1264}

1265

1266

1267

1268

1269

1270

1271static bool

1273{

1276 int64 occupied = headPage - tailPage;

1277

1279}

1280

1281

1282

1283

1284

1285

1286static bool

1288{

1291 bool pageJump = false;

1292

1293

1294

1295

1296

1297 offset += entryLength;

1299

1300

1301

1302

1303

1304

1306 {

1307 pageno++;

1308 offset = 0;

1309 pageJump = true;

1310 }

1311

1313 return pageJump;

1314}

1315

1316

1317

1318

1319static void

1321{

1324 int entryLength;

1325

1328

1329

1331 entryLength = QUEUEALIGN(entryLength);

1332 qe->length = entryLength;

1336 memcpy(qe->data, n->data, channellen + payloadlen + 2);

1337}

1338

1339

1340

1341

1342

1343

1344

1345

1346

1347

1348

1349

1350

1351

1352

1353

1354

1357{

1361 int offset;

1362 int slotno;

1364

1365

1366

1367

1368

1369

1370

1371

1372

1373

1374

1375

1377

1378

1379

1380

1381

1382

1385

1386

1388

1391 else

1394

1395

1396 NotifyCtl->shared->page_dirty[slotno] = true;

1397

1398 while (nextNotify != NULL)

1399 {

1401

1402

1404

1406

1407

1409 {

1410

1412 }

1413 else

1414 {

1415

1416

1417

1418

1419

1422 qe.data[0] = '\0';

1423 qe.data[1] = '\0';

1424 }

1425

1426

1427 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,

1428 &qe,

1430

1431

1433 {

1435

1438 if (lock != prevlock)

1439 {

1442 prevlock = lock;

1443 }

1444

1445

1446

1447

1448

1449

1450

1451

1452

1454

1455

1456

1457

1458

1459

1462

1463

1464 break;

1465 }

1466 }

1467

1468

1470

1472

1473 return nextNotify;

1474}

1475

1476

1477

1478

1479

1482{

1484

1485

1487

1491

1493}

1494

1495

1496

1497

1498

1499

1500

1501

1502

1503

1504

1505static double

1507{

1510 int64 occupied = headPage - tailPage;

1511

1512 if (occupied == 0)

1513 return (double) 0;

1514

1516}

1517

1518

1519

1520

1521

1522

1523

1524

1525

1526static void

1528{

1529 double fillDegree;

1531

1533 if (fillDegree < 0.5)

1534 return;

1535

1537

1540 {

1543

1545 {

1550 }

1551

1553 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),

1555 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)

1556 : 0),

1558 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")

1559 : 0)));

1560

1562 }

1563}

1564

1565

1566

1567

1568

1569

1570

1571

1572

1573

1574

1575

1576

1577

1578

1579

1580static void

1582{

1585 int count;

1586

1587

1588

1589

1590

1591

1592

1593

1594

1597 count = 0;

1598

1601 {

1604

1608 {

1609

1610

1611

1612

1614 continue;

1615 }

1616 else

1617 {

1618

1619

1620

1621

1624 continue;

1625 }

1626

1627 pids[count] = pid;

1628 procnos[count] = i;

1629 count++;

1630 }

1632

1633

1634 for (int i = 0; i < count; i++)

1635 {

1636 int32 pid = pids[i];

1637

1638

1639

1640

1641

1643 {

1645 continue;

1646 }

1647

1648

1649

1650

1651

1652

1653

1655 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);

1656 }

1657

1660}

1661

1662

1663

1664

1665

1666

1667

1668

1669

1670void

1672{

1673

1674

1675

1676

1677

1680

1681

1683}

1684

1685

1686

1687

1688

1689

1690void

1692{

1694

1695

1698 {

1701 {

1702

1704 }

1705 else

1706 {

1708

1710

1711

1712

1713

1716 childPendingActions->actions);

1717 pfree(childPendingActions);

1718 }

1719 }

1720

1721

1724 {

1726

1729 {

1730

1732 }

1733 else

1734 {

1735

1736

1737

1738

1739

1742

1744

1745 foreach(l, childPendingNotifies->events)

1746 {

1748

1751 }

1752 pfree(childPendingNotifies);

1753 }

1754 }

1755}

1756

1757

1758

1759

1760void

1762{

1764

1765

1766

1767

1768

1769

1770

1771

1772

1773

1774

1775

1778 {

1780

1782 pfree(childPendingActions);

1783 }

1784

1787 {

1789

1791 pfree(childPendingNotifies);

1792 }

1793}

1794

1795

1796

1797

1798

1799

1800

1801

1802

1803void

1805{

1806

1807

1808

1809

1810

1811

1813

1814

1816}

1817

1818

1819

1820

1821

1822

1823

1824

1825

1826

1827

1828

1829

1830

1831

1832

1833void

1835{

1837 return;

1838

1839

1842}

1843

1844

1845

1846

1847

1848

1849

1850static void

1852{

1856

1857

1858 union

1859 {

1862 } page_buffer;

1863

1864

1866

1871

1873 {

1874

1875 return;

1876 }

1877

1878

1879

1880

1881

1882

1883

1884

1885

1886

1887

1888

1889

1890

1891

1892

1893

1894

1895

1896

1897

1898

1899

1900

1901

1902

1903

1904

1905

1906

1907

1908

1909

1910

1911

1912

1913

1914

1915

1916

1918

1919

1920

1921

1922

1923

1924

1925

1926

1927

1928

1930 {

1931 bool reachedStop;

1932

1933 do

1934 {

1937 int slotno;

1938 int copysize;

1939

1940

1941

1942

1943

1944

1945

1949 {

1950

1952 if (copysize < 0)

1953 copysize = 0;

1954 }

1955 else

1956 {

1957

1959 }

1960 memcpy(page_buffer.buf + curoffset,

1961 NotifyCtl->shared->page_buffer[slotno] + curoffset,

1962 copysize);

1963

1965

1966

1967

1968

1969

1970

1971

1972

1973

1974

1975

1976

1977

1978

1979

1980

1982 page_buffer.buf,

1983 snapshot);

1984 } while (!reachedStop);

1985 }

1987 {

1988

1992 }

1994

1995

1997}

1998

1999

2000

2001

2002

2003

2004

2005

2006

2007

2008

2009

2010

2011

2012

2013

2014

2015static bool

2018 char *page_buffer,

2020{

2021 bool reachedStop = false;

2022 bool reachedEndOfPage;

2024

2025 do

2026 {

2028

2030 break;

2031

2033

2034

2035

2036

2037

2038

2040

2041

2043 {

2045 {

2046

2047

2048

2049

2050

2051

2052

2053

2054

2055

2056

2057

2058

2059

2060

2061

2062

2063

2064

2065 *current = thisentry;

2066 reachedStop = true;

2067 break;

2068 }

2070 {

2071

2072 char *channel = qe->data;

2073

2075 {

2076

2077 char *payload = qe->data + strlen(channel) + 1;

2078

2080 }

2081 }

2082 else

2083 {

2084

2085

2086

2087

2088 }

2089 }

2090

2091

2092 } while (!reachedEndOfPage);

2093

2095 reachedStop = true;

2096

2097 return reachedStop;

2098}

2099

2100

2101

2102

2103

2104

2105

2106

2107static void

2109{

2111 int64 oldtailpage;

2112 int64 newtailpage;

2114

2115

2117

2118

2119

2120

2121

2122

2123

2124

2125

2126

2127

2128

2129

2130

2131

2132

2133

2134

2138 {

2141 }

2145

2146

2147

2148

2149

2150

2151

2152

2156 {

2157

2158

2159

2160

2162

2166 }

2167

2169}

2170

2171

2172

2173

2174

2175

2176

2177

2178

2179

2180

2181

2182static void

2184{

2185

2187

2188

2190 return;

2191

2193 elog(DEBUG1, "ProcessIncomingNotify");

2194

2196

2197

2198

2199

2200

2202

2204

2206

2207

2208

2209

2210

2211 if (flush)

2213

2215

2217 elog(DEBUG1, "ProcessIncomingNotify: done");

2218}

2219

2220

2221

2222

2223void

2225{

2227 {

2229

2235

2236

2237

2238

2239

2240

2241 }

2242 else

2243 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);

2244}

2245

2246

2247static bool

2249{

2251 return false;

2252

2254 {

2255

2257 &n,

2259 NULL))

2260 return true;

2261 }

2262 else

2263 {

2264

2266

2268 {

2270

2275 return true;

2276 }

2277 }

2278

2279 return false;

2280}

2281

2282

2283

2284

2285

2286

2287

2288static void

2290{

2292

2293

2296 {

2299

2300

2308 256L,

2309 &hash_ctl,

2311

2312

2314 {

2316 bool found;

2317

2319 &oldn,

2321 &found);

2323 }

2324 }

2325

2326

2328

2329

2331 {

2332 bool found;

2333

2335 &n,

2337 &found);

2339 }

2340}

2341

2342

2343

2344

2345

2346

2349{

2351

2353

2356}

2357

2358

2359

2360

2361static int

2363{

2366

2372 return 0;

2373 return 1;

2374}

2375

2376

2377static void

2379{

2380

2381

2382

2383

2384

2385

2388}

2389

2390

2391

2392

2393bool

2395{

2397}

static void SignalBackends(void)

static double asyncQueueUsage(void)

#define MIN_HASHABLE_NOTIFIES

static void Exec_ListenCommit(const char *channel)

static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)

#define QUEUE_FIRST_LISTENER

#define QUEUE_POS_MAX(x, y)

static bool tryAdvanceTail

struct QueuePosition QueuePosition

void HandleNotifyInterrupt(void)

static void Exec_UnlistenCommit(const char *channel)

static void asyncQueueAdvanceTail(void)

int max_notify_queue_pages

static void Exec_ListenPreCommit(void)

static ActionList * pendingActions

static uint32 notification_hash(const void *key, Size keysize)

void Async_UnlistenAll(void)

static SlruCtlData NotifyCtlData

void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)

void AtCommit_Notify(void)

#define QUEUE_POS_MIN(x, y)

void ProcessNotifyInterrupt(bool flush)

static bool AsyncExistsPendingNotify(Notification *n)

#define QUEUE_BACKEND_POS(i)

static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)

static int notification_match(const void *key1, const void *key2, Size keysize)

#define SET_QUEUE_POS(x, y, z)

static void ProcessIncomingNotify(bool flush)

static void asyncQueueReadAllNotifications(void)

static void Async_UnlistenOnExit(int code, Datum arg)

#define QUEUE_POS_OFFSET(x)

static ListCell * asyncQueueAddEntries(ListCell *nextNotify)

static void ClearPendingActionsAndNotifies(void)

static List * listenChannels

Datum pg_listening_channels(PG_FUNCTION_ARGS)

Datum pg_notify(PG_FUNCTION_ARGS)

static NotificationList * pendingNotifies

#define AsyncQueueEntryEmptySize

static void AddEventToPendingNotifies(Notification *n)

static AsyncQueueControl * asyncQueueControl

static bool unlistenExitRegistered

static bool asyncQueuePagePrecedes(int64 p, int64 q)

static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)

void AtAbort_Notify(void)

#define QUEUE_POS_PAGE(x)

void PreCommit_Notify(void)

#define QUEUE_CLEANUP_DELAY

struct AsyncQueueControl AsyncQueueControl

static void asyncQueueFillWarning(void)

#define QUEUE_BACKEND_PID(i)

static void Exec_UnlistenAllCommit(void)

struct ActionList ActionList

Size AsyncShmemSize(void)

#define QUEUE_FULL_WARN_INTERVAL

void Async_Unlisten(const char *channel)

void Async_Listen(const char *channel)

#define NOTIFY_PAYLOAD_MAX_LENGTH

#define QUEUE_POS_IS_ZERO(x)

static int64 asyncQueuePageDiff(int64 p, int64 q)

static void queue_listen(ListenActionKind action, const char *channel)

static bool amRegisteredListener

#define QUEUE_NEXT_LISTENER(i)

#define QUEUE_BACKEND_DBOID(i)

void AtSubAbort_Notify(void)

struct NotificationList NotificationList

void AtPrepare_Notify(void)

void AtSubCommit_Notify(void)

static bool asyncQueueIsFull(void)

void AsyncShmemInit(void)

static void asyncQueueUnregister(void)

Datum pg_notification_queue_usage(PG_FUNCTION_ARGS)

struct AsyncQueueEntry AsyncQueueEntry

#define QUEUE_POS_EQUAL(x, y)

struct Notification Notification

static bool IsListeningOn(const char *channel)

void Async_Notify(const char *channel, const char *payload)

volatile sig_atomic_t notifyInterruptPending

bool check_notify_buffers(int *newval, void **extra, GucSource source)

struct QueueBackendStatus QueueBackendStatus

bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)

TimestampTz GetCurrentTimestamp(void)

#define CStringGetTextDatum(s)

#define FLEXIBLE_ARRAY_MEMBER

void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)

HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)

int errdetail(const char *fmt,...)

int errhint(const char *fmt,...)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

#define PG_GETARG_TEXT_PP(n)

#define PG_RETURN_FLOAT8(x)

#define SRF_IS_FIRSTCALL()

#define SRF_PERCALL_SETUP()

#define SRF_RETURN_NEXT(_funcctx, _result)

#define SRF_FIRSTCALL_INIT()

#define SRF_RETURN_DONE(_funcctx)

static Datum hash_any(const unsigned char *k, int keylen)

Assert(PointerIsAligned(start, uint64))

#define IsParallelWorker()

void before_shmem_exit(pg_on_exit_callback function, Datum arg)

void SetLatch(Latch *latch)

List * lappend(List *list, void *datum)

List * list_concat(List *list1, const List *list2)

void list_free_deep(List *list)

void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)

#define AccessExclusiveLock

bool LWLockAcquire(LWLock *lock, LWLockMode mode)

void LWLockRelease(LWLock *lock)

@ LWTRANCHE_NOTIFY_BUFFER

void * MemoryContextAlloc(MemoryContext context, Size size)

MemoryContext TopTransactionContext

char * pstrdup(const char *in)

void pfree(void *pointer)

MemoryContext TopMemoryContext

MemoryContext CurTransactionContext

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

static int list_length(const List *l)

#define foreach_delete_current(lst, var_or_cell)

static void * list_nth(const List *list, int n)

static ListCell * list_head(const List *l)

static ListCell * lnext(const List *l, const ListCell *c)

static rewind_source * source

CommandDest whereToSendOutput

static uint32 DatumGetUInt32(Datum X)

void pq_sendstring(StringInfo buf, const char *str)

void pq_endmessage(StringInfo buf)

void pq_beginmessage(StringInfo buf, char msgtype)

static void pq_sendint32(StringInfo buf, uint32 i)

#define INVALID_PROC_NUMBER

int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)

@ PROCSIG_NOTIFY_INTERRUPT

#define PqMsg_NotificationResponse

static void set_ps_display(const char *activity)

Size add_size(Size s1, Size s2)

Size mul_size(Size s1, Size s2)

void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)

void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)

int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)

bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)

bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)

int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)

int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)

void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)

Size SimpleLruShmemSize(int nslots, int nlsns)

bool check_slru_buffers(const char *name, int *newval)

static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)

#define SLRU_PAGES_PER_SEGMENT

bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)

Snapshot GetLatestSnapshot(void)

void UnregisterSnapshot(Snapshot snapshot)

Snapshot RegisterSnapshot(Snapshot snapshot)

struct ActionList * upper

QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]

TimestampTz lastQueueFillWarn

char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]

char channel[FLEXIBLE_ARRAY_MEMBER]

struct NotificationList * upper

char data[FLEXIBLE_ARRAY_MEMBER]

bool TransactionIdDidCommit(TransactionId transactionId)

#define InvalidTransactionId

void PreventCommandDuringRecovery(const char *cmdname)

static void usage(const char *progname)

char * text_to_cstring(const text *t)

bool IsTransactionOrTransactionBlock(void)

int GetCurrentTransactionNestLevel(void)

void StartTransactionCommand(void)

void CommitTransactionCommand(void)

TransactionId GetCurrentTransactionId(void)