PostgreSQL Source Code: src/backend/postmaster/pgarch.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

27

29#include <sys/stat.h>

31

56

57

58

59

60

61

62#define PGARCH_AUTOWAKE_INTERVAL 60

63

64#define PGARCH_RESTART_INTERVAL 10

65

66

67

68

69

70

71#define NUM_ARCHIVE_RETRIES 3

72

73

74

75

76

77#define NUM_ORPHAN_CLEANUP_RETRIES 3

78

79

80

81

82#define NUM_FILES_PER_DIRECTORY_SCAN 64

84

86{

87 int pgprocno;

88

89

90

97

98

99

100

101

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

127 int arch_files_size;

129

132

134

135

136

137

138static volatile sig_atomic_t ready_to_stop = false;

139

140

141

142

143

155

156

159{

160 Size size = 0;

161

163

164 return size;

165}

166

167

168void

170{

171 bool found;

172

175

176 if (!found)

177 {

178

182 }

183}

184

185

186

187

188

189

190

191

192

193

194

195

197bool

199{

200 static time_t last_pgarch_start_time = 0;

201 time_t curtime = time(NULL);

202

203

204

205

206

207 if ((unsigned int) (curtime - last_pgarch_start_time) <

209 return false;

210

211 last_pgarch_start_time = curtime;

212 return true;

213}

214

215

216

217void

218PgArchiverMain(const void *startup_data, size_t startup_data_len)

219{

220 Assert(startup_data_len == 0);

221

224

225

226

227

228

232

237

238

240

241

242 sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);

243

244

246

247

249

250

251

252

253

255

256

259

260

263

264

266 "archiver",

268

269

271

273

275}

276

277

278

280void

282{

284

285

286

287

288

289

290

293}

294

295

296

297static void

299{

300

303}

304

305

306

307

308

310static void

312{

314

315

316

317

318

319

320 do

321 {

323

324

326

327

329

330

331

332

333

334

335

336

338 {

339 time_t curtime = time(NULL);

340

344 (unsigned int) 60)

345 break;

346 }

347

348

350

351

352

353

354

355 if (time\_to\_stop)

356 {

357 int rc;

358

362 WAIT_EVENT_ARCHIVER_MAIN);

365 }

366

367

368

369

370

371

373}

374

375

376

377

378

380static void

382{

384

385

387

388

389

390

391

392

393

395 {

396 int failures = 0;

397 int failures_orphan = 0;

398

399 for (;;)

400 {

401 struct stat stat_buf;

403

404

405

406

407

408

409

410

412 return;

413

414

415

416

417

418

420

421

423

424

427 {

429 (errmsg("\"archive_mode\" enabled, yet archiving is not configured"),

432 return;

433 }

434

435

436

437

438

439

440

441

442

443

445 if (stat(pathname, &stat_buf) != 0 && errno == ENOENT)

446 {

448

450 if (unlink(xlogready) == 0)

451 {

453 (errmsg("removed orphan archive status file \"%s\"",

454 xlogready)));

455

456

457 break;

458 }

459

461 {

463 (errmsg("removal of orphan archive status file \"%s\" failed too many times, will try again later",

464 xlogready)));

465

466

467 return;

468 }

469

470

472 continue;

473 }

474

476 {

477

479

480

481

482

483

485

486 break;

487 }

488 else

489 {

490

491

492

493

495

497 {

499 (errmsg("archiving write-ahead log file \"%s\" failed too many times, will try again later",

500 xlog)));

501 return;

502 }

503 pg_usleep(1000000L);

504 }

505 }

506 }

507}

508

509

510

511

512

513

514

516static bool

518{

519 sigjmp_buf local_sigjmp_buf;

523 bool ret;

524

526

527

528 snprintf(activitymsg, sizeof(activitymsg), "archiving %s", xlog);

530

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548 if (sigsetjmp(local_sigjmp_buf, 1) != 0)

549 {

550

552

553

555

556

558

559

560

561

562

563

564

565

566

567

576

577

578

579

580

583

584

586

587

589

590

592

593

594 ret = false;

595 }

596 else

597 {

598

600

601

603 xlog, pathname);

604

605

607

608

611 }

612

613 if (ret)

614 snprintf(activitymsg, sizeof(activitymsg), "last was %s", xlog);

615 else

616 snprintf(activitymsg, sizeof(activitymsg), "failed on %s", xlog);

618

619 return ret;

620}

621

622

623

624

625

626

627

628

629

630

631

632

633

634

635

636

637

638

639

640

641

642

644static bool

646{

647 char XLogArchiveStatusDir[MAXPGPATH];

648 DIR *rldir;

650

651

652

653

654

657

658

659

660

661

662

663

665 {

666 struct stat st;

668 char *arch_file;

669

673

674 if (stat(status_file, &st) == 0)

675 {

676 strcpy(xlog, arch_file);

677 return true;

678 }

679 else if (errno != ENOENT)

682 errmsg("could not stat file \"%s\": %m", status_file)));

683 }

684

685

687

688

689

690

691

693 rldir = AllocateDir(XLogArchiveStatusDir);

694

695 while ((rlde = ReadDir(rldir, XLogArchiveStatusDir)) != NULL)

696 {

697 int basenamelen = (int) strlen(rlde->d_name) - 6;

699 char *arch_file;

700

701

704 continue;

705

706

708 continue;

709

710

711 if (strcmp(rlde->d_name + basenamelen, ".ready") != 0)

712 continue;

713

714

715 memcpy(basename, rlde->d_name, basenamelen);

716 basename[basenamelen] = '\0';

717

718

719

720

722 {

723

725 strcpy(arch_file, basename);

727

728

731 }

734 {

735

736

737

738

740 strcpy(arch_file, basename);

742 }

743 }

745

746

748 return false;

749

750

751

752

753

756

757

758

759

760

764

765

768

769 return true;

770}

771

772

773

774

775

776

777

778

780static int

782{

787

788

789 if (a_history != b_history)

790 return a_history ? -1 : 1;

791

792

793 return strcmp(a_str, b_str);

794}

795

796

797

798

799

800

801

803void

805{

807}

808

809

810

811

812

813

814

815

817static void

819{

822

825

826

827

828

829

830

831

832

833 if (rename(rlogready, rlogdone) < 0)

836 errmsg("could not rename file \"%s\" to \"%s\": %m",

837 rlogready, rlogdone)));

838}

839

840

841

842

843

844

846static void

848{

850}

851

852

853

854

855

856

857

858

860static void

862{

865

866

869

870

873

875 {

877 bool archiveLibChanged;

878

881

884 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

885 errmsg("both \"archive_command\" and \"archive_library\" set"),

886 errdetail("Only one of \"archive_command\", \"archive_library\" may be set.")));

887

889 pfree(archiveLib);

890

891 if (archiveLibChanged)

892 {

893

894

895

896

897

898

899

900

901

903 (errmsg("restarting archiver process because value of "

904 "\"archive_library\" was changed")));

905

907 }

908 }

909}

910

911

912

913

914

916static void

918{

920

923 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

924 errmsg("both \"archive_command\" and \"archive_library\" set"),

925 errdetail("Only one of \"archive_command\", \"archive_library\" may be set.")));

926

927

928

929

930

933 else

936 "_PG_archive_module_init", false, NULL);

937

938 if (archive_init == NULL)

940 (errmsg("archive modules have to define the symbol %s", "_PG_archive_module_init")));

941

943

946 (errmsg("archive modules must register an archive callback")));

947

951

953}

954

955

956

958static void

960{

963}

void pgaio_error_cleanup(void)

const ArchiveModuleCallbacks *(* ArchiveModuleInit)(void)

static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)

static void pg_atomic_write_membarrier_u32(volatile pg_atomic_uint32 *ptr, uint32 val)

static uint32 pg_atomic_exchange_u32(volatile pg_atomic_uint32 *ptr, uint32 newval)

void AuxiliaryProcessMainCommon(void)

void binaryheap_build(binaryheap *heap)

void binaryheap_reset(binaryheap *heap)

bh_node_type binaryheap_first(binaryheap *heap)

void binaryheap_add(binaryheap *heap, bh_node_type d)

bh_node_type binaryheap_remove_first(binaryheap *heap)

void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)

binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)

#define MemSet(start, val, len)

bool ConditionVariableCancelSleep(void)

void * load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)

void AtEOXact_HashTables(bool isCommit)

void EmitErrorReport(void)

int errdetail_internal(const char *fmt,...)

int errcode_for_file_access(void)

int errdetail(const char *fmt,...)

ErrorContextCallback * error_context_stack

void FlushErrorState(void)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

sigjmp_buf * PG_exception_stack

#define ereport(elevel,...)

void AtEOXact_Files(bool isCommit)

DIR * AllocateDir(const char *dirname)

struct dirent * ReadDir(DIR *dir, const char *dirname)

volatile sig_atomic_t LogMemoryContextPending

volatile sig_atomic_t ProcSignalBarrierPending

volatile sig_atomic_t PublishMemoryContextPending

void ProcessConfigFile(GucContext context)

Assert(PointerIsAligned(start, uint64))

void SignalHandlerForShutdownRequest(SIGNAL_ARGS)

volatile sig_atomic_t ShutdownRequestPending

volatile sig_atomic_t ConfigReloadPending

void SignalHandlerForConfigReload(SIGNAL_ARGS)

void on_shmem_exit(pg_on_exit_callback function, Datum arg)

void before_shmem_exit(pg_on_exit_callback function, Datum arg)

void SetLatch(Latch *latch)

void ResetLatch(Latch *latch)

int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)

void LWLockReleaseAll(void)

void MemoryContextReset(MemoryContext context)

char * pstrdup(const char *in)

void pfree(void *pointer)

void * palloc0(Size size)

void ProcessGetMemoryContextInterrupt(void)

MemoryContext TopMemoryContext

void ProcessLogMemoryContextInterrupt(void)

#define AllocSetContextCreate

#define ALLOCSET_DEFAULT_SIZES

#define RESUME_INTERRUPTS()

#define HOLD_INTERRUPTS()

BackendType MyBackendType

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

static volatile sig_atomic_t time_to_stop

static bool pgarch_archiveXlog(char *xlog)

static void pgarch_die(int code, Datum arg)

#define PGARCH_RESTART_INTERVAL

bool PgArchCanRestart(void)

static bool pgarch_readyXlog(char *xlog)

static PgArchData * PgArch

char * arch_module_check_errdetail_string

static void pgarch_MainLoop(void)

#define NUM_ARCHIVE_RETRIES

struct PgArchData PgArchData

static void pgarch_waken_stop(SIGNAL_ARGS)

static volatile sig_atomic_t ready_to_stop

static struct arch_files_state * arch_files

char * XLogArchiveLibrary

Size PgArchShmemSize(void)

static void LoadArchiveLibrary(void)

static void pgarch_ArchiverCopyLoop(void)

static int ready_file_comparator(Datum a, Datum b, void *arg)

void PgArchiverMain(const void *startup_data, size_t startup_data_len)

#define NUM_FILES_PER_DIRECTORY_SCAN

void PgArchForceDirScan(void)

static void ProcessPgArchInterrupts(void)

static const ArchiveModuleCallbacks * ArchiveCallbacks

static ArchiveModuleState * archive_module_state

void PgArchShmemInit(void)

static void pgarch_call_module_shutdown_cb(int code, Datum arg)

static void pgarch_archiveDone(char *xlog)

#define NUM_ORPHAN_CLEANUP_RETRIES

static MemoryContext archive_context

#define PGARCH_AUTOWAKE_INTERVAL

static time_t last_sigterm_time

void pgstat_report_archiver(const char *xlog, bool failed)

#define PostmasterIsAlive()

static char * DatumGetCString(Datum X)

static Datum CStringGetDatum(const char *X)

#define INVALID_PROC_NUMBER

void ProcessProcSignalBarrier(void)

void procsignal_sigusr1_handler(SIGNAL_ARGS)

static void set_ps_display(const char *activity)

void ReleaseAuxProcessResources(bool isCommit)

const ArchiveModuleCallbacks * shell_archive_init(void)

Size add_size(Size s1, Size s2)

void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)

void pg_usleep(long microsec)

ArchiveFileCB archive_file_cb

ArchiveShutdownCB shutdown_cb

ArchiveCheckConfiguredCB check_configured_cb

ArchiveStartupCB startup_cb

pg_atomic_uint32 force_dir_scan

char arch_filenames[NUM_FILES_PER_DIRECTORY_SCAN][MAX_XFN_CHARS+1]

char * arch_files[NUM_FILES_PER_DIRECTORY_SCAN]

void disable_all_timeouts(bool keep_indicators)

static void pgstat_report_wait_end(void)

#define WL_POSTMASTER_DEATH

char * XLogArchiveCommand

#define XLogArchivingActive()

static bool IsTLHistoryFileName(const char *fname)

static void StatusFilePath(char *path, const char *xlog, const char *suffix)