PostgreSQL Source Code: src/backend/replication/logical/applyparallelworker.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

159

177

178#define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067

179

180

181

182

183

184

185#define PARALLEL_APPLY_KEY_SHARED 1

186#define PARALLEL_APPLY_KEY_MQ 2

187#define PARALLEL_APPLY_KEY_ERROR_QUEUE 3

188

189

190#define DSM_QUEUE_SIZE (16 * 1024 * 1024)

191

192

193

194

195

196

197

198#define DSM_ERROR_QUEUE_SIZE (16 * 1024)

199

200

201

202

203

204

205

206#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))

207

208

209

210

211

212#define PARALLEL_APPLY_LOCK_STREAM 0

213#define PARALLEL_APPLY_LOCK_XACT 1

214

215

216

217

223

224

225

226

227

229

230

231

232

233

234

235

236

238

239

240

241

243

244

245

246

247

249

250

251

252

253

254

256

257

259

263

264

265

266

267static bool

269{

270

272 return false;

273

274

275

276

277

278

279

280

281

282

284

285

286

287

288

289

291 return false;

292

293

294

295

296

297

298

299

300

301

302

303

304

306 return false;

307

308

309

310

311

312

313

314

316 return false;

317

318 return true;

319}

320

321

322

323

324

325

326

327

328

329static bool

331{

340

341

342

343

344

345

346

347

348

349

350

355

358

359

361 if (!seg)

362 return false;

363

366

367

370

375

377

378

382

383

385

386

391

392

394

395

397 winfo->shared = shared;

398

399 return true;

400}

401

402

403

404

405

408{

413

414

416 {

418

420 return winfo;

421 }

422

423

424

425

426

427

428

430

432

433

435 {

439 }

440

448 false);

449

451 {

453 }

454 else

455 {

457 winfo = NULL;

458 }

459

461

462 return winfo;

463}

464

465

466

467

468

469

470

471

472

473void

475{

476 bool found;

479

481 return;

482

484 if (!winfo)

485 return;

486

487

489 {

491

496

498 16, &ctl,

500 }

501

502

504 if (found)

505 elog(ERROR, "hash table corrupted");

506

507

512

513 winfo->in_use = true;

515 entry->winfo = winfo;

516}

517

518

519

520

523{

524 bool found;

526

529

532

533

536

537

539 if (found)

540 {

541

543 return entry->winfo;

544 }

545

547}

548

549

550

551

552

553

554

555

556

557

558

559static void

561{

565

567 elog(ERROR, "hash table corrupted");

568

569

570

571

572

573

574

575

576

577

578

579

583 {

586

587 return;

588 }

589

590 winfo->in_use = false;

592}

593

594

595

596

597

598static void

600{

602

605

608

609

612

615

616

618

620}

621

622

623

624

625void

627{

629

631 {

633

635 {

638 }

639 }

640}

641

642

643

644

645static bool

647{

649

651

652 return (fileset_state != FS_EMPTY);

653}

654

655

656

657

658

659

660

661static bool

663{

665

667

668 if (fileset_state == FS_EMPTY)

669 return false;

670

671

672

673

674

675

676

677

678

679

680

681

683 {

686

688 }

689

690

691

692

693

694

695

696

698 {

700 }

701 else if (fileset_state == FS_READY)

702 {

707 }

708

709 return true;

710}

711

712

713

714

715static void

717{

719

721 {

723 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",

725

727 }

728

730 {

733 }

734}

735

736

737static void

739{

743

744

745

746

747

749 "ApplyMessageContext",

751

752

753

754

755

759

760 for (;;)

761 {

764

766

767

769

771

773 {

775 int c;

776

777 if (len == 0)

778 elog(ERROR, "invalid message length");

779

781

782

783

784

785

788 elog(ERROR, "unexpected message \"%c\"", c);

789

790

791

792

793

794

795

796

797

798

800

802 }

804 {

805

807 {

808 int rc;

809

810

813 1000L,

815

818 }

819 }

820 else

821 {

823

826 errmsg("lost connection to the logical replication apply worker")));

827 }

828

831 }

832

833

835

837}

838

839

840

841

842

843

844

845

846

847static void

856

857

858

859

860void

862{

873

875

876

877

878

879

880

881

882

883

887

888

889

890

891

892

893

894

897 if (!seg)

900 errmsg("could not map dynamic shared memory segment")));

901

903 if (!toc)

906 errmsg("invalid magic number in dynamic shared memory segment")));

907

908

911

912

913

914

918

919

920

921

922

923

925

926

927

928

929

930

932

937

938

939

940

944

948

951

953

955

956

961

962

963

964

965

969

970

971

972

973

977

979

981

982

983

984

985

986

987

989}

990

991

992

993

994

995

996

997

998void

1005

1006

1007

1008

1009

1010static void

1012{

1014

1016

1018 {

1020 {

1022

1023

1025

1026

1027

1028

1029

1030

1031

1032 if (edata.context)

1034 _("logical replication parallel apply worker"));

1035 else

1036 edata.context = pstrdup(_("logical replication parallel apply worker"));

1037

1038

1039

1040

1041

1043

1044

1045

1046

1047

1050 errmsg("logical replication parallel apply worker exited due to error"),

1052 }

1053

1054

1055

1056

1057

1058

1061 break;

1062

1063 default:

1064 elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",

1066 }

1067}

1068

1069

1070

1071

1072void

1074{

1077

1079

1080

1081

1082

1083

1084

1085

1086

1088

1089

1090

1091

1092

1093

1094 if (hpam\_context)

1096 "ProcessParallelApplyMessages",

1098 else

1100

1102

1104

1106 {

1108 Size nbytes;

1111

1112

1113

1114

1115

1116

1117

1119 continue;

1120

1122

1124 continue;

1126 {

1128

1133 }

1134 else

1137 errmsg("lost connection to the logical replication parallel apply worker")));

1138 }

1139

1141

1142

1144

1146}

1147

1148

1149

1150

1151

1152

1153

1154

1155bool

1157{

1158 int rc;

1161

1164

1165

1166

1167

1168

1170 return false;

1171

1172

1173

1174

1175

1176

1177

1178#define SHM_SEND_RETRY_INTERVAL_MS 1000

1179#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)

1180

1181 for (;;)

1182 {

1184

1186 return true;

1190 errmsg("could not send data to shared-memory queue")));

1191

1193

1194

1199

1201 {

1204 }

1205

1206 if (startTime == 0)

1210 return false;

1211 }

1212}

1213

1214

1215

1216

1217

1218

1219

1220void

1223{

1225 (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",

1227

1228

1229

1230

1231

1232

1234

1235

1237

1238

1239

1240

1241

1242

1245

1247}

1248

1249

1250

1251

1252

1253static void

1256{

1257 for (;;)

1258 {

1259

1260

1261

1262

1264 break;

1265

1266

1269 10L,

1271

1272

1274

1275

1277 }

1278}

1279

1280

1281

1282

1283static void

1285{

1286

1287

1288

1289

1290

1291

1293

1294

1295

1296

1297

1298

1301

1302

1303

1304

1305

1306

1310 errmsg("lost connection to the logical replication parallel apply worker")));

1311}

1312

1313

1314

1315

1316void

1319{

1321 wshared->xact_state = xact_state;

1323}

1324

1325

1326

1327

1330{

1332

1334 xact_state = wshared->xact_state;

1336

1337 return xact_state;

1338}

1339

1340

1341

1342

1343void

1348

1349

1350

1351

1352

1353

1354

1355

1356

1357static void

1362

1363

1364

1365

1366

1367

1368

1369

1370

1371void

1373{

1376 {

1379

1382

1383 elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);

1384

1385

1387 {

1390

1393 }

1394

1396

1397

1398

1399

1400

1401

1403

1407 }

1408}

1409

1410

1411void

1413{

1414

1415

1416

1417

1419}

1420

1421

1422

1423

1424

1425void

1427{

1430

1431

1432

1433

1434

1437

1438

1439

1440

1441

1442 if (subxid == xid)

1443 {

1445

1446

1447

1448

1449

1450

1451

1452

1453

1454

1455

1457

1459

1461 {

1464 }

1465

1467

1469 }

1470 else

1471 {

1472

1473 int i;

1475

1477

1478 elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);

1479

1480

1481

1482

1483

1484

1485

1486

1488 {

1490

1492 {

1496 break;

1497 }

1498 }

1499 }

1500}

1501

1502

1503

1504

1505

1506

1507void

1510{

1512 wshared->fileset_state = fileset_state;

1513

1515 {

1519 }

1520

1522}

1523

1524

1525

1526

1529{

1531

1533

1537

1538 return fileset_state;

1539}

1540

1541

1542

1543

1544

1545

1546

1547

1548

1549void

1555

1556void

1562

1563

1564

1565

1566

1567

1568

1569

1570

1571

1572

1573

1574

1575

1576

1577

1578

1579

1580

1581

1582void

1588

1589void

1595

1596

1597

1598

1599

1600void

1602{

1604

1605

1606

1607

1608

1610 {

1612 return;

1613

1614 elog(ERROR, "invalid pending streaming chunk 0");

1615 }

1616

1618 {

1621 }

1622}

1623

1624

1625

1626

1627void

1629{

1631

1632

1633

1634

1635

1637

1638

1639

1640

1641

1642

1644

1647

1649}

static ParallelApplyWorkerInfo * stream_apply_worker

static List * ParallelApplyWorkerPool

void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)

void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)

static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)

#define DSM_ERROR_QUEUE_SIZE

volatile sig_atomic_t ParallelApplyMessagePending

static bool pa_can_start(void)

void HandleParallelApplyMessageInterrupt(void)

void ProcessParallelApplyMessages(void)

#define SHM_SEND_TIMEOUT_MS

static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)

void pa_stream_abort(LogicalRepStreamAbortData *abort_data)

static void ProcessParallelApplyInterrupts(void)

static void ProcessParallelApplyMessage(StringInfo msg)

static PartialFileSetState pa_get_fileset_state(void)

static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)

#define PARALLEL_APPLY_LOCK_XACT

void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)

static List * subxactlist

static void pa_shutdown(int code, Datum arg)

void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)

void pa_reset_subtrans(void)

static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)

#define PARALLEL_APPLY_KEY_SHARED

void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)

ParallelApplyWorkerShared * MyParallelShared

void pa_detach_all_error_mq(void)

static bool pa_has_spooled_message_pending(void)

static void LogicalParallelApplyLoop(shm_mq_handle *mqh)

static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)

void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)

#define PARALLEL_APPLY_KEY_ERROR_QUEUE

void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)

static void pa_free_worker(ParallelApplyWorkerInfo *winfo)

void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)

#define PARALLEL_APPLY_KEY_MQ

static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)

#define SIZE_STATS_MESSAGE

#define SHM_SEND_RETRY_INTERVAL_MS

bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)

void pa_allocate_worker(TransactionId xid)

static bool pa_process_spooled_messages_if_required(void)

void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)

static HTAB * ParallelApplyTxnHash

#define PARALLEL_APPLY_LOCK_STREAM

ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)

void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)

static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)

void ParallelApplyWorkerMain(Datum main_arg)

#define PG_LOGICAL_APPLY_SHM_MAGIC

void pa_decr_and_wait_stream_block(void)

static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)

static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)

static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)

void stream_cleanup_files(Oid subid, TransactionId xid)

MemoryContext ApplyMessageContext

bool InitializingApplyWorker

void apply_dispatch(StringInfo s)

void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)

ErrorContextCallback * apply_error_context_stack

void stream_start_internal(TransactionId xid, bool first_segment)

void set_apply_error_context_origin(char *originname)

MemoryContext ApplyContext

void apply_error_callback(void *arg)

void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)

void maybe_reread_subscription(void)

void InitializeLogRepWorker(void)

void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)

Subscription * MySubscription

bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)

TimestampTz GetCurrentTimestamp(void)

void pgstat_report_activity(BackendState state, const char *cmd_str)

void BackgroundWorkerUnblockSignals(void)

#define Assert(condition)

#define MemSet(start, val, len)

dsm_handle dsm_segment_handle(dsm_segment *seg)

void dsm_detach(dsm_segment *seg)

void * dsm_segment_address(dsm_segment *seg)

dsm_segment * dsm_create(Size size, int flags)

dsm_segment * dsm_attach(dsm_handle h)

void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)

HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)

ErrorContextCallback * error_context_stack

int errcode(int sqlerrcode)

#define ereport(elevel,...)

#define palloc0_object(type)

volatile sig_atomic_t InterruptPending

void ProcessConfigFile(GucContext context)

void SignalHandlerForShutdownRequest(SIGNAL_ARGS)

volatile sig_atomic_t ShutdownRequestPending

volatile sig_atomic_t ConfigReloadPending

void SignalHandlerForConfigReload(SIGNAL_ARGS)

void CacheRegisterSyscacheCallback(SysCacheIdentifier cacheid, SyscacheCallbackFunction func, Datum arg)

void before_shmem_exit(pg_on_exit_callback function, Datum arg)

void SetLatch(Latch *latch)

void ResetLatch(Latch *latch)

int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)

bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)

void logicalrep_worker_attach(int slot)

void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)

LogicalRepWorker * MyLogicalRepWorker

int max_parallel_apply_workers_per_subscription

List * list_delete_ptr(List *list, void *datum)

List * lappend(List *list, void *datum)

List * lappend_xid(List *list, TransactionId datum)

bool list_member_xid(const List *list, TransactionId datum)

List * list_truncate(List *list, int new_size)

void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)

void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)

#define AccessExclusiveLock

void MemoryContextReset(MemoryContext context)

MemoryContext TopTransactionContext

char * pstrdup(const char *in)

void pfree(void *pointer)

MemoryContext TopMemoryContext

MemoryContext CurrentMemoryContext

#define AllocSetContextCreate

#define ALLOCSET_DEFAULT_SIZES

#define RESUME_INTERRUPTS()

#define CHECK_FOR_INTERRUPTS()

#define HOLD_INTERRUPTS()

ReplOriginXactState replorigin_xact_state

ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)

void replorigin_session_setup(ReplOriginId node, int acquired_by)

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

static int list_length(const List *l)

static ListCell * list_nth_cell(const List *list, int n)

static Datum PointerGetDatum(const void *X)

static Pointer DatumGetPointer(Datum X)

static int32 DatumGetInt32(Datum X)

BackgroundWorker * MyBgworkerEntry

int pq_getmsgbyte(StringInfo msg)

void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)

void pq_parse_errornotice(StringInfo msg, ErrorData *edata)

void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)

#define INVALID_PROC_NUMBER

int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)

@ PROCSIG_PARALLEL_APPLY_MESSAGE

#define PqReplMsg_WALData

#define PqMsg_NotificationResponse

#define PqMsg_ErrorResponse

#define PqMsg_NoticeResponse

char * psprintf(const char *fmt,...)

int debug_logical_replication_streaming

@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE

void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)

shm_mq * shm_mq_create(void *address, Size size)

void shm_mq_detach(shm_mq_handle *mqh)

void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)

shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)

shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)

shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)

void * shm_toc_allocate(shm_toc *toc, Size nbytes)

Size shm_toc_estimate(shm_toc_estimator *e)

shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)

void shm_toc_insert(shm_toc *toc, uint64 key, void *address)

void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)

shm_toc * shm_toc_attach(uint64 magic, void *address)

#define shm_toc_estimate_chunk(e, sz)

#define shm_toc_initialize_estimator(e)

#define shm_toc_estimate_keys(e, cnt)

static void SpinLockRelease(volatile slock_t *lock)

static void SpinLockAcquire(volatile slock_t *lock)

static void SpinLockInit(volatile slock_t *lock)

void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)

void initStringInfo(StringInfo str)

static void initReadOnlyStringInfo(StringInfo str, char *data, int len)

char bgw_extra[BGW_EXTRALEN]

struct ErrorContextCallback * previous

void(* callback)(void *arg)

TimestampTz last_recv_time

TimestampTz last_send_time

ParallelApplyWorkerInfo * winfo

shm_mq_handle * error_mq_handle

shm_mq_handle * mq_handle

ParallelApplyWorkerShared * shared

TimestampTz origin_timestamp

void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)

bool AllTablesyncsReady(void)

#define TransactionIdIsValid(xid)

#define WL_EXIT_ON_PM_DEATH

@ PARALLEL_TRANS_FINISHED

static bool am_parallel_apply_worker(void)

@ WORKERTYPE_PARALLEL_APPLY

@ FS_SERIALIZE_IN_PROGRESS

static bool am_leader_apply_worker(void)

void DefineSavepoint(const char *name)

bool IsTransactionState(void)

void StartTransactionCommand(void)

bool IsTransactionBlock(void)

void BeginTransactionBlock(void)

void CommitTransactionCommand(void)

void RollbackToSavepoint(const char *name)

bool EndTransactionBlock(bool chain)

void AbortCurrentTransaction(void)

#define XLogRecPtrIsValid(r)

#define InvalidXLogRecPtr