PostgreSQL Source Code: src/bin/pg_dump/parallel.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

54

55#ifndef WIN32

58#include <signal.h>

60#include <fcntl.h>

61#endif

62

66#ifdef WIN32

68#endif

69

70

71#define PIPE_READ 0

72#define PIPE_WRITE 1

73

74#define NO_SLOT (-1)

75

76

84

85#define WORKER_IS_RUNNING(workerStatus) \

86 ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)

87

88

89

90

91

92

93

94

96{

98

99

102

104

105 int pipeRead;

109

110

111#ifdef WIN32

114#else

116#endif

117};

118

119#ifdef WIN32

120

121

122

123

124

125typedef struct

126{

127 ArchiveHandle *AH;

128 ParallelSlot *slot;

130

131

133#define piperead(a,b,c) recv(a,b,c,0)

134#define pipewrite(a,b,c) send(a,b,c,0)

135

136#else

137

138

139#define pgpipe(a) pipe(a)

140#define piperead(a,b,c) read(a,b,c)

141#define pipewrite(a,b,c) write(a,b,c)

142

143#endif

144

145

146

147

153

155

156

157

158

159

160

161

162

163

164

174

176

177#ifdef WIN32

179#endif

180

181

182

183

184

185

186#define write_stderr(str) \

187 do { \

188 const char *str_ = (str); \

189 int rc_; \

190 rc_ = write(fileno(stderr), str_, strlen(str_)); \

191 (void) rc_; \

192 } while (0)

193

194

195#ifdef WIN32

196

198

199

202#endif

203

204

223 bool do_wait, int *worker);

225 int worker, const char *str);

227

228#define messageStartsWith(msg, prefix) \

229 (strncmp(msg, prefix, strlen(prefix)) == 0)

230

231

232

233

234

235

236

237void

239{

240#ifdef WIN32

242 {

245

246

249

250

252 if (err != 0)

253 pg_fatal("%s() failed: error code %d", "WSAStartup", err);

254

256 }

257#endif

258}

259

260

261

262

263

264

267{

268 int i;

269

271 {

272#ifdef WIN32

274#else

276#endif

278 }

279

281}

282

283

284

285

286

287

288

289#ifdef WIN32

292{

293

294

295

296

297

300

303 else

305

306 if (id_return)

307 {

308

310 }

311 else

312 {

313

317 else

319 }

320

322}

323#endif

324

325

326

327

328

329void

335

336

337

338

339

340

341

342

343

344void

349

350

351

352

353

354static void

356{

358

359 if (si->pstate)

360 {

361

363

364 if (!slot)

365 {

366

367

368

369

371

372 if (si->AHX)

374 }

375 else

376 {

377

378

379

380

381

382

383

384

385 if (slot->AH)

387

388#ifdef WIN32

391#endif

392 }

393 }

394 else

395 {

396

397 if (si->AHX)

399 }

400}

401

402

403

404

405

406

407

408

409

410static void

412{

413 int i;

414

415

416

417

418

419

420

423

424

425

426

427#ifndef WIN32

428

430 {

432

433 if (pid != 0)

435 }

436#else

437

438

439

440

441

444 {

446 char errbuf[1];

447

450 }

452#endif

453

454

456}

457

458

459

460

461static void

463{

465 {

467 int j;

468

469#ifndef WIN32

470

471 int status;

472 pid_t pid = wait(&status);

473

474

476 {

478 if (slot->pid == pid)

479 {

480 slot->pid = 0;

481 break;

482 }

483 }

484#else

485

487 int nrun = 0;

490

492 {

494 {

497 }

498 }

503

504

506 {

508 if (slot->hThread == hThread)

509 {

510

513 break;

514 }

515 }

516#endif

517

518

522 }

523}

524

525

526

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555#ifndef WIN32

556

557

558

559

560static void

562{

563 int i;

564 char errbuf[1];

565

566

567

568

569

570

574

575

576

577

578

579

580

582 {

584 {

586

587 if (pid != 0)

589 }

590 }

591

592

593

594

595

598

599

600

601

602

604 {

606 {

609 }

611 }

612

613

614

615

616

618}

619

620

621

622

623static void

625{

626

627

628

629

631 {

633

637 }

638}

639

640#else

641

642

643

644

645

646

647

648

651{

652 int i;

653 char errbuf[1];

654

657 {

658

660

661

662

663

664

665

666

667

668

669

670

672 {

674 {

678

679

680

681

682

683

686

687 if (AH != NULL && AH->connCancel != NULL)

688 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));

689 }

690 }

691

692

693

694

695

698 errbuf, sizeof(errbuf));

699

701

702

703

704

705

706

707

709 {

712 }

714 }

715

716

718}

719

720

721

722

723static void

725{

727 {

729

731

733 }

734}

735

736#endif

737

738

739

740

741

742

743

744

745void

747{

749

750

751

752

753

754

755

757

758

759

760

761

762

763#ifdef WIN32

765#endif

766

767

769

771

774

775

778

779

780

781

782

783

784

785

786#ifndef WIN32

788#else

791#endif

792

793#ifdef WIN32

795#endif

796}

797

798

799

800

801

802

803

804static void

806{

807#ifdef WIN32

809#endif

810

812

813#ifdef WIN32

815#endif

816}

817

818

819

820

821

822

823

824static void

826{

827#ifdef WIN32

829#endif

830

831 slot->AH = AH;

832

833#ifdef WIN32

835#endif

836}

837

838

839

840

841

842

843

844static void

846{

848

849

852

853

854

855

856

857

858

859

860

861

863

864

866

867

868

869

871

872

873

874

876

877

878

879

883}

884

885

886

887

888#ifdef WIN32

891{

894

895

897

898

900

901

903 return 0;

904}

905#endif

906

907

908

909

910

911

914{

916 int i;

917

919

921

925

927 return pstate;

928

929

930 pstate->te =

934

935#ifdef WIN32

936

938#endif

939

940

941

942

943

944

945

947

948

949

950

951

952

953

954

956

957

959

960

962 {

963#ifdef WIN32

966#else

968#endif

972

973

975 pg_fatal("could not create communication channels: %m");

976

977

980

983

984#ifdef WIN32

985

987

988 wi->AH = AH;

989 wi->slot = slot;

990

992 wi, 0, &(slot->threadId));

993 slot->hThread = handle;

995#else

996 pid = fork();

997 if (pid == 0)

998 {

999

1000 int j;

1001

1002

1004

1005

1007

1008

1010

1012

1013

1014

1015

1016

1017 for (j = 0; j < i; j++)

1018 {

1021 }

1022

1023

1025

1026

1028 }

1029 else if (pid < 0)

1030 {

1031

1032 pg_fatal("could not create worker process: %m");

1033 }

1034

1035

1036 slot->pid = pid;

1038

1039

1041

1043#endif

1044 }

1045

1046

1047

1048

1049

1050

1051#ifndef WIN32

1053#endif

1054

1055

1056

1057

1059

1060

1061

1062

1063

1064

1065

1067

1068 return pstate;

1069}

1070

1071

1072

1073

1074void

1076{

1077 int i;

1078

1079

1081 return;

1082

1083

1085

1086

1088 {

1091 }

1092

1093

1095

1096

1097

1098

1099

1102

1103

1106 free(pstate);

1107}

1108

1109

1110

1111

1112

1113

1114

1115

1116

1117

1118

1119

1120

1121

1122

1123static void

1125 char *buf, int buflen)

1126{

1131 else

1133}

1134

1135

1136

1137

1138static void

1140 const char *msg)

1141{

1144

1146 {

1152 }

1154 {

1156 sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);

1160 }

1161 else

1162 pg_fatal("unrecognized command received from leader: \"%s\"",

1163 msg);

1164}

1165

1166

1167

1168

1169

1170

1171static void

1173 char *buf, int buflen)

1174{

1177 status,

1179}

1180

1181

1182

1183

1184

1185

1186static int

1188 const char *msg)

1189{

1192 n_errors;

1193 int status = 0;

1194

1196 {

1197 sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);

1198

1201

1203 }

1204 else

1205 pg_fatal("invalid message received from worker: \"%s\"",

1206 msg);

1207

1208 return status;

1209}

1210

1211

1212

1213

1214

1215

1216

1217

1218

1219

1220void

1226 void *callback_data)

1227{

1228 int worker;

1229 char buf[256];

1230

1231

1234

1235

1237

1239

1240

1244 pstate->te[worker] = te;

1245}

1246

1247

1248

1249

1250

1251static int

1253{

1254 int i;

1255

1257 {

1259 return i;

1260 }

1262}

1263

1264

1265

1266

1267static bool

1269{

1270 int i;

1271

1273 {

1275 return false;

1276 }

1277 return true;

1278}

1279

1280

1281

1282

1283bool

1285{

1286 int i;

1287

1289 {

1291 return false;

1292 }

1293 return true;

1294}

1295

1296

1297

1298

1299

1300

1301

1302

1303

1304

1305

1306

1307

1308

1309

1310

1311

1312

1313

1314

1315

1316static void

1318{

1322

1323

1325 return;

1326

1328

1330

1331 appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",

1333

1335

1337 pg_fatal("could not obtain lock on relation \"%s\"\n"

1338 "This usually means that someone requested an ACCESS EXCLUSIVE lock "

1339 "on the table after the pg_dump parent process had gotten the "

1340 "initial ACCESS SHARE lock on the table.", qualId);

1341

1344}

1345

1346

1347

1348

1349

1350

1351static void

1353{

1354 char *command;

1357 int status = 0;

1358 char buf[256];

1359

1360 for (;;)

1361 {

1363 {

1364

1365 return;

1366 }

1367

1368

1370

1372 {

1373

1375

1376

1378 }

1380 {

1381

1383 }

1384 else

1386

1387

1389

1391

1392

1393 free(command);

1394 }

1395}

1396

1397

1398

1399

1400

1401

1402

1403

1404

1405

1406

1407

1408

1409

1410

1411

1412

1413static bool

1415{

1416 int worker;

1417 char *msg;

1418

1419

1421

1422 if (!msg)

1423 {

1424

1426 pg_fatal("a worker process died unexpectedly");

1427 return false;

1428 }

1429

1430

1432 {

1435 int status;

1436

1440 pstate->te[worker] = NULL;

1441 }

1442 else

1443 pg_fatal("invalid message received from worker: \"%s\"",

1444 msg);

1445

1446

1448

1449 return true;

1450}

1451

1452

1453

1454

1455

1456

1457

1458

1459

1460

1461

1462

1463

1464

1465

1466void

1468{

1470

1471

1472

1473

1474

1475

1477 {

1478

1481 }

1482

1483 for (;;)

1484 {

1485

1486

1487

1488

1489

1491 {

1492

1493

1494

1495

1496

1498 return;

1499 }

1500

1501

1502 switch (mode)

1503 {

1505 return;

1507 Assert(false);

1508 break;

1511 return;

1512 break;

1515 return;

1516 break;

1517 }

1518

1519

1521 }

1522}

1523

1524

1525

1526

1527

1528

1529

1530

1531static char *

1536

1537

1538

1539

1540

1541

1542static void

1544{

1546

1548 pg_fatal("could not write to the communication channel: %m");

1549}

1550

1551

1552

1553

1554

1555static int

1557{

1558 int i;

1560

1561 for (;;)

1562 {

1565

1566#ifndef WIN32

1568 continue;

1569#else

1571 continue;

1572#endif

1573 break;

1574 }

1575

1576 return i;

1577}

1578

1579

1580

1581

1582

1583

1584

1585

1586

1587

1588

1589

1590

1591

1592

1593

1594static char *

1596{

1597 int i;

1600 struct timeval nowait = {0, 0};

1601

1602

1605 {

1607 continue;

1611 }

1612

1614 {

1617 }

1618 else

1619 {

1621 return NULL;

1622 }

1623

1624 if (i < 0)

1625 pg_fatal("%s() failed: %m", "select");

1626

1628 {

1629 char *msg;

1630

1632 continue;

1634 continue;

1635

1636

1637

1638

1639

1640

1641

1642

1643

1644

1645

1647 *worker = i;

1648 return msg;

1649 }

1651 return NULL;

1652}

1653

1654

1655

1656

1657

1658

1659static void

1661{

1663

1665 {

1666 pg_fatal("could not write to the communication channel: %m");

1667 }

1668}

1669

1670

1671

1672

1673

1674

1675

1676

1677static char *

1679{

1680 char *msg;

1683 int ret;

1684

1685

1686

1687

1688

1689

1690

1691

1692

1693

1694 bufsize = 64;

1697 for (;;)

1698 {

1701 if (ret <= 0)

1702 break;

1703

1705

1706 if (msg[msgsize] == '\0')

1707 return msg;

1708

1710 if (msgsize == bufsize)

1711 {

1712 bufsize += 16;

1714 }

1715 }

1716

1717

1719 return NULL;

1720}

1721

1722#ifdef WIN32

1723

1724

1725

1726

1727

1728

1729

1730

1731

1732

1733

1734static int

1736{

1741

1742

1744

1745

1746

1747

1749 {

1750 pg_log_error("pgpipe: could not create socket: error code %d",

1752 return -1;

1753 }

1754

1760 {

1761 pg_log_error("pgpipe: could not bind: error code %d",

1764 return -1;

1765 }

1767 {

1768 pg_log_error("pgpipe: could not listen: error code %d",

1771 return -1;

1772 }

1774 {

1775 pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",

1778 return -1;

1779 }

1780

1781

1782

1783

1785 {

1786 pg_log_error("pgpipe: could not create second socket: error code %d",

1789 return -1;

1790 }

1792

1794 {

1795 pg_log_error("pgpipe: could not connect socket: error code %d",

1800 return -1;

1801 }

1803 {

1804 pg_log_error("pgpipe: could not accept connection: error code %d",

1809 return -1;

1810 }

1812

1814 return 0;

1815}

1816

1817#endif

struct WorkerInfoData * WorkerInfo

void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)

static void sendMessageToLeader(int pipefd[2], const char *str)

static ParallelSlot * GetMyPSlot(ParallelState *pstate)

static void WaitForCommands(ArchiveHandle *AH, int pipefd[2])

void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)

static bool HasEveryWorkerTerminated(ParallelState *pstate)

void replace_on_exit_close_archive(Archive *AHX)

static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)

static void sigTermHandler(SIGNAL_ARGS)

ParallelState * ParallelBackupStart(ArchiveHandle *AH)

static char * readMessageFromPipe(int fd)

static int select_loop(int maxFd, fd_set *workerset)

static int parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, const char *msg)

static int GetIdleWorker(ParallelState *pstate)

static void set_cancel_pstate(ParallelState *pstate)

static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot)

static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)

static void buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, char *buf, int buflen)

static char * getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)

static void archive_close_connection(int code, void *arg)

static void sendMessageToWorker(ParallelState *pstate, int worker, const char *str)

static ShutdownInformation shutdown_info

void on_exit_close_archive(Archive *AHX)

void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)

#define WORKER_IS_RUNNING(workerStatus)

static char * getMessageFromLeader(int pipefd[2])

static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te)

#define piperead(a, b, c)

#define pipewrite(a, b, c)

void init_parallel_dump_utils(void)

static void set_cancel_handler(void)

static void buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, char *buf, int buflen)

static volatile DumpSignalInformation signal_info

bool IsEveryWorkerIdle(ParallelState *pstate)

#define write_stderr(str)

static void parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg)

#define messageStartsWith(msg, prefix)

static void ShutdownWorkersHard(ParallelState *pstate)

static void WaitForTerminatingWorkers(ParallelState *pstate)

void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)

void(* ParallelCompletionPtr)(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)

#define Assert(condition)

void err(int eval, const char *fmt,...)

PGcancel * PQgetCancel(PGconn *conn)

int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)

void PQfreeCancel(PGcancel *cancel)

PGresult * PQexec(PGconn *conn, const char *query)

void * pg_malloc(size_t size)

void * pg_realloc(void *ptr, size_t size)

#define pg_malloc_array(type, count)

#define pg_malloc_object(type)

#define pg_malloc0_array(type, count)

#define pg_log_error(...)

void DisconnectDatabase(Archive *AHX)

void DeCloneArchive(ArchiveHandle *AH)

ArchiveHandle * CloneArchive(ArchiveHandle *AH)

TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)

#define WORKER_IGNORED_ERRORS

void on_exit_nicely(on_exit_nicely_callback function, void *arg)

static PgChecksumMode mode

static char buf[DEFAULT_XLOG_SEG_SIZE]

PQExpBuffer createPQExpBuffer(void)

void resetPQExpBuffer(PQExpBuffer str)

void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)

void destroyPQExpBuffer(PQExpBuffer str)

PQExpBufferData * PQExpBuffer

static int fd(const char *x, int i)

const char * fmtQualifiedId(const char *schema, const char *id)

PQExpBuffer(* getLocalPQExpBuffer)(void)

ParallelCompletionPtr callback

T_WorkerStatus workerStatus

ParallelSlot * parallelSlot

WorkerJobDumpPtrType WorkerJobDumpPtr

PGcancel *volatile connCancel

WorkerJobRestorePtrType WorkerJobRestorePtr

SetupWorkerPtrType SetupWorkerPtr

static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)

#define bind(s, addr, addrlen)

#define socket(af, type, protocol)

#define accept(s, addr, addrlen)

#define connect(s, name, namelen)

#define listen(s, backlog)

#define select(n, r, w, e, timeout)