PostgreSQL Source Code: src/bin/pg_basebackup/walmethods.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
13
14#include <fcntl.h>
15#include <sys/stat.h>
18
19#ifdef USE_LZ4
20#include <lz4frame.h>
21#endif
22#ifdef HAVE_LIBZ
23#include <zlib.h>
24#endif
25
31
32
33#define ZLIB_OUT_SIZE 4096
34
35
36#define LZ4_IN_SIZE 4096
37
38
39
40
41
42
44 const char *pathname,
45 const char *temp_suffix,
46 size_t pad_to_size);
50 const char *pathname);
52 const char *pathname, const char *temp_suffix);
57
68};
69
70
71
72
74{
78
79
80
81
83{
88#ifdef HAVE_LIBZ
89 gzFile gzfp;
90#endif
91#ifdef USE_LZ4
92 LZ4F_compressionContext_t ctx;
93 size_t lz4bufsize;
94 void *lz4buf;
95#endif
97
98#define clear_error(wwmethod) \
99 ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0)
100
101static char *
103 const char *pathname, const char *temp_suffix)
104{
106
108 pathname,
111 temp_suffix ? temp_suffix : "");
112
114}
115
118 const char *temp_suffix, size_t pad_to_size)
119{
123 int fd;
125#ifdef HAVE_LIBZ
126 gzFile gzfp = NULL;
127#endif
128#ifdef USE_LZ4
129 LZ4F_compressionContext_t ctx = NULL;
130 size_t lz4bufsize = 0;
131 void *lz4buf = NULL;
132#endif
133
135
137 snprintf(tmppath, sizeof(tmppath), "%s/%s",
140
141
142
143
144
145
146
148 if (fd < 0)
149 {
151 return NULL;
152 }
153
154#ifdef HAVE_LIBZ
156 {
157 gzfp = gzdopen(fd, "wb");
158 if (gzfp == NULL)
159 {
162 return NULL;
163 }
164
166 Z_DEFAULT_STRATEGY) != Z_OK)
167 {
169 gzclose(gzfp);
170 return NULL;
171 }
172 }
173#endif
174#ifdef USE_LZ4
176 {
177 size_t ctx_out;
178 size_t header_size;
179 LZ4F_preferences_t prefs;
180
181 ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
182 if (LZ4F_isError(ctx_out))
183 {
184 wwmethod->lasterrstring = LZ4F_getErrorName(ctx_out);
186 return NULL;
187 }
188
189 lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL);
191
192
193 memset(&prefs, 0, sizeof(prefs));
195
196
197 header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs);
198 if (LZ4F_isError(header_size))
199 {
200 wwmethod->lasterrstring = LZ4F_getErrorName(header_size);
201 (void) LZ4F_freeCompressionContext(ctx);
204 return NULL;
205 }
206
207 errno = 0;
208 if (write(fd, lz4buf, header_size) != header_size)
209 {
210
211 wwmethod->lasterrno = errno ? errno : ENOSPC;
212 (void) LZ4F_freeCompressionContext(ctx);
215 return NULL;
216 }
217 }
218#endif
219
220
222 {
223 ssize_t rc;
224
226
227 if (rc < 0)
228 {
231 return NULL;
232 }
233
234
235
236
237
238 if (lseek(fd, 0, SEEK_SET) != 0)
239 {
242 return NULL;
243 }
244 }
245
246
247
248
249
250
251
252 if (wwmethod->sync)
253 {
256 {
258#ifdef HAVE_LIBZ
260 gzclose(gzfp);
261 else
262#endif
263#ifdef USE_LZ4
265 {
266 (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
267 (void) LZ4F_freeCompressionContext(ctx);
270 }
271 else
272#endif
274 return NULL;
275 }
276 }
277
279#ifdef HAVE_LIBZ
281 f->gzfp = gzfp;
282#endif
283#ifdef USE_LZ4
285 {
286 f->ctx = ctx;
287 f->lz4buf = lz4buf;
288 f->lz4bufsize = lz4bufsize;
289 }
290#endif
291
297 if (temp_suffix)
299
300 return &f->base;
301}
302
303static ssize_t
305{
306 ssize_t r;
308
311
312#ifdef HAVE_LIBZ
314 {
315 errno = 0;
316 r = (ssize_t) gzwrite(df->gzfp, buf, count);
317 if (r != count)
318 {
319
321 }
322 }
323 else
324#endif
325#ifdef USE_LZ4
327 {
328 size_t chunk;
330 const void *inbuf = buf;
331
334 {
335 size_t compressed;
336
339 else
341
343 compressed = LZ4F_compressUpdate(df->ctx,
344 df->lz4buf, df->lz4bufsize,
345 inbuf, chunk,
346 NULL);
347
348 if (LZ4F_isError(compressed))
349 {
351 return -1;
352 }
353
354 errno = 0;
355 if (write(df->fd, df->lz4buf, compressed) != compressed)
356 {
357
359 return -1;
360 }
361
362 inbuf = ((char *) inbuf) + chunk;
363 }
364
365
366 r = (ssize_t) count;
367 }
368 else
369#endif
370 {
371 errno = 0;
373 if (r != count)
374 {
375
377 }
378 }
379 if (r > 0)
381 return r;
382}
383
384static int
386{
387 int r;
392
395
396#ifdef HAVE_LIBZ
398 {
399 errno = 0;
400 r = gzclose(df->gzfp);
401 }
402 else
403#endif
404#ifdef USE_LZ4
406 {
407 size_t compressed;
408
409 compressed = LZ4F_compressEnd(df->ctx,
410 df->lz4buf, df->lz4bufsize,
411 NULL);
412
413 if (LZ4F_isError(compressed))
414 {
416 return -1;
417 }
418
419 errno = 0;
420 if (write(df->fd, df->lz4buf, compressed) != compressed)
421 {
422
424 return -1;
425 }
426
428 }
429 else
430#endif
432
433 if (r == 0)
434 {
435
437 {
439 char *filename2;
440
441
442
443
444
447 snprintf(tmppath, sizeof(tmppath), "%s/%s",
450
451
453 snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
454 dir_data->basedir, filename2);
458 else
459 {
460 if (rename(tmppath, tmppath2) != 0)
461 {
462 pg_log_error("could not rename file \"%s\" to \"%s\": %m",
463 tmppath, tmppath2);
464 r = -1;
465 }
466 }
467 }
469 {
471
472
475 snprintf(tmppath, sizeof(tmppath), "%s/%s",
478 r = unlink(tmppath);
479 }
480 else
481 {
482
483
484
485
486
488 {
490 if (r == 0)
492 }
493 }
494 }
495
496 if (r != 0)
498
499#ifdef USE_LZ4
501
502 LZ4F_freeCompressionContext(df->ctx);
503#endif
504
509
510 return r;
511}
512
513static int
515{
516 int r;
517
520
522 return 0;
523
524#ifdef HAVE_LIBZ
526 {
528 {
529 f->wwmethod->lasterrno = errno;
530 return -1;
531 }
532 }
533#endif
534#ifdef USE_LZ4
536 {
538 size_t compressed;
539
540
541 compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
542 if (LZ4F_isError(compressed))
543 {
545 return -1;
546 }
547
548 errno = 0;
549 if (write(df->fd, df->lz4buf, compressed) != compressed)
550 {
551
553 return -1;
554 }
555 }
556#endif
557
559 if (r < 0)
561 return r;
562}
563
564static ssize_t
566{
568 struct stat statbuf;
570
571 snprintf(tmppath, sizeof(tmppath), "%s/%s",
572 dir_data->basedir, pathname);
573
574 if (stat(tmppath, &statbuf) != 0)
575 {
577 return -1;
578 }
579
581}
582
583static bool
585{
588 int fd;
589
591
592 snprintf(tmppath, sizeof(tmppath), "%s/%s",
593 dir_data->basedir, pathname);
594
595 fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
596 if (fd < 0)
597
598
599
600
601
602 return false;
604 return true;
605}
606
607static bool
609{
611
612 if (wwmethod->sync)
613 {
615
616
617
618
619
621 {
623 return false;
624 }
625 }
626 return true;
627}
628
629static void
631{
633
636}
637
638
642 int compression_level, bool sync)
643{
645
654
655 return &wwmethod->base;
656}
657
658
659
660
661
662
663
665 const char *pathname,
666 const char *temp_suffix,
667 size_t pad_to_size);
671 const char *pathname);
673 const char *pathname, const char *temp_suffix);
678
689};
690
692{
698
700{
705#ifdef HAVE_LIBZ
706 z_streamp zp;
707 void *zlibOut;
708#endif
710
711#ifdef HAVE_LIBZ
712static bool
713tar_write_compressed_data(TarMethodData *tar_data, const void *buf, size_t count,
714 bool flush)
715{
716 tar_data->zp->next_in = buf;
717 tar_data->zp->avail_in = count;
718
719 while (tar_data->zp->avail_in || flush)
720 {
721 int r;
722
723 r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
724 if (r == Z_STREAM_ERROR)
725 {
727 return false;
728 }
729
731 {
733
734 errno = 0;
735 if (write(tar_data->fd, tar_data->zlibOut, len) != len)
736 {
737
738 tar_data->base.lasterrno = errno ? errno : ENOSPC;
739 return false;
740 }
741
742 tar_data->zp->next_out = tar_data->zlibOut;
744 }
745
746 if (r == Z_STREAM_END)
747 break;
748 }
749
750 if (flush)
751 {
752
753 if (deflateReset(tar_data->zp) != Z_OK)
754 {
755 tar_data->base.lasterrstring = _("could not reset compression stream");
756 return false;
757 }
758 }
759
760 return true;
761}
762#endif
763
764static ssize_t
766{
768 ssize_t r;
769
772
773
775 {
776 errno = 0;
778 if (r != count)
779 {
780
782 return -1;
783 }
785 return r;
786 }
787#ifdef HAVE_LIBZ
789 {
790 if (!tar_write_compressed_data(tar_data, buf, count, false))
791 return -1;
793 return count;
794 }
795#endif
796 else
797 {
798
800 return -1;
801 }
802}
803
804static bool
806{
808 size_t bytesleft = bytes;
809
810 memset(zerobuf.data, 0, XLOG_BLCKSZ);
811 while (bytesleft)
812 {
813 size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
815
816 if (r < 0)
817 return false;
818 bytesleft -= r;
819 }
820
821 return true;
822}
823
824static char *
826 const char *temp_suffix)
827{
829
831 pathname, temp_suffix ? temp_suffix : "");
832
834}
835
838 const char *temp_suffix, size_t pad_to_size)
839{
841 char *tmppath;
842
844
845 if (tar_data->fd < 0)
846 {
847
848
849
853 if (tar_data->fd < 0)
854 {
856 return NULL;
857 }
858
859#ifdef HAVE_LIBZ
861 {
862 tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
863 tar_data->zp->zalloc = Z_NULL;
864 tar_data->zp->zfree = Z_NULL;
865 tar_data->zp->opaque = Z_NULL;
866 tar_data->zp->next_out = tar_data->zlibOut;
868
869
870
871
872
873
875 Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
876 {
878 tar_data->zp = NULL;
880 _("could not initialize compression library");
881 return NULL;
882 }
883 }
884#endif
885
886
887 }
888
890 {
892 _("implementation error: tar files can't have more than one open file");
893 return NULL;
894 }
895
898
900
901
903 {
907 wwmethod->lasterrstring = _("could not create tar header");
908 return NULL;
909 }
910
912
913#ifdef HAVE_LIBZ
915 {
916
917 if (!tar_write_compressed_data(tar_data, NULL, 0, true))
918 return NULL;
919
920
921 if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
922 {
924 _("could not change compression parameters");
925 return NULL;
926 }
927 }
928#endif
929
932 {
936 return NULL;
937 }
939
941 {
942 errno = 0;
945 {
946
947 wwmethod->lasterrno = errno ? errno : ENOSPC;
950 return NULL;
951 }
952 }
953#ifdef HAVE_LIBZ
955 {
956
957 if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
959 return NULL;
960
961
963 Z_DEFAULT_STRATEGY) != Z_OK)
964 {
965 wwmethod->lasterrstring = _("could not change compression parameters");
966 return NULL;
967 }
968 }
969#endif
970 else
971 {
972
974 }
975
977
978
979
980
981
982 if (pad_to_size)
983 {
986 {
987
989 return NULL;
990
991 if (lseek(tar_data->fd,
994 {
996 return NULL;
997 }
998
1000 }
1001 }
1002
1004}
1005
1006static ssize_t
1008{
1010
1011
1013 return -1;
1014}
1015
1016static int
1018{
1020 int r;
1021
1024
1026 return 0;
1027
1028
1029
1030
1031
1033 return 0;
1034
1036 if (r < 0)
1038 return r;
1039}
1040
1041static int
1043{
1044 ssize_t filesize;
1045 int padding;
1048
1051
1053 {
1055 {
1057 return -1;
1058 }
1059
1060
1061
1062
1063
1064
1065 if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
1066 {
1068 return -1;
1069 }
1070
1071 pg_free(tf->base.pathname);
1074
1075 return 0;
1076 }
1077
1078
1079
1080
1081
1082
1083 if (tf->pad_to_size)
1084 {
1086 {
1087
1088
1089
1090
1091 size_t sizeleft = tf->pad_to_size - tf->base.currpos;
1092
1093 if (sizeleft)
1094 {
1096 return -1;
1097 }
1098 }
1099 else
1100 {
1101
1102
1103
1104
1105 tf->base.currpos = tf->pad_to_size;
1106 }
1107 }
1108
1109
1110
1111
1112
1115 if (padding)
1116 {
1118
1119 if (tar_write(f, zerobuf, padding) != padding)
1120 return -1;
1121 }
1122
1123
1124#ifdef HAVE_LIBZ
1126 {
1127
1128 if (!tar_write_compressed_data(tar_data, NULL, 0, true))
1129 return -1;
1130 }
1131#endif
1132
1133
1134
1135
1136
1137
1139
1141
1142
1143
1144
1145
1147
1150 if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
1151 {
1153 return -1;
1154 }
1156 {
1157 errno = 0;
1159 {
1160
1162 return -1;
1163 }
1164 }
1165#ifdef HAVE_LIBZ
1167 {
1168
1169 if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
1170 {
1172 return -1;
1173 }
1174
1175
1176 if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
1178 return -1;
1179
1180
1182 Z_DEFAULT_STRATEGY) != Z_OK)
1183 {
1185 return -1;
1186 }
1187 }
1188#endif
1189 else
1190 {
1191
1193 }
1194
1195
1196 if (lseek(tar_data->fd, 0, SEEK_END) < 0)
1197 {
1199 return -1;
1200 }
1201
1202
1204 {
1205
1206 pg_fatal("could not fsync file \"%s\": %s",
1208 }
1209
1210
1211 pg_free(tf->base.pathname);
1214
1215 return 0;
1216}
1217
1218static bool
1220{
1222
1223 return false;
1224}
1225
1226static bool
1228{
1230 char zerobuf[1024] = {0};
1231
1233
1235 {
1237 return false;
1238 }
1239
1240
1242 {
1243 errno = 0;
1244 if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
1245 {
1246
1247 wwmethod->lasterrno = errno ? errno : ENOSPC;
1248 return false;
1249 }
1250 }
1251#ifdef HAVE_LIBZ
1253 {
1254 if (!tar_write_compressed_data(tar_data, zerobuf, sizeof(zerobuf),
1255 false))
1256 return false;
1257
1258
1259 tar_data->zp->next_in = NULL;
1260 tar_data->zp->avail_in = 0;
1261 while (true)
1262 {
1263 int r;
1264
1265 r = deflate(tar_data->zp, Z_FINISH);
1266
1267 if (r == Z_STREAM_ERROR)
1268 {
1269 wwmethod->lasterrstring = _("could not compress data");
1270 return false;
1271 }
1273 {
1275
1276 errno = 0;
1277 if (write(tar_data->fd, tar_data->zlibOut, len) != len)
1278 {
1279
1280
1281
1282
1283 wwmethod->lasterrno = errno ? errno : ENOSPC;
1284 return false;
1285 }
1286 }
1287 if (r == Z_STREAM_END)
1288 break;
1289 }
1290
1291 if (deflateEnd(tar_data->zp) != Z_OK)
1292 {
1293 wwmethod->lasterrstring = _("could not close compression stream");
1294 return false;
1295 }
1296 }
1297#endif
1298 else
1299 {
1300
1302 }
1303
1304
1305 if (wwmethod->sync)
1306 {
1307 if (fsync(tar_data->fd) != 0)
1308 {
1310 return false;
1311 }
1312 }
1313
1314 if (close(tar_data->fd) != 0)
1315 {
1317 return false;
1318 }
1319
1320 tar_data->fd = -1;
1321
1322 if (wwmethod->sync)
1323 {
1326 {
1328 return false;
1329 }
1330 }
1331
1332 return true;
1333}
1334
1335static void
1337{
1339
1341#ifdef HAVE_LIBZ
1343 pg_free(tar_data->zlibOut);
1344#endif
1346}
1347
1348
1349
1350
1351
1352
1353
1357 int compression_level, bool sync)
1358{
1361 ".tar.gz" : ".tar";
1362
1370
1373 wwmethod->fd = -1;
1374#ifdef HAVE_LIBZ
1377#endif
1378
1379 return &wwmethod->base;
1380}
1381
1382const char *
1384{
1388}
int durable_rename(const char *oldfile, const char *newfile, int elevel)
void fsync_fname(const char *fname, bool isdir)
static int fsync_parent_path(const char *fname, int elevel)
void * pg_malloc(size_t size)
char * pg_strdup(const char *in)
void * pg_malloc0(size_t size)
ssize_t pg_pwrite_zeros(int fd, size_t size, off_t offset)
Assert(PointerIsAligned(start, uint64))
#define pg_log_error(...)
static pg_compress_algorithm compression_algorithm
static size_t tarPaddingBytesRequired(size_t len)
int tarChecksum(char *header)
enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget, pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime)
void print_tar_number(char *s, int len, uint64 val)
size_t strlcpy(char *dst, const char *src, size_t siz)
static int fd(const char *x, int i)
TarMethodFile * currentfile
char header[TAR_BLOCK_SIZE]
Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
const char * lasterrstring
const WalWriteMethodOps * ops
pg_compress_algorithm compression_algorithm
WalWriteMethod * wwmethod
WalWriteMethod * CreateWalTarMethod(const char *tarbase, pg_compress_algorithm compression_algorithm, int compression_level, bool sync)
static char * dir_get_file_name(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
static Walfile * dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
struct DirectoryMethodData DirectoryMethodData
static Walfile * tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
static ssize_t tar_write(Walfile *f, const void *buf, size_t count)
static void tar_free(WalWriteMethod *wwmethod)
static const WalWriteMethodOps WalTarMethodOps
struct TarMethodData TarMethodData
static bool dir_finish(WalWriteMethod *wwmethod)
#define clear_error(wwmethod)
static const WalWriteMethodOps WalDirectoryMethodOps
WalWriteMethod * CreateWalDirectoryMethod(const char *basedir, pg_compress_algorithm compression_algorithm, int compression_level, bool sync)
struct DirectoryMethodFile DirectoryMethodFile
static void dir_free(WalWriteMethod *wwmethod)
static int dir_sync(Walfile *f)
static int tar_sync(Walfile *f)
static bool tar_finish(WalWriteMethod *wwmethod)
static int dir_close(Walfile *f, WalCloseMethod method)
static ssize_t dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname)
static ssize_t tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
struct TarMethodFile TarMethodFile
static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname)
static int tar_close(Walfile *f, WalCloseMethod method)
static ssize_t dir_write(Walfile *f, const void *buf, size_t count)
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
static bool tar_write_padding_data(TarMethodFile *f, size_t bytes)
static char * tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)