PostgreSQL Source Code: src/bin/pg_basebackup/streamutil.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

16

19

29

30#define ERRCODE_DUPLICATE_OBJECT "42710"

31

33

35

36

37#define MINIMUM_VERSION_FOR_SHOW_CMD 100000

38

39

40

41

42#define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000

43

53

54

55

56

57

58

61{

63 int argcount = 7;

64

65 int i;

68 const char *tmpparam;

69 bool need_password;

72 char *err_msg = NULL;

73

74

75

76

77

79

80

81

82

83

84 i = 0;

86 {

88 if (conn_opts == NULL)

90

91 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)

92 {

93 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')

94 argcount++;

95 }

96

99

100

101

102

103

105 values[i] = "replication";

106 i++;

107

108 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)

109 {

110 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')

111 {

114 i++;

115 }

116 }

117 }

118 else

119 {

124 i++;

125 }

126

128 values[i] = (dbname == NULL) ? "true" : "database";

129 i++;

130 keywords[i] = "fallback_application_name";

132 i++;

133

135 {

138 i++;

139 }

141 {

144 i++;

145 }

147 {

150 i++;

151 }

152

153

155

156 do

157 {

158

159 if (need_password)

160 {

163 need_password = false;

164 }

165

166

168 {

171 }

172 else

173 {

176 }

177

178

179

180

181

183

184

185

186

187

188 if (!tmpconn)

189 pg_fatal("could not connect to server");

190

191

195 {

197 need_password = true;

198 }

199 }

200 while (need_password);

201

203 {

209 return NULL;

210 }

211

212

216

217

218

219

220

221

222

224 {

226

229 {

230 pg_log_error("could not clear \"search_path\": %s",

234 exit(1);

235 }

237 }

238

239

240

241

242

244 if (!tmpparam)

245 {

246 pg_log_error("could not determine server setting for \"integer_datetimes\"");

248 exit(1);

249 }

250

251 if (strcmp(tmpparam, "on") != 0)

252 {

253 pg_log_error("\"integer_datetimes\" compile flag does not match server");

255 exit(1);

256 }

257

258

259

260

261

263 {

265 exit(1);

266 }

267

268 return tmpconn;

269}

270

271

272

273

274

275bool

277{

279 char xlog_unit[3];

280 int xlog_val,

281 multiplier = 1;

282

283

285

286

288 {

290 return true;

291 }

292

293 res = PQexec(conn, "SHOW wal_segment_size");

295 {

296 pg_log_error("could not send replication command \"%s\": %s",

298

300 return false;

301 }

303 {

304 pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",

306

308 return false;

309 }

310

311

312 if (sscanf(PQgetvalue(res, 0, 0), "%d%2s", &xlog_val, xlog_unit) != 2)

313 {

314 pg_log_error("WAL segment size could not be parsed");

316 return false;

317 }

318

320

321

322 if (strcmp(xlog_unit, "MB") == 0)

323 multiplier = 1024 * 1024;

324 else if (strcmp(xlog_unit, "GB") == 0)

325 multiplier = 1024 * 1024 * 1024;

326

327

328 WalSegSz = xlog_val * multiplier;

329

331 {

332 pg_log_error(ngettext("remote server reported invalid WAL segment size (%d byte)",

333 "remote server reported invalid WAL segment size (%d bytes)",

336 pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");

337 return false;

338 }

339

340 return true;

341}

342

343

344

345

346

347

348

349

350

351

352

353

354static bool

356{

359

360

362

363

365 return true;

366

367 res = PQexec(conn, "SHOW data_directory_mode");

369 {

370 pg_log_error("could not send replication command \"%s\": %s",

372

374 return false;

375 }

377 {

378 pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",

380

382 return false;

383 }

384

386 {

387 pg_log_error("group access flag could not be parsed: %s",

389

391 return false;

392 }

393

395

397 return true;

398}

399

400

401

402

403

404

405

406

407

408bool

411{

414 lo;

415

416

418

419 res = PQexec(conn, "IDENTIFY_SYSTEM");

421 {

422 pg_log_error("could not send replication command \"%s\": %s",

424

426 return false;

427 }

429 {

430 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",

432

434 return false;

435 }

436

437

438 if (sysid != NULL)

440

441

442 if (starttli != NULL)

443 *starttli = atoi(PQgetvalue(res, 0, 1));

444

445

447 {

448 if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)

449 {

450 pg_log_error("could not parse write-ahead log location \"%s\"",

452

454 return false;

455 }

457 }

458

459

460 if (db_name != NULL)

461 {

462 *db_name = NULL;

464 {

466 {

467 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",

469

471 return false;

472 }

475 }

476 }

477

479 return true;

480}

481

482

483

484

485

486

487

488

489bool

492{

497

498 if (restart_lsn)

499 *restart_lsn = lsn_loc;

500 if (restart_tli)

501 *restart_tli = tli_loc;

502

507

509 {

510 pg_log_error("could not send replication command \"%s\": %s",

513 return false;

514 }

515

516

518 {

519 pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",

522 return false;

523 }

524

525

526

527

528

530 {

531 pg_log_error("replication slot \"%s\" does not exist", slot_name);

533 return false;

534 }

535

536

537

538

539

540 if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)

541 {

542 pg_log_error("expected a physical replication slot, got type \"%s\" instead",

545 return false;

546 }

547

548

550 {

552 lo;

553

554 if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)

555 {

556 pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"",

559 return false;

560 }

561 lsn_loc = ((uint64) hi) << 32 | lo;

562 }

563

564

567

569

570

571 if (restart_lsn)

572 *restart_lsn = lsn_loc;

573 if (restart_tli)

574 *restart_tli = tli_loc;

575

576 return true;

577}

578

579

580

581

582

583bool

585 bool is_temporary, bool is_physical, bool reserve_wal,

587{

591

593

595 (!is_physical && plugin != NULL));

598 Assert(slot_name != NULL);

599

600

601 appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);

602 if (is_temporary)

604 if (is_physical)

606 else

608

609

610 if (use_new_option_syntax)

612 if (is_physical)

613 {

614 if (reserve_wal)

616 "RESERVE_WAL");

617 }

618 else

619 {

622 "FAILOVER");

623

626 "TWO_PHASE");

627

629 {

630

631 if (use_new_option_syntax)

633 "SNAPSHOT", "nothing");

634 else

636 "NOEXPORT_SNAPSHOT");

637 }

638 }

639 if (use_new_option_syntax)

640 {

641

642 if (query->data[query->len - 1] == '(')

643 {

644 query->len -= 2;

645 query->data[query->len] = '\0';

646 }

647 else

649 }

650

651

654 {

656

658 sqlstate &&

660 {

663 return true;

664 }

665 else

666 {

667 pg_log_error("could not send replication command \"%s\": %s",

669

672 return false;

673 }

674 }

675

677 {

678 pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",

679 slot_name,

681

684 return false;

685 }

686

689 return true;

690}

691

692

693

694

695

696bool

698{

701

702 Assert(slot_name != NULL);

703

705

706

708 slot_name);

711 {

712 pg_log_error("could not send replication command \"%s\": %s",

714

717 return false;

718 }

719

721 {

722 pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",

723 slot_name,

725

728 return false;

729 }

730

733 return true;

734}

735

736

737

738

739

740

741

742

743

744

745void

747 char *option_name)

748{

749 if (buf->len > 0 && buf->data[buf->len - 1] != '(')

750 {

751 if (use_new_option_syntax)

753 else

755 }

756

758}

759

760

761

762

763

764

765

766void

768 char *option_name, char *option_value)

769{

771

772 if (option_value != NULL)

773 {

774 size_t length = strlen(option_value);

775 char *escaped_value = palloc(1 + 2 * length);

776

779 pfree(escaped_value);

780 }

781}

782

783

784

785

786

787

788

789void

791 char *option_name, int32 option_value)

792{

794

796}

797

798

799

800

801

804{

806 struct timeval tp;

807

809

813

814 return result;

815}

816

817

818

819

820

821void

823 long *secs, int *microsecs)

824{

826

827 if (diff <= 0)

828 {

829 *secs = 0;

830 *microsecs = 0;

831 }

832 else

833 {

836 }

837}

838

839

840

841

842

843bool

846 int msec)

847{

849

850 return (diff >= msec * INT64CONST(1000));

851}

852

853

854

855

856void

858{

860

861 memcpy(buf, &n64, sizeof(n64));

862}

863

864

865

866

869{

871

872 memcpy(&n64, buf, sizeof(n64));

873

875}

static Datum values[MAXATTR]

#define ngettext(s, p, n)

#define ALWAYS_SECURE_SEARCH_PATH_SQL

#define POSTGRES_EPOCH_JDATE

int PQserverVersion(const PGconn *conn)

void PQconninfoFree(PQconninfoOption *connOptions)

PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)

const char * PQparameterStatus(const PGconn *conn, const char *paramName)

int PQconnectionNeedsPassword(const PGconn *conn)

ConnStatusType PQstatus(const PGconn *conn)

void PQfinish(PGconn *conn)

char * PQerrorMessage(const PGconn *conn)

PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)

size_t PQescapeStringConn(PGconn *conn, char *to, const char *from, size_t length, int *error)

char * PQgetvalue(const PGresult *res, int tup_num, int field_num)

ExecStatusType PQresultStatus(const PGresult *res)

void PQclear(PGresult *res)

int PQntuples(const PGresult *res)

int PQgetisnull(const PGresult *res, int tup_num, int field_num)

char * PQresultErrorField(const PGresult *res, int fieldcode)

PGresult * PQexec(PGconn *conn, const char *query)

int PQnfields(const PGresult *res)

char * pg_strdup(const char *in)

void * pg_malloc0(size_t size)

void SetDataDirectoryCreatePerm(int dataDirMode)

Assert(PointerIsAligned(start, uint64))

static const JsonPathKeyword keywords[]

#define pg_log_error(...)

#define pg_log_error_detail(...)

void pfree(void *pointer)

#define DEFAULT_XLOG_SEG_SIZE

static bool slot_exists_ok

static const char * plugin

static XLogRecPtr startpos

PQExpBuffer createPQExpBuffer(void)

void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)

void destroyPQExpBuffer(PQExpBuffer str)

void appendPQExpBufferChar(PQExpBuffer str, char ch)

void appendPQExpBufferStr(PQExpBuffer str, const char *data)

char * simple_prompt(const char *prompt, bool echo)

void AppendIntegerCommandOption(PQExpBuffer buf, bool use_new_option_syntax, char *option_name, int32 option_value)

PGconn * GetConnection(void)

bool RetrieveWalSegSize(PGconn *conn)

#define ERRCODE_DUPLICATE_OBJECT

#define MINIMUM_VERSION_FOR_SHOW_CMD

int64 fe_recvint64(char *buf)

TimestampTz feGetCurrentTimestamp(void)

bool CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, bool is_temporary, bool is_physical, bool reserve_wal, bool slot_exists_ok, bool two_phase, bool failover)

#define MINIMUM_VERSION_FOR_GROUP_ACCESS

void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)

void AppendPlainCommandOption(PQExpBuffer buf, bool use_new_option_syntax, char *option_name)

void AppendStringCommandOption(PQExpBuffer buf, bool use_new_option_syntax, char *option_name, char *option_value)

void fe_sendint64(int64 i, char *buf)

bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)

bool GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID *restart_tli)

static bool RetrieveDataDirCreatePerm(PGconn *conn)

bool DropReplicationSlot(PGconn *conn, const char *slot_name)

bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)

int gettimeofday(struct timeval *tp, void *tzp)

#define IsValidWalSegSize(size)

#define InvalidXLogRecPtr