PostgreSQL Source Code: src/backend/replication/logical/applyparallelworker.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

159

174

175#define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067

176

177

178

179

180

181

182#define PARALLEL_APPLY_KEY_SHARED 1

183#define PARALLEL_APPLY_KEY_MQ 2

184#define PARALLEL_APPLY_KEY_ERROR_QUEUE 3

185

186

187#define DSM_QUEUE_SIZE (16 * 1024 * 1024)

188

189

190

191

192

193

194

195#define DSM_ERROR_QUEUE_SIZE (16 * 1024)

196

197

198

199

200

201

202

203#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))

204

205

206

207

208

209#define PARALLEL_APPLY_LOCK_STREAM 0

210#define PARALLEL_APPLY_LOCK_XACT 1

211

212

213

214

216{

220

221

222

223

224

226

227

228

229

230

231

232

233

235

236

237

238

240

241

242

243

244

246

247

248

249

250

251

253

254

256

260

261

262

263

264static bool

266{

267

269 return false;

270

271

272

273

274

275

276

277

278

279

281

282

283

284

285

286

288 return false;

289

290

291

292

293

294

295

296

297

298

299

300

301

303 return false;

304

305

306

307

308

309

310

311

313 return false;

314

315 return true;

316}

317

318

319

320

321

322

323

324

325

326static bool

328{

330 Size segsize;

337

338

339

340

341

342

343

344

345

346

347

352

355

356

358 if (!seg)

359 return false;

360

362 segsize);

363

364

367

372

374

375

379

380

382

383

385 error_queue_size);

388

389

391

392

394 winfo->shared = shared;

395

396 return true;

397}

398

399

400

401

402

405{

407 bool launched;

410

411

413 {

415

417 return winfo;

418 }

419

420

421

422

423

424

425

427

429

430

432 {

435 return NULL;

436 }

437

445

446 if (launched)

447 {

449 }

450 else

451 {

453 winfo = NULL;

454 }

455

457

458 return winfo;

459}

460

461

462

463

464

465

466

467

468

469void

471{

472 bool found;

475

477 return;

478

480 if (!winfo)

481 return;

482

483

485 {

487

492

494 16, &ctl,

496 }

497

498

500 if (found)

501 elog(ERROR, "hash table corrupted");

502

503

508

509 winfo->in_use = true;

511 entry->winfo = winfo;

512}

513

514

515

516

519{

520 bool found;

522

524 return NULL;

525

527 return NULL;

528

529

532

533

535 if (found)

536 {

537

539 return entry->winfo;

540 }

541

542 return NULL;

543}

544

545

546

547

548

549

550

551

552

553

554

555static void

557{

561

563 elog(ERROR, "hash table corrupted");

564

565

566

567

568

569

570

571

572

573

574

575

579 {

582

583 return;

584 }

585

586 winfo->in_use = false;

588}

589

590

591

592

593

594static void

596{

598

601

604

605

608

611

612

614

616}

617

618

619

620

621void

623{

625

627 {

629

631 {

634 }

635 }

636}

637

638

639

640

641static bool

643{

645

647

648 return (fileset_state != FS_EMPTY);

649}

650

651

652

653

654

655

656

657static bool

659{

661

663

664 if (fileset_state == FS_EMPTY)

665 return false;

666

667

668

669

670

671

672

673

674

675

676

677

679 {

682

684 }

685

686

687

688

689

690

691

692

694 {

696 }

697 else if (fileset_state == FS_READY)

698 {

703 }

704

705 return true;

706}

707

708

709

710

711static void

713{

715

717 {

719 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",

721

723 }

724

726 {

729 }

730}

731

732

733static void

735{

739

740

741

742

743

745 "ApplyMessageContext",

747

748

749

750

751

755

756 for (;;)

757 {

760

762

763

765

767

769 {

771 int c;

772

773 if (len == 0)

774 elog(ERROR, "invalid message length");

775

777

778

779

780

781

783 if (c != 'w')

784 elog(ERROR, "unexpected message \"%c\"", c);

785

786

787

788

789

790

791

792

793

794

796

798 }

800 {

801

803 {

804 int rc;

805

806

809 1000L,

810 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);

811

814 }

815 }

816 else

817 {

819

821 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

822 errmsg("lost connection to the logical replication apply worker")));

823 }

824

827 }

828

829

831

833}

834

835

836

837

838

839

840

841

842

843static void

845{

849

851}

852

853

854

855

856void

858{

869

871

872

877

878

879

880

881

882

883

884

887 if (!seg)

889 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

890 errmsg("could not map dynamic shared memory segment")));

891

893 if (!toc)

895 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

896 errmsg("invalid magic number in dynamic shared memory segment")));

897

898

901

902

903

904

908

909

910

911

912

913

915

916

917

918

919

920

922

927

928

929

930

934

938

941

943

945

946

949 originname, sizeof(originname));

951

952

953

954

955

959

960

961

962

963

967

969

971

972

973

974

975

976

977

979}

980

981

982

983

984

985

986

987

988void

990{

994}

995

996

997

998

999

1000static void

1002{

1003 char msgtype;

1004

1006

1007 switch (msgtype)

1008 {

1009 case 'E':

1010 {

1012

1013

1015

1016

1017

1018

1019

1020

1021

1024 _("logical replication parallel apply worker"));

1025 else

1026 edata.context = pstrdup(_("logical replication parallel apply worker"));

1027

1028

1029

1030

1031

1033

1034

1035

1036

1037

1039 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1040 errmsg("logical replication parallel apply worker exited due to error"),

1042 }

1043

1044

1045

1046

1047

1048

1049 case 'N':

1050 case 'A':

1051 break;

1052

1053 default:

1054 elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",

1055 msgtype, msg->len);

1056 }

1057}

1058

1059

1060

1061

1062void

1064{

1067

1069

1070

1071

1072

1073

1074

1075

1076

1078

1079

1080

1081

1082

1083

1084 if (!hpam_context)

1086 "ProcessParallelApplyMessages",

1088 else

1090

1092

1094

1096 {

1098 Size nbytes;

1101

1102

1103

1104

1105

1106

1107

1109 continue;

1110

1112

1114 continue;

1116 {

1118

1123 }

1124 else

1126 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1127 errmsg("lost connection to the logical replication parallel apply worker")));

1128 }

1129

1131

1132

1134

1136}

1137

1138

1139

1140

1141

1142

1143

1144

1145bool

1147{

1148 int rc;

1151

1154

1155

1156

1157

1158

1160 return false;

1161

1162

1163

1164

1165

1166

1167

1168#define SHM_SEND_RETRY_INTERVAL_MS 1000

1169#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)

1170

1171 for (;;)

1172 {

1174

1176 return true;

1179 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1180 errmsg("could not send data to shared-memory queue")));

1181

1183

1184

1188 WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);

1189

1191 {

1194 }

1195

1196 if (startTime == 0)

1200 return false;

1201 }

1202}

1203

1204

1205

1206

1207

1208

1209

1210void

1212 bool stream_locked)

1213{

1215 (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",

1217

1218

1219

1220

1221

1222

1224

1225

1227

1228

1229

1230

1231

1232

1233 if (!stream_locked)

1235

1237}

1238

1239

1240

1241

1242

1243static void

1246{

1247 for (;;)

1248 {

1249

1250

1251

1252

1254 break;

1255

1256

1259 10L,

1260 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);

1261

1262

1264

1265

1267 }

1268}

1269

1270

1271

1272

1273static void

1275{

1276

1277

1278

1279

1280

1281

1283

1284

1285

1286

1287

1288

1291

1292

1293

1294

1295

1296

1299 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1300 errmsg("lost connection to the logical replication parallel apply worker")));

1301}

1302

1303

1304

1305

1306void

1309{

1313}

1314

1315

1316

1317

1320{

1322

1326

1327 return xact_state;

1328}

1329

1330

1331

1332

1333void

1335{

1337}

1338

1339

1340

1341

1342

1343

1344

1345

1346

1347static void

1349{

1350 snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);

1351}

1352

1353

1354

1355

1356

1357

1358

1359

1360

1361void

1363{

1364 if (current_xid != top_xid &&

1366 {

1369

1371 spname, sizeof(spname));

1372

1373 elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);

1374

1375

1377 {

1380

1383 }

1384

1386

1387

1388

1389

1390

1391

1393

1397 }

1398}

1399

1400

1401void

1403{

1404

1405

1406

1407

1409}

1410

1411

1412

1413

1414

1415void

1417{

1420

1421

1422

1423

1424

1427

1428

1429

1430

1431

1432 if (subxid == xid)

1433 {

1435

1436

1437

1438

1439

1440

1441

1442

1443

1444

1445

1447

1449

1451 {

1454 }

1455

1457

1459 }

1460 else

1461 {

1462

1463 int i;

1465

1467

1468 elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);

1469

1470

1471

1472

1473

1474

1475

1476

1478 {

1480

1481 if (xid_tmp == subxid)

1482 {

1486 break;

1487 }

1488 }

1489 }

1490}

1491

1492

1493

1494

1495

1496

1497void

1500{

1503

1505 {

1509 }

1510

1512}

1513

1514

1515

1516

1519{

1521

1523

1527

1528 return fileset_state;

1529}

1530

1531

1532

1533

1534

1535

1536

1537

1538

1539void

1541{

1544}

1545

1546void

1548{

1551}

1552

1553

1554

1555

1556

1557

1558

1559

1560

1561

1562

1563

1564

1565

1566

1567

1568

1569

1570

1571

1572void

1574{

1577}

1578

1579void

1581{

1584}

1585

1586

1587

1588

1589

1590void

1592{

1594

1595

1596

1597

1598

1600 {

1602 return;

1603

1604 elog(ERROR, "invalid pending streaming chunk 0");

1605 }

1606

1608 {

1611 }

1612}

1613

1614

1615

1616

1617void

1619{

1621

1622

1623

1624

1625

1627

1628

1629

1630

1631

1632

1634

1637

1639}

struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry

static ParallelApplyWorkerInfo * stream_apply_worker

static List * ParallelApplyWorkerPool

void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)

void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)

static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)

#define DSM_ERROR_QUEUE_SIZE

volatile sig_atomic_t ParallelApplyMessagePending

static bool pa_can_start(void)

void HandleParallelApplyMessageInterrupt(void)

void ProcessParallelApplyMessages(void)

#define SHM_SEND_TIMEOUT_MS

static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)

void pa_stream_abort(LogicalRepStreamAbortData *abort_data)

static void ProcessParallelApplyInterrupts(void)

static void ProcessParallelApplyMessage(StringInfo msg)

static PartialFileSetState pa_get_fileset_state(void)

static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)

#define PARALLEL_APPLY_LOCK_XACT

void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)

static List * subxactlist

static bool pa_has_spooled_message_pending()

static void pa_shutdown(int code, Datum arg)

void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)

void pa_reset_subtrans(void)

static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)

#define PARALLEL_APPLY_KEY_SHARED

void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)

ParallelApplyWorkerShared * MyParallelShared

void pa_detach_all_error_mq(void)

static void LogicalParallelApplyLoop(shm_mq_handle *mqh)

static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)

void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)

#define PARALLEL_APPLY_KEY_ERROR_QUEUE

void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)

static void pa_free_worker(ParallelApplyWorkerInfo *winfo)

void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)

#define PARALLEL_APPLY_KEY_MQ

static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)

#define SIZE_STATS_MESSAGE

#define SHM_SEND_RETRY_INTERVAL_MS

bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)

void pa_allocate_worker(TransactionId xid)

static bool pa_process_spooled_messages_if_required(void)

void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)

static HTAB * ParallelApplyTxnHash

#define PARALLEL_APPLY_LOCK_STREAM

ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)

void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)

static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)

void ParallelApplyWorkerMain(Datum main_arg)

#define PG_LOGICAL_APPLY_SHM_MAGIC

void pa_decr_and_wait_stream_block(void)

static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)

static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)

static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)

void stream_cleanup_files(Oid subid, TransactionId xid)

MemoryContext ApplyMessageContext

bool InitializingApplyWorker

void apply_dispatch(StringInfo s)

void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)

ErrorContextCallback * apply_error_context_stack

void stream_start_internal(TransactionId xid, bool first_segment)

void set_apply_error_context_origin(char *originname)

MemoryContext ApplyContext

void apply_error_callback(void *arg)

void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)

void maybe_reread_subscription(void)

void InitializeLogRepWorker(void)

void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)

Subscription * MySubscription

bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)

TimestampTz GetCurrentTimestamp(void)

void pgstat_report_activity(BackendState state, const char *cmd_str)

void BackgroundWorkerUnblockSignals(void)

#define MemSet(start, val, len)

dsm_handle dsm_segment_handle(dsm_segment *seg)

void dsm_detach(dsm_segment *seg)

void * dsm_segment_address(dsm_segment *seg)

dsm_segment * dsm_create(Size size, int flags)

dsm_segment * dsm_attach(dsm_handle h)

void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)

HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)

ErrorContextCallback * error_context_stack

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

volatile sig_atomic_t InterruptPending

void ProcessConfigFile(GucContext context)

Assert(PointerIsAligned(start, uint64))

void SignalHandlerForShutdownRequest(SIGNAL_ARGS)

volatile sig_atomic_t ShutdownRequestPending

volatile sig_atomic_t ConfigReloadPending

void SignalHandlerForConfigReload(SIGNAL_ARGS)

void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)

void before_shmem_exit(pg_on_exit_callback function, Datum arg)

void SetLatch(Latch *latch)

void ResetLatch(Latch *latch)

int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)

bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)

void logicalrep_worker_attach(int slot)

void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)

LogicalRepWorker * MyLogicalRepWorker

int max_parallel_apply_workers_per_subscription

List * list_delete_ptr(List *list, void *datum)

List * lappend(List *list, void *datum)

List * lappend_xid(List *list, TransactionId datum)

bool list_member_xid(const List *list, TransactionId datum)

List * list_truncate(List *list, int new_size)

void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)

void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)

#define AccessExclusiveLock

void MemoryContextReset(MemoryContext context)

MemoryContext TopTransactionContext

char * pstrdup(const char *in)

void pfree(void *pointer)

void * palloc0(Size size)

MemoryContext TopMemoryContext

MemoryContext CurrentMemoryContext

#define AllocSetContextCreate

#define ALLOCSET_DEFAULT_SIZES

#define RESUME_INTERRUPTS()

#define CHECK_FOR_INTERRUPTS()

#define HOLD_INTERRUPTS()

TimestampTz replorigin_session_origin_timestamp

RepOriginId replorigin_by_name(const char *roname, bool missing_ok)

RepOriginId replorigin_session_origin

void replorigin_session_setup(RepOriginId node, int acquired_by)

XLogRecPtr replorigin_session_origin_lsn

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

static int list_length(const List *l)

static ListCell * list_nth_cell(const List *list, int n)

static Datum PointerGetDatum(const void *X)

static Pointer DatumGetPointer(Datum X)

static int32 DatumGetInt32(Datum X)

BackgroundWorker * MyBgworkerEntry

int pq_getmsgbyte(StringInfo msg)

void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)

void pq_parse_errornotice(StringInfo msg, ErrorData *edata)

void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)

#define INVALID_PROC_NUMBER

int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)

@ PROCSIG_PARALLEL_APPLY_MESSAGE

char * psprintf(const char *fmt,...)

int debug_logical_replication_streaming

@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE

void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)

shm_mq * shm_mq_create(void *address, Size size)

void shm_mq_detach(shm_mq_handle *mqh)

void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)

shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)

shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)

shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)

void * shm_toc_allocate(shm_toc *toc, Size nbytes)

Size shm_toc_estimate(shm_toc_estimator *e)

shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)

void shm_toc_insert(shm_toc *toc, uint64 key, void *address)

void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)

shm_toc * shm_toc_attach(uint64 magic, void *address)

#define shm_toc_estimate_chunk(e, sz)

#define shm_toc_initialize_estimator(e)

#define shm_toc_estimate_keys(e, cnt)

#define SpinLockInit(lock)

#define SpinLockRelease(lock)

#define SpinLockAcquire(lock)

void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)

void initStringInfo(StringInfo str)

static void initReadOnlyStringInfo(StringInfo str, char *data, int len)

char bgw_extra[BGW_EXTRALEN]

struct ErrorContextCallback * previous

void(* callback)(void *arg)

TimestampTz last_recv_time

TimestampTz last_send_time

ParallelApplyWorkerInfo * winfo

shm_mq_handle * error_mq_handle

shm_mq_handle * mq_handle

ParallelApplyWorkerShared * shared

bool AllTablesyncsReady(void)

void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)

#define TransactionIdIsValid(xid)

#define WL_EXIT_ON_PM_DEATH

@ PARALLEL_TRANS_FINISHED

static bool am_parallel_apply_worker(void)

@ WORKERTYPE_PARALLEL_APPLY

@ FS_SERIALIZE_IN_PROGRESS

static bool am_leader_apply_worker(void)

void DefineSavepoint(const char *name)

bool IsTransactionState(void)

void StartTransactionCommand(void)

bool IsTransactionBlock(void)

void BeginTransactionBlock(void)

void CommitTransactionCommand(void)

void RollbackToSavepoint(const char *name)

bool EndTransactionBlock(bool chain)

void AbortCurrentTransaction(void)

#define XLogRecPtrIsInvalid(r)

#define InvalidXLogRecPtr