PostgreSQL Source Code: src/backend/access/transam/parallel.c File Reference (original) (raw)

Go to the source code of this file.

Macros
#define PARALLEL_ERROR_QUEUE_SIZE 16384
#define PARALLEL_MAGIC 0x50477c7c
#define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
#define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
#define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
#define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
#define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
#define PARALLEL_KEY_PENDING_SYNCS UINT64CONST(0xFFFFFFFFFFFF000B)
#define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
#define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D)
#define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E)
#define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F)
Functions
static void ProcessParallelMessage (ParallelContext *pcxt, int i, StringInfo msg)
static void WaitForParallelWorkersToExit (ParallelContext *pcxt)
static parallel_worker_main_type LookupParallelWorkerFunction (const char *libraryname, const char *funcname)
static void ParallelWorkerShutdown (int code, Datum arg)
ParallelContext * CreateParallelContext (const char *library_name, const char *function_name, int nworkers)
void InitializeParallelDSM (ParallelContext *pcxt)
void ReinitializeParallelDSM (ParallelContext *pcxt)
void ReinitializeParallelWorkers (ParallelContext *pcxt, int nworkers_to_launch)
void LaunchParallelWorkers (ParallelContext *pcxt)
void WaitForParallelWorkersToAttach (ParallelContext *pcxt)
void WaitForParallelWorkersToFinish (ParallelContext *pcxt)
void DestroyParallelContext (ParallelContext *pcxt)
bool ParallelContextActive (void)
void HandleParallelMessageInterrupt (void)
void ProcessParallelMessages (void)
void AtEOSubXact_Parallel (bool isCommit, SubTransactionId mySubId)
void AtEOXact_Parallel (bool isCommit)
void ParallelWorkerMain (Datum main_arg)
void ParallelWorkerReportLastRecEnd (XLogRecPtr last_xlog_end)
Variables
int ParallelWorkerNumber = -1
volatile sig_atomic_t ParallelMessagePending = false
bool InitializingParallelWorker = false
static FixedParallelState * MyFixedParallelState
static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list)
static pid_t ParallelLeaderPid
struct {
const char * fn_name
parallel_worker_main_type fn_addr
} InternalParallelWorkers []

PARALLEL_ERROR_QUEUE_SIZE

#define PARALLEL_ERROR_QUEUE_SIZE 16384

PARALLEL_KEY_ACTIVE_SNAPSHOT

PARALLEL_KEY_CLIENTCONNINFO

PARALLEL_KEY_COMBO_CID

PARALLEL_KEY_ENTRYPOINT

PARALLEL_KEY_ERROR_QUEUE

PARALLEL_KEY_FIXED

PARALLEL_KEY_GUC

PARALLEL_KEY_LIBRARY

PARALLEL_KEY_PENDING_SYNCS

PARALLEL_KEY_REINDEX_STATE

PARALLEL_KEY_RELMAPPER_STATE

PARALLEL_KEY_SESSION_DSM

PARALLEL_KEY_TRANSACTION_SNAPSHOT

PARALLEL_KEY_TRANSACTION_STATE

PARALLEL_KEY_UNCOMMITTEDENUMS

PARALLEL_MAGIC

#define PARALLEL_MAGIC 0x50477c7c

FixedParallelState

AtEOSubXact_Parallel()

Definition at line 1263 of file parallel.c.

1264{

1266 {

1268

1271 break;

1273 elog(WARNING, "leaked parallel context");

1275 }

1276}

void DestroyParallelContext(ParallelContext *pcxt)

static dlist_head pcxt_list

#define dlist_head_element(type, membername, lhead)

static bool dlist_is_empty(const dlist_head *head)

References DestroyParallelContext(), dlist_head_element, dlist_is_empty(), elog, fb(), pcxt_list, ParallelContext::subid, and WARNING.

Referenced by AbortSubTransaction(), and CommitSubTransaction().

AtEOXact_Parallel()

CreateParallelContext()

Definition at line 175 of file parallel.c.

177{

180

181

183

184

185 Assert(nworkers >= 0);

186

187

189

190

200

201

203

204 return pcxt;

205}

#define Assert(condition)

ErrorContextCallback * error_context_stack

#define palloc0_object(type)

static void dlist_push_head(dlist_head *head, dlist_node *node)

MemoryContext TopTransactionContext

char * pstrdup(const char *in)

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

#define shm_toc_initialize_estimator(e)

ErrorContextCallback * error_context_stack

shm_toc_estimator estimator

SubTransactionId GetCurrentSubTransactionId(void)

bool IsInParallelMode(void)

References Assert, dlist_push_head(), error_context_stack, ParallelContext::error_context_stack, ParallelContext::estimator, ParallelContext::function_name, GetCurrentSubTransactionId(), IsInParallelMode(), ParallelContext::library_name, MemoryContextSwitchTo(), ParallelContext::node, ParallelContext::nworkers, ParallelContext::nworkers_to_launch, palloc0_object, pcxt_list, pstrdup(), shm_toc_initialize_estimator, ParallelContext::subid, and TopTransactionContext.

Referenced by _brin_begin_parallel(), _bt_begin_parallel(), _gin_begin_parallel(), ExecInitParallelPlan(), and parallel_vacuum_init().

DestroyParallelContext()

Definition at line 959 of file parallel.c.

960{

961 int i;

962

963

964

965

966

967

968

970

971

973 {

975 {

977 {

979

982 }

983 }

984 }

985

986

987

988

989

990

992 {

995 }

996

997

998

999

1000

1002 {

1005 }

1006

1007

1008

1009

1010

1011

1015

1016

1018 {

1021 }

1022

1023

1027}

static void WaitForParallelWorkersToExit(ParallelContext *pcxt)

void TerminateBackgroundWorker(BackgroundWorkerHandle *handle)

void dsm_detach(dsm_segment *seg)

static void dlist_delete(dlist_node *node)

void pfree(void *pointer)

#define RESUME_INTERRUPTS()

#define HOLD_INTERRUPTS()

void shm_mq_detach(shm_mq_handle *mqh)

ParallelWorkerInfo * worker

BackgroundWorkerHandle * bgwhandle

shm_mq_handle * error_mqh

References ParallelWorkerInfo::bgwhandle, dlist_delete(), dsm_detach(), ParallelWorkerInfo::error_mqh, fb(), ParallelContext::function_name, HOLD_INTERRUPTS, i, ParallelContext::library_name, ParallelContext::node, ParallelContext::nworkers_launched, pfree(), ParallelContext::private_memory, RESUME_INTERRUPTS, ParallelContext::seg, shm_mq_detach(), TerminateBackgroundWorker(), WaitForParallelWorkersToExit(), and ParallelContext::worker.

Referenced by _brin_begin_parallel(), _brin_end_parallel(), _bt_begin_parallel(), _bt_end_parallel(), _gin_begin_parallel(), _gin_end_parallel(), AtEOSubXact_Parallel(), AtEOXact_Parallel(), ExecParallelCleanup(), and parallel_vacuum_end().

HandleParallelMessageInterrupt()

void HandleParallelMessageInterrupt ( void )

InitializeParallelDSM()

Definition at line 213 of file parallel.c.

214{

228 int i;

233

234

236

237

240

241

242

243

244

245

246

249

250

251

252

253

255 {

256

258

259

260

261

262

263

264

267 }

268

270 {

273 "parallel error queue size not buffer-aligned");

274

275

283 {

286 }

302

304

305

310

311

315 }

316

317

318

319

320

321

322

323

324

325

326

327

335 else

336 {

341 }

342

343

354 &fps->temp_toast_namespace_id);

355 fps->parallel_leader_pgproc = MyProc;

364

365

367 {

383

384

388

389

393

394

398

399

400

401

402

404 {

409 }

410

411

415

416

422

423

427

428

433

434

438

439

444

445

451

452

457

458

460

461

462

463

464

465

466

467

473 {

476

481 }

483

484

485

486

487

488

489

490

497 }

498

499

501

502

504}

#define PARALLEL_KEY_TRANSACTION_STATE

#define PARALLEL_KEY_UNCOMMITTEDENUMS

#define PARALLEL_KEY_TRANSACTION_SNAPSHOT

#define PARALLEL_KEY_CLIENTCONNINFO

#define PARALLEL_KEY_PENDING_SYNCS

#define PARALLEL_KEY_ACTIVE_SNAPSHOT

#define PARALLEL_KEY_ERROR_QUEUE

#define PARALLEL_KEY_SESSION_DSM

#define PARALLEL_KEY_REINDEX_STATE

#define PARALLEL_KEY_LIBRARY

#define PARALLEL_KEY_FIXED

#define PARALLEL_KEY_ENTRYPOINT

#define PARALLEL_KEY_COMBO_CID

#define PARALLEL_ERROR_QUEUE_SIZE

#define PARALLEL_KEY_RELMAPPER_STATE

#define StaticAssertDecl(condition, errmessage)

void SerializeComboCIDState(Size maxsize, char *start_address)

Size EstimateComboCIDStateSpace(void)

void SerializeLibraryState(Size maxsize, char *start_address)

Size EstimateLibraryStateSpace(void)

void * dsm_segment_address(dsm_segment *seg)

dsm_segment * dsm_create(Size size, int flags)

#define DSM_CREATE_NULL_IF_MAXSEGMENTS

#define DSM_HANDLE_INVALID

#define palloc0_array(type, count)

void SerializeGUCState(Size maxsize, char *start_address)

Size EstimateGUCStateSpace(void)

bool current_role_is_superuser

void SerializeReindexState(Size maxsize, char *start_address)

Size EstimateReindexStateSpace(void)

void * MemoryContextAlloc(MemoryContext context, Size size)

MemoryContext TopMemoryContext

#define INTERRUPTS_CAN_BE_PROCESSED()

void GetUserIdAndSecContext(Oid *userid, int *sec_context)

bool GetSessionUserIsSuperuser(void)

Size EstimateClientConnectionInfoSpace(void)

Oid GetSessionUserId(void)

Oid GetAuthenticatedUserId(void)

Oid GetCurrentRoleId(void)

void SerializeClientConnectionInfo(Size maxsize PG_USED_FOR_ASSERTS_ONLY, char *start_address)

void GetTempNamespaceState(Oid *tempNamespaceId, Oid *tempToastNamespaceId)

Size EstimateUncommittedEnumsSpace(void)

void SerializeUncommittedEnums(void *space, Size size)

SerializableXactHandle ShareSerializableXact(void)

Size EstimateRelationMapSpace(void)

void SerializeRelationMap(Size maxSize, char *startAddress)

dsm_handle GetSessionDsmHandle(void)

shm_mq * shm_mq_create(void *address, Size size)

void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)

shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)

void * shm_toc_allocate(shm_toc *toc, Size nbytes)

Size shm_toc_estimate(shm_toc_estimator *e)

shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)

void shm_toc_insert(shm_toc *toc, uint64 key, void *address)

#define shm_toc_estimate_chunk(e, sz)

#define shm_toc_estimate_keys(e, cnt)

Size mul_size(Size s1, Size s2)

void SerializeSnapshot(Snapshot snapshot, char *start_address)

Snapshot GetTransactionSnapshot(void)

Size EstimateSnapshotSpace(Snapshot snapshot)

Snapshot GetActiveSnapshot(void)

static void SpinLockInit(volatile slock_t *lock)

void SerializePendingSyncs(Size maxSize, char *startAddress)

Size EstimatePendingSyncsSpace(void)

void SerializeTransactionState(Size maxsize, char *start_address)

Size EstimateTransactionStateSpace(void)

TimestampTz GetCurrentStatementStartTimestamp(void)

TimestampTz GetCurrentTransactionStartTimestamp(void)

#define IsolationUsesXactSnapshot()

#define InvalidXLogRecPtr

References BUFFERALIGN, current_role_is_superuser, dsm_create(), DSM_CREATE_NULL_IF_MAXSEGMENTS, DSM_HANDLE_INVALID, dsm_segment_address(), ParallelWorkerInfo::error_mqh, EstimateClientConnectionInfoSpace(), EstimateComboCIDStateSpace(), EstimateGUCStateSpace(), EstimateLibraryStateSpace(), EstimatePendingSyncsSpace(), EstimateReindexStateSpace(), EstimateRelationMapSpace(), EstimateSnapshotSpace(), EstimateTransactionStateSpace(), EstimateUncommittedEnumsSpace(), ParallelContext::estimator, fb(), ParallelContext::function_name, GetActiveSnapshot(), GetAuthenticatedUserId(), GetCurrentRoleId(), GetCurrentStatementStartTimestamp(), GetCurrentTransactionStartTimestamp(), GetSessionDsmHandle(), GetSessionUserId(), GetSessionUserIsSuperuser(), GetTempNamespaceState(), GetTransactionSnapshot(), GetUserIdAndSecContext(), i, INTERRUPTS_CAN_BE_PROCESSED, InvalidXLogRecPtr, IsolationUsesXactSnapshot, ParallelContext::library_name, MemoryContextAlloc(), MemoryContextSwitchTo(), mul_size(), MyDatabaseId, MyProc, MyProcNumber, MyProcPid, ParallelContext::nworkers, ParallelContext::nworkers_to_launch, palloc0_array, PARALLEL_ERROR_QUEUE_SIZE, PARALLEL_KEY_ACTIVE_SNAPSHOT, PARALLEL_KEY_CLIENTCONNINFO, PARALLEL_KEY_COMBO_CID, PARALLEL_KEY_ENTRYPOINT, PARALLEL_KEY_ERROR_QUEUE, PARALLEL_KEY_FIXED, PARALLEL_KEY_GUC, PARALLEL_KEY_LIBRARY, PARALLEL_KEY_PENDING_SYNCS, PARALLEL_KEY_REINDEX_STATE, PARALLEL_KEY_RELMAPPER_STATE, PARALLEL_KEY_SESSION_DSM, PARALLEL_KEY_TRANSACTION_SNAPSHOT, PARALLEL_KEY_TRANSACTION_STATE, PARALLEL_KEY_UNCOMMITTEDENUMS, PARALLEL_MAGIC, ParallelContext::private_memory, ParallelContext::seg, SerializeClientConnectionInfo(), SerializeComboCIDState(), SerializeGUCState(), SerializeLibraryState(), SerializePendingSyncs(), SerializeReindexState(), SerializeRelationMap(), SerializeSnapshot(), SerializeTransactionState(), SerializeUncommittedEnums(), ShareSerializableXact(), shm_mq_attach(), shm_mq_create(), shm_mq_set_receiver(), shm_toc_allocate(), shm_toc_create(), shm_toc_estimate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_insert(), SpinLockInit(), start, StaticAssertDecl, ParallelContext::toc, TopMemoryContext, TopTransactionContext, and ParallelContext::worker.

Referenced by _brin_begin_parallel(), _bt_begin_parallel(), _gin_begin_parallel(), ExecInitParallelPlan(), and parallel_vacuum_init().

LaunchParallelWorkers()

Definition at line 583 of file parallel.c.

584{

587 int i;

589

590

592 return;

593

594

596

597

599

600

602

603

604 memset(&worker, 0, sizeof(worker));

617

618

619

620

621

622

623

624

625

627 {

632 {

636 }

637 else

638 {

639

640

641

642

643

644

645

646

647

652 }

653 }

654

655

656

657

658

660 {

663 }

664

665

667}

bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)

#define BGW_NEVER_RESTART

#define BGWORKER_CLASS_PARALLEL

@ BgWorkerStart_ConsistentState

#define BGWORKER_BACKEND_DATABASE_CONNECTION

#define BGWORKER_SHMEM_ACCESS

dsm_handle dsm_segment_handle(dsm_segment *seg)

static Datum UInt32GetDatum(uint32 X)

void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)

void BecomeLockGroupLeader(void)

char bgw_function_name[BGW_MAXLEN]

char bgw_name[BGW_MAXLEN]

char bgw_type[BGW_MAXLEN]

BgWorkerStartTime bgw_start_time

char bgw_extra[BGW_EXTRALEN]

char bgw_library_name[MAXPGPATH]

bool * known_attached_workers

int nknown_attached_workers

References Assert, BecomeLockGroupLeader(), BackgroundWorker::bgw_extra, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, ParallelWorkerInfo::bgwhandle, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_CLASS_PARALLEL, BGWORKER_SHMEM_ACCESS, BgWorkerStart_ConsistentState, dsm_segment_handle(), ParallelWorkerInfo::error_mqh, fb(), i, ParallelContext::known_attached_workers, MemoryContextSwitchTo(), MyProcPid, ParallelContext::nknown_attached_workers, ParallelContext::nworkers, ParallelContext::nworkers_launched, ParallelContext::nworkers_to_launch, palloc0_array, RegisterDynamicBackgroundWorker(), ParallelContext::seg, shm_mq_detach(), shm_mq_set_handle(), snprintf, sprintf, TopTransactionContext, UInt32GetDatum(), and ParallelContext::worker.

Referenced by _brin_begin_parallel(), _bt_begin_parallel(), _gin_begin_parallel(), ExecGather(), ExecGatherMerge(), and parallel_vacuum_process_all_indexes().

LookupParallelWorkerFunction()

Definition at line 1650 of file parallel.c.

1651{

1652

1653

1654

1655

1657 {

1658 int i;

1659

1661 {

1664 }

1665

1666

1668 }

1669

1670

1673}

parallel_worker_main_type fn_addr

static const struct @16 InternalParallelWorkers[]

void * load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)

void(* parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc)

References elog, ERROR, fb(), fn_addr, fn_name, funcname, i, InternalParallelWorkers, lengthof, and load_external_function().

Referenced by ParallelWorkerMain().

ParallelContextActive()

ParallelWorkerMain()

Definition at line 1301 of file parallel.c.

1302{

1311 char *library_name;

1312 char *function_name;

1327

1328

1330

1331

1333

1334

1337

1338

1340 "Parallel worker",

1342

1343

1344

1345

1346

1347

1348

1349

1350

1351

1353 if (seg == NULL)

1356 errmsg("could not map dynamic shared memory segment")));

1358 if (toc == NULL)

1361 errmsg("invalid magic number in dynamic shared memory segment")));

1362

1363

1366

1367

1371

1372

1373

1374

1375

1376

1377

1385 fps->parallel_leader_proc_number);

1386

1387

1388

1389

1390

1391

1392

1393

1394

1395

1396

1397

1398

1399

1400

1402 fps->parallel_leader_pid))

1403 return;

1404

1405

1406

1407

1408

1409

1411

1412

1413

1414

1415

1416

1420

1422

1423

1424

1425

1426

1427

1428

1431 fps->session_user_is_superuser);

1433

1434

1435

1436

1437

1438

1439

1441 fps->authenticated_user_id,

1444

1445

1446

1447

1448

1449

1450

1451

1454

1455

1456

1457

1458

1459

1464

1465

1468

1469

1470

1471

1472

1473

1474

1476 false);

1484

1485

1489

1490

1491

1492

1493

1494

1495

1496

1497

1498

1499

1500

1501

1507 fps->parallel_leader_pgproc);

1509

1510

1511

1512

1513

1515

1516

1517

1518

1519

1520

1521

1522

1525

1526

1527

1528

1529

1530

1531

1532

1533

1535

1536

1538 fps->temp_toast_namespace_id);

1539

1540

1542 false);

1544

1545

1547 false);

1549

1550

1551

1552

1553

1557

1558

1560

1561

1562

1563

1564

1567

1568

1569

1570

1572

1573

1575

1576

1578

1579

1581

1582

1584

1585

1587}

static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname)

bool InitializingParallelWorker

static FixedParallelState * MyFixedParallelState

static pid_t ParallelLeaderPid

static void ParallelWorkerShutdown(int code, Datum arg)

void BackgroundWorkerUnblockSignals(void)

void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)

#define BGWORKER_BYPASS_ROLELOGINCHECK

#define BGWORKER_BYPASS_ALLOWCONN

void RestoreComboCIDState(char *comboCIDstate)

void RestoreLibraryState(char *start_address)

dsm_segment * dsm_attach(dsm_handle h)

int errcode(int sqlerrcode)

#define ereport(elevel,...)

ProcNumber ParallelLeaderProcNumber

void RestoreGUCState(void *gucstate)

const char * hba_authname(UserAuth auth_method)

void RestoreReindexState(const void *reindexstate)

void InvalidateSystemCaches(void)

void before_shmem_exit(pg_on_exit_callback function, Datum arg)

#define pq_putmessage(msgtype, s, len)

int GetDatabaseEncoding(void)

int SetClientEncoding(int encoding)

MemoryContext CurrentMemoryContext

#define AllocSetContextCreate

#define ALLOCSET_DEFAULT_SIZES

void InitializeSystemUser(const char *authn_id, const char *auth_method)

void SetSessionAuthorization(Oid userid, bool is_superuser)

void SetCurrentRoleId(Oid roleid, bool is_superuser)

ClientConnectionInfo MyClientConnectionInfo

void RestoreClientConnectionInfo(char *conninfo)

void SetAuthenticatedUserId(Oid userid)

void SetUserIdAndSecContext(Oid userid, int sec_context)

void SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId)

void RestoreUncommittedEnums(void *space)

static uint32 DatumGetUInt32(Datum X)

static Datum PointerGetDatum(const void *X)

BackgroundWorker * MyBgworkerEntry

void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)

void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)

void AttachSerializableXact(SerializableXactHandle handle)

void RestoreRelationMap(char *startAddress)

void AttachSession(dsm_handle handle)

void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)

void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)

shm_toc * shm_toc_attach(uint64 magic, void *address)

void PushActiveSnapshot(Snapshot snapshot)

Snapshot RestoreSnapshot(char *start_address)

void RestoreTransactionSnapshot(Snapshot snapshot, PGPROC *source_pgproc)

void PopActiveSnapshot(void)

bool BecomeLockGroupMember(PGPROC *leader, int pid)

void RestorePendingSyncs(char *startAddress)

pid_t parallel_leader_pid

void ExitParallelMode(void)

void EnterParallelMode(void)

void StartTransactionCommand(void)

void StartParallelWorkerTransaction(char *tstatespace)

void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts)

void EndParallelWorkerTransaction(void)

void CommitTransactionCommand(void)

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, AttachSerializableXact(), AttachSession(), ClientConnectionInfo::auth_method, ClientConnectionInfo::authn_id, BackgroundWorkerInitializeConnectionByOid(), BackgroundWorkerUnblockSignals(), BecomeLockGroupMember(), before_shmem_exit(), BackgroundWorker::bgw_extra, BGWORKER_BYPASS_ALLOWCONN, BGWORKER_BYPASS_ROLELOGINCHECK, CommitTransactionCommand(), CurrentMemoryContext, DatumGetUInt32(), DetachSession(), dsm_attach(), dsm_segment_address(), elog, EndParallelWorkerTransaction(), EnterParallelMode(), ereport, errcode(), errmsg, ERROR, ExitParallelMode(), fb(), GetDatabaseEncoding(), hba_authname(), InitializeSystemUser(), InitializingParallelWorker, InvalidateSystemCaches(), LookupParallelWorkerFunction(), MyBgworkerEntry, MyClientConnectionInfo, MyFixedParallelState, MyProc, PARALLEL_ERROR_QUEUE_SIZE, PARALLEL_KEY_ACTIVE_SNAPSHOT, PARALLEL_KEY_CLIENTCONNINFO, PARALLEL_KEY_COMBO_CID, PARALLEL_KEY_ENTRYPOINT, PARALLEL_KEY_ERROR_QUEUE, PARALLEL_KEY_FIXED, PARALLEL_KEY_GUC, PARALLEL_KEY_LIBRARY, PARALLEL_KEY_PENDING_SYNCS, PARALLEL_KEY_REINDEX_STATE, PARALLEL_KEY_RELMAPPER_STATE, PARALLEL_KEY_SESSION_DSM, PARALLEL_KEY_TRANSACTION_SNAPSHOT, PARALLEL_KEY_TRANSACTION_STATE, PARALLEL_KEY_UNCOMMITTEDENUMS, FixedParallelState::parallel_leader_pid, PARALLEL_MAGIC, ParallelLeaderPid, ParallelLeaderProcNumber, ParallelWorkerNumber, ParallelWorkerShutdown(), PointerGetDatum(), PopActiveSnapshot(), pq_putmessage, pq_redirect_to_shm_mq(), pq_set_parallel_leader(), PqMsg_Terminate, PushActiveSnapshot(), RestoreClientConnectionInfo(), RestoreComboCIDState(), RestoreGUCState(), RestoreLibraryState(), RestorePendingSyncs(), RestoreReindexState(), RestoreRelationMap(), RestoreSnapshot(), RestoreTransactionSnapshot(), RestoreUncommittedEnums(), SetAuthenticatedUserId(), SetClientEncoding(), SetCurrentRoleId(), SetParallelStartTimestamps(), SetSessionAuthorization(), SetTempNamespaceState(), SetUserIdAndSecContext(), shm_mq_attach(), shm_mq_set_sender(), shm_toc_attach(), shm_toc_lookup(), StartParallelWorkerTransaction(), StartTransactionCommand(), and TopMemoryContext.

ParallelWorkerReportLastRecEnd()

ParallelWorkerShutdown()

ProcessParallelMessage()

Definition at line 1146 of file parallel.c.

1147{

1149

1152 {

1155 }

1156

1158

1160 {

1163 {

1166

1167

1169

1170

1172

1173

1174

1175

1176

1177

1178

1179

1180

1182 {

1183 if (edata.context)

1185 _("parallel worker"));

1186 else

1188 }

1189

1190

1191

1192

1193

1194

1197

1198

1200

1201

1203

1204 break;

1205 }

1206

1208 {

1209

1211 const char *channel;

1212 const char *payload;

1213

1218

1220

1221 break;

1222 }

1223

1225 {

1226

1227

1228

1229

1230

1233

1235

1237

1238 break;

1239 }

1240

1242 {

1245 break;

1246 }

1247

1248 default:

1249 {

1250 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",

1252 }

1253 }

1254}

void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)

void pgstat_progress_incr_param(int index, int64 incr)

void ThrowErrorData(ErrorData *edata)

unsigned int pq_getmsgint(StringInfo msg, int b)

void pq_getmsgend(StringInfo msg)

void pq_endmessage(StringInfo buf)

int pq_getmsgbyte(StringInfo msg)

const char * pq_getmsgrawstring(StringInfo msg)

int64 pq_getmsgint64(StringInfo msg)

void pq_parse_errornotice(StringInfo msg, ErrorData *edata)

#define PqMsg_NotificationResponse

#define PqMsg_ErrorResponse

#define PqMsg_NoticeResponse

char * psprintf(const char *fmt,...)

References _, debug_parallel_query, DEBUG_PARALLEL_REGRESS, elog, ERROR, error_context_stack, ParallelContext::error_context_stack, ParallelWorkerInfo::error_mqh, fb(), i, ParallelContext::known_attached_workers, StringInfoData::len, Min, ParallelContext::nknown_attached_workers, NotifyMyFrontEnd(), pgstat_progress_incr_param(), pq_endmessage(), pq_getmsgbyte(), pq_getmsgend(), pq_getmsgint(), pq_getmsgint64(), pq_getmsgrawstring(), pq_parse_errornotice(), PqMsg_ErrorResponse, PqMsg_NoticeResponse, PqMsg_NotificationResponse, PqMsg_Progress, PqMsg_Terminate, psprintf(), pstrdup(), shm_mq_detach(), ThrowErrorData(), and ParallelContext::worker.

Referenced by ProcessParallelMessages().

ProcessParallelMessages()

Definition at line 1057 of file parallel.c.

1058{

1061

1063

1064

1065

1066

1067

1068

1069

1070

1072

1073

1074

1075

1076

1077

1080 "ProcessParallelMessages",

1082 else

1084

1086

1087

1089

1091 {

1093 int i;

1094

1097 continue;

1098

1100 {

1101

1102

1103

1104

1105

1106

1108 {

1110 Size nbytes;

1112

1114 &data, true);

1116 break;

1118 {

1120

1125 }

1126 else

1129 errmsg("lost connection to parallel worker")));

1130 }

1131 }

1132 }

1133

1135

1136

1138

1140}

static void ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)

#define dlist_foreach(iter, lhead)

#define dlist_container(type, membername, ptr)

void MemoryContextReset(MemoryContext context)

shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)

void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)

void initStringInfo(StringInfo str)

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), dlist_iter::cur, StringInfoData::data, data, dlist_container, dlist_foreach, ereport, errcode(), errmsg, ERROR, ParallelWorkerInfo::error_mqh, fb(), HOLD_INTERRUPTS, i, initStringInfo(), MemoryContextReset(), MemoryContextSwitchTo(), ParallelContext::nworkers_launched, ParallelMessagePending, pcxt_list, pfree(), ProcessParallelMessage(), RESUME_INTERRUPTS, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, TopMemoryContext, and ParallelContext::worker.

Referenced by ProcessInterrupts().

ReinitializeParallelDSM()

Definition at line 511 of file parallel.c.

512{

515

516

518

519

521 {

526 {

530 }

531 }

532

533

536

537

539 {

541 int i;

542

546 {

549

554 }

555 }

556

557

559}

void WaitForParallelWorkersToFinish(ParallelContext *pcxt)

References ParallelWorkerInfo::error_mqh, fb(), i, InvalidXLogRecPtr, ParallelContext::known_attached_workers, MemoryContextSwitchTo(), MyProc, ParallelContext::nknown_attached_workers, ParallelContext::nworkers, ParallelContext::nworkers_launched, PARALLEL_ERROR_QUEUE_SIZE, PARALLEL_KEY_ERROR_QUEUE, PARALLEL_KEY_FIXED, pfree(), ParallelContext::seg, shm_mq_attach(), shm_mq_create(), shm_mq_set_receiver(), shm_toc_lookup(), start, ParallelContext::toc, TopTransactionContext, WaitForParallelWorkersToExit(), WaitForParallelWorkersToFinish(), and ParallelContext::worker.

Referenced by ExecParallelReinitialize(), and parallel_vacuum_process_all_indexes().

ReinitializeParallelWorkers()

WaitForParallelWorkersToAttach()

Definition at line 702 of file parallel.c.

703{

704 int i;

705

706

708 return;

709

710 for (;;)

711 {

712

713

714

715

717

719 {

722 int rc;

724

726 continue;

727

728

729

730

731

733 {

736 continue;

737 }

738

741 {

742

745 {

746

749 }

750 }

752 {

753

754

755

756

761 errmsg("parallel worker failed to initialize"),

762 errhint("More details may be available in the server log.")));

763

766 }

767 else

768 {

769

770

771

772

773

774

778

781 }

782 }

783

784

786 {

788 break;

789 }

790 }

791}

BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)

int errhint(const char *fmt,...) pg_attribute_printf(1

void ResetLatch(Latch *latch)

int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)

#define CHECK_FOR_INTERRUPTS()

shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)

PGPROC * shm_mq_get_sender(shm_mq *mq)

#define WL_EXIT_ON_PM_DEATH

References Assert, BGWH_STARTED, BGWH_STOPPED, ParallelWorkerInfo::bgwhandle, CHECK_FOR_INTERRUPTS, ereport, errcode(), errhint(), errmsg, ERROR, ParallelWorkerInfo::error_mqh, fb(), GetBackgroundWorkerPid(), i, ParallelContext::known_attached_workers, MyLatch, ParallelContext::nknown_attached_workers, ParallelContext::nworkers_launched, ResetLatch(), shm_mq_get_queue(), shm_mq_get_sender(), WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and ParallelContext::worker.

Referenced by _brin_begin_parallel(), _bt_begin_parallel(), and _gin_begin_parallel().

WaitForParallelWorkersToExit()

Definition at line 919 of file parallel.c.

920{

921 int i;

922

923

925 {

927

929 continue;

930

932

933

934

935

936

937

938

942 errmsg("postmaster exited during a parallel transaction")));

943

944

947 }

948}

BgwHandleStatus WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)

References BGWH_POSTMASTER_DIED, ParallelWorkerInfo::bgwhandle, ereport, errcode(), errmsg, FATAL, fb(), i, ParallelContext::nworkers_launched, pfree(), WaitForBackgroundWorkerShutdown(), and ParallelContext::worker.

Referenced by DestroyParallelContext(), and ReinitializeParallelDSM().

WaitForParallelWorkersToFinish()

Definition at line 805 of file parallel.c.

806{

807 for (;;)

808 {

811 int i;

812

813

814

815

816

817

819

821 {

822

823

824

825

826

827

831 {

833 break;

834 }

835 }

836

838 {

839

841 {

843 break;

844 }

845

846

847

848

849

850

851

853 {

856

857

858

859

860

861

866 continue;

867

868

869

870

871

872

873

874

875

880 errmsg("parallel worker failed to initialize"),

881 errhint("More details may be available in the server log.")));

882

883

884

885

886

887

888

889

890

891

892 }

893 }

894

898 }

899

901 {

903

907 }

908}

XLogRecPtr XactLastRecEnd

References Assert, BGWH_STOPPED, ParallelWorkerInfo::bgwhandle, CHECK_FOR_INTERRUPTS, ereport, errcode(), errhint(), errmsg, ERROR, ParallelWorkerInfo::error_mqh, fb(), GetBackgroundWorkerPid(), i, ParallelContext::known_attached_workers, MyLatch, ParallelContext::nworkers_launched, PARALLEL_KEY_FIXED, ResetLatch(), shm_mq_get_queue(), shm_mq_get_sender(), shm_toc_lookup(), ParallelContext::toc, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, ParallelContext::worker, and XactLastRecEnd.

Referenced by _brin_end_parallel(), _bt_end_parallel(), _gin_end_parallel(), ExecParallelFinish(), parallel_vacuum_process_all_indexes(), and ReinitializeParallelDSM().

fn_addr

fn_name

InitializingParallelWorker

[struct]

const struct { ... } InternalParallelWorkers[]

Initial value:

=

{

{

},

{

},

{

},

{

},

{

}

}

void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)

void ParallelQueryMain(dsm_segment *seg, shm_toc *toc)

void _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc)

void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)

void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)

Referenced by LookupParallelWorkerFunction().

MyFixedParallelState

ParallelLeaderPid

ParallelMessagePending

ParallelWorkerNumber

int ParallelWorkerNumber = -1

Definition at line 117 of file parallel.c.

Referenced by _brin_parallel_build_main(), _bt_parallel_build_main(), _gin_parallel_build_main(), BuildTupleHashTable(), ExecEndAgg(), ExecEndBitmapHeapScan(), ExecEndBitmapIndexScan(), ExecEndIndexOnlyScan(), ExecEndIndexScan(), ExecEndMemoize(), ExecHashInitializeWorker(), ExecParallelGetReceiver(), ExecParallelHashEnsureBatchAccessors(), ExecParallelHashJoinSetUpBatches(), ExecParallelHashRepartitionRest(), ExecParallelReportInstrumentation(), ExecSort(), parallel_vacuum_main(), ParallelQueryMain(), and ParallelWorkerMain().

pcxt_list