PostgreSQL Source Code: src/bin/pg_basebackup/receivelog.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
16
18#include <sys/stat.h>
20
26
27
31
32static bool still_sending = true;
33
38 char **buffer);
48
51
52static bool
54{
57
58 snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
59 fname);
60
62 NULL, 0);
63 if (f == NULL)
64 {
65 pg_log_error("could not create archive status file \"%s\": %s",
67 return false;
68 }
69
71 {
72 pg_log_error("could not close archive status file \"%s\": %s",
74 return false;
75 }
76
77 return true;
78}
79
80
81
82
83
84
85
86
87
88static bool
90{
92 char *fn;
93 ssize_t size;
96
99
100
102 walfile_name,
104
105
106
107
108
109
110
111
112
113
114
117 {
119 if (size < 0)
120 {
121 pg_log_error("could not get size of write-ahead log file \"%s\": %s",
124 return false;
125 }
127 {
128
130 if (f == NULL)
131 {
132 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
135 return false;
136 }
137
138
140 {
141 pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
144 exit(1);
145 }
146
149 return true;
150 }
151 if (size != 0)
152 {
153
154 if (errno == 0)
155 errno = ENOSPC;
156 pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
157 "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
158 size),
161 return false;
162 }
163
164 }
165
166
167
169 walfile_name,
172 if (f == NULL)
173 {
174 pg_log_error("could not open write-ahead log file \"%s\": %s",
177 return false;
178 }
179
182 return true;
183}
184
185
186
187
188
189
190static bool
192{
193 char *fn;
195 int r;
197
199 return true;
200
203
204
206 walfile_name,
208
210 {
213 else
214 {
215 pg_log_info("not renaming \"%s\", segment is not complete", fn);
217 }
218 }
219 else
221
223
224 if (r != 0)
225 {
226 pg_log_error("could not close file \"%s\": %s",
228
230 return false;
231 }
232
234
235
236
237
238
239
240
242 {
243
245 return false;
246 }
247
249 return true;
250}
251
252
253
254
255
256static bool
258{
260
261
262
263
264
266 return true;
267
269
271}
272
273static bool
275{
276 int size = strlen(content);
279
280
281
282
283
285 if (strcmp(histfname, filename) != 0)
286 {
287 pg_log_error("server reported unexpected history file name for timeline %u: %s",
289 return false;
290 }
291
293 histfname, ".tmp", 0);
294 if (f == NULL)
295 {
296 pg_log_error("could not create timeline history file \"%s\": %s",
298 return false;
299 }
300
301 if ((int) stream->walmethod->ops->write(f, content, size) != size)
302 {
303 pg_log_error("could not write timeline history file \"%s\": %s",
305
306
307
308
310
311 return false;
312 }
313
315 {
316 pg_log_error("could not close file \"%s\": %s",
318 return false;
319 }
320
321
323 {
324
326 return false;
327 }
328
329 return true;
330}
331
332
333
334
335static bool
337{
338 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
339 int len = 0;
340
341 replybuf[len] = 'r';
342 len += 1;
344 len += 8;
347 else
349 len += 8;
351 len += 8;
353 len += 8;
354 replybuf[len] = replyRequested ? 1 : 0;
355 len += 1;
356
358 {
359 pg_log_error("could not send feedback packet: %s",
361 return false;
362 }
363
364 return true;
365}
366
367
368
369
370
371
372
373bool
375{
376 int minServerMajor,
377 maxServerMajor;
378 int serverMajor;
379
380
381
382
383
384
385
386 minServerMajor = 903;
387 maxServerMajor = PG_VERSION_NUM / 100;
389 if (serverMajor < minServerMajor)
390 {
392
393 pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
394 serverver ? serverver : "'unknown'",
395 "9.3");
396 return false;
397 }
398 else if (serverMajor > maxServerMajor)
399 {
401
402 pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
403 serverver ? serverver : "'unknown'",
404 PG_VERSION);
405 return false;
406 }
407 return true;
408}
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451bool
453{
454 char query[128];
455 char slotcmd[128];
458
459
460
461
462
464 return false;
465
466
467
468
469
470
471
472
473
474
475
476
478 {
481 }
482 else
483 {
486 else
488 slotcmd[0] = 0;
489 }
490
492 {
493 char *sysidentifier = NULL;
495
496
497
498
500 {
502 return false;
503 }
504
505 if (strcmp(stream->sysidentifier, sysidentifier) != 0)
506 {
507 pg_log_error("system identifier does not match between base backup and streaming connection");
509 return false;
510 }
512
513 if (stream->timeline > servertli)
514 {
515 pg_log_error("starting timeline %u is not present in the server",
517 return false;
518 }
519 }
520
521
522
523
524
526
527 while (1)
528 {
529
530
531
532
533
534
536 {
537 snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
540 {
541
542 pg_log_error("could not send replication command \"%s\": %s",
545 return false;
546 }
547
548
549
550
551
553 {
554 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
556 }
557
558
562
564 }
565
566
567
568
569
571 return true;
572
573
574 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
575 slotcmd,
580 {
581 pg_log_error("could not send replication command \"%s\": %s",
584 return false;
585 }
587
588
590 if (res == NULL)
592
593
594
595
596
597
598
599
600
601
602
604 {
605
606
607
608
609
610
611
612
613
615 bool parsed;
616
619 if (!parsed)
621
622
623 if (newtimeline <= stream->timeline)
624 {
625 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
626 newtimeline, stream->timeline);
628 }
629 if (stream->startpos > stoppos)
630 {
631 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
635 }
636
637
640 {
641 pg_log_error("unexpected termination of replication stream: %s",
645 }
647
648
649
650
651
652 stream->timeline = newtimeline;
655 continue;
656 }
658 {
660
661
662
663
664
665
666
668 return true;
669 else
670 {
671 pg_log_error("replication stream was terminated before stop point");
673 }
674 }
675 else
676 {
677
678 pg_log_error("unexpected termination of replication stream: %s",
682 }
683 }
684
687 pg_log_error("could not close file \"%s\": %s",
690 return false;
691}
692
693
694
695
696
697static bool
699{
700 uint32 startpos_xlogid,
701 startpos_xrecoff;
702
703
704
705
706
707
708
709
710
711
712
713
714
716 {
717 pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
719 return false;
720 }
721
722 *timeline = atoi(PQgetvalue(res, 0, 0));
723 if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
724 &startpos_xrecoff) != 2)
725 {
726 pg_log_error("could not parse next timeline's starting point \"%s\"",
728 return false;
729 }
730 *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
731
732 return true;
733}
734
735
736
737
738
739
740
741
742
746{
750
752
753 while (1)
754 {
755 int r;
757 long sleeptime;
758
759
760
761
764
766
767
768
769
770
772 {
774 pg_fatal("could not fsync file \"%s\": %s",
777
778
779
780
781
784 last_status = now;
785 }
786
787
788
789
793 {
794
797 last_status = now;
798 }
799
800
801
802
804 last_status);
805
806
809
811 while (r != 0)
812 {
813 if (r == -1)
815 if (r == -2)
816 {
818
819 if (res == NULL)
822 return res;
823 }
824
825
827 {
829 &last_status))
831 }
832 else if (copybuf[0] == 'w')
833 {
836
837
838
839
840
843 }
844 else
845 {
846 pg_log_error("unrecognized streaming header: \"%c\"",
849 }
850
851
854
855
856
857
858
860 }
861 }
862
865 return NULL;
866}
867
868
869
870
871
872
873
874
875
876static int
878{
879 int ret;
880 fd_set input_mask;
881 int connsocket;
882 int maxfd;
883 struct timeval timeout;
884 struct timeval *timeoutptr;
885
887 if (connsocket < 0)
888 {
890 return -1;
891 }
892
893 FD_ZERO(&input_mask);
894 FD_SET(connsocket, &input_mask);
895 maxfd = connsocket;
897 {
898 FD_SET(stop_socket, &input_mask);
899 maxfd = Max(maxfd, stop_socket);
900 }
901
902 if (timeout_ms < 0)
903 timeoutptr = NULL;
904 else
905 {
906 timeout.tv_sec = timeout_ms / 1000L;
907 timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
908 timeoutptr = &timeout;
909 }
910
911 ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
912
913 if (ret < 0)
914 {
915 if (errno == EINTR)
916 return 0;
918 return -1;
919 }
920 if (ret > 0 && FD_ISSET(connsocket, &input_mask))
921 return 1;
922
923 return 0;
924}
925
926
927
928
929
930
931
932
933
934
935
936
937
938static int
940 char **buffer)
941{
943 int rawlen;
944
945
946 Assert(*buffer == NULL);
947
948
950 if (rawlen == 0)
951 {
952 int ret;
953
954
955
956
957
958
960 if (ret <= 0)
961 return ret;
962
963
965 {
966 pg_log_error("could not receive data from WAL stream: %s",
968 return -1;
969 }
970
971
973 if (rawlen == 0)
974 return 0;
975 }
976 if (rawlen == -1)
977 return -2;
978 if (rawlen == -2)
979 {
981 return -1;
982 }
983
984
986 return rawlen;
987}
988
989
990
991
992static bool
995{
996 int pos;
997 bool replyRequested;
999
1000
1001
1002
1003
1004 pos = 1;
1005 pos += 8;
1006 pos += 8;
1007
1008 if (len < pos + 1)
1009 {
1011 return false;
1012 }
1013 replyRequested = copybuf[pos];
1014
1015
1017 {
1020 {
1021
1022
1023
1024
1025
1026
1027
1029 pg_fatal("could not fsync file \"%s\": %s",
1032 }
1033
1036 return false;
1037 *last_status = now;
1038 }
1039
1040 return true;
1041}
1042
1043
1044
1045
1046static bool
1049{
1050 int xlogoff;
1051 int bytes_left;
1052 int bytes_written;
1053 int hdr_len;
1054
1055
1056
1057
1058
1060 return true;
1061
1062
1063
1064
1065
1066
1067 hdr_len = 1;
1068 hdr_len += 8;
1069 hdr_len += 8;
1070 hdr_len += 8;
1071 if (len < hdr_len)
1072 {
1074 return false;
1075 }
1077
1078
1080
1081
1082
1083
1084
1086 {
1087
1088 if (xlogoff != 0)
1089 {
1090 pg_log_error("received write-ahead log record for offset %u with no file open",
1091 xlogoff);
1092 return false;
1093 }
1094 }
1095 else
1096 {
1097
1099 {
1100 pg_log_error("got WAL data offset %08x, expected %08x",
1102 return false;
1103 }
1104 }
1105
1106 bytes_left = len - hdr_len;
1107 bytes_written = 0;
1108
1109 while (bytes_left)
1110 {
1111 int bytes_to_write;
1112
1113
1114
1115
1116
1117 if (xlogoff + bytes_left > WalSegSz)
1118 bytes_to_write = WalSegSz - xlogoff;
1119 else
1120 bytes_to_write = bytes_left;
1121
1123 {
1125 {
1126
1127 return false;
1128 }
1129 }
1130
1132 copybuf + hdr_len + bytes_written,
1133 bytes_to_write) != bytes_to_write)
1134 {
1135 pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
1138 return false;
1139 }
1140
1141
1142 bytes_written += bytes_to_write;
1143 bytes_left -= bytes_to_write;
1144 *blockpos += bytes_to_write;
1145 xlogoff += bytes_to_write;
1146
1147
1149 {
1151
1152 return false;
1153
1154 xlogoff = 0;
1155
1157 {
1159 {
1160 pg_log_error("could not send copy-end packet: %s",
1162 return false;
1163 }
1165 return true;
1166 }
1167 }
1168 }
1169
1170
1171 return true;
1172}
1173
1174
1175
1176
1180{
1182
1183
1184
1185
1186
1187
1189 {
1191 {
1192
1194 return NULL;
1195 }
1197 {
1199 {
1200 pg_log_error("could not send copy-end packet: %s",
1203 return NULL;
1204 }
1206 }
1208 }
1209 *stoppos = blockpos;
1210 return res;
1211}
1212
1213
1214
1215
1216static bool
1218{
1220 {
1222 {
1223
1224 return false;
1225 }
1227 {
1228 pg_log_error("could not send copy-end packet: %s",
1230 return false;
1231 }
1233 }
1234
1235 return true;
1236}
1237
1238
1239
1240
1241static long
1244{
1246 long sleeptime;
1247
1249 status_targettime = last_status +
1251
1252 if (status_targettime > 0)
1253 {
1254 long secs;
1255 int usecs;
1256
1258 status_targettime,
1259 &secs,
1260 &usecs);
1261
1262 if (secs <= 0)
1263 {
1264 secs = 1;
1265 usecs = 0;
1266 }
1267
1268 sleeptime = secs * 1000 + usecs / 1000;
1269 }
1270 else
1271 sleeptime = -1;
1272
1273 return sleeptime;
1274}
Datum now(PG_FUNCTION_ARGS)
#define ngettext(s, p, n)
int PQserverVersion(const PGconn *conn)
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
int PQflush(PGconn *conn)
void PQfreemem(void *ptr)
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
PGresult * PQgetResult(PGconn *conn)
ExecStatusType PQresultStatus(const PGresult *res)
void PQclear(PGresult *res)
int PQputCopyEnd(PGconn *conn, const char *errormsg)
int PQntuples(const PGresult *res)
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
char * PQresultErrorMessage(const PGresult *res)
int PQconsumeInput(PGconn *conn)
PGresult * PQexec(PGconn *conn, const char *query)
int PQnfields(const PGresult *res)
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Assert(PointerIsAligned(start, uint64))
#define pg_log_error(...)
static int standby_message_timeout
static XLogRecPtr startpos
#define pg_log_warning(...)
size_t strlcpy(char *dst, const char *src, size_t siz)
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
static bool reportFlushPosition
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
static bool existsTimeLineHistoryFile(StreamCtl *stream)
static bool still_sending
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
bool ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
bool CheckServerVersionForStreaming(PGconn *conn)
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
static XLogRecPtr lastFlushPosition
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
int64 fe_recvint64(char *buf)
TimestampTz feGetCurrentTimestamp(void)
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
void fe_sendint64(int64 i, char *buf)
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
stream_stop_callback stream_stop
int standby_message_timeout
WalWriteMethod * walmethod
bool(* existsfile)(WalWriteMethod *wwmethod, const char *pathname)
ssize_t(* write)(Walfile *f, const void *buf, size_t count)
ssize_t(* get_file_size)(WalWriteMethod *wwmethod, const char *pathname)
int(* close)(Walfile *f, WalCloseMethod method)
char *(* get_file_name)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
const WalWriteMethodOps * ops
pg_compress_algorithm compression_algorithm
static StringInfo copybuf
static void * fn(void *arg)
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
#define select(n, r, w, e, timeout)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void TLHistoryFileName(char *fname, TimeLineID tli)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr