PostgreSQL Source Code: src/bin/pg_basebackup/receivelog.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

16

18#include <sys/stat.h>

20

26

27

31

32static bool still_sending = true;

33

38 char **buffer);

48

51

52static bool

54{

57

58 snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",

59 fname);

60

62 NULL, 0);

63 if (f == NULL)

64 {

65 pg_log_error("could not create archive status file \"%s\": %s",

67 return false;

68 }

69

71 {

72 pg_log_error("could not close archive status file \"%s\": %s",

74 return false;

75 }

76

77 return true;

78}

79

80

81

82

83

84

85

86

87

88static bool

90{

92 char *fn;

93 ssize_t size;

96

99

100

102 walfile_name,

104

105

106

107

108

109

110

111

112

113

114

117 {

119 if (size < 0)

120 {

121 pg_log_error("could not get size of write-ahead log file \"%s\": %s",

124 return false;

125 }

127 {

128

130 if (f == NULL)

131 {

132 pg_log_error("could not open existing write-ahead log file \"%s\": %s",

135 return false;

136 }

137

138

140 {

141 pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",

144 exit(1);

145 }

146

149 return true;

150 }

151 if (size != 0)

152 {

153

154 if (errno == 0)

155 errno = ENOSPC;

156 pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",

157 "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",

158 size),

161 return false;

162 }

163

164 }

165

166

167

169 walfile_name,

172 if (f == NULL)

173 {

174 pg_log_error("could not open write-ahead log file \"%s\": %s",

177 return false;

178 }

179

182 return true;

183}

184

185

186

187

188

189

190static bool

192{

193 char *fn;

195 int r;

197

199 return true;

200

203

204

206 walfile_name,

208

210 {

213 else

214 {

215 pg_log_info("not renaming \"%s\", segment is not complete", fn);

217 }

218 }

219 else

221

223

224 if (r != 0)

225 {

226 pg_log_error("could not close file \"%s\": %s",

228

230 return false;

231 }

232

234

235

236

237

238

239

240

242 {

243

245 return false;

246 }

247

249 return true;

250}

251

252

253

254

255

256static bool

258{

260

261

262

263

264

266 return true;

267

269

271}

272

273static bool

275{

276 int size = strlen(content);

279

280

281

282

283

285 if (strcmp(histfname, filename) != 0)

286 {

287 pg_log_error("server reported unexpected history file name for timeline %u: %s",

289 return false;

290 }

291

293 histfname, ".tmp", 0);

294 if (f == NULL)

295 {

296 pg_log_error("could not create timeline history file \"%s\": %s",

298 return false;

299 }

300

301 if ((int) stream->walmethod->ops->write(f, content, size) != size)

302 {

303 pg_log_error("could not write timeline history file \"%s\": %s",

305

306

307

308

310

311 return false;

312 }

313

315 {

316 pg_log_error("could not close file \"%s\": %s",

318 return false;

319 }

320

321

323 {

324

326 return false;

327 }

328

329 return true;

330}

331

332

333

334

335static bool

337{

338 char replybuf[1 + 8 + 8 + 8 + 8 + 1];

339 int len = 0;

340

341 replybuf[len] = 'r';

342 len += 1;

344 len += 8;

347 else

349 len += 8;

351 len += 8;

353 len += 8;

354 replybuf[len] = replyRequested ? 1 : 0;

355 len += 1;

356

358 {

359 pg_log_error("could not send feedback packet: %s",

361 return false;

362 }

363

364 return true;

365}

366

367

368

369

370

371

372

373bool

375{

376 int minServerMajor,

377 maxServerMajor;

378 int serverMajor;

379

380

381

382

383

384

385

386 minServerMajor = 903;

387 maxServerMajor = PG_VERSION_NUM / 100;

389 if (serverMajor < minServerMajor)

390 {

392

393 pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",

394 serverver ? serverver : "'unknown'",

395 "9.3");

396 return false;

397 }

398 else if (serverMajor > maxServerMajor)

399 {

401

402 pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",

403 serverver ? serverver : "'unknown'",

404 PG_VERSION);

405 return false;

406 }

407 return true;

408}

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451bool

453{

454 char query[128];

455 char slotcmd[128];

458

459

460

461

462

464 return false;

465

466

467

468

469

470

471

472

473

474

475

476

478 {

481 }

482 else

483 {

486 else

488 slotcmd[0] = 0;

489 }

490

492 {

493 char *sysidentifier = NULL;

495

496

497

498

500 {

502 return false;

503 }

504

505 if (strcmp(stream->sysidentifier, sysidentifier) != 0)

506 {

507 pg_log_error("system identifier does not match between base backup and streaming connection");

509 return false;

510 }

512

513 if (stream->timeline > servertli)

514 {

515 pg_log_error("starting timeline %u is not present in the server",

517 return false;

518 }

519 }

520

521

522

523

524

526

527 while (1)

528 {

529

530

531

532

533

534

536 {

537 snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);

540 {

541

542 pg_log_error("could not send replication command \"%s\": %s",

545 return false;

546 }

547

548

549

550

551

553 {

554 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",

556 }

557

558

562

564 }

565

566

567

568

569

571 return true;

572

573

574 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",

575 slotcmd,

580 {

581 pg_log_error("could not send replication command \"%s\": %s",

584 return false;

585 }

587

588

590 if (res == NULL)

592

593

594

595

596

597

598

599

600

601

602

604 {

605

606

607

608

609

610

611

612

613

615 bool parsed;

616

619 if (!parsed)

621

622

623 if (newtimeline <= stream->timeline)

624 {

625 pg_log_error("server reported unexpected next timeline %u, following timeline %u",

626 newtimeline, stream->timeline);

628 }

629 if (stream->startpos > stoppos)

630 {

631 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",

635 }

636

637

640 {

641 pg_log_error("unexpected termination of replication stream: %s",

645 }

647

648

649

650

651

652 stream->timeline = newtimeline;

655 continue;

656 }

658 {

660

661

662

663

664

665

666

668 return true;

669 else

670 {

671 pg_log_error("replication stream was terminated before stop point");

673 }

674 }

675 else

676 {

677

678 pg_log_error("unexpected termination of replication stream: %s",

682 }

683 }

684

687 pg_log_error("could not close file \"%s\": %s",

690 return false;

691}

692

693

694

695

696

697static bool

699{

700 uint32 startpos_xlogid,

701 startpos_xrecoff;

702

703

704

705

706

707

708

709

710

711

712

713

714

716 {

717 pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",

719 return false;

720 }

721

722 *timeline = atoi(PQgetvalue(res, 0, 0));

723 if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,

724 &startpos_xrecoff) != 2)

725 {

726 pg_log_error("could not parse next timeline's starting point \"%s\"",

728 return false;

729 }

730 *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;

731

732 return true;

733}

734

735

736

737

738

739

740

741

742

746{

750

752

753 while (1)

754 {

755 int r;

757 long sleeptime;

758

759

760

761

764

766

767

768

769

770

772 {

774 pg_fatal("could not fsync file \"%s\": %s",

777

778

779

780

781

784 last_status = now;

785 }

786

787

788

789

793 {

794

797 last_status = now;

798 }

799

800

801

802

804 last_status);

805

806

809

811 while (r != 0)

812 {

813 if (r == -1)

815 if (r == -2)

816 {

818

819 if (res == NULL)

822 return res;

823 }

824

825

827 {

829 &last_status))

831 }

832 else if (copybuf[0] == 'w')

833 {

836

837

838

839

840

843 }

844 else

845 {

846 pg_log_error("unrecognized streaming header: \"%c\"",

849 }

850

851

854

855

856

857

858

860 }

861 }

862

865 return NULL;

866}

867

868

869

870

871

872

873

874

875

876static int

878{

879 int ret;

880 fd_set input_mask;

881 int connsocket;

882 int maxfd;

883 struct timeval timeout;

884 struct timeval *timeoutptr;

885

887 if (connsocket < 0)

888 {

890 return -1;

891 }

892

893 FD_ZERO(&input_mask);

894 FD_SET(connsocket, &input_mask);

895 maxfd = connsocket;

897 {

898 FD_SET(stop_socket, &input_mask);

899 maxfd = Max(maxfd, stop_socket);

900 }

901

902 if (timeout_ms < 0)

903 timeoutptr = NULL;

904 else

905 {

906 timeout.tv_sec = timeout_ms / 1000L;

907 timeout.tv_usec = (timeout_ms % 1000L) * 1000L;

908 timeoutptr = &timeout;

909 }

910

911 ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);

912

913 if (ret < 0)

914 {

915 if (errno == EINTR)

916 return 0;

918 return -1;

919 }

920 if (ret > 0 && FD_ISSET(connsocket, &input_mask))

921 return 1;

922

923 return 0;

924}

925

926

927

928

929

930

931

932

933

934

935

936

937

938static int

940 char **buffer)

941{

943 int rawlen;

944

945

946 Assert(*buffer == NULL);

947

948

950 if (rawlen == 0)

951 {

952 int ret;

953

954

955

956

957

958

960 if (ret <= 0)

961 return ret;

962

963

965 {

966 pg_log_error("could not receive data from WAL stream: %s",

968 return -1;

969 }

970

971

973 if (rawlen == 0)

974 return 0;

975 }

976 if (rawlen == -1)

977 return -2;

978 if (rawlen == -2)

979 {

981 return -1;

982 }

983

984

986 return rawlen;

987}

988

989

990

991

992static bool

995{

996 int pos;

997 bool replyRequested;

999

1000

1001

1002

1003

1004 pos = 1;

1005 pos += 8;

1006 pos += 8;

1007

1008 if (len < pos + 1)

1009 {

1011 return false;

1012 }

1013 replyRequested = copybuf[pos];

1014

1015

1017 {

1020 {

1021

1022

1023

1024

1025

1026

1027

1029 pg_fatal("could not fsync file \"%s\": %s",

1032 }

1033

1036 return false;

1037 *last_status = now;

1038 }

1039

1040 return true;

1041}

1042

1043

1044

1045

1046static bool

1049{

1050 int xlogoff;

1051 int bytes_left;

1052 int bytes_written;

1053 int hdr_len;

1054

1055

1056

1057

1058

1060 return true;

1061

1062

1063

1064

1065

1066

1067 hdr_len = 1;

1068 hdr_len += 8;

1069 hdr_len += 8;

1070 hdr_len += 8;

1071 if (len < hdr_len)

1072 {

1074 return false;

1075 }

1077

1078

1080

1081

1082

1083

1084

1086 {

1087

1088 if (xlogoff != 0)

1089 {

1090 pg_log_error("received write-ahead log record for offset %u with no file open",

1091 xlogoff);

1092 return false;

1093 }

1094 }

1095 else

1096 {

1097

1099 {

1100 pg_log_error("got WAL data offset %08x, expected %08x",

1102 return false;

1103 }

1104 }

1105

1106 bytes_left = len - hdr_len;

1107 bytes_written = 0;

1108

1109 while (bytes_left)

1110 {

1111 int bytes_to_write;

1112

1113

1114

1115

1116

1117 if (xlogoff + bytes_left > WalSegSz)

1118 bytes_to_write = WalSegSz - xlogoff;

1119 else

1120 bytes_to_write = bytes_left;

1121

1123 {

1125 {

1126

1127 return false;

1128 }

1129 }

1130

1132 copybuf + hdr_len + bytes_written,

1133 bytes_to_write) != bytes_to_write)

1134 {

1135 pg_log_error("could not write %d bytes to WAL file \"%s\": %s",

1138 return false;

1139 }

1140

1141

1142 bytes_written += bytes_to_write;

1143 bytes_left -= bytes_to_write;

1144 *blockpos += bytes_to_write;

1145 xlogoff += bytes_to_write;

1146

1147

1149 {

1151

1152 return false;

1153

1154 xlogoff = 0;

1155

1157 {

1159 {

1160 pg_log_error("could not send copy-end packet: %s",

1162 return false;

1163 }

1165 return true;

1166 }

1167 }

1168 }

1169

1170

1171 return true;

1172}

1173

1174

1175

1176

1180{

1182

1183

1184

1185

1186

1187

1189 {

1191 {

1192

1194 return NULL;

1195 }

1197 {

1199 {

1200 pg_log_error("could not send copy-end packet: %s",

1203 return NULL;

1204 }

1206 }

1208 }

1209 *stoppos = blockpos;

1210 return res;

1211}

1212

1213

1214

1215

1216static bool

1218{

1220 {

1222 {

1223

1224 return false;

1225 }

1227 {

1228 pg_log_error("could not send copy-end packet: %s",

1230 return false;

1231 }

1233 }

1234

1235 return true;

1236}

1237

1238

1239

1240

1241static long

1244{

1246 long sleeptime;

1247

1249 status_targettime = last_status +

1251

1252 if (status_targettime > 0)

1253 {

1254 long secs;

1255 int usecs;

1256

1258 status_targettime,

1259 &secs,

1260 &usecs);

1261

1262 if (secs <= 0)

1263 {

1264 secs = 1;

1265 usecs = 0;

1266 }

1267

1268 sleeptime = secs * 1000 + usecs / 1000;

1269 }

1270 else

1271 sleeptime = -1;

1272

1273 return sleeptime;

1274}

Datum now(PG_FUNCTION_ARGS)

#define ngettext(s, p, n)

int PQserverVersion(const PGconn *conn)

const char * PQparameterStatus(const PGconn *conn, const char *paramName)

char * PQerrorMessage(const PGconn *conn)

int PQsocket(const PGconn *conn)

int PQflush(PGconn *conn)

void PQfreemem(void *ptr)

char * PQgetvalue(const PGresult *res, int tup_num, int field_num)

PGresult * PQgetResult(PGconn *conn)

ExecStatusType PQresultStatus(const PGresult *res)

void PQclear(PGresult *res)

int PQputCopyEnd(PGconn *conn, const char *errormsg)

int PQntuples(const PGresult *res)

int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)

char * PQresultErrorMessage(const PGresult *res)

int PQconsumeInput(PGconn *conn)

PGresult * PQexec(PGconn *conn, const char *query)

int PQnfields(const PGresult *res)

int PQgetCopyData(PGconn *conn, char **buffer, int async)

Assert(PointerIsAligned(start, uint64))

#define pg_log_error(...)

static int standby_message_timeout

static XLogRecPtr startpos

#define pg_log_warning(...)

size_t strlcpy(char *dst, const char *src, size_t siz)

static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)

static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)

static bool mark_file_as_archived(StreamCtl *stream, const char *fname)

static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)

static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)

static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)

static bool reportFlushPosition

static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)

static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)

static bool existsTimeLineHistoryFile(StreamCtl *stream)

static bool still_sending

static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)

bool ReceiveXlogStream(PGconn *conn, StreamCtl *stream)

static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)

static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)

bool CheckServerVersionForStreaming(PGconn *conn)

static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)

static XLogRecPtr lastFlushPosition

static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)

static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)

int64 fe_recvint64(char *buf)

TimestampTz feGetCurrentTimestamp(void)

void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)

void fe_sendint64(int64 i, char *buf)

bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)

bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)

stream_stop_callback stream_stop

int standby_message_timeout

WalWriteMethod * walmethod

bool(* existsfile)(WalWriteMethod *wwmethod, const char *pathname)

ssize_t(* write)(Walfile *f, const void *buf, size_t count)

ssize_t(* get_file_size)(WalWriteMethod *wwmethod, const char *pathname)

int(* close)(Walfile *f, WalCloseMethod method)

char *(* get_file_name)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)

Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)

const WalWriteMethodOps * ops

pg_compress_algorithm compression_algorithm

static StringInfo copybuf

static void * fn(void *arg)

const char * GetLastWalMethodError(WalWriteMethod *wwmethod)

#define select(n, r, w, e, timeout)

#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)

#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)

static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

static void TLHistoryFileName(char *fname, TimeLineID tli)

#define LSN_FORMAT_ARGS(lsn)

#define InvalidXLogRecPtr