PostgreSQL Source Code: src/bin/pg_basebackup/walmethods.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

13

14#include <fcntl.h>

15#include <sys/stat.h>

18

19#ifdef USE_LZ4

20#include <lz4frame.h>

21#endif

22#ifdef HAVE_LIBZ

23#include <zlib.h>

24#endif

25

31

32

33#define ZLIB_OUT_SIZE 4096

34

35

36#define LZ4_IN_SIZE 4096

37

38

39

40

41

42

44 const char *pathname,

45 const char *temp_suffix,

46 size_t pad_to_size);

50 const char *pathname);

52 const char *pathname, const char *temp_suffix);

57

68};

69

70

71

72

74{

78

79

80

81

83{

88#ifdef HAVE_LIBZ

89 gzFile gzfp;

90#endif

91#ifdef USE_LZ4

92 LZ4F_compressionContext_t ctx;

93 size_t lz4bufsize;

94 void *lz4buf;

95#endif

97

98#define clear_error(wwmethod) \

99 ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0)

100

101static char *

103 const char *pathname, const char *temp_suffix)

104{

106

108 pathname,

111 temp_suffix ? temp_suffix : "");

112

114}

115

118 const char *temp_suffix, size_t pad_to_size)

119{

123 int fd;

125#ifdef HAVE_LIBZ

126 gzFile gzfp = NULL;

127#endif

128#ifdef USE_LZ4

129 LZ4F_compressionContext_t ctx = NULL;

130 size_t lz4bufsize = 0;

131 void *lz4buf = NULL;

132#endif

133

135

137 snprintf(tmppath, sizeof(tmppath), "%s/%s",

140

141

142

143

144

145

146

148 if (fd < 0)

149 {

151 return NULL;

152 }

153

154#ifdef HAVE_LIBZ

156 {

157 gzfp = gzdopen(fd, "wb");

158 if (gzfp == NULL)

159 {

162 return NULL;

163 }

164

166 Z_DEFAULT_STRATEGY) != Z_OK)

167 {

169 gzclose(gzfp);

170 return NULL;

171 }

172 }

173#endif

174#ifdef USE_LZ4

176 {

177 size_t ctx_out;

178 size_t header_size;

179 LZ4F_preferences_t prefs;

180

181 ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);

182 if (LZ4F_isError(ctx_out))

183 {

184 wwmethod->lasterrstring = LZ4F_getErrorName(ctx_out);

186 return NULL;

187 }

188

189 lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL);

191

192

193 memset(&prefs, 0, sizeof(prefs));

195

196

197 header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs);

198 if (LZ4F_isError(header_size))

199 {

200 wwmethod->lasterrstring = LZ4F_getErrorName(header_size);

201 (void) LZ4F_freeCompressionContext(ctx);

204 return NULL;

205 }

206

207 errno = 0;

208 if (write(fd, lz4buf, header_size) != header_size)

209 {

210

211 wwmethod->lasterrno = errno ? errno : ENOSPC;

212 (void) LZ4F_freeCompressionContext(ctx);

215 return NULL;

216 }

217 }

218#endif

219

220

222 {

223 ssize_t rc;

224

226

227 if (rc < 0)

228 {

231 return NULL;

232 }

233

234

235

236

237

238 if (lseek(fd, 0, SEEK_SET) != 0)

239 {

242 return NULL;

243 }

244 }

245

246

247

248

249

250

251

252 if (wwmethod->sync)

253 {

256 {

258#ifdef HAVE_LIBZ

260 gzclose(gzfp);

261 else

262#endif

263#ifdef USE_LZ4

265 {

266 (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);

267 (void) LZ4F_freeCompressionContext(ctx);

270 }

271 else

272#endif

274 return NULL;

275 }

276 }

277

279#ifdef HAVE_LIBZ

281 f->gzfp = gzfp;

282#endif

283#ifdef USE_LZ4

285 {

286 f->ctx = ctx;

287 f->lz4buf = lz4buf;

288 f->lz4bufsize = lz4bufsize;

289 }

290#endif

291

297 if (temp_suffix)

299

300 return &f->base;

301}

302

303static ssize_t

305{

306 ssize_t r;

308

311

312#ifdef HAVE_LIBZ

314 {

315 errno = 0;

316 r = (ssize_t) gzwrite(df->gzfp, buf, count);

317 if (r != count)

318 {

319

321 }

322 }

323 else

324#endif

325#ifdef USE_LZ4

327 {

328 size_t chunk;

330 const void *inbuf = buf;

331

334 {

335 size_t compressed;

336

339 else

341

343 compressed = LZ4F_compressUpdate(df->ctx,

344 df->lz4buf, df->lz4bufsize,

345 inbuf, chunk,

346 NULL);

347

348 if (LZ4F_isError(compressed))

349 {

351 return -1;

352 }

353

354 errno = 0;

355 if (write(df->fd, df->lz4buf, compressed) != compressed)

356 {

357

359 return -1;

360 }

361

362 inbuf = ((char *) inbuf) + chunk;

363 }

364

365

366 r = (ssize_t) count;

367 }

368 else

369#endif

370 {

371 errno = 0;

373 if (r != count)

374 {

375

377 }

378 }

379 if (r > 0)

381 return r;

382}

383

384static int

386{

387 int r;

392

395

396#ifdef HAVE_LIBZ

398 {

399 errno = 0;

400 r = gzclose(df->gzfp);

401 }

402 else

403#endif

404#ifdef USE_LZ4

406 {

407 size_t compressed;

408

409 compressed = LZ4F_compressEnd(df->ctx,

410 df->lz4buf, df->lz4bufsize,

411 NULL);

412

413 if (LZ4F_isError(compressed))

414 {

416 return -1;

417 }

418

419 errno = 0;

420 if (write(df->fd, df->lz4buf, compressed) != compressed)

421 {

422

424 return -1;

425 }

426

428 }

429 else

430#endif

432

433 if (r == 0)

434 {

435

437 {

439 char *filename2;

440

441

442

443

444

447 snprintf(tmppath, sizeof(tmppath), "%s/%s",

450

451

453 snprintf(tmppath2, sizeof(tmppath2), "%s/%s",

454 dir_data->basedir, filename2);

458 else

459 {

460 if (rename(tmppath, tmppath2) != 0)

461 {

462 pg_log_error("could not rename file \"%s\" to \"%s\": %m",

463 tmppath, tmppath2);

464 r = -1;

465 }

466 }

467 }

469 {

471

472

475 snprintf(tmppath, sizeof(tmppath), "%s/%s",

478 r = unlink(tmppath);

479 }

480 else

481 {

482

483

484

485

486

488 {

490 if (r == 0)

492 }

493 }

494 }

495

496 if (r != 0)

498

499#ifdef USE_LZ4

501

502 LZ4F_freeCompressionContext(df->ctx);

503#endif

504

509

510 return r;

511}

512

513static int

515{

516 int r;

517

520

522 return 0;

523

524#ifdef HAVE_LIBZ

526 {

528 {

529 f->wwmethod->lasterrno = errno;

530 return -1;

531 }

532 }

533#endif

534#ifdef USE_LZ4

536 {

538 size_t compressed;

539

540

541 compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);

542 if (LZ4F_isError(compressed))

543 {

545 return -1;

546 }

547

548 errno = 0;

549 if (write(df->fd, df->lz4buf, compressed) != compressed)

550 {

551

553 return -1;

554 }

555 }

556#endif

557

559 if (r < 0)

561 return r;

562}

563

564static ssize_t

566{

568 struct stat statbuf;

570

571 snprintf(tmppath, sizeof(tmppath), "%s/%s",

572 dir_data->basedir, pathname);

573

574 if (stat(tmppath, &statbuf) != 0)

575 {

577 return -1;

578 }

579

581}

582

583static bool

585{

588 int fd;

589

591

592 snprintf(tmppath, sizeof(tmppath), "%s/%s",

593 dir_data->basedir, pathname);

594

595 fd = open(tmppath, O_RDONLY | PG_BINARY, 0);

596 if (fd < 0)

597

598

599

600

601

602 return false;

604 return true;

605}

606

607static bool

609{

611

612 if (wwmethod->sync)

613 {

615

616

617

618

619

621 {

623 return false;

624 }

625 }

626 return true;

627}

628

629static void

631{

633

636}

637

638

642 int compression_level, bool sync)

643{

645

654

655 return &wwmethod->base;

656}

657

658

659

660

661

662

663

665 const char *pathname,

666 const char *temp_suffix,

667 size_t pad_to_size);

671 const char *pathname);

673 const char *pathname, const char *temp_suffix);

678

689};

690

692{

698

700{

705#ifdef HAVE_LIBZ

706 z_streamp zp;

707 void *zlibOut;

708#endif

710

711#ifdef HAVE_LIBZ

712static bool

713tar_write_compressed_data(TarMethodData *tar_data, const void *buf, size_t count,

714 bool flush)

715{

716 tar_data->zp->next_in = buf;

717 tar_data->zp->avail_in = count;

718

719 while (tar_data->zp->avail_in || flush)

720 {

721 int r;

722

723 r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);

724 if (r == Z_STREAM_ERROR)

725 {

727 return false;

728 }

729

731 {

733

734 errno = 0;

735 if (write(tar_data->fd, tar_data->zlibOut, len) != len)

736 {

737

738 tar_data->base.lasterrno = errno ? errno : ENOSPC;

739 return false;

740 }

741

742 tar_data->zp->next_out = tar_data->zlibOut;

744 }

745

746 if (r == Z_STREAM_END)

747 break;

748 }

749

750 if (flush)

751 {

752

753 if (deflateReset(tar_data->zp) != Z_OK)

754 {

755 tar_data->base.lasterrstring = _("could not reset compression stream");

756 return false;

757 }

758 }

759

760 return true;

761}

762#endif

763

764static ssize_t

766{

768 ssize_t r;

769

772

773

775 {

776 errno = 0;

778 if (r != count)

779 {

780

782 return -1;

783 }

785 return r;

786 }

787#ifdef HAVE_LIBZ

789 {

790 if (!tar_write_compressed_data(tar_data, buf, count, false))

791 return -1;

793 return count;

794 }

795#endif

796 else

797 {

798

800 return -1;

801 }

802}

803

804static bool

806{

808 size_t bytesleft = bytes;

809

810 memset(zerobuf.data, 0, XLOG_BLCKSZ);

811 while (bytesleft)

812 {

813 size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ);

815

816 if (r < 0)

817 return false;

818 bytesleft -= r;

819 }

820

821 return true;

822}

823

824static char *

826 const char *temp_suffix)

827{

829

831 pathname, temp_suffix ? temp_suffix : "");

832

834}

835

838 const char *temp_suffix, size_t pad_to_size)

839{

841 char *tmppath;

842

844

845 if (tar_data->fd < 0)

846 {

847

848

849

853 if (tar_data->fd < 0)

854 {

856 return NULL;

857 }

858

859#ifdef HAVE_LIBZ

861 {

862 tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));

863 tar_data->zp->zalloc = Z_NULL;

864 tar_data->zp->zfree = Z_NULL;

865 tar_data->zp->opaque = Z_NULL;

866 tar_data->zp->next_out = tar_data->zlibOut;

868

869

870

871

872

873

875 Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)

876 {

878 tar_data->zp = NULL;

880 _("could not initialize compression library");

881 return NULL;

882 }

883 }

884#endif

885

886

887 }

888

890 {

892 _("implementation error: tar files can't have more than one open file");

893 return NULL;

894 }

895

898

900

901

903 {

907 wwmethod->lasterrstring = _("could not create tar header");

908 return NULL;

909 }

910

912

913#ifdef HAVE_LIBZ

915 {

916

917 if (!tar_write_compressed_data(tar_data, NULL, 0, true))

918 return NULL;

919

920

921 if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)

922 {

924 _("could not change compression parameters");

925 return NULL;

926 }

927 }

928#endif

929

932 {

936 return NULL;

937 }

939

941 {

942 errno = 0;

945 {

946

947 wwmethod->lasterrno = errno ? errno : ENOSPC;

950 return NULL;

951 }

952 }

953#ifdef HAVE_LIBZ

955 {

956

957 if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,

959 return NULL;

960

961

963 Z_DEFAULT_STRATEGY) != Z_OK)

964 {

965 wwmethod->lasterrstring = _("could not change compression parameters");

966 return NULL;

967 }

968 }

969#endif

970 else

971 {

972

974 }

975

977

978

979

980

981

982 if (pad_to_size)

983 {

986 {

987

989 return NULL;

990

991 if (lseek(tar_data->fd,

994 {

996 return NULL;

997 }

998

1000 }

1001 }

1002

1004}

1005

1006static ssize_t

1008{

1010

1011

1013 return -1;

1014}

1015

1016static int

1018{

1020 int r;

1021

1024

1026 return 0;

1027

1028

1029

1030

1031

1033 return 0;

1034

1035 r = fsync(tar_data->fd);

1036 if (r < 0)

1038 return r;

1039}

1040

1041static int

1043{

1044 ssize_t filesize;

1045 int padding;

1048

1051

1053 {

1055 {

1057 return -1;

1058 }

1059

1060

1061

1062

1063

1064

1065 if (ftruncate(tar_data->fd, tf->ofs_start) != 0)

1066 {

1068 return -1;

1069 }

1070

1071 pg_free(tf->base.pathname);

1074

1075 return 0;

1076 }

1077

1078

1079

1080

1081

1082

1083 if (tf->pad_to_size)

1084 {

1086 {

1087

1088

1089

1090

1091 size_t sizeleft = tf->pad_to_size - tf->base.currpos;

1092

1093 if (sizeleft)

1094 {

1096 return -1;

1097 }

1098 }

1099 else

1100 {

1101

1102

1103

1104

1105 tf->base.currpos = tf->pad_to_size;

1106 }

1107 }

1108

1109

1110

1111

1112

1115 if (padding)

1116 {

1118

1119 if (tar_write(f, zerobuf, padding) != padding)

1120 return -1;

1121 }

1122

1123

1124#ifdef HAVE_LIBZ

1126 {

1127

1128 if (!tar_write_compressed_data(tar_data, NULL, 0, true))

1129 return -1;

1130 }

1131#endif

1132

1133

1134

1135

1136

1137

1139

1141

1142

1143

1144

1145

1147

1150 if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)

1151 {

1153 return -1;

1154 }

1156 {

1157 errno = 0;

1159 {

1160

1162 return -1;

1163 }

1164 }

1165#ifdef HAVE_LIBZ

1167 {

1168

1169 if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)

1170 {

1172 return -1;

1173 }

1174

1175

1176 if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,

1178 return -1;

1179

1180

1182 Z_DEFAULT_STRATEGY) != Z_OK)

1183 {

1185 return -1;

1186 }

1187 }

1188#endif

1189 else

1190 {

1191

1193 }

1194

1195

1196 if (lseek(tar_data->fd, 0, SEEK_END) < 0)

1197 {

1199 return -1;

1200 }

1201

1202

1204 {

1205

1206 pg_fatal("could not fsync file \"%s\": %s",

1208 }

1209

1210

1211 pg_free(tf->base.pathname);

1214

1215 return 0;

1216}

1217

1218static bool

1220{

1222

1223 return false;

1224}

1225

1226static bool

1228{

1230 char zerobuf[1024] = {0};

1231

1233

1235 {

1237 return false;

1238 }

1239

1240

1242 {

1243 errno = 0;

1244 if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))

1245 {

1246

1247 wwmethod->lasterrno = errno ? errno : ENOSPC;

1248 return false;

1249 }

1250 }

1251#ifdef HAVE_LIBZ

1253 {

1254 if (!tar_write_compressed_data(tar_data, zerobuf, sizeof(zerobuf),

1255 false))

1256 return false;

1257

1258

1259 tar_data->zp->next_in = NULL;

1260 tar_data->zp->avail_in = 0;

1261 while (true)

1262 {

1263 int r;

1264

1265 r = deflate(tar_data->zp, Z_FINISH);

1266

1267 if (r == Z_STREAM_ERROR)

1268 {

1269 wwmethod->lasterrstring = _("could not compress data");

1270 return false;

1271 }

1273 {

1275

1276 errno = 0;

1277 if (write(tar_data->fd, tar_data->zlibOut, len) != len)

1278 {

1279

1280

1281

1282

1283 wwmethod->lasterrno = errno ? errno : ENOSPC;

1284 return false;

1285 }

1286 }

1287 if (r == Z_STREAM_END)

1288 break;

1289 }

1290

1291 if (deflateEnd(tar_data->zp) != Z_OK)

1292 {

1293 wwmethod->lasterrstring = _("could not close compression stream");

1294 return false;

1295 }

1296 }

1297#endif

1298 else

1299 {

1300

1302 }

1303

1304

1305 if (wwmethod->sync)

1306 {

1307 if (fsync(tar_data->fd) != 0)

1308 {

1310 return false;

1311 }

1312 }

1313

1314 if (close(tar_data->fd) != 0)

1315 {

1317 return false;

1318 }

1319

1320 tar_data->fd = -1;

1321

1322 if (wwmethod->sync)

1323 {

1326 {

1328 return false;

1329 }

1330 }

1331

1332 return true;

1333}

1334

1335static void

1337{

1339

1341#ifdef HAVE_LIBZ

1343 pg_free(tar_data->zlibOut);

1344#endif

1346}

1347

1348

1349

1350

1351

1352

1353

1357 int compression_level, bool sync)

1358{

1361 ".tar.gz" : ".tar";

1362

1370

1373 wwmethod->fd = -1;

1374#ifdef HAVE_LIBZ

1377#endif

1378

1379 return &wwmethod->base;

1380}

1381

1382const char *

1384{

1388}

int durable_rename(const char *oldfile, const char *newfile, int elevel)

void fsync_fname(const char *fname, bool isdir)

static int fsync_parent_path(const char *fname, int elevel)

void * pg_malloc(size_t size)

char * pg_strdup(const char *in)

void * pg_malloc0(size_t size)

ssize_t pg_pwrite_zeros(int fd, size_t size, off_t offset)

Assert(PointerIsAligned(start, uint64))

#define pg_log_error(...)

static pg_compress_algorithm compression_algorithm

static size_t tarPaddingBytesRequired(size_t len)

int tarChecksum(char *header)

enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget, pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime)

void print_tar_number(char *s, int len, uint64 val)

size_t strlcpy(char *dst, const char *src, size_t siz)

static int fd(const char *x, int i)

TarMethodFile * currentfile

char header[TAR_BLOCK_SIZE]

Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)

const char * lasterrstring

const WalWriteMethodOps * ops

pg_compress_algorithm compression_algorithm

WalWriteMethod * wwmethod

WalWriteMethod * CreateWalTarMethod(const char *tarbase, pg_compress_algorithm compression_algorithm, int compression_level, bool sync)

static char * dir_get_file_name(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)

static Walfile * dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)

struct DirectoryMethodData DirectoryMethodData

static Walfile * tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)

static ssize_t tar_write(Walfile *f, const void *buf, size_t count)

static void tar_free(WalWriteMethod *wwmethod)

static const WalWriteMethodOps WalTarMethodOps

struct TarMethodData TarMethodData

static bool dir_finish(WalWriteMethod *wwmethod)

#define clear_error(wwmethod)

static const WalWriteMethodOps WalDirectoryMethodOps

WalWriteMethod * CreateWalDirectoryMethod(const char *basedir, pg_compress_algorithm compression_algorithm, int compression_level, bool sync)

struct DirectoryMethodFile DirectoryMethodFile

static void dir_free(WalWriteMethod *wwmethod)

static int dir_sync(Walfile *f)

static int tar_sync(Walfile *f)

static bool tar_finish(WalWriteMethod *wwmethod)

static int dir_close(Walfile *f, WalCloseMethod method)

static ssize_t dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname)

static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname)

static ssize_t tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname)

struct TarMethodFile TarMethodFile

static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname)

static int tar_close(Walfile *f, WalCloseMethod method)

static ssize_t dir_write(Walfile *f, const void *buf, size_t count)

const char * GetLastWalMethodError(WalWriteMethod *wwmethod)

static bool tar_write_padding_data(TarMethodFile *f, size_t bytes)

static char * tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)