PostgreSQL Source Code: src/backend/commands/async.c File Reference (original) (raw)

Go to the source code of this file.

Data Structures
struct AsyncQueueEntry
struct QueuePosition
struct QueueBackendStatus
struct AsyncQueueControl
struct ListenAction
struct ActionList
struct Notification
struct NotificationList
struct NotificationHash
Macros
#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
#define QUEUEALIGN(len) INTALIGN(len)
#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2)
#define QUEUE_POS_PAGE(x) ((x).page)
#define QUEUE_POS_OFFSET(x) ((x).offset)
#define SET_QUEUE_POS(x, y, z)
#define QUEUE_POS_EQUAL(x, y) ((x).page == (y).page && (x).offset == (y).offset)
#define QUEUE_POS_IS_ZERO(x) ((x).page == 0 && (x).offset == 0)
#define QUEUE_POS_MIN(x, y)
#define QUEUE_POS_MAX(x, y)
#define QUEUE_CLEANUP_DELAY 4
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
#define NotifyCtl (&NotifyCtlData)
#define QUEUE_PAGESIZE BLCKSZ
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
Typedefs
typedef struct AsyncQueueEntry AsyncQueueEntry
typedef struct QueuePosition QueuePosition
typedef struct QueueBackendStatus QueueBackendStatus
typedef struct AsyncQueueControl AsyncQueueControl
typedef struct ActionList ActionList
typedef struct Notification Notification
typedef struct NotificationList NotificationList
Functions
static int64 asyncQueuePageDiff (int64 p, int64 q)
static bool asyncQueuePagePrecedes (int64 p, int64 q)
static void queue_listen (ListenActionKind action, const char *channel)
static void Async_UnlistenOnExit (int code, Datum arg)
static void Exec_ListenPreCommit (void)
static void Exec_ListenCommit (const char *channel)
static void Exec_UnlistenCommit (const char *channel)
static void Exec_UnlistenAllCommit (void)
static bool IsListeningOn (const char *channel)
static void asyncQueueUnregister (void)
static bool asyncQueueIsFull (void)
static bool asyncQueueAdvance (volatile QueuePosition *position, int entryLength)
static void asyncQueueNotificationToEntry (Notification *n, AsyncQueueEntry *qe)
static ListCell * asyncQueueAddEntries (ListCell *nextNotify)
static double asyncQueueUsage (void)
static void asyncQueueFillWarning (void)
static void SignalBackends (void)
static void asyncQueueReadAllNotifications (void)
static bool asyncQueueProcessPageEntries (volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)
static void asyncQueueAdvanceTail (void)
static void ProcessIncomingNotify (bool flush)
static bool AsyncExistsPendingNotify (Notification *n)
static void AddEventToPendingNotifies (Notification *n)
static uint32 notification_hash (const void *key, Size keysize)
static int notification_match (const void *key1, const void *key2, Size keysize)
static void ClearPendingActionsAndNotifies (void)
Size AsyncShmemSize (void)
void AsyncShmemInit (void)
Datum pg_notify (PG_FUNCTION_ARGS)
void Async_Notify (const char *channel, const char *payload)
void Async_Listen (const char *channel)
void Async_Unlisten (const char *channel)
void Async_UnlistenAll (void)
Datum pg_listening_channels (PG_FUNCTION_ARGS)
void AtPrepare_Notify (void)
void PreCommit_Notify (void)
void AtCommit_Notify (void)
Datum pg_notification_queue_usage (PG_FUNCTION_ARGS)
void AtAbort_Notify (void)
void AtSubCommit_Notify (void)
void AtSubAbort_Notify (void)
void HandleNotifyInterrupt (void)
void ProcessNotifyInterrupt (bool flush)
void NotifyMyFrontEnd (const char *channel, const char *payload, int32 srcPid)
bool check_notify_buffers (int *newval, void **extra, GucSource source)
Variables
static AsyncQueueControl * asyncQueueControl
static SlruCtlData NotifyCtlData
static List * listenChannels = NIL
static ActionList * pendingActions = NULL
static NotificationList * pendingNotifies = NULL
volatile sig_atomic_t notifyInterruptPending = false
static bool unlistenExitRegistered = false
static bool amRegisteredListener = false
static bool tryAdvanceTail = false
bool Trace_notify = false
int max_notify_queue_pages = 1048576

AsyncQueueEntryEmptySize

MIN_HASHABLE_NOTIFIES

#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */

NOTIFY_PAYLOAD_MAX_LENGTH

#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)

NotifyCtl

QUEUE_BACKEND_DBOID

QUEUE_BACKEND_PID

QUEUE_BACKEND_POS

QUEUE_CLEANUP_DELAY

#define QUEUE_CLEANUP_DELAY 4

QUEUE_FIRST_LISTENER

QUEUE_FULL_WARN_INTERVAL

#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */

QUEUE_HEAD

QUEUE_NEXT_LISTENER

QUEUE_PAGESIZE

#define QUEUE_PAGESIZE BLCKSZ

QUEUE_POS_EQUAL

| #define QUEUE_POS_EQUAL | ( | | x, | | ------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | ----------------------------------------------------- | | | y | | | | | ) | ((x).page == (y).page && (x).offset == (y).offset) | | |

QUEUE_POS_IS_ZERO

| #define QUEUE_POS_IS_ZERO | ( | | x | ) | ((x).page == 0 && (x).offset == 0) | | ---------------------------- | - | | ---------------------------------------------------- | - | ---------------------------------------------------------------------------------------------------------------------------------------- |

QUEUE_POS_MAX

| #define QUEUE_POS_MAX | ( | | x, | | ------------------------------------------------------- | - | | ----------------------------------------------------- | | | y | | | | | ) | | | |

Value:

(x).page != (y).page ? (x) : \

(x).offset > (y).offset ? (x) : (y))

static bool asyncQueuePagePrecedes(int64 p, int64 q)

Definition at line 222 of file async.c.

QUEUE_POS_MIN

| #define QUEUE_POS_MIN | ( | | x, | | ------------------------------------------------------- | - | | ----------------------------------------------------- | | | y | | | | | ) | | | |

Value:

(x).page != (y).page ? (y) : \

(x).offset < (y).offset ? (x) : (y))

Definition at line 216 of file async.c.

QUEUE_POS_OFFSET

| #define QUEUE_POS_OFFSET | ( | | x | ) | ((x).offset) | | -------------------------- | - | | ---------------------------------------------------- | - | --------------------------------------------------------------- |

QUEUE_POS_PAGE

| #define QUEUE_POS_PAGE | ( | | x | ) | ((x).page) | | ------------------------ | - | | ---------------------------------------------------- | - | ------------------------------------------------------------- |

QUEUE_STOP_PAGE

QUEUE_TAIL

QUEUEALIGN

SET_QUEUE_POS

| #define SET_QUEUE_POS | ( | | x, | | -------------------------------------------------------- | - | | ----------------------------------------------------- | | | y, | | | | | | z | | | | | ) | | | |

Value:

do { \

(x).page = (y); \

(x).offset = (z); \

} while (0)

Definition at line 203 of file async.c.

ActionList

AsyncQueueControl

AsyncQueueEntry

Notification

NotificationList

QueueBackendStatus

QueuePosition

ListenActionKind

Enumerator
LISTEN_LISTEN
LISTEN_UNLISTEN
LISTEN_UNLISTEN_ALL

Definition at line 332 of file async.c.

AddEventToPendingNotifies()

static void AddEventToPendingNotifies ( Notification * n) static

Definition at line 2289 of file async.c.

2290{

2292

2293

2296 {

2299

2300

2308 256L,

2309 &hash_ctl,

2311

2312

2314 {

2316 bool found;

2317

2319 &oldn,

2321 &found);

2323 }

2324 }

2325

2326

2328

2329

2331 {

2332 bool found;

2333

2335 &n,

2337 &found);

2339 }

2340}

#define MIN_HASHABLE_NOTIFIES

static uint32 notification_hash(const void *key, Size keysize)

static int notification_match(const void *key1, const void *key2, Size keysize)

static NotificationList * pendingNotifies

void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)

HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)

Assert(PointerIsAligned(start, uint64))

List * lappend(List *list, void *datum)

MemoryContext CurTransactionContext

static int list_length(const List *l)

References Assert(), CurTransactionContext, HASHCTL::entrysize, NotificationList::events, HASHCTL::hash, HASH_COMPARE, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), NotificationList::hashtab, HASHCTL::hcxt, HASHCTL::keysize, lappend(), lfirst, list_length(), HASHCTL::match, MIN_HASHABLE_NOTIFIES, NIL, notification_hash(), notification_match(), and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

Async_Listen()

void Async_Listen ( const char * channel )

Async_Notify()

void Async_Notify ( const char * channel,
const char * payload
)

Definition at line 591 of file async.c.

592{

594 size_t channel_len;

595 size_t payload_len;

598

600 elog(ERROR, "cannot send notifications from a parallel worker");

601

603 elog(DEBUG1, "Async_Notify(%s)", channel);

604

605 channel_len = channel ? strlen(channel) : 0;

606 payload_len = payload ? strlen(payload) : 0;

607

608

609 if (channel_len == 0)

611 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

612 errmsg("channel name cannot be empty")));

613

614

617 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

618 errmsg("channel name too long")));

619

622 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

623 errmsg("payload string too long")));

624

625

626

627

628

629

630

631

633

635 channel_len + payload_len + 2);

638 strcpy(n->data, channel);

639 if (payload)

640 strcpy(n->data + channel_len + 1, payload);

641 else

642 n->data[channel_len + 1] = '\0';

643

645 {

647

648

649

650

651

652

658

659 notifies->hashtab = NULL;

662 }

663 else

664 {

665

667 {

668

671 return;

672 }

673

674

676 }

677

679}

static bool AsyncExistsPendingNotify(Notification *n)

static void AddEventToPendingNotifies(Notification *n)

#define NOTIFY_PAYLOAD_MAX_LENGTH

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

#define IsParallelWorker()

void * MemoryContextAlloc(MemoryContext context, Size size)

MemoryContext TopTransactionContext

void pfree(void *pointer)

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

struct NotificationList * upper

char data[FLEXIBLE_ARRAY_MEMBER]

int GetCurrentTransactionNestLevel(void)

References AddEventToPendingNotifies(), AsyncExistsPendingNotify(), Notification::channel_len, CurTransactionContext, Notification::data, data, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, GetCurrentTransactionNestLevel(), NotificationList::hashtab, IsParallelWorker, list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), NAMEDATALEN, NotificationList::nestingLevel, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload_len, pendingNotifies, pfree(), TopTransactionContext, Trace_notify, and NotificationList::upper.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

Async_Unlisten()

void Async_Unlisten ( const char * channel )

Async_UnlistenAll()

void Async_UnlistenAll ( void )

Async_UnlistenOnExit()

static void Async_UnlistenOnExit ( int code, Datum arg ) static

AsyncExistsPendingNotify()

static bool AsyncExistsPendingNotify ( Notification * n) static

Definition at line 2248 of file async.c.

2249{

2251 return false;

2252

2254 {

2255

2257 &n,

2259 NULL))

2260 return true;

2261 }

2262 else

2263 {

2264

2266

2268 {

2270

2275 return true;

2276 }

2277 }

2278

2279 return false;

2280}

References Notification::channel_len, Notification::data, NotificationList::events, HASH_FIND, hash_search(), NotificationList::hashtab, lfirst, Notification::payload_len, and pendingNotifies.

Referenced by Async_Notify(), and AtSubCommit_Notify().

asyncQueueAddEntries()

Definition at line 1356 of file async.c.

1357{

1361 int offset;

1362 int slotno;

1364

1365

1366

1367

1368

1369

1370

1371

1372

1373

1374

1375

1377

1378

1379

1380

1381

1382

1385

1386

1388

1391 else

1394

1395

1396 NotifyCtl->shared->page_dirty[slotno] = true;

1397

1398 while (nextNotify != NULL)

1399 {

1401

1402

1404

1406

1407

1409 {

1410

1412 }

1413 else

1414 {

1415

1416

1417

1418

1419

1422 qe.data[0] = '\0';

1423 qe.data[1] = '\0';

1424 }

1425

1426

1427 memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,

1428 &qe,

1430

1431

1433 {

1435

1438 if (lock != prevlock)

1439 {

1442 prevlock = lock;

1443 }

1444

1445

1446

1447

1448

1449

1450

1451

1452

1454

1455

1456

1457

1458

1459

1462

1463

1464 break;

1465 }

1466 }

1467

1468

1470

1472

1473 return nextNotify;

1474}

static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)

static bool tryAdvanceTail

#define QUEUE_POS_OFFSET(x)

static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength)

#define QUEUE_POS_PAGE(x)

#define QUEUE_CLEANUP_DELAY

#define QUEUE_POS_IS_ZERO(x)

bool LWLockAcquire(LWLock *lock, LWLockMode mode)

void LWLockRelease(LWLock *lock)

static ListCell * lnext(const List *l, const ListCell *c)

int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)

int SimpleLruZeroPage(SlruCtl ctl, int64 pageno)

static LWLock * SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)

char data[NAMEDATALEN+NOTIFY_PAYLOAD_MAX_LENGTH]

#define InvalidTransactionId

References asyncQueueAdvance(), asyncQueueNotificationToEntry(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, NotificationList::events, InvalidOid, InvalidTransactionId, AsyncQueueEntry::length, lfirst, lnext(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, pendingNotifies, QUEUE_CLEANUP_DELAY, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_IS_ZERO, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, SimpleLruGetBankLock(), SimpleLruReadPage(), SimpleLruZeroPage(), and tryAdvanceTail.

Referenced by PreCommit_Notify().

asyncQueueAdvance()

static bool asyncQueueAdvance ( volatile QueuePosition * position, int entryLength ) static

asyncQueueAdvanceTail()

static void asyncQueueAdvanceTail ( void ) static

Definition at line 2108 of file async.c.

2109{

2111 int64 oldtailpage;

2112 int64 newtailpage;

2114

2115

2117

2118

2119

2120

2121

2122

2123

2124

2125

2126

2127

2128

2129

2130

2131

2132

2133

2134

2138 {

2141 }

2145

2146

2147

2148

2149

2150

2151

2152

2156 {

2157

2158

2159

2160

2162

2166 }

2167

2169}

#define QUEUE_FIRST_LISTENER

#define QUEUE_POS_MIN(x, y)

#define QUEUE_BACKEND_POS(i)

#define QUEUE_BACKEND_PID(i)

#define QUEUE_NEXT_LISTENER(i)

#define INVALID_PROC_NUMBER

void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)

#define SLRU_PAGES_PER_SEGMENT

References Assert(), asyncQueuePagePrecedes(), i, INVALID_PROC_NUMBER, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NotifyCtl, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_MIN, QUEUE_POS_PAGE, QUEUE_STOP_PAGE, QUEUE_TAIL, SimpleLruTruncate(), and SLRU_PAGES_PER_SEGMENT.

Referenced by AtCommit_Notify(), and pg_notification_queue_usage().

asyncQueueFillWarning()

static void asyncQueueFillWarning ( void ) static

Definition at line 1527 of file async.c.

1528{

1529 double fillDegree;

1531

1533 if (fillDegree < 0.5)

1534 return;

1535

1537

1540 {

1543

1545 {

1550 }

1551

1553 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),

1555 errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)

1556 : 0),

1558 errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")

1559 : 0)));

1560

1562 }

1563}

static double asyncQueueUsage(void)

static AsyncQueueControl * asyncQueueControl

#define QUEUE_FULL_WARN_INTERVAL

#define QUEUE_POS_EQUAL(x, y)

bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)

TimestampTz GetCurrentTimestamp(void)

int errdetail(const char *fmt,...)

int errhint(const char *fmt,...)

TimestampTz lastQueueFillWarn

References Assert(), asyncQueueControl, asyncQueueUsage(), ereport, errdetail(), errhint(), errmsg(), GetCurrentTimestamp(), i, INVALID_PROC_NUMBER, InvalidPid, AsyncQueueControl::lastQueueFillWarn, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_FULL_WARN_INTERVAL, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MIN, TimestampDifferenceExceeds(), and WARNING.

Referenced by PreCommit_Notify().

asyncQueueIsFull()

static bool asyncQueueIsFull ( void ) static

asyncQueueNotificationToEntry()

Definition at line 1320 of file async.c.

1321{

1324 int entryLength;

1325

1328

1329

1331 entryLength = QUEUEALIGN(entryLength);

1332 qe->length = entryLength;

1336 memcpy(qe->data, n->data, channellen + payloadlen + 2);

1337}

TransactionId GetCurrentTransactionId(void)

References Assert(), AsyncQueueEntryEmptySize, Notification::channel_len, Notification::data, AsyncQueueEntry::data, AsyncQueueEntry::dboid, GetCurrentTransactionId(), AsyncQueueEntry::length, MyDatabaseId, MyProcPid, NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, Notification::payload_len, QUEUEALIGN, AsyncQueueEntry::srcPid, and AsyncQueueEntry::xid.

Referenced by asyncQueueAddEntries().

asyncQueuePageDiff()

asyncQueuePagePrecedes()

static bool asyncQueuePagePrecedes ( int64 p, int64 q ) inlinestatic

asyncQueueProcessPageEntries()

Definition at line 2016 of file async.c.

2020{

2021 bool reachedStop = false;

2022 bool reachedEndOfPage;

2024

2025 do

2026 {

2028

2030 break;

2031

2033

2034

2035

2036

2037

2038

2040

2041

2043 {

2045 {

2046

2047

2048

2049

2050

2051

2052

2053

2054

2055

2056

2057

2058

2059

2060

2061

2062

2063

2064

2065 *current = thisentry;

2066 reachedStop = true;

2067 break;

2068 }

2070 {

2071

2072 char *channel = qe->data;

2073

2075 {

2076

2077 char *payload = qe->data + strlen(channel) + 1;

2078

2080 }

2081 }

2082 else

2083 {

2084

2085

2086

2087

2088 }

2089 }

2090

2091

2092 } while (!reachedEndOfPage);

2093

2095 reachedStop = true;

2096

2097 return reachedStop;

2098}

void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)

static bool IsListeningOn(const char *channel)

bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)

bool TransactionIdDidCommit(TransactionId transactionId)

References asyncQueueAdvance(), AsyncQueueEntry::data, AsyncQueueEntry::dboid, IsListeningOn(), AsyncQueueEntry::length, MyDatabaseId, NotifyMyFrontEnd(), QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, AsyncQueueEntry::srcPid, TransactionIdDidCommit(), AsyncQueueEntry::xid, and XidInMVCCSnapshot().

Referenced by asyncQueueReadAllNotifications().

asyncQueueReadAllNotifications()

static void asyncQueueReadAllNotifications ( void ) static

Definition at line 1851 of file async.c.

1852{

1856

1857

1858 union

1859 {

1862 } page_buffer;

1863

1864

1866

1871

1873 {

1874

1875 return;

1876 }

1877

1878

1879

1880

1881

1882

1883

1884

1885

1886

1887

1888

1889

1890

1891

1892

1893

1894

1895

1896

1897

1898

1899

1900

1901

1902

1903

1904

1905

1906

1907

1908

1909

1910

1911

1912

1913

1914

1915

1916

1918

1919

1920

1921

1922

1923

1924

1925

1926

1927

1928

1930 {

1931 bool reachedStop;

1932

1933 do

1934 {

1937 int slotno;

1938 int copysize;

1939

1940

1941

1942

1943

1944

1945

1949 {

1950

1952 if (copysize < 0)

1953 copysize = 0;

1954 }

1955 else

1956 {

1957

1959 }

1960 memcpy(page_buffer.buf + curoffset,

1961 NotifyCtl->shared->page_buffer[slotno] + curoffset,

1962 copysize);

1963

1965

1966

1967

1968

1969

1970

1971

1972

1973

1974

1975

1976

1977

1978

1979

1980

1982 page_buffer.buf,

1983 snapshot);

1984 } while (!reachedStop);

1985 }

1987 {

1988

1992 }

1994

1995

1997}

static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot)

int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)

Snapshot GetLatestSnapshot(void)

void UnregisterSnapshot(Snapshot snapshot)

Snapshot RegisterSnapshot(Snapshot snapshot)

References Assert(), asyncQueueProcessPageEntries(), buf, GetLatestSnapshot(), InvalidTransactionId, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyProcNumber, MyProcPid, NotifyCtl, PG_END_TRY, PG_FINALLY, PG_TRY, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_PAGESIZE, QUEUE_POS_EQUAL, QUEUE_POS_OFFSET, QUEUE_POS_PAGE, RegisterSnapshot(), SimpleLruGetBankLock(), SimpleLruReadPage_ReadOnly(), and UnregisterSnapshot().

Referenced by Exec_ListenPreCommit(), and ProcessIncomingNotify().

asyncQueueUnregister()

static void asyncQueueUnregister ( void ) static

Definition at line 1231 of file async.c.

1232{

1234

1236 return;

1237

1238

1239

1240

1242

1245

1248 else

1249 {

1251 {

1253 {

1255 break;

1256 }

1257 }

1258 }

1261

1262

1264}

static List * listenChannels

static bool amRegisteredListener

#define QUEUE_BACKEND_DBOID(i)

References amRegisteredListener, Assert(), i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, listenChannels, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcNumber, NIL, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_FIRST_LISTENER, and QUEUE_NEXT_LISTENER.

Referenced by Async_UnlistenOnExit(), AtAbort_Notify(), and AtCommit_Notify().

asyncQueueUsage()

static double asyncQueueUsage ( void ) static

AsyncShmemInit()

void AsyncShmemInit ( void )

Definition at line 502 of file async.c.

503{

504 bool found;

506

507

508

509

512

515

516 if (!found)

517 {

518

525 {

530 }

531 }

532

533

534

535

536

541

542 if (!found)

543 {

544

545

546

548 }

549}

@ LWTRANCHE_NOTIFY_BUFFER

Size add_size(Size s1, Size s2)

Size mul_size(Size s1, Size s2)

void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)

void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler, bool long_segment_names)

bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)

bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)

References add_size(), asyncQueueControl, asyncQueuePagePrecedes(), i, INVALID_PROC_NUMBER, InvalidOid, InvalidPid, AsyncQueueControl::lastQueueFillWarn, LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU, MaxBackends, mul_size(), notify_buffers, NotifyCtl, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_STOP_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SlruScanDirCbDeleteAll(), SlruScanDirectory(), and SYNC_HANDLER_NONE.

Referenced by CreateOrAttachShmemStructs().

AsyncShmemSize()

Size AsyncShmemSize ( void )

AtAbort_Notify()

void AtAbort_Notify ( void )

AtCommit_Notify()

void AtCommit_Notify ( void )

Definition at line 968 of file async.c.

969{

971

972

973

974

975

977 return;

978

981

982

984 {

986 {

988

989 switch (actrec->action)

990 {

993 break;

996 break;

999 break;

1000 }

1001 }

1002 }

1003

1004

1007

1008

1009

1010

1011

1012

1015

1016

1017

1018

1019

1020

1021

1022

1023

1024

1026 {

1029 }

1030

1031

1033}

static void SignalBackends(void)

static void Exec_ListenCommit(const char *channel)

static void Exec_UnlistenCommit(const char *channel)

static void asyncQueueAdvanceTail(void)

char channel[FLEXIBLE_ARRAY_MEMBER]

References ListenAction::action, ActionList::actions, amRegisteredListener, asyncQueueAdvanceTail(), asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, listenChannels, NIL, pendingActions, pendingNotifies, SignalBackends(), Trace_notify, and tryAdvanceTail.

Referenced by CommitTransaction().

AtPrepare_Notify()

void AtPrepare_Notify ( void )

AtSubAbort_Notify()

void AtSubAbort_Notify ( void )

AtSubCommit_Notify()

void AtSubCommit_Notify ( void )

Definition at line 1691 of file async.c.

1692{

1694

1695

1698 {

1701 {

1702

1704 }

1705 else

1706 {

1708

1710

1711

1712

1713

1716 childPendingActions->actions);

1717 pfree(childPendingActions);

1718 }

1719 }

1720

1721

1724 {

1726

1729 {

1730

1732 }

1733 else

1734 {

1735

1736

1737

1738

1739

1742

1744

1745 foreach(l, childPendingNotifies->events)

1746 {

1748

1751 }

1752 pfree(childPendingNotifies);

1753 }

1754 }

1755}

List * list_concat(List *list1, const List *list2)

References ActionList::actions, AddEventToPendingNotifies(), Assert(), AsyncExistsPendingNotify(), NotificationList::events, GetCurrentTransactionNestLevel(), lfirst, list_concat(), ActionList::nestingLevel, NotificationList::nestingLevel, pendingActions, pendingNotifies, pfree(), ActionList::upper, and NotificationList::upper.

Referenced by CommitSubTransaction().

check_notify_buffers()

bool check_notify_buffers ( int * newval,
void ** extra,
GucSource source
)

ClearPendingActionsAndNotifies()

static void ClearPendingActionsAndNotifies ( void ) static

Exec_ListenCommit()

static void Exec_ListenCommit ( const char * channel) static

Exec_ListenPreCommit()

static void Exec_ListenPreCommit ( void ) static

Definition at line 1041 of file async.c.

1042{

1046

1047

1048

1049

1050

1052 return;

1053

1056

1057

1058

1059

1060

1062 {

1065 }

1066

1067

1068

1069

1070

1071

1072

1073

1074

1075

1076

1077

1078

1079

1080

1081

1082

1083

1084

1085

1086

1092 {

1095

1097 prevListener = i;

1098 }

1102

1104 {

1107 }

1108 else

1109 {

1112 }

1114

1115

1117

1118

1119

1120

1121

1122

1123

1124

1125

1128}

#define QUEUE_POS_MAX(x, y)

static void asyncQueueReadAllNotifications(void)

static void Async_UnlistenOnExit(int code, Datum arg)

void before_shmem_exit(pg_on_exit_callback function, Datum arg)

References amRegisteredListener, Async_UnlistenOnExit(), asyncQueueReadAllNotifications(), before_shmem_exit(), DEBUG1, elog, i, INVALID_PROC_NUMBER, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyDatabaseId, MyProcNumber, MyProcPid, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_MAX, QUEUE_TAIL, Trace_notify, and unlistenExitRegistered.

Referenced by PreCommit_Notify().

Exec_UnlistenAllCommit()

static void Exec_UnlistenAllCommit ( void ) static

Exec_UnlistenCommit()

static void Exec_UnlistenCommit ( const char * channel) static

HandleNotifyInterrupt()

void HandleNotifyInterrupt ( void )

IsListeningOn()

static bool IsListeningOn ( const char * channel) static

notification_hash()

static uint32 notification_hash ( const void * key, Size keysize ) static

notification_match()

static int notification_match ( const void * key1, const void * key2, Size keysize ) static

NotifyMyFrontEnd()

void NotifyMyFrontEnd ( const char * channel,
const char * payload,
int32 srcPid
)

Definition at line 2224 of file async.c.

2225{

2227 {

2229

2235

2236

2237

2238

2239

2240

2241 }

2242 else

2243 elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);

2244}

CommandDest whereToSendOutput

void pq_sendstring(StringInfo buf, const char *str)

void pq_endmessage(StringInfo buf)

void pq_beginmessage(StringInfo buf, char msgtype)

static void pq_sendint32(StringInfo buf, uint32 i)

#define PqMsg_NotificationResponse

References buf, DestRemote, elog, INFO, pq_beginmessage(), pq_endmessage(), pq_sendint32(), pq_sendstring(), PqMsg_NotificationResponse, and whereToSendOutput.

Referenced by asyncQueueProcessPageEntries(), and ProcessParallelMessage().

pg_listening_channels()

Definition at line 790 of file async.c.

791{

793

794

796 {

797

799 }

800

801

803

805 {

808

810 }

811

813}

#define CStringGetTextDatum(s)

#define SRF_IS_FIRSTCALL()

#define SRF_PERCALL_SETUP()

#define SRF_RETURN_NEXT(_funcctx, _result)

#define SRF_FIRSTCALL_INIT()

#define SRF_RETURN_DONE(_funcctx)

static void * list_nth(const List *list, int n)

References FuncCallContext::call_cntr, CStringGetTextDatum, list_length(), list_nth(), listenChannels, SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, and SRF_RETURN_NEXT.

pg_notification_queue_usage()

pg_notify()

PreCommit_Notify()

void PreCommit_Notify ( void )

Definition at line 861 of file async.c.

862{

864

866 return;

867

870

871

873 {

875 {

877

878 switch (actrec->action)

879 {

882 break;

884

885 break;

887

888 break;

889 }

890 }

891 }

892

893

895 {

897

898

899

900

901

902

903

905

906

907

908

909

910

911

912

913

914

915

916

917

918

919

920

921

924

925

927 while (nextNotify != NULL)

928 {

929

930

931

932

933

934

935

936

937

938

939

940

945 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),

946 errmsg("too many notifications in the NOTIFY queue")));

949 }

950

951

952 }

953}

static void Exec_ListenPreCommit(void)

static ListCell * asyncQueueAddEntries(ListCell *nextNotify)

static void asyncQueueFillWarning(void)

static bool asyncQueueIsFull(void)

void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)

#define AccessExclusiveLock

static ListCell * list_head(const List *l)

References AccessExclusiveLock, ListenAction::action, ActionList::actions, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, NotificationList::events, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), pendingActions, pendingNotifies, and Trace_notify.

Referenced by CommitTransaction().

ProcessIncomingNotify()

static void ProcessIncomingNotify ( bool flush) static

Definition at line 2183 of file async.c.

2184{

2185

2187

2188

2190 return;

2191

2193 elog(DEBUG1, "ProcessIncomingNotify");

2194

2196

2197

2198

2199

2200

2202

2204

2206

2207

2208

2209

2210

2211 if (flush)

2213

2215

2217 elog(DEBUG1, "ProcessIncomingNotify: done");

2218}

static void set_ps_display(const char *activity)

void StartTransactionCommand(void)

void CommitTransactionCommand(void)

References asyncQueueReadAllNotifications(), CommitTransactionCommand(), DEBUG1, elog, listenChannels, NIL, notifyInterruptPending, pq_flush, set_ps_display(), StartTransactionCommand(), and Trace_notify.

Referenced by ProcessNotifyInterrupt().

ProcessNotifyInterrupt()

void ProcessNotifyInterrupt ( bool flush )

queue_listen()

static void queue_listen ( ListenActionKind action, const char * channel ) static

Definition at line 690 of file async.c.

691{

695

696

697

698

699

700

701

703

704

706 strlen(channel) + 1);

708 strcpy(actrec->channel, channel);

709

711 {

713

714

715

716

717

718

725 }

726 else

728

730}

References generate_unaccent_rules::action, ListenAction::action, ActionList::actions, ListenAction::channel, CurTransactionContext, GetCurrentTransactionNestLevel(), lappend(), list_make1, MemoryContextAlloc(), MemoryContextSwitchTo(), ActionList::nestingLevel, palloc(), pendingActions, TopTransactionContext, and ActionList::upper.

Referenced by Async_Listen(), Async_Unlisten(), and Async_UnlistenAll().

SignalBackends()

static void SignalBackends ( void ) static

Definition at line 1581 of file async.c.

1582{

1585 int count;

1586

1587

1588

1589

1590

1591

1592

1593

1594

1597 count = 0;

1598

1601 {

1604

1608 {

1609

1610

1611

1612

1614 continue;

1615 }

1616 else

1617 {

1618

1619

1620

1621

1624 continue;

1625 }

1626

1627 pids[count] = pid;

1628 procnos[count] = i;

1629 count++;

1630 }

1632

1633

1634 for (int i = 0; i < count; i++)

1635 {

1636 int32 pid = pids[i];

1637

1638

1639

1640

1641

1643 {

1645 continue;

1646 }

1647

1648

1649

1650

1651

1652

1653

1655 elog(DEBUG3, "could not signal backend with PID %d: %m", pid);

1656 }

1657

1660}

static int64 asyncQueuePageDiff(int64 p, int64 q)

int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)

@ PROCSIG_NOTIFY_INTERRUPT

References Assert(), asyncQueuePageDiff(), DEBUG3, elog, i, INVALID_PROC_NUMBER, InvalidPid, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, MyDatabaseId, MyProcPid, notifyInterruptPending, palloc(), pfree(), PROCSIG_NOTIFY_INTERRUPT, QUEUE_BACKEND_DBOID, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_CLEANUP_DELAY, QUEUE_FIRST_LISTENER, QUEUE_HEAD, QUEUE_NEXT_LISTENER, QUEUE_POS_EQUAL, QUEUE_POS_PAGE, and SendProcSignal().

Referenced by AtCommit_Notify().

amRegisteredListener

bool amRegisteredListener = false static

asyncQueueControl

listenChannels

max_notify_queue_pages

int max_notify_queue_pages = 1048576

NotifyCtlData

notifyInterruptPending

volatile sig_atomic_t notifyInterruptPending = false

pendingActions

pendingNotifies

Trace_notify

bool Trace_notify = false

tryAdvanceTail

bool tryAdvanceTail = false static

unlistenExitRegistered

bool unlistenExitRegistered = false static