PostgreSQL Source Code: src/include/access/parallel.h File Reference (original) (raw)

Go to the source code of this file.

Data Structures
struct ParallelWorkerInfo
struct ParallelContext
struct ParallelWorkerContext
Typedefs
typedef void(* parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc)
typedef struct ParallelWorkerInfo ParallelWorkerInfo
typedef struct ParallelContext ParallelContext
typedef struct ParallelWorkerContext ParallelWorkerContext
Functions
ParallelContext * CreateParallelContext (const char *library_name, const char *function_name, int nworkers)
void InitializeParallelDSM (ParallelContext *pcxt)
void ReinitializeParallelDSM (ParallelContext *pcxt)
void ReinitializeParallelWorkers (ParallelContext *pcxt, int nworkers_to_launch)
void LaunchParallelWorkers (ParallelContext *pcxt)
void WaitForParallelWorkersToAttach (ParallelContext *pcxt)
void WaitForParallelWorkersToFinish (ParallelContext *pcxt)
void DestroyParallelContext (ParallelContext *pcxt)
bool ParallelContextActive (void)
void HandleParallelMessageInterrupt (void)
void ProcessParallelMessages (void)
void AtEOXact_Parallel (bool isCommit)
void AtEOSubXact_Parallel (bool isCommit, SubTransactionId mySubId)
void ParallelWorkerReportLastRecEnd (XLogRecPtr last_xlog_end)
void ParallelWorkerMain (Datum main_arg)
Variables
PGDLLIMPORT volatile sig_atomic_t ParallelMessagePending
PGDLLIMPORT int ParallelWorkerNumber
PGDLLIMPORT bool InitializingParallelWorker

IsParallelWorker

parallel_worker_main_type

ParallelContext

ParallelWorkerContext

ParallelWorkerInfo

AtEOSubXact_Parallel()

Definition at line 1263 of file parallel.c.

1264{

1266 {

1268

1271 break;

1273 elog(WARNING, "leaked parallel context");

1275 }

1276}

void DestroyParallelContext(ParallelContext *pcxt)

static dlist_head pcxt_list

#define dlist_head_element(type, membername, lhead)

static bool dlist_is_empty(const dlist_head *head)

References DestroyParallelContext(), dlist_head_element, dlist_is_empty(), elog, fb(), pcxt_list, ParallelContext::subid, and WARNING.

Referenced by AbortSubTransaction(), and CommitSubTransaction().

AtEOXact_Parallel()

void AtEOXact_Parallel ( bool isCommit) extern

CreateParallelContext()

Definition at line 175 of file parallel.c.

177{

180

181

183

184

185 Assert(nworkers >= 0);

186

187

189

190

200

201

203

204 return pcxt;

205}

#define Assert(condition)

ErrorContextCallback * error_context_stack

#define palloc0_object(type)

static void dlist_push_head(dlist_head *head, dlist_node *node)

MemoryContext TopTransactionContext

char * pstrdup(const char *in)

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

#define shm_toc_initialize_estimator(e)

ErrorContextCallback * error_context_stack

shm_toc_estimator estimator

SubTransactionId GetCurrentSubTransactionId(void)

bool IsInParallelMode(void)

References Assert, dlist_push_head(), error_context_stack, ParallelContext::error_context_stack, ParallelContext::estimator, ParallelContext::function_name, GetCurrentSubTransactionId(), IsInParallelMode(), ParallelContext::library_name, MemoryContextSwitchTo(), ParallelContext::node, ParallelContext::nworkers, ParallelContext::nworkers_to_launch, palloc0_object, pcxt_list, pstrdup(), shm_toc_initialize_estimator, ParallelContext::subid, and TopTransactionContext.

Referenced by _brin_begin_parallel(), _bt_begin_parallel(), _gin_begin_parallel(), ExecInitParallelPlan(), and parallel_vacuum_init().

DestroyParallelContext()

Definition at line 959 of file parallel.c.

960{

961 int i;

962

963

964

965

966

967

968

970

971

973 {

975 {

977 {

979

982 }

983 }

984 }

985

986

987

988

989

990

992 {

995 }

996

997

998

999

1000

1002 {

1005 }

1006

1007

1008

1009

1010

1011

1015

1016

1018 {

1021 }

1022

1023

1027}

static void WaitForParallelWorkersToExit(ParallelContext *pcxt)

void TerminateBackgroundWorker(BackgroundWorkerHandle *handle)

void dsm_detach(dsm_segment *seg)

static void dlist_delete(dlist_node *node)

void pfree(void *pointer)

#define RESUME_INTERRUPTS()

#define HOLD_INTERRUPTS()

void shm_mq_detach(shm_mq_handle *mqh)

ParallelWorkerInfo * worker

BackgroundWorkerHandle * bgwhandle

shm_mq_handle * error_mqh

References ParallelWorkerInfo::bgwhandle, dlist_delete(), dsm_detach(), ParallelWorkerInfo::error_mqh, fb(), ParallelContext::function_name, HOLD_INTERRUPTS, i, ParallelContext::library_name, ParallelContext::node, ParallelContext::nworkers_launched, pfree(), ParallelContext::private_memory, RESUME_INTERRUPTS, ParallelContext::seg, shm_mq_detach(), TerminateBackgroundWorker(), WaitForParallelWorkersToExit(), and ParallelContext::worker.

Referenced by _brin_begin_parallel(), _brin_end_parallel(), _bt_begin_parallel(), _bt_end_parallel(), _gin_begin_parallel(), _gin_end_parallel(), AtEOSubXact_Parallel(), AtEOXact_Parallel(), ExecParallelCleanup(), and parallel_vacuum_end().

HandleParallelMessageInterrupt()

void HandleParallelMessageInterrupt ( void ) extern

InitializeParallelDSM()

Definition at line 213 of file parallel.c.

214{

228 int i;

233

234

236

237

240

241

242

243

244

245

246

249

250

251

252

253

255 {

256

258

259

260

261

262

263

264

267 }

268

270 {

273 "parallel error queue size not buffer-aligned");

274

275

283 {

286 }

302

304

305

310

311

315 }

316

317

318

319

320

321

322

323

324

325

326

327

335 else

336 {

341 }

342

343

354 &fps->temp_toast_namespace_id);

355 fps->parallel_leader_pgproc = MyProc;

364

365

367 {

383

384

388

389

393

394

398

399

400

401

402

404 {

409 }

410

411

415

416

422

423

427

428

433

434

438

439

444

445

451

452

457

458

460

461

462

463

464

465

466

467

473 {

476

481 }

483

484

485

486

487

488

489

490

497 }

498

499

501

502

504}

#define PARALLEL_KEY_TRANSACTION_STATE

#define PARALLEL_KEY_UNCOMMITTEDENUMS

#define PARALLEL_KEY_TRANSACTION_SNAPSHOT

#define PARALLEL_KEY_CLIENTCONNINFO

#define PARALLEL_KEY_PENDING_SYNCS

#define PARALLEL_KEY_ACTIVE_SNAPSHOT

#define PARALLEL_KEY_ERROR_QUEUE

#define PARALLEL_KEY_SESSION_DSM

#define PARALLEL_KEY_REINDEX_STATE

#define PARALLEL_KEY_LIBRARY

#define PARALLEL_KEY_FIXED

#define PARALLEL_KEY_ENTRYPOINT

#define PARALLEL_KEY_COMBO_CID

#define PARALLEL_ERROR_QUEUE_SIZE

#define PARALLEL_KEY_RELMAPPER_STATE

#define StaticAssertDecl(condition, errmessage)

void SerializeComboCIDState(Size maxsize, char *start_address)

Size EstimateComboCIDStateSpace(void)

void SerializeLibraryState(Size maxsize, char *start_address)

Size EstimateLibraryStateSpace(void)

void * dsm_segment_address(dsm_segment *seg)

dsm_segment * dsm_create(Size size, int flags)

#define DSM_CREATE_NULL_IF_MAXSEGMENTS

#define DSM_HANDLE_INVALID

#define palloc0_array(type, count)

void SerializeGUCState(Size maxsize, char *start_address)

Size EstimateGUCStateSpace(void)

bool current_role_is_superuser

void SerializeReindexState(Size maxsize, char *start_address)

Size EstimateReindexStateSpace(void)

void * MemoryContextAlloc(MemoryContext context, Size size)

MemoryContext TopMemoryContext

#define INTERRUPTS_CAN_BE_PROCESSED()

void GetUserIdAndSecContext(Oid *userid, int *sec_context)

bool GetSessionUserIsSuperuser(void)

Size EstimateClientConnectionInfoSpace(void)

Oid GetSessionUserId(void)

Oid GetAuthenticatedUserId(void)

Oid GetCurrentRoleId(void)

void SerializeClientConnectionInfo(Size maxsize PG_USED_FOR_ASSERTS_ONLY, char *start_address)

void GetTempNamespaceState(Oid *tempNamespaceId, Oid *tempToastNamespaceId)

Size EstimateUncommittedEnumsSpace(void)

void SerializeUncommittedEnums(void *space, Size size)

SerializableXactHandle ShareSerializableXact(void)

Size EstimateRelationMapSpace(void)

void SerializeRelationMap(Size maxSize, char *startAddress)

dsm_handle GetSessionDsmHandle(void)

shm_mq * shm_mq_create(void *address, Size size)

void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)

shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)

void * shm_toc_allocate(shm_toc *toc, Size nbytes)

Size shm_toc_estimate(shm_toc_estimator *e)

shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)

void shm_toc_insert(shm_toc *toc, uint64 key, void *address)

#define shm_toc_estimate_chunk(e, sz)

#define shm_toc_estimate_keys(e, cnt)

Size mul_size(Size s1, Size s2)

void SerializeSnapshot(Snapshot snapshot, char *start_address)

Snapshot GetTransactionSnapshot(void)

Size EstimateSnapshotSpace(Snapshot snapshot)

Snapshot GetActiveSnapshot(void)

static void SpinLockInit(volatile slock_t *lock)

void SerializePendingSyncs(Size maxSize, char *startAddress)

Size EstimatePendingSyncsSpace(void)

void SerializeTransactionState(Size maxsize, char *start_address)

Size EstimateTransactionStateSpace(void)

TimestampTz GetCurrentStatementStartTimestamp(void)

TimestampTz GetCurrentTransactionStartTimestamp(void)

#define IsolationUsesXactSnapshot()

#define InvalidXLogRecPtr

References BUFFERALIGN, current_role_is_superuser, dsm_create(), DSM_CREATE_NULL_IF_MAXSEGMENTS, DSM_HANDLE_INVALID, dsm_segment_address(), ParallelWorkerInfo::error_mqh, EstimateClientConnectionInfoSpace(), EstimateComboCIDStateSpace(), EstimateGUCStateSpace(), EstimateLibraryStateSpace(), EstimatePendingSyncsSpace(), EstimateReindexStateSpace(), EstimateRelationMapSpace(), EstimateSnapshotSpace(), EstimateTransactionStateSpace(), EstimateUncommittedEnumsSpace(), ParallelContext::estimator, fb(), ParallelContext::function_name, GetActiveSnapshot(), GetAuthenticatedUserId(), GetCurrentRoleId(), GetCurrentStatementStartTimestamp(), GetCurrentTransactionStartTimestamp(), GetSessionDsmHandle(), GetSessionUserId(), GetSessionUserIsSuperuser(), GetTempNamespaceState(), GetTransactionSnapshot(), GetUserIdAndSecContext(), i, INTERRUPTS_CAN_BE_PROCESSED, InvalidXLogRecPtr, IsolationUsesXactSnapshot, ParallelContext::library_name, MemoryContextAlloc(), MemoryContextSwitchTo(), mul_size(), MyDatabaseId, MyProc, MyProcNumber, MyProcPid, ParallelContext::nworkers, ParallelContext::nworkers_to_launch, palloc0_array, PARALLEL_ERROR_QUEUE_SIZE, PARALLEL_KEY_ACTIVE_SNAPSHOT, PARALLEL_KEY_CLIENTCONNINFO, PARALLEL_KEY_COMBO_CID, PARALLEL_KEY_ENTRYPOINT, PARALLEL_KEY_ERROR_QUEUE, PARALLEL_KEY_FIXED, PARALLEL_KEY_GUC, PARALLEL_KEY_LIBRARY, PARALLEL_KEY_PENDING_SYNCS, PARALLEL_KEY_REINDEX_STATE, PARALLEL_KEY_RELMAPPER_STATE, PARALLEL_KEY_SESSION_DSM, PARALLEL_KEY_TRANSACTION_SNAPSHOT, PARALLEL_KEY_TRANSACTION_STATE, PARALLEL_KEY_UNCOMMITTEDENUMS, PARALLEL_MAGIC, ParallelContext::private_memory, ParallelContext::seg, SerializeClientConnectionInfo(), SerializeComboCIDState(), SerializeGUCState(), SerializeLibraryState(), SerializePendingSyncs(), SerializeReindexState(), SerializeRelationMap(), SerializeSnapshot(), SerializeTransactionState(), SerializeUncommittedEnums(), ShareSerializableXact(), shm_mq_attach(), shm_mq_create(), shm_mq_set_receiver(), shm_toc_allocate(), shm_toc_create(), shm_toc_estimate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_insert(), SpinLockInit(), start, StaticAssertDecl, ParallelContext::toc, TopMemoryContext, TopTransactionContext, and ParallelContext::worker.

Referenced by _brin_begin_parallel(), _bt_begin_parallel(), _gin_begin_parallel(), ExecInitParallelPlan(), and parallel_vacuum_init().

LaunchParallelWorkers()

Definition at line 583 of file parallel.c.

584{

587 int i;

589

590

592 return;

593

594

596

597

599

600

602

603

604 memset(&worker, 0, sizeof(worker));

617

618

619

620

621

622

623

624

625

627 {

632 {

636 }

637 else

638 {

639

640

641

642

643

644

645

646

647

652 }

653 }

654

655

656

657

658

660 {

663 }

664

665

667}

bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)

#define BGW_NEVER_RESTART

#define BGWORKER_CLASS_PARALLEL

@ BgWorkerStart_ConsistentState

#define BGWORKER_BACKEND_DATABASE_CONNECTION

#define BGWORKER_SHMEM_ACCESS

dsm_handle dsm_segment_handle(dsm_segment *seg)

static Datum UInt32GetDatum(uint32 X)

void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)

void BecomeLockGroupLeader(void)

char bgw_function_name[BGW_MAXLEN]

char bgw_name[BGW_MAXLEN]

char bgw_type[BGW_MAXLEN]

BgWorkerStartTime bgw_start_time

char bgw_extra[BGW_EXTRALEN]

char bgw_library_name[MAXPGPATH]

bool * known_attached_workers

int nknown_attached_workers

References Assert, BecomeLockGroupLeader(), BackgroundWorker::bgw_extra, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, ParallelWorkerInfo::bgwhandle, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_CLASS_PARALLEL, BGWORKER_SHMEM_ACCESS, BgWorkerStart_ConsistentState, dsm_segment_handle(), ParallelWorkerInfo::error_mqh, fb(), i, ParallelContext::known_attached_workers, MemoryContextSwitchTo(), MyProcPid, ParallelContext::nknown_attached_workers, ParallelContext::nworkers, ParallelContext::nworkers_launched, ParallelContext::nworkers_to_launch, palloc0_array, RegisterDynamicBackgroundWorker(), ParallelContext::seg, shm_mq_detach(), shm_mq_set_handle(), snprintf, sprintf, TopTransactionContext, UInt32GetDatum(), and ParallelContext::worker.

Referenced by _brin_begin_parallel(), _bt_begin_parallel(), _gin_begin_parallel(), ExecGather(), ExecGatherMerge(), and parallel_vacuum_process_all_indexes().

ParallelContextActive()

bool ParallelContextActive ( void ) extern

ParallelWorkerMain()

void ParallelWorkerMain ( Datum main_arg) extern

Definition at line 1301 of file parallel.c.

1302{

1311 char *library_name;

1312 char *function_name;

1327

1328

1330

1331

1333

1334

1337

1338

1340 "Parallel worker",

1342

1343

1344

1345

1346

1347

1348

1349

1350

1351

1353 if (seg == NULL)

1356 errmsg("could not map dynamic shared memory segment")));

1358 if (toc == NULL)

1361 errmsg("invalid magic number in dynamic shared memory segment")));

1362

1363

1366

1367

1371

1372

1373

1374

1375

1376

1377

1385 fps->parallel_leader_proc_number);

1386

1387

1388

1389

1390

1391

1392

1393

1394

1395

1396

1397

1398

1399

1400

1402 fps->parallel_leader_pid))

1403 return;

1404

1405

1406

1407

1408

1409

1411

1412

1413

1414

1415

1416

1420

1422

1423

1424

1425

1426

1427

1428

1431 fps->session_user_is_superuser);

1433

1434

1435

1436

1437

1438

1439

1441 fps->authenticated_user_id,

1444

1445

1446

1447

1448

1449

1450

1451

1454

1455

1456

1457

1458

1459

1464

1465

1468

1469

1470

1471

1472

1473

1474

1476 false);

1484

1485

1489

1490

1491

1492

1493

1494

1495

1496

1497

1498

1499

1500

1501

1507 fps->parallel_leader_pgproc);

1509

1510

1511

1512

1513

1515

1516

1517

1518

1519

1520

1521

1522

1525

1526

1527

1528

1529

1530

1531

1532

1533

1535

1536

1538 fps->temp_toast_namespace_id);

1539

1540

1542 false);

1544

1545

1547 false);

1549

1550

1551

1552

1553

1557

1558

1560

1561

1562

1563

1564

1567

1568

1569

1570

1572

1573

1575

1576

1578

1579

1581

1582

1584

1585

1587}

static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname)

bool InitializingParallelWorker

static FixedParallelState * MyFixedParallelState

static pid_t ParallelLeaderPid

static void ParallelWorkerShutdown(int code, Datum arg)

void BackgroundWorkerUnblockSignals(void)

void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)

#define BGWORKER_BYPASS_ROLELOGINCHECK

#define BGWORKER_BYPASS_ALLOWCONN

void RestoreComboCIDState(char *comboCIDstate)

void RestoreLibraryState(char *start_address)

dsm_segment * dsm_attach(dsm_handle h)

int errcode(int sqlerrcode)

#define ereport(elevel,...)

ProcNumber ParallelLeaderProcNumber

void RestoreGUCState(void *gucstate)

const char * hba_authname(UserAuth auth_method)

void(* parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc)

void RestoreReindexState(const void *reindexstate)

void InvalidateSystemCaches(void)

void before_shmem_exit(pg_on_exit_callback function, Datum arg)

#define pq_putmessage(msgtype, s, len)

int GetDatabaseEncoding(void)

int SetClientEncoding(int encoding)

MemoryContext CurrentMemoryContext

#define AllocSetContextCreate

#define ALLOCSET_DEFAULT_SIZES

void InitializeSystemUser(const char *authn_id, const char *auth_method)

void SetSessionAuthorization(Oid userid, bool is_superuser)

void SetCurrentRoleId(Oid roleid, bool is_superuser)

ClientConnectionInfo MyClientConnectionInfo

void RestoreClientConnectionInfo(char *conninfo)

void SetAuthenticatedUserId(Oid userid)

void SetUserIdAndSecContext(Oid userid, int sec_context)

void SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId)

void RestoreUncommittedEnums(void *space)

static uint32 DatumGetUInt32(Datum X)

static Datum PointerGetDatum(const void *X)

BackgroundWorker * MyBgworkerEntry

void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)

void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)

void AttachSerializableXact(SerializableXactHandle handle)

void RestoreRelationMap(char *startAddress)

void AttachSession(dsm_handle handle)

void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)

void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)

shm_toc * shm_toc_attach(uint64 magic, void *address)

void PushActiveSnapshot(Snapshot snapshot)

Snapshot RestoreSnapshot(char *start_address)

void RestoreTransactionSnapshot(Snapshot snapshot, PGPROC *source_pgproc)

void PopActiveSnapshot(void)

bool BecomeLockGroupMember(PGPROC *leader, int pid)

void RestorePendingSyncs(char *startAddress)

pid_t parallel_leader_pid

void ExitParallelMode(void)

void EnterParallelMode(void)

void StartTransactionCommand(void)

void StartParallelWorkerTransaction(char *tstatespace)

void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts)

void EndParallelWorkerTransaction(void)

void CommitTransactionCommand(void)

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, AttachSerializableXact(), AttachSession(), ClientConnectionInfo::auth_method, ClientConnectionInfo::authn_id, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), BecomeLockGroupMember(), before_shmem_exit(), BackgroundWorker::bgw_extra, BGWORKER_BYPASS_ALLOWCONN, BGWORKER_BYPASS_ROLELOGINCHECK, CommitTransactionCommand(), CurrentMemoryContext, DatumGetUInt32(), DetachSession(), dsm_attach(), dsm_segment_address(), elog, EndParallelWorkerTransaction(), EnterParallelMode(), ereport, errcode(), errmsg, ERROR, ExitParallelMode(), fb(), GetDatabaseEncoding(), hba_authname(), InitializeSystemUser(), InitializingParallelWorker, InvalidateSystemCaches(), LookupParallelWorkerFunction(), MyBgworkerEntry, MyClientConnectionInfo, MyFixedParallelState, MyProc, PARALLEL_ERROR_QUEUE_SIZE, PARALLEL_KEY_ACTIVE_SNAPSHOT, PARALLEL_KEY_CLIENTCONNINFO, PARALLEL_KEY_COMBO_CID, PARALLEL_KEY_ENTRYPOINT, PARALLEL_KEY_ERROR_QUEUE, PARALLEL_KEY_FIXED, PARALLEL_KEY_GUC, PARALLEL_KEY_LIBRARY, PARALLEL_KEY_PENDING_SYNCS, PARALLEL_KEY_REINDEX_STATE, PARALLEL_KEY_RELMAPPER_STATE, PARALLEL_KEY_SESSION_DSM, PARALLEL_KEY_TRANSACTION_SNAPSHOT, PARALLEL_KEY_TRANSACTION_STATE, PARALLEL_KEY_UNCOMMITTEDENUMS, FixedParallelState::parallel_leader_pid, PARALLEL_MAGIC, ParallelLeaderPid, ParallelLeaderProcNumber, ParallelWorkerNumber, ParallelWorkerShutdown(), PointerGetDatum(), PopActiveSnapshot(), pq_putmessage, pq_redirect_to_shm_mq(), pq_set_parallel_leader(), PqMsg_Terminate, PushActiveSnapshot(), RestoreClientConnectionInfo(), RestoreComboCIDState(), RestoreGUCState(), RestoreLibraryState(), RestorePendingSyncs(), RestoreReindexState(), RestoreRelationMap(), RestoreSnapshot(), RestoreTransactionSnapshot(), RestoreUncommittedEnums(), SetAuthenticatedUserId(), SetClientEncoding(), SetCurrentRoleId(), SetParallelStartTimestamps(), SetSessionAuthorization(), SetTempNamespaceState(), SetUserIdAndSecContext(), shm_mq_attach(), shm_mq_set_sender(), shm_toc_attach(), shm_toc_lookup(), StartParallelWorkerTransaction(), StartTransactionCommand(), and TopMemoryContext.

ParallelWorkerReportLastRecEnd()

void ParallelWorkerReportLastRecEnd ( XLogRecPtr last_xlog_end) extern

ProcessParallelMessages()

void ProcessParallelMessages ( void ) extern

Definition at line 1057 of file parallel.c.

1058{

1061

1063

1064

1065

1066

1067

1068

1069

1070

1072

1073

1074

1075

1076

1077

1080 "ProcessParallelMessages",

1082 else

1084

1086

1087

1089

1091 {

1093 int i;

1094

1097 continue;

1098

1100 {

1101

1102

1103

1104

1105

1106

1108 {

1110 Size nbytes;

1112

1114 &data, true);

1116 break;

1118 {

1120

1125 }

1126 else

1129 errmsg("lost connection to parallel worker")));

1130 }

1131 }

1132 }

1133

1135

1136

1138

1140}

static void ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)

#define dlist_foreach(iter, lhead)

#define dlist_container(type, membername, ptr)

void MemoryContextReset(MemoryContext context)

shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)

void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)

void initStringInfo(StringInfo str)

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), dlist_iter::cur, StringInfoData::data, data, dlist_container, dlist_foreach, ereport, errcode(), errmsg, ERROR, ParallelWorkerInfo::error_mqh, fb(), HOLD_INTERRUPTS, i, initStringInfo(), MemoryContextReset(), MemoryContextSwitchTo(), ParallelContext::nworkers_launched, ParallelMessagePending, pcxt_list, pfree(), ProcessParallelMessage(), RESUME_INTERRUPTS, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, TopMemoryContext, and ParallelContext::worker.

Referenced by ProcessInterrupts().

ReinitializeParallelDSM()

Definition at line 511 of file parallel.c.

512{

515

516

518

519

521 {

526 {

530 }

531 }

532

533

536

537

539 {

541 int i;

542

546 {

549

554 }

555 }

556

557

559}

void WaitForParallelWorkersToFinish(ParallelContext *pcxt)

References ParallelWorkerInfo::error_mqh, fb(), i, InvalidXLogRecPtr, ParallelContext::known_attached_workers, MemoryContextSwitchTo(), MyProc, ParallelContext::nknown_attached_workers, ParallelContext::nworkers, ParallelContext::nworkers_launched, PARALLEL_ERROR_QUEUE_SIZE, PARALLEL_KEY_ERROR_QUEUE, PARALLEL_KEY_FIXED, pfree(), ParallelContext::seg, shm_mq_attach(), shm_mq_create(), shm_mq_set_receiver(), shm_toc_lookup(), start, ParallelContext::toc, TopTransactionContext, WaitForParallelWorkersToExit(), WaitForParallelWorkersToFinish(), and ParallelContext::worker.

Referenced by ExecParallelReinitialize(), and parallel_vacuum_process_all_indexes().

ReinitializeParallelWorkers()

WaitForParallelWorkersToAttach()

Definition at line 702 of file parallel.c.

703{

704 int i;

705

706

708 return;

709

710 for (;;)

711 {

712

713

714

715

717

719 {

722 int rc;

724

726 continue;

727

728

729

730

731

733 {

736 continue;

737 }

738

741 {

742

745 {

746

749 }

750 }

752 {

753

754

755

756

761 errmsg("parallel worker failed to initialize"),

762 errhint("More details may be available in the server log.")));

763

766 }

767 else

768 {

769

770

771

772

773

774

778

781 }

782 }

783

784

786 {

788 break;

789 }

790 }

791}

BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)

int errhint(const char *fmt,...) pg_attribute_printf(1

void ResetLatch(Latch *latch)

int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)

#define CHECK_FOR_INTERRUPTS()

shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)

PGPROC * shm_mq_get_sender(shm_mq *mq)

#define WL_EXIT_ON_PM_DEATH

References Assert, BGWH_STARTED, BGWH_STOPPED, ParallelWorkerInfo::bgwhandle, CHECK_FOR_INTERRUPTS, ereport, errcode(), errhint(), errmsg, ERROR, ParallelWorkerInfo::error_mqh, fb(), GetBackgroundWorkerPid(), i, ParallelContext::known_attached_workers, MyLatch, ParallelContext::nknown_attached_workers, ParallelContext::nworkers_launched, ResetLatch(), shm_mq_get_queue(), shm_mq_get_sender(), WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and ParallelContext::worker.

Referenced by _brin_begin_parallel(), _bt_begin_parallel(), and _gin_begin_parallel().

WaitForParallelWorkersToFinish()

Definition at line 805 of file parallel.c.

806{

807 for (;;)

808 {

811 int i;

812

813

814

815

816

817

819

821 {

822

823

824

825

826

827

831 {

833 break;

834 }

835 }

836

838 {

839

841 {

843 break;

844 }

845

846

847

848

849

850

851

853 {

856

857

858

859

860

861

866 continue;

867

868

869

870

871

872

873

874

875

880 errmsg("parallel worker failed to initialize"),

881 errhint("More details may be available in the server log.")));

882

883

884

885

886

887

888

889

890

891

892 }

893 }

894

898 }

899

901 {

903

907 }

908}

XLogRecPtr XactLastRecEnd

References Assert, BGWH_STOPPED, ParallelWorkerInfo::bgwhandle, CHECK_FOR_INTERRUPTS, ereport, errcode(), errhint(), errmsg, ERROR, ParallelWorkerInfo::error_mqh, fb(), GetBackgroundWorkerPid(), i, ParallelContext::known_attached_workers, MyLatch, ParallelContext::nworkers_launched, PARALLEL_KEY_FIXED, ResetLatch(), shm_mq_get_queue(), shm_mq_get_sender(), shm_toc_lookup(), ParallelContext::toc, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, ParallelContext::worker, and XactLastRecEnd.

Referenced by _brin_end_parallel(), _bt_end_parallel(), _gin_end_parallel(), ExecParallelFinish(), parallel_vacuum_process_all_indexes(), and ReinitializeParallelDSM().

InitializingParallelWorker

ParallelMessagePending

ParallelWorkerNumber

Definition at line 117 of file parallel.c.

Referenced by _brin_parallel_build_main(), _bt_parallel_build_main(), _gin_parallel_build_main(), BuildTupleHashTable(), ExecEndAgg(), ExecEndBitmapHeapScan(), ExecEndBitmapIndexScan(), ExecEndIndexOnlyScan(), ExecEndIndexScan(), ExecEndMemoize(), ExecHashInitializeWorker(), ExecParallelGetReceiver(), ExecParallelHashEnsureBatchAccessors(), ExecParallelHashJoinSetUpBatches(), ExecParallelHashRepartitionRest(), ExecParallelReportInstrumentation(), ExecSort(), parallel_vacuum_main(), ParallelQueryMain(), and ParallelWorkerMain().