PostgreSQL Source Code: src/backend/access/transam/parallel.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

16

49

50

51

52

53

54

55

56

57#define PARALLEL_ERROR_QUEUE_SIZE 16384

58

59

60#define PARALLEL_MAGIC 0x50477c7c

61

62

63

64

65

66

67#define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)

68#define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)

69#define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)

70#define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)

71#define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)

72#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)

73#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)

74#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)

75#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)

76#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)

77#define PARALLEL_KEY_PENDING_SYNCS UINT64CONST(0xFFFFFFFFFFFF000B)

78#define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000C)

79#define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D)

80#define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E)

81#define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F)

82

83

110

111

112

113

114

115

116

118

119

121

122

124

125

127

128

130

131

133

134

135

136

137

138static const struct

139{

143

144{

145 {

147 },

148 {

150 },

151 {

153 },

154 {

156 },

157 {

159 }

161

162

167

168

169

170

171

172

173

176 int nworkers)

177{

180

181

183

184

185 Assert(nworkers >= 0);

186

187

189

190

200

201

203

204 return pcxt;

205}

206

207

208

209

210

211

212void

214{

228 int i;

233

234

236

237

240

241

242

243

244

245

246

249

250

251

252

253

255 {

256

258

259

260

261

262

263

264

267 }

268

270 {

273 "parallel error queue size not buffer-aligned");

274

275

283 {

286 }

302

304

305

310

311

315 }

316

317

318

319

320

321

322

323

324

325

326

327

335 else

336 {

341 }

342

343

354 &fps->temp_toast_namespace_id);

355 fps->parallel_leader_pgproc = MyProc;

364

365

367 {

383

384

388

389

393

394

398

399

400

401

402

404 {

409 }

410

411

415

416

422

423

427

428

433

434

438

439

444

445

451

452

457

458

460

461

462

463

464

465

466

467

473 {

476

481 }

483

484

485

486

487

488

489

490

497 }

498

499

501

502

504}

505

506

507

508

509

510void

512{

515

516

518

519

521 {

526 {

530 }

531 }

532

533

536

537

539 {

541 int i;

542

546 {

549

554 }

555 }

556

557

559}

560

561

562

563

564

565

566

567void

569{

570

571

572

573

574

575

577}

578

579

580

581

582void

584{

587 int i;

589

590

592 return;

593

594

596

597

599

600

602

603

604 memset(&worker, 0, sizeof(worker));

617

618

619

620

621

622

623

624

625

627 {

632 {

636 }

637 else

638 {

639

640

641

642

643

644

645

646

647

652 }

653 }

654

655

656

657

658

660 {

663 }

664

665

667}

668

669

670

671

672

673

674

675

676

677

678

679

680

681

682

683

684

685

686

687

688

689

690

691

692

693

694

695

696

697

698

699

700

701void

703{

704 int i;

705

706

708 return;

709

710 for (;;)

711 {

712

713

714

715

717

719 {

722 int rc;

724

726 continue;

727

728

729

730

731

733 {

736 continue;

737 }

738

741 {

742

745 {

746

749 }

750 }

752 {

753

754

755

756

761 errmsg("parallel worker failed to initialize"),

762 errhint("More details may be available in the server log.")));

763

766 }

767 else

768 {

769

770

771

772

773

774

778

781 }

782 }

783

784

786 {

788 break;

789 }

790 }

791}

792

793

794

795

796

797

798

799

800

801

802

803

804void

806{

807 for (;;)

808 {

811 int i;

812

813

814

815

816

817

819

821 {

822

823

824

825

826

827

831 {

833 break;

834 }

835 }

836

838 {

839

841 {

843 break;

844 }

845

846

847

848

849

850

851

853 {

856

857

858

859

860

861

866 continue;

867

868

869

870

871

872

873

874

875

880 errmsg("parallel worker failed to initialize"),

881 errhint("More details may be available in the server log.")));

882

883

884

885

886

887

888

889

890

891

892 }

893 }

894

898 }

899

901 {

903

907 }

908}

909

910

911

912

913

914

915

916

917

918static void

920{

921 int i;

922

923

925 {

927

929 continue;

930

932

933

934

935

936

937

938

942 errmsg("postmaster exited during a parallel transaction")));

943

944

947 }

948}

949

950

951

952

953

954

955

956

957

958void

960{

961 int i;

962

963

964

965

966

967

968

970

971

973 {

975 {

977 {

979

982 }

983 }

984 }

985

986

987

988

989

990

992 {

995 }

996

997

998

999

1000

1002 {

1005 }

1006

1007

1008

1009

1010

1011

1015

1016

1018 {

1021 }

1022

1023

1027}

1028

1029

1030

1031

1032bool

1037

1038

1039

1040

1041

1042

1043

1044

1045void

1052

1053

1054

1055

1056void

1058{

1061

1063

1064

1065

1066

1067

1068

1069

1070

1072

1073

1074

1075

1076

1077

1080 "ProcessParallelMessages",

1082 else

1084

1086

1087

1089

1091 {

1093 int i;

1094

1097 continue;

1098

1100 {

1101

1102

1103

1104

1105

1106

1108 {

1110 Size nbytes;

1112

1114 &data, true);

1116 break;

1118 {

1120

1125 }

1126 else

1129 errmsg("lost connection to parallel worker")));

1130 }

1131 }

1132 }

1133

1135

1136

1138

1140}

1141

1142

1143

1144

1145static void

1147{

1149

1152 {

1155 }

1156

1158

1160 {

1163 {

1166

1167

1169

1170

1172

1173

1174

1175

1176

1177

1178

1179

1180

1182 {

1183 if (edata.context)

1185 _("parallel worker"));

1186 else

1188 }

1189

1190

1191

1192

1193

1194

1197

1198

1200

1201

1203

1204 break;

1205 }

1206

1208 {

1209

1211 const char *channel;

1212 const char *payload;

1213

1218

1220

1221 break;

1222 }

1223

1225 {

1226

1227

1228

1229

1230

1233

1235

1237

1238 break;

1239 }

1240

1242 {

1245 break;

1246 }

1247

1248 default:

1249 {

1250 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",

1252 }

1253 }

1254}

1255

1256

1257

1258

1259

1260

1261

1262void

1264{

1266 {

1268

1271 break;

1273 elog(WARNING, "leaked parallel context");

1275 }

1276}

1277

1278

1279

1280

1281

1282

1283void

1285{

1287 {

1289

1292 elog(WARNING, "leaked parallel context");

1294 }

1295}

1296

1297

1298

1299

1300void

1302{

1311 char *library_name;

1312 char *function_name;

1327

1328

1330

1331

1333

1334

1337

1338

1340 "Parallel worker",

1342

1343

1344

1345

1346

1347

1348

1349

1350

1351

1353 if (seg == NULL)

1356 errmsg("could not map dynamic shared memory segment")));

1358 if (toc == NULL)

1361 errmsg("invalid magic number in dynamic shared memory segment")));

1362

1363

1366

1367

1371

1372

1373

1374

1375

1376

1377

1385 fps->parallel_leader_proc_number);

1386

1387

1388

1389

1390

1391

1392

1393

1394

1395

1396

1397

1398

1399

1400

1402 fps->parallel_leader_pid))

1403 return;

1404

1405

1406

1407

1408

1409

1411

1412

1413

1414

1415

1416

1420

1422

1423

1424

1425

1426

1427

1428

1431 fps->session_user_is_superuser);

1433

1434

1435

1436

1437

1438

1439

1441 fps->authenticated_user_id,

1444

1445

1446

1447

1448

1449

1450

1451

1454

1455

1456

1457

1458

1459

1464

1465

1468

1469

1470

1471

1472

1473

1474

1476 false);

1484

1485

1489

1490

1491

1492

1493

1494

1495

1496

1497

1498

1499

1500

1501

1507 fps->parallel_leader_pgproc);

1509

1510

1511

1512

1513

1515

1516

1517

1518

1519

1520

1521

1522

1525

1526

1527

1528

1529

1530

1531

1532

1533

1535

1536

1538 fps->temp_toast_namespace_id);

1539

1540

1542 false);

1544

1545

1547 false);

1549

1550

1551

1552

1553

1557

1558

1560

1561

1562

1563

1564

1567

1568

1569

1570

1572

1573

1575

1576

1578

1579

1581

1582

1584

1585

1587}

1588

1589

1590

1591

1592

1593void

1595{

1597

1600 if (fps->last_xlog_end < last_xlog_end)

1601 fps->last_xlog_end = last_xlog_end;

1603}

1604

1605

1606

1607

1608

1609

1610

1611

1612

1613

1614

1615

1616

1617

1618

1619

1620

1621static void

1630

1631

1632

1633

1634

1635

1636

1637

1638

1639

1640

1641

1642

1643

1644

1645

1646

1647

1648

1651{

1652

1653

1654

1655

1657 {

1658 int i;

1659

1661 {

1664 }

1665

1666

1668 }

1669

1670

1673}

void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)

static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname)

#define PARALLEL_KEY_TRANSACTION_STATE

void HandleParallelMessageInterrupt(void)

bool InitializingParallelWorker

parallel_worker_main_type fn_addr

#define PARALLEL_KEY_UNCOMMITTEDENUMS

#define PARALLEL_KEY_TRANSACTION_SNAPSHOT

void ProcessParallelMessages(void)

void InitializeParallelDSM(ParallelContext *pcxt)

#define PARALLEL_KEY_CLIENTCONNINFO

static FixedParallelState * MyFixedParallelState

#define PARALLEL_KEY_PENDING_SYNCS

void WaitForParallelWorkersToFinish(ParallelContext *pcxt)

void LaunchParallelWorkers(ParallelContext *pcxt)

void ReinitializeParallelDSM(ParallelContext *pcxt)

void DestroyParallelContext(ParallelContext *pcxt)

#define PARALLEL_KEY_ACTIVE_SNAPSHOT

void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)

static void ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)

#define PARALLEL_KEY_ERROR_QUEUE

#define PARALLEL_KEY_SESSION_DSM

ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)

bool ParallelContextActive(void)

void ParallelWorkerMain(Datum main_arg)

static void WaitForParallelWorkersToExit(ParallelContext *pcxt)

static pid_t ParallelLeaderPid

#define PARALLEL_KEY_REINDEX_STATE

#define PARALLEL_KEY_LIBRARY

static void ParallelWorkerShutdown(int code, Datum arg)

static dlist_head pcxt_list

#define PARALLEL_KEY_FIXED

#define PARALLEL_KEY_ENTRYPOINT

volatile sig_atomic_t ParallelMessagePending

void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)

#define PARALLEL_KEY_COMBO_CID

static const struct @16 InternalParallelWorkers[]

void WaitForParallelWorkersToAttach(ParallelContext *pcxt)

#define PARALLEL_ERROR_QUEUE_SIZE

void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)

void AtEOXact_Parallel(bool isCommit)

#define PARALLEL_KEY_RELMAPPER_STATE

void pgstat_progress_incr_param(int index, int64 incr)

void TerminateBackgroundWorker(BackgroundWorkerHandle *handle)

BgwHandleStatus WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)

void BackgroundWorkerUnblockSignals(void)

void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)

BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)

bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)

#define BGW_NEVER_RESTART

#define BGWORKER_BYPASS_ROLELOGINCHECK

#define BGWORKER_CLASS_PARALLEL

@ BgWorkerStart_ConsistentState

#define BGWORKER_BACKEND_DATABASE_CONNECTION

#define BGWORKER_BYPASS_ALLOWCONN

#define BGWORKER_SHMEM_ACCESS

void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)

#define Assert(condition)

#define StaticAssertDecl(condition, errmessage)

void RestoreComboCIDState(char *comboCIDstate)

void SerializeComboCIDState(Size maxsize, char *start_address)

Size EstimateComboCIDStateSpace(void)

void RestoreLibraryState(char *start_address)

void SerializeLibraryState(Size maxsize, char *start_address)

Size EstimateLibraryStateSpace(void)

void * load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)

dsm_handle dsm_segment_handle(dsm_segment *seg)

void dsm_detach(dsm_segment *seg)

void * dsm_segment_address(dsm_segment *seg)

dsm_segment * dsm_create(Size size, int flags)

dsm_segment * dsm_attach(dsm_handle h)

#define DSM_CREATE_NULL_IF_MAXSEGMENTS

#define DSM_HANDLE_INVALID

ErrorContextCallback * error_context_stack

void ThrowErrorData(ErrorData *edata)

int errcode(int sqlerrcode)

int errhint(const char *fmt,...) pg_attribute_printf(1

#define ereport(elevel,...)

void ParallelQueryMain(dsm_segment *seg, shm_toc *toc)

#define palloc0_array(type, count)

#define palloc0_object(type)

void _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc)

ProcNumber ParallelLeaderProcNumber

volatile sig_atomic_t InterruptPending

void RestoreGUCState(void *gucstate)

void SerializeGUCState(Size maxsize, char *start_address)

Size EstimateGUCStateSpace(void)

bool current_role_is_superuser

const char * hba_authname(UserAuth auth_method)

#define dlist_foreach(iter, lhead)

#define dlist_head_element(type, membername, lhead)

static void dlist_delete(dlist_node *node)

static void dlist_push_head(dlist_head *head, dlist_node *node)

static bool dlist_is_empty(const dlist_head *head)

#define DLIST_STATIC_INIT(name)

#define dlist_container(type, membername, ptr)

void(* parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc)

void SerializeReindexState(Size maxsize, char *start_address)

void RestoreReindexState(const void *reindexstate)

Size EstimateReindexStateSpace(void)

void InvalidateSystemCaches(void)

void before_shmem_exit(pg_on_exit_callback function, Datum arg)

void SetLatch(Latch *latch)

void ResetLatch(Latch *latch)

int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)

#define pq_putmessage(msgtype, s, len)

int GetDatabaseEncoding(void)

int SetClientEncoding(int encoding)

void * MemoryContextAlloc(MemoryContext context, Size size)

void MemoryContextReset(MemoryContext context)

MemoryContext TopTransactionContext

char * pstrdup(const char *in)

void pfree(void *pointer)

MemoryContext TopMemoryContext

MemoryContext CurrentMemoryContext

#define AllocSetContextCreate

#define ALLOCSET_DEFAULT_SIZES

#define RESUME_INTERRUPTS()

#define INTERRUPTS_CAN_BE_PROCESSED()

#define CHECK_FOR_INTERRUPTS()

#define HOLD_INTERRUPTS()

void InitializeSystemUser(const char *authn_id, const char *auth_method)

void GetUserIdAndSecContext(Oid *userid, int *sec_context)

void SetSessionAuthorization(Oid userid, bool is_superuser)

bool GetSessionUserIsSuperuser(void)

Size EstimateClientConnectionInfoSpace(void)

Oid GetSessionUserId(void)

void SetCurrentRoleId(Oid roleid, bool is_superuser)

Oid GetAuthenticatedUserId(void)

ClientConnectionInfo MyClientConnectionInfo

void RestoreClientConnectionInfo(char *conninfo)

void SetAuthenticatedUserId(Oid userid)

Oid GetCurrentRoleId(void)

void SerializeClientConnectionInfo(Size maxsize PG_USED_FOR_ASSERTS_ONLY, char *start_address)

void SetUserIdAndSecContext(Oid userid, int sec_context)

void GetTempNamespaceState(Oid *tempNamespaceId, Oid *tempToastNamespaceId)

void SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId)

void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

void RestoreUncommittedEnums(void *space)

Size EstimateUncommittedEnumsSpace(void)

void SerializeUncommittedEnums(void *space, Size size)

static uint32 DatumGetUInt32(Datum X)

static Datum PointerGetDatum(const void *X)

static Pointer DatumGetPointer(Datum X)

static Datum UInt32GetDatum(uint32 X)

BackgroundWorker * MyBgworkerEntry

unsigned int pq_getmsgint(StringInfo msg, int b)

void pq_getmsgend(StringInfo msg)

void pq_endmessage(StringInfo buf)

int pq_getmsgbyte(StringInfo msg)

const char * pq_getmsgrawstring(StringInfo msg)

int64 pq_getmsgint64(StringInfo msg)

void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)

void pq_parse_errornotice(StringInfo msg, ErrorData *edata)

void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)

void AttachSerializableXact(SerializableXactHandle handle)

SerializableXactHandle ShareSerializableXact(void)

void * SerializableXactHandle

int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)

@ PROCSIG_PARALLEL_MESSAGE

#define PqMsg_NotificationResponse

#define PqMsg_ErrorResponse

#define PqMsg_NoticeResponse

char * psprintf(const char *fmt,...)

Size EstimateRelationMapSpace(void)

void SerializeRelationMap(Size maxSize, char *startAddress)

void RestoreRelationMap(char *startAddress)

void AttachSession(dsm_handle handle)

dsm_handle GetSessionDsmHandle(void)

shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)

void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)

shm_mq * shm_mq_create(void *address, Size size)

void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)

PGPROC * shm_mq_get_sender(shm_mq *mq)

void shm_mq_detach(shm_mq_handle *mqh)

void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)

shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)

shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)

void * shm_toc_allocate(shm_toc *toc, Size nbytes)

Size shm_toc_estimate(shm_toc_estimator *e)

shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)

void shm_toc_insert(shm_toc *toc, uint64 key, void *address)

void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)

shm_toc * shm_toc_attach(uint64 magic, void *address)

#define shm_toc_estimate_chunk(e, sz)

#define shm_toc_initialize_estimator(e)

#define shm_toc_estimate_keys(e, cnt)

Size mul_size(Size s1, Size s2)

void SerializeSnapshot(Snapshot snapshot, char *start_address)

Snapshot GetTransactionSnapshot(void)

void PushActiveSnapshot(Snapshot snapshot)

Snapshot RestoreSnapshot(char *start_address)

void RestoreTransactionSnapshot(Snapshot snapshot, PGPROC *source_pgproc)

void PopActiveSnapshot(void)

Size EstimateSnapshotSpace(Snapshot snapshot)

Snapshot GetActiveSnapshot(void)

static void SpinLockRelease(volatile slock_t *lock)

static void SpinLockAcquire(volatile slock_t *lock)

static void SpinLockInit(volatile slock_t *lock)

bool BecomeLockGroupMember(PGPROC *leader, int pid)

void BecomeLockGroupLeader(void)

void SerializePendingSyncs(Size maxSize, char *startAddress)

Size EstimatePendingSyncsSpace(void)

void RestorePendingSyncs(char *startAddress)

void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)

void initStringInfo(StringInfo str)

char bgw_function_name[BGW_MAXLEN]

char bgw_name[BGW_MAXLEN]

char bgw_type[BGW_MAXLEN]

BgWorkerStartTime bgw_start_time

char bgw_extra[BGW_EXTRALEN]

char bgw_library_name[MAXPGPATH]

Oid temp_toast_namespace_id

SerializableXactHandle serializable_xact_handle

PGPROC * parallel_leader_pgproc

bool session_user_is_superuser

pid_t parallel_leader_pid

Oid authenticated_user_id

ProcNumber parallel_leader_proc_number

bool * known_attached_workers

ErrorContextCallback * error_context_stack

shm_toc_estimator estimator

int nknown_attached_workers

ParallelWorkerInfo * worker

BackgroundWorkerHandle * bgwhandle

shm_mq_handle * error_mqh

void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)

#define WL_EXIT_ON_PM_DEATH

void SerializeTransactionState(Size maxsize, char *start_address)

void ExitParallelMode(void)

SubTransactionId GetCurrentSubTransactionId(void)

void EnterParallelMode(void)

Size EstimateTransactionStateSpace(void)

void StartTransactionCommand(void)

void StartParallelWorkerTransaction(char *tstatespace)

void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts)

bool IsInParallelMode(void)

TimestampTz GetCurrentStatementStartTimestamp(void)

TimestampTz GetCurrentTransactionStartTimestamp(void)

void EndParallelWorkerTransaction(void)

void CommitTransactionCommand(void)

#define IsolationUsesXactSnapshot()

XLogRecPtr XactLastRecEnd

#define InvalidXLogRecPtr