PostgreSQL Source Code: src/backend/access/transam/parallel.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
16
49
50
51
52
53
54
55
56
57#define PARALLEL_ERROR_QUEUE_SIZE 16384
58
59
60#define PARALLEL_MAGIC 0x50477c7c
61
62
63
64
65
66
67#define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
68#define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
69#define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
70#define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
71#define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
72#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
73#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
74#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
75#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
76#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
77#define PARALLEL_KEY_PENDING_SYNCS UINT64CONST(0xFFFFFFFFFFFF000B)
78#define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
79#define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D)
80#define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E)
81#define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F)
82
83
110
111
112
113
114
115
116
118
119
121
122
124
125
127
128
130
131
133
134
135
136
137
138static const struct
139{
143
144{
145 {
147 },
148 {
150 },
151 {
153 },
154 {
156 },
157 {
159 }
161
162
167
168
169
170
171
172
173
176 int nworkers)
177{
180
181
183
184
185 Assert(nworkers >= 0);
186
187
189
190
200
201
203
204 return pcxt;
205}
206
207
208
209
210
211
212void
214{
228 int i;
233
234
236
237
240
241
242
243
244
245
246
249
250
251
252
253
255 {
256
258
259
260
261
262
263
264
267 }
268
270 {
273 "parallel error queue size not buffer-aligned");
274
275
283 {
286 }
302
304
305
310
311
315 }
316
317
318
319
320
321
322
323
324
325
326
327
335 else
336 {
341 }
342
343
354 &fps->temp_toast_namespace_id);
355 fps->parallel_leader_pgproc = MyProc;
364
365
367 {
383
384
388
389
393
394
398
399
400
401
402
404 {
409 }
410
411
415
416
422
423
427
428
433
434
438
439
444
445
451
452
457
458
460
461
462
463
464
465
466
467
473 {
476
481 }
483
484
485
486
487
488
489
490
497 }
498
499
501
502
504}
505
506
507
508
509
510void
512{
515
516
518
519
521 {
526 {
530 }
531 }
532
533
536
537
539 {
541 int i;
542
546 {
549
554 }
555 }
556
557
559}
560
561
562
563
564
565
566
567void
569{
570
571
572
573
574
575
577}
578
579
580
581
582void
584{
587 int i;
589
590
592 return;
593
594
596
597
599
600
602
603
604 memset(&worker, 0, sizeof(worker));
617
618
619
620
621
622
623
624
625
627 {
632 {
636 }
637 else
638 {
639
640
641
642
643
644
645
646
647
652 }
653 }
654
655
656
657
658
660 {
663 }
664
665
667}
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701void
703{
704 int i;
705
706
708 return;
709
710 for (;;)
711 {
712
713
714
715
717
719 {
722 int rc;
724
726 continue;
727
728
729
730
731
733 {
736 continue;
737 }
738
741 {
742
745 {
746
749 }
750 }
752 {
753
754
755
756
761 errmsg("parallel worker failed to initialize"),
762 errhint("More details may be available in the server log.")));
763
766 }
767 else
768 {
769
770
771
772
773
774
778
781 }
782 }
783
784
786 {
788 break;
789 }
790 }
791}
792
793
794
795
796
797
798
799
800
801
802
803
804void
806{
807 for (;;)
808 {
811 int i;
812
813
814
815
816
817
819
821 {
822
823
824
825
826
827
831 {
833 break;
834 }
835 }
836
838 {
839
841 {
843 break;
844 }
845
846
847
848
849
850
851
853 {
856
857
858
859
860
861
866 continue;
867
868
869
870
871
872
873
874
875
880 errmsg("parallel worker failed to initialize"),
881 errhint("More details may be available in the server log.")));
882
883
884
885
886
887
888
889
890
891
892 }
893 }
894
898 }
899
901 {
903
907 }
908}
909
910
911
912
913
914
915
916
917
918static void
920{
921 int i;
922
923
925 {
927
929 continue;
930
932
933
934
935
936
937
938
942 errmsg("postmaster exited during a parallel transaction")));
943
944
947 }
948}
949
950
951
952
953
954
955
956
957
958void
960{
961 int i;
962
963
964
965
966
967
968
970
971
973 {
975 {
977 {
979
982 }
983 }
984 }
985
986
987
988
989
990
992 {
995 }
996
997
998
999
1000
1002 {
1005 }
1006
1007
1008
1009
1010
1011
1015
1016
1018 {
1021 }
1022
1023
1027}
1028
1029
1030
1031
1032bool
1037
1038
1039
1040
1041
1042
1043
1044
1045void
1052
1053
1054
1055
1056void
1058{
1061
1063
1064
1065
1066
1067
1068
1069
1070
1072
1073
1074
1075
1076
1077
1080 "ProcessParallelMessages",
1082 else
1084
1086
1087
1089
1091 {
1093 int i;
1094
1097 continue;
1098
1100 {
1101
1102
1103
1104
1105
1106
1108 {
1110 Size nbytes;
1112
1114 &data, true);
1116 break;
1118 {
1120
1125 }
1126 else
1129 errmsg("lost connection to parallel worker")));
1130 }
1131 }
1132 }
1133
1135
1136
1138
1140}
1141
1142
1143
1144
1145static void
1147{
1149
1152 {
1155 }
1156
1158
1160 {
1163 {
1166
1167
1169
1170
1172
1173
1174
1175
1176
1177
1178
1179
1180
1182 {
1183 if (edata.context)
1185 _("parallel worker"));
1186 else
1188 }
1189
1190
1191
1192
1193
1194
1197
1198
1200
1201
1203
1204 break;
1205 }
1206
1208 {
1209
1211 const char *channel;
1212 const char *payload;
1213
1218
1220
1221 break;
1222 }
1223
1225 {
1226
1227
1228
1229
1230
1233
1235
1237
1238 break;
1239 }
1240
1242 {
1245 break;
1246 }
1247
1248 default:
1249 {
1250 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
1252 }
1253 }
1254}
1255
1256
1257
1258
1259
1260
1261
1262void
1264{
1266 {
1268
1271 break;
1273 elog(WARNING, "leaked parallel context");
1275 }
1276}
1277
1278
1279
1280
1281
1282
1283void
1285{
1287 {
1289
1292 elog(WARNING, "leaked parallel context");
1294 }
1295}
1296
1297
1298
1299
1300void
1302{
1311 char *library_name;
1312 char *function_name;
1327
1328
1330
1331
1333
1334
1337
1338
1340 "Parallel worker",
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1353 if (seg == NULL)
1356 errmsg("could not map dynamic shared memory segment")));
1358 if (toc == NULL)
1361 errmsg("invalid magic number in dynamic shared memory segment")));
1362
1363
1366
1367
1371
1372
1373
1374
1375
1376
1377
1385 fps->parallel_leader_proc_number);
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1402 fps->parallel_leader_pid))
1403 return;
1404
1405
1406
1407
1408
1409
1411
1412
1413
1414
1415
1416
1420
1422
1423
1424
1425
1426
1427
1428
1431 fps->session_user_is_superuser);
1433
1434
1435
1436
1437
1438
1439
1441 fps->authenticated_user_id,
1444
1445
1446
1447
1448
1449
1450
1451
1454
1455
1456
1457
1458
1459
1464
1465
1468
1469
1470
1471
1472
1473
1474
1476 false);
1484
1485
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1507 fps->parallel_leader_pgproc);
1509
1510
1511
1512
1513
1515
1516
1517
1518
1519
1520
1521
1522
1525
1526
1527
1528
1529
1530
1531
1532
1533
1535
1536
1538 fps->temp_toast_namespace_id);
1539
1540
1542 false);
1544
1545
1547 false);
1549
1550
1551
1552
1553
1557
1558
1560
1561
1562
1563
1564
1567
1568
1569
1570
1572
1573
1575
1576
1578
1579
1581
1582
1584
1585
1587}
1588
1589
1590
1591
1592
1593void
1595{
1597
1600 if (fps->last_xlog_end < last_xlog_end)
1601 fps->last_xlog_end = last_xlog_end;
1603}
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621static void
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1651{
1652
1653
1654
1655
1657 {
1658 int i;
1659
1661 {
1664 }
1665
1666
1668 }
1669
1670
1673}
void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
#define PARALLEL_KEY_TRANSACTION_STATE
void HandleParallelMessageInterrupt(void)
bool InitializingParallelWorker
parallel_worker_main_type fn_addr
#define PARALLEL_KEY_UNCOMMITTEDENUMS
#define PARALLEL_KEY_TRANSACTION_SNAPSHOT
void ProcessParallelMessages(void)
void InitializeParallelDSM(ParallelContext *pcxt)
#define PARALLEL_KEY_CLIENTCONNINFO
static FixedParallelState * MyFixedParallelState
#define PARALLEL_KEY_PENDING_SYNCS
void WaitForParallelWorkersToFinish(ParallelContext *pcxt)
void LaunchParallelWorkers(ParallelContext *pcxt)
void ReinitializeParallelDSM(ParallelContext *pcxt)
void DestroyParallelContext(ParallelContext *pcxt)
#define PARALLEL_KEY_ACTIVE_SNAPSHOT
void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
static void ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
#define PARALLEL_KEY_ERROR_QUEUE
#define PARALLEL_KEY_SESSION_DSM
ParallelContext * CreateParallelContext(const char *library_name, const char *function_name, int nworkers)
bool ParallelContextActive(void)
void ParallelWorkerMain(Datum main_arg)
static void WaitForParallelWorkersToExit(ParallelContext *pcxt)
static pid_t ParallelLeaderPid
#define PARALLEL_KEY_REINDEX_STATE
#define PARALLEL_KEY_LIBRARY
static void ParallelWorkerShutdown(int code, Datum arg)
static dlist_head pcxt_list
#define PARALLEL_KEY_FIXED
#define PARALLEL_KEY_ENTRYPOINT
volatile sig_atomic_t ParallelMessagePending
void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
#define PARALLEL_KEY_COMBO_CID
static const struct @16 InternalParallelWorkers[]
void WaitForParallelWorkersToAttach(ParallelContext *pcxt)
#define PARALLEL_ERROR_QUEUE_SIZE
void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
void AtEOXact_Parallel(bool isCommit)
#define PARALLEL_KEY_RELMAPPER_STATE
void pgstat_progress_incr_param(int index, int64 incr)
void TerminateBackgroundWorker(BackgroundWorkerHandle *handle)
BgwHandleStatus WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
void BackgroundWorkerUnblockSignals(void)
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
#define BGW_NEVER_RESTART
#define BGWORKER_BYPASS_ROLELOGINCHECK
#define BGWORKER_CLASS_PARALLEL
@ BgWorkerStart_ConsistentState
#define BGWORKER_BACKEND_DATABASE_CONNECTION
#define BGWORKER_BYPASS_ALLOWCONN
#define BGWORKER_SHMEM_ACCESS
void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
#define Assert(condition)
#define StaticAssertDecl(condition, errmessage)
void RestoreComboCIDState(char *comboCIDstate)
void SerializeComboCIDState(Size maxsize, char *start_address)
Size EstimateComboCIDStateSpace(void)
void RestoreLibraryState(char *start_address)
void SerializeLibraryState(Size maxsize, char *start_address)
Size EstimateLibraryStateSpace(void)
void * load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)
dsm_handle dsm_segment_handle(dsm_segment *seg)
void dsm_detach(dsm_segment *seg)
void * dsm_segment_address(dsm_segment *seg)
dsm_segment * dsm_create(Size size, int flags)
dsm_segment * dsm_attach(dsm_handle h)
#define DSM_CREATE_NULL_IF_MAXSEGMENTS
#define DSM_HANDLE_INVALID
ErrorContextCallback * error_context_stack
void ThrowErrorData(ErrorData *edata)
int errcode(int sqlerrcode)
int errhint(const char *fmt,...) pg_attribute_printf(1
#define ereport(elevel,...)
void ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
#define palloc0_array(type, count)
#define palloc0_object(type)
void _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
ProcNumber ParallelLeaderProcNumber
volatile sig_atomic_t InterruptPending
void RestoreGUCState(void *gucstate)
void SerializeGUCState(Size maxsize, char *start_address)
Size EstimateGUCStateSpace(void)
bool current_role_is_superuser
const char * hba_authname(UserAuth auth_method)
#define dlist_foreach(iter, lhead)
#define dlist_head_element(type, membername, lhead)
static void dlist_delete(dlist_node *node)
static void dlist_push_head(dlist_head *head, dlist_node *node)
static bool dlist_is_empty(const dlist_head *head)
#define DLIST_STATIC_INIT(name)
#define dlist_container(type, membername, ptr)
void(* parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc)
void SerializeReindexState(Size maxsize, char *start_address)
void RestoreReindexState(const void *reindexstate)
Size EstimateReindexStateSpace(void)
void InvalidateSystemCaches(void)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define pq_putmessage(msgtype, s, len)
int GetDatabaseEncoding(void)
int SetClientEncoding(int encoding)
void * MemoryContextAlloc(MemoryContext context, Size size)
void MemoryContextReset(MemoryContext context)
MemoryContext TopTransactionContext
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext TopMemoryContext
MemoryContext CurrentMemoryContext
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define INTERRUPTS_CAN_BE_PROCESSED()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
void InitializeSystemUser(const char *authn_id, const char *auth_method)
void GetUserIdAndSecContext(Oid *userid, int *sec_context)
void SetSessionAuthorization(Oid userid, bool is_superuser)
bool GetSessionUserIsSuperuser(void)
Size EstimateClientConnectionInfoSpace(void)
Oid GetSessionUserId(void)
void SetCurrentRoleId(Oid roleid, bool is_superuser)
Oid GetAuthenticatedUserId(void)
ClientConnectionInfo MyClientConnectionInfo
void RestoreClientConnectionInfo(char *conninfo)
void SetAuthenticatedUserId(Oid userid)
Oid GetCurrentRoleId(void)
void SerializeClientConnectionInfo(Size maxsize PG_USED_FOR_ASSERTS_ONLY, char *start_address)
void SetUserIdAndSecContext(Oid userid, int sec_context)
void GetTempNamespaceState(Oid *tempNamespaceId, Oid *tempToastNamespaceId)
void SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId)
void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
void RestoreUncommittedEnums(void *space)
Size EstimateUncommittedEnumsSpace(void)
void SerializeUncommittedEnums(void *space, Size size)
static uint32 DatumGetUInt32(Datum X)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static Datum UInt32GetDatum(uint32 X)
BackgroundWorker * MyBgworkerEntry
unsigned int pq_getmsgint(StringInfo msg, int b)
void pq_getmsgend(StringInfo msg)
void pq_endmessage(StringInfo buf)
int pq_getmsgbyte(StringInfo msg)
const char * pq_getmsgrawstring(StringInfo msg)
int64 pq_getmsgint64(StringInfo msg)
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
void AttachSerializableXact(SerializableXactHandle handle)
SerializableXactHandle ShareSerializableXact(void)
void * SerializableXactHandle
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
@ PROCSIG_PARALLEL_MESSAGE
#define PqMsg_NotificationResponse
#define PqMsg_ErrorResponse
#define PqMsg_NoticeResponse
char * psprintf(const char *fmt,...)
Size EstimateRelationMapSpace(void)
void SerializeRelationMap(Size maxSize, char *startAddress)
void RestoreRelationMap(char *startAddress)
void AttachSession(dsm_handle handle)
dsm_handle GetSessionDsmHandle(void)
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
PGPROC * shm_mq_get_sender(shm_mq *mq)
void shm_mq_detach(shm_mq_handle *mqh)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Size shm_toc_estimate(shm_toc_estimator *e)
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
shm_toc * shm_toc_attach(uint64 magic, void *address)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_initialize_estimator(e)
#define shm_toc_estimate_keys(e, cnt)
Size mul_size(Size s1, Size s2)
void SerializeSnapshot(Snapshot snapshot, char *start_address)
Snapshot GetTransactionSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
Snapshot RestoreSnapshot(char *start_address)
void RestoreTransactionSnapshot(Snapshot snapshot, PGPROC *source_pgproc)
void PopActiveSnapshot(void)
Size EstimateSnapshotSpace(Snapshot snapshot)
Snapshot GetActiveSnapshot(void)
static void SpinLockRelease(volatile slock_t *lock)
static void SpinLockAcquire(volatile slock_t *lock)
static void SpinLockInit(volatile slock_t *lock)
bool BecomeLockGroupMember(PGPROC *leader, int pid)
void BecomeLockGroupLeader(void)
void SerializePendingSyncs(Size maxSize, char *startAddress)
Size EstimatePendingSyncsSpace(void)
void RestorePendingSyncs(char *startAddress)
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
void initStringInfo(StringInfo str)
char bgw_function_name[BGW_MAXLEN]
char bgw_name[BGW_MAXLEN]
char bgw_type[BGW_MAXLEN]
BgWorkerStartTime bgw_start_time
char bgw_extra[BGW_EXTRALEN]
char bgw_library_name[MAXPGPATH]
Oid temp_toast_namespace_id
SerializableXactHandle serializable_xact_handle
PGPROC * parallel_leader_pgproc
bool session_user_is_superuser
pid_t parallel_leader_pid
Oid authenticated_user_id
ProcNumber parallel_leader_proc_number
bool * known_attached_workers
ErrorContextCallback * error_context_stack
shm_toc_estimator estimator
int nknown_attached_workers
ParallelWorkerInfo * worker
BackgroundWorkerHandle * bgwhandle
shm_mq_handle * error_mqh
void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
#define WL_EXIT_ON_PM_DEATH
void SerializeTransactionState(Size maxsize, char *start_address)
void ExitParallelMode(void)
SubTransactionId GetCurrentSubTransactionId(void)
void EnterParallelMode(void)
Size EstimateTransactionStateSpace(void)
void StartTransactionCommand(void)
void StartParallelWorkerTransaction(char *tstatespace)
void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts)
bool IsInParallelMode(void)
TimestampTz GetCurrentStatementStartTimestamp(void)
TimestampTz GetCurrentTransactionStartTimestamp(void)
void EndParallelWorkerTransaction(void)
void CommitTransactionCommand(void)
#define IsolationUsesXactSnapshot()
XLogRecPtr XactLastRecEnd
#define InvalidXLogRecPtr