PostgreSQL Source Code: src/backend/replication/logical/applyparallelworker.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

159

174

175#define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067

176

177

178

179

180

181

182#define PARALLEL_APPLY_KEY_SHARED 1

183#define PARALLEL_APPLY_KEY_MQ 2

184#define PARALLEL_APPLY_KEY_ERROR_QUEUE 3

185

186

187#define DSM_QUEUE_SIZE (16 * 1024 * 1024)

188

189

190

191

192

193

194

195#define DSM_ERROR_QUEUE_SIZE (16 * 1024)

196

197

198

199

200

201

202

203#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))

204

205

206

207

208

209#define PARALLEL_APPLY_LOCK_STREAM 0

210#define PARALLEL_APPLY_LOCK_XACT 1

211

212

213

214

216{

220

221

222

223

224

226

227

228

229

230

231

232

233

235

236

237

238

240

241

242

243

244

246

247

248

249

250

251

253

254

256

260

261

262

263

264static bool

266{

267

269 return false;

270

271

272

273

274

275

276

277

278

279

281

282

283

284

285

286

288 return false;

289

290

291

292

293

294

295

296

297

298

299

300

301

303 return false;

304

305

306

307

308

309

310

311

313 return false;

314

315 return true;

316}

317

318

319

320

321

322

323

324

325

326static bool

328{

330 Size segsize;

337

338

339

340

341

342

343

344

345

346

347

352

355

356

358 if (!seg)

359 return false;

360

362 segsize);

363

364

367

372

374

375

379

380

382

383

385 error_queue_size);

388

389

391

392

394 winfo->shared = shared;

395

396 return true;

397}

398

399

400

401

402

405{

407 bool launched;

410

411

413 {

415

417 return winfo;

418 }

419

420

421

422

423

424

425

427

429

430

432 {

435 return NULL;

436 }

437

445 false);

446

447 if (launched)

448 {

450 }

451 else

452 {

454 winfo = NULL;

455 }

456

458

459 return winfo;

460}

461

462

463

464

465

466

467

468

469

470void

472{

473 bool found;

476

478 return;

479

481 if (!winfo)

482 return;

483

484

486 {

488

493

495 16, &ctl,

497 }

498

499

501 if (found)

502 elog(ERROR, "hash table corrupted");

503

504

509

510 winfo->in_use = true;

512 entry->winfo = winfo;

513}

514

515

516

517

520{

521 bool found;

523

525 return NULL;

526

528 return NULL;

529

530

533

534

536 if (found)

537 {

538

540 return entry->winfo;

541 }

542

543 return NULL;

544}

545

546

547

548

549

550

551

552

553

554

555

556static void

558{

562

564 elog(ERROR, "hash table corrupted");

565

566

567

568

569

570

571

572

573

574

575

576

580 {

583

584 return;

585 }

586

587 winfo->in_use = false;

589}

590

591

592

593

594

595static void

597{

599

602

605

606

609

612

613

615

617}

618

619

620

621

622void

624{

626

628 {

630

632 {

635 }

636 }

637}

638

639

640

641

642static bool

644{

646

648

649 return (fileset_state != FS_EMPTY);

650}

651

652

653

654

655

656

657

658static bool

660{

662

664

665 if (fileset_state == FS_EMPTY)

666 return false;

667

668

669

670

671

672

673

674

675

676

677

678

680 {

683

685 }

686

687

688

689

690

691

692

693

695 {

697 }

698 else if (fileset_state == FS_READY)

699 {

704 }

705

706 return true;

707}

708

709

710

711

712static void

714{

716

718 {

720 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",

722

724 }

725

727 {

730 }

731}

732

733

734static void

736{

740

741

742

743

744

746 "ApplyMessageContext",

748

749

750

751

752

756

757 for (;;)

758 {

761

763

764

766

768

770 {

772 int c;

773

774 if (len == 0)

775 elog(ERROR, "invalid message length");

776

778

779

780

781

782

785 elog(ERROR, "unexpected message \"%c\"", c);

786

787

788

789

790

791

792

793

794

795

797

799 }

801 {

802

804 {

805 int rc;

806

807

810 1000L,

811 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);

812

815 }

816 }

817 else

818 {

820

822 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

823 errmsg("lost connection to the logical replication apply worker")));

824 }

825

828 }

829

830

832

834}

835

836

837

838

839

840

841

842

843

844static void

846{

850

852}

853

854

855

856

857void

859{

870

872

873

874

875

876

877

878

879

880

885

886

887

888

889

890

891

892

895 if (!seg)

897 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

898 errmsg("could not map dynamic shared memory segment")));

899

901 if (!toc)

903 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

904 errmsg("invalid magic number in dynamic shared memory segment")));

905

906

909

910

911

912

916

917

918

919

920

921

923

924

925

926

927

928

930

935

936

937

938

942

946

949

951

953

954

957 originname, sizeof(originname));

959

960

961

962

963

967

968

969

970

971

975

977

979

980

981

982

983

984

985

987}

988

989

990

991

992

993

994

995

996void

998{

1002}

1003

1004

1005

1006

1007

1008static void

1010{

1011 char msgtype;

1012

1014

1015 switch (msgtype)

1016 {

1018 {

1020

1021

1023

1024

1025

1026

1027

1028

1029

1032 _("logical replication parallel apply worker"));

1033 else

1034 edata.context = pstrdup(_("logical replication parallel apply worker"));

1035

1036

1037

1038

1039

1041

1042

1043

1044

1045

1047 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1048 errmsg("logical replication parallel apply worker exited due to error"),

1050 }

1051

1052

1053

1054

1055

1056

1059 break;

1060

1061 default:

1062 elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",

1063 msgtype, msg->len);

1064 }

1065}

1066

1067

1068

1069

1070void

1072{

1075

1077

1078

1079

1080

1081

1082

1083

1084

1086

1087

1088

1089

1090

1091

1092 if (!hpam_context)

1094 "ProcessParallelApplyMessages",

1096 else

1098

1100

1102

1104 {

1106 Size nbytes;

1109

1110

1111

1112

1113

1114

1115

1117 continue;

1118

1120

1122 continue;

1124 {

1126

1131 }

1132 else

1134 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1135 errmsg("lost connection to the logical replication parallel apply worker")));

1136 }

1137

1139

1140

1142

1144}

1145

1146

1147

1148

1149

1150

1151

1152

1153bool

1155{

1156 int rc;

1159

1162

1163

1164

1165

1166

1168 return false;

1169

1170

1171

1172

1173

1174

1175

1176#define SHM_SEND_RETRY_INTERVAL_MS 1000

1177#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)

1178

1179 for (;;)

1180 {

1182

1184 return true;

1187 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1188 errmsg("could not send data to shared-memory queue")));

1189

1191

1192

1196 WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);

1197

1199 {

1202 }

1203

1204 if (startTime == 0)

1208 return false;

1209 }

1210}

1211

1212

1213

1214

1215

1216

1217

1218void

1220 bool stream_locked)

1221{

1223 (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",

1225

1226

1227

1228

1229

1230

1232

1233

1235

1236

1237

1238

1239

1240

1241 if (!stream_locked)

1243

1245}

1246

1247

1248

1249

1250

1251static void

1254{

1255 for (;;)

1256 {

1257

1258

1259

1260

1262 break;

1263

1264

1267 10L,

1268 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);

1269

1270

1272

1273

1275 }

1276}

1277

1278

1279

1280

1281static void

1283{

1284

1285

1286

1287

1288

1289

1291

1292

1293

1294

1295

1296

1299

1300

1301

1302

1303

1304

1307 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

1308 errmsg("lost connection to the logical replication parallel apply worker")));

1309}

1310

1311

1312

1313

1314void

1317{

1321}

1322

1323

1324

1325

1328{

1330

1334

1335 return xact_state;

1336}

1337

1338

1339

1340

1341void

1343{

1345}

1346

1347

1348

1349

1350

1351

1352

1353

1354

1355static void

1357{

1358 snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);

1359}

1360

1361

1362

1363

1364

1365

1366

1367

1368

1369void

1371{

1372 if (current_xid != top_xid &&

1374 {

1377

1379 spname, sizeof(spname));

1380

1381 elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);

1382

1383

1385 {

1388

1391 }

1392

1394

1395

1396

1397

1398

1399

1401

1405 }

1406}

1407

1408

1409void

1411{

1412

1413

1414

1415

1417}

1418

1419

1420

1421

1422

1423void

1425{

1428

1429

1430

1431

1432

1435

1436

1437

1438

1439

1440 if (subxid == xid)

1441 {

1443

1444

1445

1446

1447

1448

1449

1450

1451

1452

1453

1455

1457

1459 {

1462 }

1463

1465

1467 }

1468 else

1469 {

1470

1471 int i;

1473

1475

1476 elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);

1477

1478

1479

1480

1481

1482

1483

1484

1486 {

1488

1489 if (xid_tmp == subxid)

1490 {

1494 break;

1495 }

1496 }

1497 }

1498}

1499

1500

1501

1502

1503

1504

1505void

1508{

1511

1513 {

1517 }

1518

1520}

1521

1522

1523

1524

1527{

1529

1531

1535

1536 return fileset_state;

1537}

1538

1539

1540

1541

1542

1543

1544

1545

1546

1547void

1549{

1552}

1553

1554void

1556{

1559}

1560

1561

1562

1563

1564

1565

1566

1567

1568

1569

1570

1571

1572

1573

1574

1575

1576

1577

1578

1579

1580void

1582{

1585}

1586

1587void

1589{

1592}

1593

1594

1595

1596

1597

1598void

1600{

1602

1603

1604

1605

1606

1608 {

1610 return;

1611

1612 elog(ERROR, "invalid pending streaming chunk 0");

1613 }

1614

1616 {

1619 }

1620}

1621

1622

1623

1624

1625void

1627{

1629

1630

1631

1632

1633

1635

1636

1637

1638

1639

1640

1642

1645

1647}

struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry

static ParallelApplyWorkerInfo * stream_apply_worker

static List * ParallelApplyWorkerPool

void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)

void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)

static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)

#define DSM_ERROR_QUEUE_SIZE

volatile sig_atomic_t ParallelApplyMessagePending

static bool pa_can_start(void)

void HandleParallelApplyMessageInterrupt(void)

void ProcessParallelApplyMessages(void)

#define SHM_SEND_TIMEOUT_MS

static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)

void pa_stream_abort(LogicalRepStreamAbortData *abort_data)

static void ProcessParallelApplyInterrupts(void)

static void ProcessParallelApplyMessage(StringInfo msg)

static PartialFileSetState pa_get_fileset_state(void)

static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)

#define PARALLEL_APPLY_LOCK_XACT

void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)

static List * subxactlist

static void pa_shutdown(int code, Datum arg)

void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)

void pa_reset_subtrans(void)

static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)

#define PARALLEL_APPLY_KEY_SHARED

void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)

ParallelApplyWorkerShared * MyParallelShared

void pa_detach_all_error_mq(void)

static bool pa_has_spooled_message_pending(void)

static void LogicalParallelApplyLoop(shm_mq_handle *mqh)

static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)

void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)

#define PARALLEL_APPLY_KEY_ERROR_QUEUE

void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)

static void pa_free_worker(ParallelApplyWorkerInfo *winfo)

void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)

#define PARALLEL_APPLY_KEY_MQ

static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)

#define SIZE_STATS_MESSAGE

#define SHM_SEND_RETRY_INTERVAL_MS

bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)

void pa_allocate_worker(TransactionId xid)

static bool pa_process_spooled_messages_if_required(void)

void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)

static HTAB * ParallelApplyTxnHash

#define PARALLEL_APPLY_LOCK_STREAM

ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)

void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)

static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)

void ParallelApplyWorkerMain(Datum main_arg)

#define PG_LOGICAL_APPLY_SHM_MAGIC

void pa_decr_and_wait_stream_block(void)

static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)

static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)

static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)

void stream_cleanup_files(Oid subid, TransactionId xid)

MemoryContext ApplyMessageContext

bool InitializingApplyWorker

void apply_dispatch(StringInfo s)

void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)

ErrorContextCallback * apply_error_context_stack

void stream_start_internal(TransactionId xid, bool first_segment)

void set_apply_error_context_origin(char *originname)

MemoryContext ApplyContext

void apply_error_callback(void *arg)

void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)

void maybe_reread_subscription(void)

void InitializeLogRepWorker(void)

void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)

Subscription * MySubscription

bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)

TimestampTz GetCurrentTimestamp(void)

void pgstat_report_activity(BackendState state, const char *cmd_str)

void BackgroundWorkerUnblockSignals(void)

#define MemSet(start, val, len)

dsm_handle dsm_segment_handle(dsm_segment *seg)

void dsm_detach(dsm_segment *seg)

void * dsm_segment_address(dsm_segment *seg)

dsm_segment * dsm_create(Size size, int flags)

dsm_segment * dsm_attach(dsm_handle h)

void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)

HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)

ErrorContextCallback * error_context_stack

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

#define palloc0_object(type)

volatile sig_atomic_t InterruptPending

void ProcessConfigFile(GucContext context)

Assert(PointerIsAligned(start, uint64))

void SignalHandlerForShutdownRequest(SIGNAL_ARGS)

volatile sig_atomic_t ShutdownRequestPending

volatile sig_atomic_t ConfigReloadPending

void SignalHandlerForConfigReload(SIGNAL_ARGS)

void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)

void before_shmem_exit(pg_on_exit_callback function, Datum arg)

void SetLatch(Latch *latch)

void ResetLatch(Latch *latch)

int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)

bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)

void logicalrep_worker_attach(int slot)

void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)

LogicalRepWorker * MyLogicalRepWorker

int max_parallel_apply_workers_per_subscription

List * list_delete_ptr(List *list, void *datum)

List * lappend(List *list, void *datum)

List * lappend_xid(List *list, TransactionId datum)

bool list_member_xid(const List *list, TransactionId datum)

List * list_truncate(List *list, int new_size)

void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)

void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)

#define AccessExclusiveLock

void MemoryContextReset(MemoryContext context)

MemoryContext TopTransactionContext

char * pstrdup(const char *in)

void pfree(void *pointer)

MemoryContext TopMemoryContext

MemoryContext CurrentMemoryContext

#define AllocSetContextCreate

#define ALLOCSET_DEFAULT_SIZES

#define RESUME_INTERRUPTS()

#define CHECK_FOR_INTERRUPTS()

#define HOLD_INTERRUPTS()

TimestampTz replorigin_session_origin_timestamp

RepOriginId replorigin_by_name(const char *roname, bool missing_ok)

RepOriginId replorigin_session_origin

void replorigin_session_setup(RepOriginId node, int acquired_by)

XLogRecPtr replorigin_session_origin_lsn

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

static int list_length(const List *l)

static ListCell * list_nth_cell(const List *list, int n)

static Datum PointerGetDatum(const void *X)

static Pointer DatumGetPointer(Datum X)

static int32 DatumGetInt32(Datum X)

BackgroundWorker * MyBgworkerEntry

int pq_getmsgbyte(StringInfo msg)

void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)

void pq_parse_errornotice(StringInfo msg, ErrorData *edata)

void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)

#define INVALID_PROC_NUMBER

int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)

@ PROCSIG_PARALLEL_APPLY_MESSAGE

#define PqReplMsg_WALData

#define PqMsg_NotificationResponse

#define PqMsg_ErrorResponse

#define PqMsg_NoticeResponse

char * psprintf(const char *fmt,...)

int debug_logical_replication_streaming

@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE

void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)

shm_mq * shm_mq_create(void *address, Size size)

void shm_mq_detach(shm_mq_handle *mqh)

void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)

shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)

shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)

shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)

void * shm_toc_allocate(shm_toc *toc, Size nbytes)

Size shm_toc_estimate(shm_toc_estimator *e)

shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)

void shm_toc_insert(shm_toc *toc, uint64 key, void *address)

void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)

shm_toc * shm_toc_attach(uint64 magic, void *address)

#define shm_toc_estimate_chunk(e, sz)

#define shm_toc_initialize_estimator(e)

#define shm_toc_estimate_keys(e, cnt)

#define SpinLockInit(lock)

#define SpinLockRelease(lock)

#define SpinLockAcquire(lock)

void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)

void initStringInfo(StringInfo str)

static void initReadOnlyStringInfo(StringInfo str, char *data, int len)

char bgw_extra[BGW_EXTRALEN]

struct ErrorContextCallback * previous

void(* callback)(void *arg)

TimestampTz last_recv_time

TimestampTz last_send_time

ParallelApplyWorkerInfo * winfo

shm_mq_handle * error_mq_handle

shm_mq_handle * mq_handle

ParallelApplyWorkerShared * shared

void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)

bool AllTablesyncsReady(void)

#define TransactionIdIsValid(xid)

#define WL_EXIT_ON_PM_DEATH

@ PARALLEL_TRANS_FINISHED

static bool am_parallel_apply_worker(void)

@ WORKERTYPE_PARALLEL_APPLY

@ FS_SERIALIZE_IN_PROGRESS

static bool am_leader_apply_worker(void)

void DefineSavepoint(const char *name)

bool IsTransactionState(void)

void StartTransactionCommand(void)

bool IsTransactionBlock(void)

void BeginTransactionBlock(void)

void CommitTransactionCommand(void)

void RollbackToSavepoint(const char *name)

bool EndTransactionBlock(bool chain)

void AbortCurrentTransaction(void)

#define XLogRecPtrIsValid(r)

#define InvalidXLogRecPtr