PostgreSQL Source Code: src/backend/access/transam/parallel.c File Reference (original) (raw)

Go to the source code of this file.
| Variables | |
|---|---|
| int | ParallelWorkerNumber = -1 |
| volatile sig_atomic_t | ParallelMessagePending = false |
| bool | InitializingParallelWorker = false |
| static FixedParallelState * | MyFixedParallelState |
| static dlist_head | pcxt_list = DLIST_STATIC_INIT(pcxt_list) |
| static pid_t | ParallelLeaderPid |
| struct { | |
| const char * fn_name | |
| parallel_worker_main_type fn_addr | |
| } | InternalParallelWorkers [] |
◆ PARALLEL_ERROR_QUEUE_SIZE
#define PARALLEL_ERROR_QUEUE_SIZE 16384
◆ PARALLEL_KEY_ACTIVE_SNAPSHOT
◆ PARALLEL_KEY_CLIENTCONNINFO
◆ PARALLEL_KEY_COMBO_CID
◆ PARALLEL_KEY_ENTRYPOINT
◆ PARALLEL_KEY_ERROR_QUEUE
◆ PARALLEL_KEY_FIXED
◆ PARALLEL_KEY_GUC
◆ PARALLEL_KEY_LIBRARY
◆ PARALLEL_KEY_PENDING_SYNCS
◆ PARALLEL_KEY_REINDEX_STATE
◆ PARALLEL_KEY_RELMAPPER_STATE
◆ PARALLEL_KEY_SESSION_DSM
◆ PARALLEL_KEY_TRANSACTION_SNAPSHOT
◆ PARALLEL_KEY_TRANSACTION_STATE
◆ PARALLEL_KEY_UNCOMMITTEDENUMS
◆ PARALLEL_MAGIC
#define PARALLEL_MAGIC 0x50477c7c
◆ FixedParallelState
◆ AtEOSubXact_Parallel()
Definition at line 1263 of file parallel.c.
1264{
1266 {
1268
1271 break;
1273 elog(WARNING, "leaked parallel context");
1275 }
1276}
void DestroyParallelContext(ParallelContext *pcxt)
static dlist_head pcxt_list
#define dlist_head_element(type, membername, lhead)
static bool dlist_is_empty(const dlist_head *head)
References DestroyParallelContext(), dlist_head_element, dlist_is_empty(), elog, fb(), pcxt_list, ParallelContext::subid, and WARNING.
Referenced by AbortSubTransaction(), and CommitSubTransaction().
◆ AtEOXact_Parallel()
◆ CreateParallelContext()
Definition at line 175 of file parallel.c.
177{
180
181
183
184
185 Assert(nworkers >= 0);
186
187
189
190
200
201
203
204 return pcxt;
205}
#define Assert(condition)
ErrorContextCallback * error_context_stack
#define palloc0_object(type)
static void dlist_push_head(dlist_head *head, dlist_node *node)
MemoryContext TopTransactionContext
char * pstrdup(const char *in)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
#define shm_toc_initialize_estimator(e)
ErrorContextCallback * error_context_stack
shm_toc_estimator estimator
SubTransactionId GetCurrentSubTransactionId(void)
bool IsInParallelMode(void)
References Assert, dlist_push_head(), error_context_stack, ParallelContext::error_context_stack, ParallelContext::estimator, ParallelContext::function_name, GetCurrentSubTransactionId(), IsInParallelMode(), ParallelContext::library_name, MemoryContextSwitchTo(), ParallelContext::node, ParallelContext::nworkers, ParallelContext::nworkers_to_launch, palloc0_object, pcxt_list, pstrdup(), shm_toc_initialize_estimator, ParallelContext::subid, and TopTransactionContext.
Referenced by _brin_begin_parallel(), _bt_begin_parallel(), _gin_begin_parallel(), ExecInitParallelPlan(), and parallel_vacuum_init().
◆ DestroyParallelContext()
Definition at line 959 of file parallel.c.
960{
961 int i;
962
963
964
965
966
967
968
970
971
973 {
975 {
977 {
979
982 }
983 }
984 }
985
986
987
988
989
990
992 {
995 }
996
997
998
999
1000
1002 {
1005 }
1006
1007
1008
1009
1010
1011
1015
1016
1018 {
1021 }
1022
1023
1027}
static void WaitForParallelWorkersToExit(ParallelContext *pcxt)
void TerminateBackgroundWorker(BackgroundWorkerHandle *handle)
void dsm_detach(dsm_segment *seg)
static void dlist_delete(dlist_node *node)
void pfree(void *pointer)
#define RESUME_INTERRUPTS()
#define HOLD_INTERRUPTS()
void shm_mq_detach(shm_mq_handle *mqh)
ParallelWorkerInfo * worker
BackgroundWorkerHandle * bgwhandle
shm_mq_handle * error_mqh
References ParallelWorkerInfo::bgwhandle, dlist_delete(), dsm_detach(), ParallelWorkerInfo::error_mqh, fb(), ParallelContext::function_name, HOLD_INTERRUPTS, i, ParallelContext::library_name, ParallelContext::node, ParallelContext::nworkers_launched, pfree(), ParallelContext::private_memory, RESUME_INTERRUPTS, ParallelContext::seg, shm_mq_detach(), TerminateBackgroundWorker(), WaitForParallelWorkersToExit(), and ParallelContext::worker.
Referenced by _brin_begin_parallel(), _brin_end_parallel(), _bt_begin_parallel(), _bt_end_parallel(), _gin_begin_parallel(), _gin_end_parallel(), AtEOSubXact_Parallel(), AtEOXact_Parallel(), ExecParallelCleanup(), and parallel_vacuum_end().
◆ HandleParallelMessageInterrupt()
| void HandleParallelMessageInterrupt | ( | void | ) |
|---|
◆ InitializeParallelDSM()
Definition at line 213 of file parallel.c.
214{
228 int i;
233
234
236
237
240
241
242
243
244
245
246
249
250
251
252
253
255 {
256
258
259
260
261
262
263
264
267 }
268
270 {
273 "parallel error queue size not buffer-aligned");
274
275
283 {
286 }
302
304
305
310
311
315 }
316
317
318
319
320
321
322
323
324
325
326
327
335 else
336 {
341 }
342
343
354 &fps->temp_toast_namespace_id);
355 fps->parallel_leader_pgproc = MyProc;
364
365
367 {
383
384
388
389
393
394
398
399
400
401
402
404 {
409 }
410
411
415
416
422
423
427
428
433
434
438
439
444
445
451
452
457
458
460
461
462
463
464
465
466
467
473 {
476
481 }
483
484
485
486
487
488
489
490
497 }
498
499
501
502
504}
#define PARALLEL_KEY_TRANSACTION_STATE
#define PARALLEL_KEY_UNCOMMITTEDENUMS
#define PARALLEL_KEY_TRANSACTION_SNAPSHOT
#define PARALLEL_KEY_CLIENTCONNINFO
#define PARALLEL_KEY_PENDING_SYNCS
#define PARALLEL_KEY_ACTIVE_SNAPSHOT
#define PARALLEL_KEY_ERROR_QUEUE
#define PARALLEL_KEY_SESSION_DSM
#define PARALLEL_KEY_REINDEX_STATE
#define PARALLEL_KEY_LIBRARY
#define PARALLEL_KEY_FIXED
#define PARALLEL_KEY_ENTRYPOINT
#define PARALLEL_KEY_COMBO_CID
#define PARALLEL_ERROR_QUEUE_SIZE
#define PARALLEL_KEY_RELMAPPER_STATE
#define StaticAssertDecl(condition, errmessage)
void SerializeComboCIDState(Size maxsize, char *start_address)
Size EstimateComboCIDStateSpace(void)
void SerializeLibraryState(Size maxsize, char *start_address)
Size EstimateLibraryStateSpace(void)
void * dsm_segment_address(dsm_segment *seg)
dsm_segment * dsm_create(Size size, int flags)
#define DSM_CREATE_NULL_IF_MAXSEGMENTS
#define DSM_HANDLE_INVALID
#define palloc0_array(type, count)
void SerializeGUCState(Size maxsize, char *start_address)
Size EstimateGUCStateSpace(void)
bool current_role_is_superuser
void SerializeReindexState(Size maxsize, char *start_address)
Size EstimateReindexStateSpace(void)
void * MemoryContextAlloc(MemoryContext context, Size size)
MemoryContext TopMemoryContext
#define INTERRUPTS_CAN_BE_PROCESSED()
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
bool GetSessionUserIsSuperuser(void)
Size EstimateClientConnectionInfoSpace(void)
Oid GetSessionUserId(void)
Oid GetAuthenticatedUserId(void)
Oid GetCurrentRoleId(void)
void SerializeClientConnectionInfo(Size maxsize PG_USED_FOR_ASSERTS_ONLY, char *start_address)
void GetTempNamespaceState(Oid *tempNamespaceId, Oid *tempToastNamespaceId)
Size EstimateUncommittedEnumsSpace(void)
void SerializeUncommittedEnums(void *space, Size size)
SerializableXactHandle ShareSerializableXact(void)
Size EstimateRelationMapSpace(void)
void SerializeRelationMap(Size maxSize, char *startAddress)
dsm_handle GetSessionDsmHandle(void)
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Size shm_toc_estimate(shm_toc_estimator *e)
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_estimate_keys(e, cnt)
Size mul_size(Size s1, Size s2)
void SerializeSnapshot(Snapshot snapshot, char *start_address)
Snapshot GetTransactionSnapshot(void)
Size EstimateSnapshotSpace(Snapshot snapshot)
Snapshot GetActiveSnapshot(void)
static void SpinLockInit(volatile slock_t *lock)
void SerializePendingSyncs(Size maxSize, char *startAddress)
Size EstimatePendingSyncsSpace(void)
void SerializeTransactionState(Size maxsize, char *start_address)
Size EstimateTransactionStateSpace(void)
TimestampTz GetCurrentStatementStartTimestamp(void)
TimestampTz GetCurrentTransactionStartTimestamp(void)
#define IsolationUsesXactSnapshot()
#define InvalidXLogRecPtr
References BUFFERALIGN, current_role_is_superuser, dsm_create(), DSM_CREATE_NULL_IF_MAXSEGMENTS, DSM_HANDLE_INVALID, dsm_segment_address(), ParallelWorkerInfo::error_mqh, EstimateClientConnectionInfoSpace(), EstimateComboCIDStateSpace(), EstimateGUCStateSpace(), EstimateLibraryStateSpace(), EstimatePendingSyncsSpace(), EstimateReindexStateSpace(), EstimateRelationMapSpace(), EstimateSnapshotSpace(), EstimateTransactionStateSpace(), EstimateUncommittedEnumsSpace(), ParallelContext::estimator, fb(), ParallelContext::function_name, GetActiveSnapshot(), GetAuthenticatedUserId(), GetCurrentRoleId(), GetCurrentStatementStartTimestamp(), GetCurrentTransactionStartTimestamp(), GetSessionDsmHandle(), GetSessionUserId(), GetSessionUserIsSuperuser(), GetTempNamespaceState(), GetTransactionSnapshot(), GetUserIdAndSecContext(), i, INTERRUPTS_CAN_BE_PROCESSED, InvalidXLogRecPtr, IsolationUsesXactSnapshot, ParallelContext::library_name, MemoryContextAlloc(), MemoryContextSwitchTo(), mul_size(), MyDatabaseId, MyProc, MyProcNumber, MyProcPid, ParallelContext::nworkers, ParallelContext::nworkers_to_launch, palloc0_array, PARALLEL_ERROR_QUEUE_SIZE, PARALLEL_KEY_ACTIVE_SNAPSHOT, PARALLEL_KEY_CLIENTCONNINFO, PARALLEL_KEY_COMBO_CID, PARALLEL_KEY_ENTRYPOINT, PARALLEL_KEY_ERROR_QUEUE, PARALLEL_KEY_FIXED, PARALLEL_KEY_GUC, PARALLEL_KEY_LIBRARY, PARALLEL_KEY_PENDING_SYNCS, PARALLEL_KEY_REINDEX_STATE, PARALLEL_KEY_RELMAPPER_STATE, PARALLEL_KEY_SESSION_DSM, PARALLEL_KEY_TRANSACTION_SNAPSHOT, PARALLEL_KEY_TRANSACTION_STATE, PARALLEL_KEY_UNCOMMITTEDENUMS, PARALLEL_MAGIC, ParallelContext::private_memory, ParallelContext::seg, SerializeClientConnectionInfo(), SerializeComboCIDState(), SerializeGUCState(), SerializeLibraryState(), SerializePendingSyncs(), SerializeReindexState(), SerializeRelationMap(), SerializeSnapshot(), SerializeTransactionState(), SerializeUncommittedEnums(), ShareSerializableXact(), shm_mq_attach(), shm_mq_create(), shm_mq_set_receiver(), shm_toc_allocate(), shm_toc_create(), shm_toc_estimate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_insert(), SpinLockInit(), start, StaticAssertDecl, ParallelContext::toc, TopMemoryContext, TopTransactionContext, and ParallelContext::worker.
Referenced by _brin_begin_parallel(), _bt_begin_parallel(), _gin_begin_parallel(), ExecInitParallelPlan(), and parallel_vacuum_init().
◆ LaunchParallelWorkers()
Definition at line 583 of file parallel.c.
584{
587 int i;
589
590
592 return;
593
594
596
597
599
600
602
603
604 memset(&worker, 0, sizeof(worker));
617
618
619
620
621
622
623
624
625
627 {
632 {
636 }
637 else
638 {
639
640
641
642
643
644
645
646
647
652 }
653 }
654
655
656
657
658
660 {
663 }
664
665
667}
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
#define BGW_NEVER_RESTART
#define BGWORKER_CLASS_PARALLEL
@ BgWorkerStart_ConsistentState
#define BGWORKER_BACKEND_DATABASE_CONNECTION
#define BGWORKER_SHMEM_ACCESS
dsm_handle dsm_segment_handle(dsm_segment *seg)
static Datum UInt32GetDatum(uint32 X)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
void BecomeLockGroupLeader(void)
char bgw_function_name[BGW_MAXLEN]
char bgw_name[BGW_MAXLEN]
char bgw_type[BGW_MAXLEN]
BgWorkerStartTime bgw_start_time
char bgw_extra[BGW_EXTRALEN]
char bgw_library_name[MAXPGPATH]
bool * known_attached_workers
int nknown_attached_workers
References Assert, BecomeLockGroupLeader(), BackgroundWorker::bgw_extra, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, ParallelWorkerInfo::bgwhandle, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_CLASS_PARALLEL, BGWORKER_SHMEM_ACCESS, BgWorkerStart_ConsistentState, dsm_segment_handle(), ParallelWorkerInfo::error_mqh, fb(), i, ParallelContext::known_attached_workers, MemoryContextSwitchTo(), MyProcPid, ParallelContext::nknown_attached_workers, ParallelContext::nworkers, ParallelContext::nworkers_launched, ParallelContext::nworkers_to_launch, palloc0_array, RegisterDynamicBackgroundWorker(), ParallelContext::seg, shm_mq_detach(), shm_mq_set_handle(), snprintf, sprintf, TopTransactionContext, UInt32GetDatum(), and ParallelContext::worker.
Referenced by _brin_begin_parallel(), _bt_begin_parallel(), _gin_begin_parallel(), ExecGather(), ExecGatherMerge(), and parallel_vacuum_process_all_indexes().
◆ LookupParallelWorkerFunction()
Definition at line 1650 of file parallel.c.
1651{
1652
1653
1654
1655
1657 {
1658 int i;
1659
1661 {
1664 }
1665
1666
1668 }
1669
1670
1673}
parallel_worker_main_type fn_addr
static const struct @16 InternalParallelWorkers[]
void * load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)
void(* parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc)
References elog, ERROR, fb(), fn_addr, fn_name, funcname, i, InternalParallelWorkers, lengthof, and load_external_function().
Referenced by ParallelWorkerMain().
◆ ParallelContextActive()
◆ ParallelWorkerMain()
Definition at line 1301 of file parallel.c.
1302{
1311 char *library_name;
1312 char *function_name;
1327
1328
1330
1331
1333
1334
1337
1338
1340 "Parallel worker",
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1353 if (seg == NULL)
1356 errmsg("could not map dynamic shared memory segment")));
1358 if (toc == NULL)
1361 errmsg("invalid magic number in dynamic shared memory segment")));
1362
1363
1366
1367
1371
1372
1373
1374
1375
1376
1377
1385 fps->parallel_leader_proc_number);
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1402 fps->parallel_leader_pid))
1403 return;
1404
1405
1406
1407
1408
1409
1411
1412
1413
1414
1415
1416
1420
1422
1423
1424
1425
1426
1427
1428
1431 fps->session_user_is_superuser);
1433
1434
1435
1436
1437
1438
1439
1441 fps->authenticated_user_id,
1444
1445
1446
1447
1448
1449
1450
1451
1454
1455
1456
1457
1458
1459
1464
1465
1468
1469
1470
1471
1472
1473
1474
1476 false);
1484
1485
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1507 fps->parallel_leader_pgproc);
1509
1510
1511
1512
1513
1515
1516
1517
1518
1519
1520
1521
1522
1525
1526
1527
1528
1529
1530
1531
1532
1533
1535
1536
1538 fps->temp_toast_namespace_id);
1539
1540
1542 false);
1544
1545
1547 false);
1549
1550
1551
1552
1553
1557
1558
1560
1561
1562
1563
1564
1567
1568
1569
1570
1572
1573
1575
1576
1578
1579
1581
1582
1584
1585
1587}
static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
bool InitializingParallelWorker
static FixedParallelState * MyFixedParallelState
static pid_t ParallelLeaderPid
static void ParallelWorkerShutdown(int code, Datum arg)
void BackgroundWorkerUnblockSignals(void)
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
#define BGWORKER_BYPASS_ROLELOGINCHECK
#define BGWORKER_BYPASS_ALLOWCONN
void RestoreComboCIDState(char *comboCIDstate)
void RestoreLibraryState(char *start_address)
dsm_segment * dsm_attach(dsm_handle h)
int errcode(int sqlerrcode)
#define ereport(elevel,...)
ProcNumber ParallelLeaderProcNumber
void RestoreGUCState(void *gucstate)
const char * hba_authname(UserAuth auth_method)
void RestoreReindexState(const void *reindexstate)
void InvalidateSystemCaches(void)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
#define pq_putmessage(msgtype, s, len)
int GetDatabaseEncoding(void)
int SetClientEncoding(int encoding)
MemoryContext CurrentMemoryContext
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
void InitializeSystemUser(const char *authn_id, const char *auth_method)
void SetSessionAuthorization(Oid userid, bool is_superuser)
void SetCurrentRoleId(Oid roleid, bool is_superuser)
ClientConnectionInfo MyClientConnectionInfo
void RestoreClientConnectionInfo(char *conninfo)
void SetAuthenticatedUserId(Oid userid)
void SetUserIdAndSecContext(Oid userid, int sec_context)
void SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId)
void RestoreUncommittedEnums(void *space)
static uint32 DatumGetUInt32(Datum X)
static Datum PointerGetDatum(const void *X)
BackgroundWorker * MyBgworkerEntry
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
void AttachSerializableXact(SerializableXactHandle handle)
void RestoreRelationMap(char *startAddress)
void AttachSession(dsm_handle handle)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
shm_toc * shm_toc_attach(uint64 magic, void *address)
void PushActiveSnapshot(Snapshot snapshot)
Snapshot RestoreSnapshot(char *start_address)
void RestoreTransactionSnapshot(Snapshot snapshot, PGPROC *source_pgproc)
void PopActiveSnapshot(void)
bool BecomeLockGroupMember(PGPROC *leader, int pid)
void RestorePendingSyncs(char *startAddress)
pid_t parallel_leader_pid
void ExitParallelMode(void)
void EnterParallelMode(void)
void StartTransactionCommand(void)
void StartParallelWorkerTransaction(char *tstatespace)
void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts)
void EndParallelWorkerTransaction(void)
void CommitTransactionCommand(void)
References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, AttachSerializableXact(), AttachSession(), ClientConnectionInfo::auth_method, ClientConnectionInfo::authn_id, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), BecomeLockGroupMember(), before_shmem_exit(), BackgroundWorker::bgw_extra, BGWORKER_BYPASS_ALLOWCONN, BGWORKER_BYPASS_ROLELOGINCHECK, CommitTransactionCommand(), CurrentMemoryContext, DatumGetUInt32(), DetachSession(), dsm_attach(), dsm_segment_address(), elog, EndParallelWorkerTransaction(), EnterParallelMode(), ereport, errcode(), errmsg, ERROR, ExitParallelMode(), fb(), GetDatabaseEncoding(), hba_authname(), InitializeSystemUser(), InitializingParallelWorker, InvalidateSystemCaches(), LookupParallelWorkerFunction(), MyBgworkerEntry, MyClientConnectionInfo, MyFixedParallelState, MyProc, PARALLEL_ERROR_QUEUE_SIZE, PARALLEL_KEY_ACTIVE_SNAPSHOT, PARALLEL_KEY_CLIENTCONNINFO, PARALLEL_KEY_COMBO_CID, PARALLEL_KEY_ENTRYPOINT, PARALLEL_KEY_ERROR_QUEUE, PARALLEL_KEY_FIXED, PARALLEL_KEY_GUC, PARALLEL_KEY_LIBRARY, PARALLEL_KEY_PENDING_SYNCS, PARALLEL_KEY_REINDEX_STATE, PARALLEL_KEY_RELMAPPER_STATE, PARALLEL_KEY_SESSION_DSM, PARALLEL_KEY_TRANSACTION_SNAPSHOT, PARALLEL_KEY_TRANSACTION_STATE, PARALLEL_KEY_UNCOMMITTEDENUMS, FixedParallelState::parallel_leader_pid, PARALLEL_MAGIC, ParallelLeaderPid, ParallelLeaderProcNumber, ParallelWorkerNumber, ParallelWorkerShutdown(), PointerGetDatum(), PopActiveSnapshot(), pq_putmessage, pq_redirect_to_shm_mq(), pq_set_parallel_leader(), PqMsg_Terminate, PushActiveSnapshot(), RestoreClientConnectionInfo(), RestoreComboCIDState(), RestoreGUCState(), RestoreLibraryState(), RestorePendingSyncs(), RestoreReindexState(), RestoreRelationMap(), RestoreSnapshot(), RestoreTransactionSnapshot(), RestoreUncommittedEnums(), SetAuthenticatedUserId(), SetClientEncoding(), SetCurrentRoleId(), SetParallelStartTimestamps(), SetSessionAuthorization(), SetTempNamespaceState(), SetUserIdAndSecContext(), shm_mq_attach(), shm_mq_set_sender(), shm_toc_attach(), shm_toc_lookup(), StartParallelWorkerTransaction(), StartTransactionCommand(), and TopMemoryContext.
◆ ParallelWorkerReportLastRecEnd()
◆ ParallelWorkerShutdown()
◆ ProcessParallelMessage()
Definition at line 1146 of file parallel.c.
1147{
1149
1152 {
1155 }
1156
1158
1160 {
1163 {
1166
1167
1169
1170
1172
1173
1174
1175
1176
1177
1178
1179
1180
1182 {
1183 if (edata.context)
1185 _("parallel worker"));
1186 else
1188 }
1189
1190
1191
1192
1193
1194
1197
1198
1200
1201
1203
1204 break;
1205 }
1206
1208 {
1209
1211 const char *channel;
1212 const char *payload;
1213
1218
1220
1221 break;
1222 }
1223
1225 {
1226
1227
1228
1229
1230
1233
1235
1237
1238 break;
1239 }
1240
1242 {
1245 break;
1246 }
1247
1248 default:
1249 {
1250 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
1252 }
1253 }
1254}
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
void pgstat_progress_incr_param(int index, int64 incr)
void ThrowErrorData(ErrorData *edata)
unsigned int pq_getmsgint(StringInfo msg, int b)
void pq_getmsgend(StringInfo msg)
void pq_endmessage(StringInfo buf)
int pq_getmsgbyte(StringInfo msg)
const char * pq_getmsgrawstring(StringInfo msg)
int64 pq_getmsgint64(StringInfo msg)
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
#define PqMsg_NotificationResponse
#define PqMsg_ErrorResponse
#define PqMsg_NoticeResponse
char * psprintf(const char *fmt,...)
References _, debug_parallel_query, DEBUG_PARALLEL_REGRESS, elog, ERROR, error_context_stack, ParallelContext::error_context_stack, ParallelWorkerInfo::error_mqh, fb(), i, ParallelContext::known_attached_workers, StringInfoData::len, Min, ParallelContext::nknown_attached_workers, NotifyMyFrontEnd(), pgstat_progress_incr_param(), pq_endmessage(), pq_getmsgbyte(), pq_getmsgend(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgrawstring(), pq_parse_errornotice(), PqMsg_ErrorResponse, PqMsg_NoticeResponse, PqMsg_NotificationResponse, PqMsg_Progress, PqMsg_Terminate, psprintf(), pstrdup(), shm_mq_detach(), ThrowErrorData(), and ParallelContext::worker.
Referenced by ProcessParallelMessages().
◆ ProcessParallelMessages()
Definition at line 1057 of file parallel.c.
1058{
1061
1063
1064
1065
1066
1067
1068
1069
1070
1072
1073
1074
1075
1076
1077
1080 "ProcessParallelMessages",
1082 else
1084
1086
1087
1089
1091 {
1093 int i;
1094
1097 continue;
1098
1100 {
1101
1102
1103
1104
1105
1106
1108 {
1110 Size nbytes;
1112
1114 &data, true);
1116 break;
1118 {
1120
1125 }
1126 else
1129 errmsg("lost connection to parallel worker")));
1130 }
1131 }
1132 }
1133
1135
1136
1138
1140}
static void ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
#define dlist_foreach(iter, lhead)
#define dlist_container(type, membername, ptr)
void MemoryContextReset(MemoryContext context)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
void initStringInfo(StringInfo str)
References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), dlist_iter::cur, StringInfoData::data, data, dlist_container, dlist_foreach, ereport, errcode(), errmsg, ERROR, ParallelWorkerInfo::error_mqh, fb(), HOLD_INTERRUPTS, i, initStringInfo(), MemoryContextReset(), MemoryContextSwitchTo(), ParallelContext::nworkers_launched, ParallelMessagePending, pcxt_list, pfree(), ProcessParallelMessage(), RESUME_INTERRUPTS, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, TopMemoryContext, and ParallelContext::worker.
Referenced by ProcessInterrupts().
◆ ReinitializeParallelDSM()
Definition at line 511 of file parallel.c.
512{
515
516
518
519
521 {
526 {
530 }
531 }
532
533
536
537
539 {
541 int i;
542
546 {
549
554 }
555 }
556
557
559}
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
References ParallelWorkerInfo::error_mqh, fb(), i, InvalidXLogRecPtr, ParallelContext::known_attached_workers, MemoryContextSwitchTo(), MyProc, ParallelContext::nknown_attached_workers, ParallelContext::nworkers, ParallelContext::nworkers_launched, PARALLEL_ERROR_QUEUE_SIZE, PARALLEL_KEY_ERROR_QUEUE, PARALLEL_KEY_FIXED, pfree(), ParallelContext::seg, shm_mq_attach(), shm_mq_create(), shm_mq_set_receiver(), shm_toc_lookup(), start, ParallelContext::toc, TopTransactionContext, WaitForParallelWorkersToExit(), WaitForParallelWorkersToFinish(), and ParallelContext::worker.
Referenced by ExecParallelReinitialize(), and parallel_vacuum_process_all_indexes().
◆ ReinitializeParallelWorkers()
◆ WaitForParallelWorkersToAttach()
Definition at line 702 of file parallel.c.
703{
704 int i;
705
706
708 return;
709
710 for (;;)
711 {
712
713
714
715
717
719 {
722 int rc;
724
726 continue;
727
728
729
730
731
733 {
736 continue;
737 }
738
741 {
742
745 {
746
749 }
750 }
752 {
753
754
755
756
761 errmsg("parallel worker failed to initialize"),
762 errhint("More details may be available in the server log.")));
763
766 }
767 else
768 {
769
770
771
772
773
774
778
781 }
782 }
783
784
786 {
788 break;
789 }
790 }
791}
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
int errhint(const char *fmt,...) pg_attribute_printf(1
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define CHECK_FOR_INTERRUPTS()
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
PGPROC * shm_mq_get_sender(shm_mq *mq)
#define WL_EXIT_ON_PM_DEATH
References Assert, BGWH_STARTED, BGWH_STOPPED, ParallelWorkerInfo::bgwhandle, CHECK_FOR_INTERRUPTS, ereport, errcode(), errhint(), errmsg, ERROR, ParallelWorkerInfo::error_mqh, fb(), GetBackgroundWorkerPid(), i, ParallelContext::known_attached_workers, MyLatch, ParallelContext::nknown_attached_workers, ParallelContext::nworkers_launched, ResetLatch(), shm_mq_get_queue(), shm_mq_get_sender(), WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and ParallelContext::worker.
Referenced by _brin_begin_parallel(), _bt_begin_parallel(), and _gin_begin_parallel().
◆ WaitForParallelWorkersToExit()
Definition at line 919 of file parallel.c.
920{
921 int i;
922
923
925 {
927
929 continue;
930
932
933
934
935
936
937
938
942 errmsg("postmaster exited during a parallel transaction")));
943
944
947 }
948}
BgwHandleStatus WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
References BGWH_POSTMASTER_DIED, ParallelWorkerInfo::bgwhandle, ereport, errcode(), errmsg, FATAL, fb(), i, ParallelContext::nworkers_launched, pfree(), WaitForBackgroundWorkerShutdown(), and ParallelContext::worker.
Referenced by DestroyParallelContext(), and ReinitializeParallelDSM().
◆ WaitForParallelWorkersToFinish()
Definition at line 805 of file parallel.c.
806{
807 for (;;)
808 {
811 int i;
812
813
814
815
816
817
819
821 {
822
823
824
825
826
827
831 {
833 break;
834 }
835 }
836
838 {
839
841 {
843 break;
844 }
845
846
847
848
849
850
851
853 {
856
857
858
859
860
861
866 continue;
867
868
869
870
871
872
873
874
875
880 errmsg("parallel worker failed to initialize"),
881 errhint("More details may be available in the server log.")));
882
883
884
885
886
887
888
889
890
891
892 }
893 }
894
898 }
899
901 {
903
907 }
908}
XLogRecPtr XactLastRecEnd
References Assert, BGWH_STOPPED, ParallelWorkerInfo::bgwhandle, CHECK_FOR_INTERRUPTS, ereport, errcode(), errhint(), errmsg, ERROR, ParallelWorkerInfo::error_mqh, fb(), GetBackgroundWorkerPid(), i, ParallelContext::known_attached_workers, MyLatch, ParallelContext::nworkers_launched, PARALLEL_KEY_FIXED, ResetLatch(), shm_mq_get_queue(), shm_mq_get_sender(), shm_toc_lookup(), ParallelContext::toc, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, ParallelContext::worker, and XactLastRecEnd.
Referenced by _brin_end_parallel(), _bt_end_parallel(), _gin_end_parallel(), ExecParallelFinish(), parallel_vacuum_process_all_indexes(), and ReinitializeParallelDSM().
◆ fn_addr
◆ fn_name
◆ InitializingParallelWorker
◆ [struct]
const struct { ... } InternalParallelWorkers[]
Initial value:
=
{
{
},
{
},
{
},
{
},
{
}
}
void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
void ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
void _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
Referenced by LookupParallelWorkerFunction().
◆ MyFixedParallelState
◆ ParallelLeaderPid
◆ ParallelMessagePending
◆ ParallelWorkerNumber
int ParallelWorkerNumber = -1
Definition at line 117 of file parallel.c.
Referenced by _brin_parallel_build_main(), _bt_parallel_build_main(), _gin_parallel_build_main(), BuildTupleHashTable(), ExecEndAgg(), ExecEndBitmapHeapScan(), ExecEndBitmapIndexScan(), ExecEndIndexOnlyScan(), ExecEndIndexScan(), ExecEndMemoize(), ExecHashInitializeWorker(), ExecParallelGetReceiver(), ExecParallelHashEnsureBatchAccessors(), ExecParallelHashJoinSetUpBatches(), ExecParallelHashRepartitionRest(), ExecParallelReportInstrumentation(), ExecSort(), parallel_vacuum_main(), ParallelQueryMain(), and ParallelWorkerMain().