PostgreSQL Source Code: src/backend/postmaster/pgarch.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
27
29#include <sys/stat.h>
31
56
57
58
59
60
61
62#define PGARCH_AUTOWAKE_INTERVAL 60
64#define PGARCH_RESTART_INTERVAL 10
65
66
67
68
70
71#define NUM_ARCHIVE_RETRIES 3
72
73
74
76
77#define NUM_ORPHAN_CLEANUP_RETRIES 3
78
79
81
82#define NUM_FILES_PER_DIRECTORY_SCAN 64
84
86{
87 int pgprocno;
88
89
97
98
99
100
108
109
110
111
112
113
114
115
116
117
118
119
120
121
123
127 int arch_files_size;
129
132
134
135
137
138static volatile sig_atomic_t ready_to_stop = false;
139
140
141
142
143
155
159{
160 Size size = 0;
161
163
164 return size;
165}
166
168void
170{
171 bool found;
172
175
176 if (!found)
177 {
178
182 }
183}
184
185
186
187
188
189
190
191
192
193
194
195
197bool
199{
200 static time_t last_pgarch_start_time = 0;
201 time_t curtime = time(NULL);
202
203
204
205
206
207 if ((unsigned int) (curtime - last_pgarch_start_time) <
209 return false;
210
211 last_pgarch_start_time = curtime;
212 return true;
213}
214
215
217void
218PgArchiverMain(const void *startup_data, size_t startup_data_len)
219{
220 Assert(startup_data_len == 0);
221
224
225
226
227
228
232
237
238
240
241
242 sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
243
244
246
247
249
250
251
252
253
255
256
259
260
263
264
266 "archiver",
268
269
271
273
275}
276
277
278
280void
282{
284
285
286
287
288
289
290
293}
294
295
297static void
299{
300
303}
304
305
306
307
308
310static void
312{
314
315
316
317
318
319
320 do
321 {
323
324
326
327
329
330
331
332
333
334
335
336
338 {
339 time_t curtime = time(NULL);
340
344 (unsigned int) 60)
345 break;
346 }
347
348
350
351
352
353
354
355 if ()
356 {
357 int rc;
358
362 WAIT_EVENT_ARCHIVER_MAIN);
365 }
366
367
368
369
370
371
373}
374
375
376
377
378
380static void
382{
384
385
387
388
389
390
391
392
393
395 {
396 int failures = 0;
397 int failures_orphan = 0;
398
399 for (;;)
400 {
401 struct stat stat_buf;
403
404
405
406
407
408
409
410
412 return;
413
414
415
416
417
418
420
421
423
424
427 {
429 (errmsg("\"archive_mode\" enabled, yet archiving is not configured"),
432 return;
433 }
434
435
436
437
438
439
440
441
442
443
445 if (stat(pathname, &stat_buf) != 0 && errno == ENOENT)
446 {
448
450 if (unlink(xlogready) == 0)
451 {
453 (errmsg("removed orphan archive status file \"%s\"",
454 xlogready)));
455
456
457 break;
458 }
459
461 {
463 (errmsg("removal of orphan archive status file \"%s\" failed too many times, will try again later",
464 xlogready)));
465
466
467 return;
468 }
469
470
472 continue;
473 }
474
476 {
477
479
480
481
482
483
485
486 break;
487 }
488 else
489 {
490
491
492
493
495
497 {
499 (errmsg("archiving write-ahead log file \"%s\" failed too many times, will try again later",
500 xlog)));
501 return;
502 }
503 pg_usleep(1000000L);
504 }
505 }
506 }
507}
508
509
510
511
512
513
514
516static bool
518{
519 sigjmp_buf local_sigjmp_buf;
523 bool ret;
524
526
527
528 snprintf(activitymsg, sizeof(activitymsg), "archiving %s", xlog);
530
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
549 {
550
552
553
555
556
558
559
560
561
562
563
564
565
566
567
576
577
578
579
580
583
584
586
587
589
590
592
593
594 ret = false;
595 }
596 else
597 {
598
600
601
603 xlog, pathname);
604
605
607
608
611 }
612
613 if (ret)
614 snprintf(activitymsg, sizeof(activitymsg), "last was %s", xlog);
615 else
616 snprintf(activitymsg, sizeof(activitymsg), "failed on %s", xlog);
618
619 return ret;
620}
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
644static bool
646{
647 char XLogArchiveStatusDir[MAXPGPATH];
648 DIR *rldir;
650
651
652
653
654
657
658
659
660
661
662
663
665 {
666 struct stat st;
668 char *arch_file;
669
673
674 if (stat(status_file, &st) == 0)
675 {
676 strcpy(xlog, arch_file);
677 return true;
678 }
679 else if (errno != ENOENT)
682 errmsg("could not stat file \"%s\": %m", status_file)));
683 }
684
685
687
688
689
690
691
693 rldir = AllocateDir(XLogArchiveStatusDir);
694
695 while ((rlde = ReadDir(rldir, XLogArchiveStatusDir)) != NULL)
696 {
697 int basenamelen = (int) strlen(rlde->d_name) - 6;
699 char *arch_file;
700
701
704 continue;
705
706
708 continue;
709
710
711 if (strcmp(rlde->d_name + basenamelen, ".ready") != 0)
712 continue;
713
714
715 memcpy(basename, rlde->d_name, basenamelen);
716 basename[basenamelen] = '\0';
717
718
719
720
722 {
723
725 strcpy(arch_file, basename);
727
728
731 }
734 {
735
736
737
738
740 strcpy(arch_file, basename);
742 }
743 }
745
746
748 return false;
749
750
751
752
753
756
757
758
759
760
764
765
768
769 return true;
770}
771
772
773
774
775
776
777
778
780static int
782{
787
788
789 if (a_history != b_history)
790 return a_history ? -1 : 1;
791
792
793 return strcmp(a_str, b_str);
794}
795
796
797
798
799
800
801
803void
805{
807}
808
809
810
811
812
813
814
815
817static void
819{
822
825
826
827
828
829
830
831
832
833 if (rename(rlogready, rlogdone) < 0)
836 errmsg("could not rename file \"%s\" to \"%s\": %m",
837 rlogready, rlogdone)));
838}
839
840
841
842
843
844
846static void
848{
850}
851
852
853
854
855
856
857
858
860static void
862{
865
866
869
870
873
875 {
877 bool archiveLibChanged;
878
881
884 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
885 errmsg("both \"archive_command\" and \"archive_library\" set"),
886 errdetail("Only one of \"archive_command\", \"archive_library\" may be set.")));
887
889 pfree(archiveLib);
890
891 if (archiveLibChanged)
892 {
893
894
895
896
897
898
899
900
901
903 (errmsg("restarting archiver process because value of "
904 "\"archive_library\" was changed")));
905
907 }
908 }
909}
910
911
912
913
914
916static void
918{
920
923 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
924 errmsg("both \"archive_command\" and \"archive_library\" set"),
925 errdetail("Only one of \"archive_command\", \"archive_library\" may be set.")));
926
927
928
929
930
933 else
936 "_PG_archive_module_init", false, NULL);
937
938 if (archive_init == NULL)
940 (errmsg("archive modules have to define the symbol %s", "_PG_archive_module_init")));
941
943
946 (errmsg("archive modules must register an archive callback")));
947
951
953}
954
955
956
958static void
960{
963}
void pgaio_error_cleanup(void)
const ArchiveModuleCallbacks *(* ArchiveModuleInit)(void)
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static void pg_atomic_write_membarrier_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_exchange_u32(volatile pg_atomic_uint32 *ptr, uint32 newval)
void AuxiliaryProcessMainCommon(void)
void binaryheap_build(binaryheap *heap)
void binaryheap_reset(binaryheap *heap)
bh_node_type binaryheap_first(binaryheap *heap)
void binaryheap_add(binaryheap *heap, bh_node_type d)
bh_node_type binaryheap_remove_first(binaryheap *heap)
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
#define MemSet(start, val, len)
bool ConditionVariableCancelSleep(void)
void * load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)
void AtEOXact_HashTables(bool isCommit)
void EmitErrorReport(void)
int errdetail_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
void FlushErrorState(void)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
sigjmp_buf * PG_exception_stack
#define ereport(elevel,...)
void AtEOXact_Files(bool isCommit)
DIR * AllocateDir(const char *dirname)
struct dirent * ReadDir(DIR *dir, const char *dirname)
volatile sig_atomic_t LogMemoryContextPending
volatile sig_atomic_t ProcSignalBarrierPending
volatile sig_atomic_t PublishMemoryContextPending
void ProcessConfigFile(GucContext context)
Assert(PointerIsAligned(start, uint64))
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
void LWLockReleaseAll(void)
void MemoryContextReset(MemoryContext context)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
void ProcessGetMemoryContextInterrupt(void)
MemoryContext TopMemoryContext
void ProcessLogMemoryContextInterrupt(void)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define HOLD_INTERRUPTS()
BackendType MyBackendType
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static volatile sig_atomic_t time_to_stop
static bool pgarch_archiveXlog(char *xlog)
static void pgarch_die(int code, Datum arg)
#define PGARCH_RESTART_INTERVAL
bool PgArchCanRestart(void)
static bool pgarch_readyXlog(char *xlog)
static PgArchData * PgArch
char * arch_module_check_errdetail_string
static void pgarch_MainLoop(void)
#define NUM_ARCHIVE_RETRIES
struct PgArchData PgArchData
static void pgarch_waken_stop(SIGNAL_ARGS)
static volatile sig_atomic_t ready_to_stop
static struct arch_files_state * arch_files
char * XLogArchiveLibrary
Size PgArchShmemSize(void)
static void LoadArchiveLibrary(void)
static void pgarch_ArchiverCopyLoop(void)
static int ready_file_comparator(Datum a, Datum b, void *arg)
void PgArchiverMain(const void *startup_data, size_t startup_data_len)
#define NUM_FILES_PER_DIRECTORY_SCAN
void PgArchForceDirScan(void)
static void ProcessPgArchInterrupts(void)
static const ArchiveModuleCallbacks * ArchiveCallbacks
static ArchiveModuleState * archive_module_state
void PgArchShmemInit(void)
static void pgarch_call_module_shutdown_cb(int code, Datum arg)
static void pgarch_archiveDone(char *xlog)
#define NUM_ORPHAN_CLEANUP_RETRIES
static MemoryContext archive_context
#define PGARCH_AUTOWAKE_INTERVAL
static time_t last_sigterm_time
void pgstat_report_archiver(const char *xlog, bool failed)
#define PostmasterIsAlive()
static char * DatumGetCString(Datum X)
static Datum CStringGetDatum(const char *X)
#define INVALID_PROC_NUMBER
void ProcessProcSignalBarrier(void)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
static void set_ps_display(const char *activity)
void ReleaseAuxProcessResources(bool isCommit)
const ArchiveModuleCallbacks * shell_archive_init(void)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
void pg_usleep(long microsec)
ArchiveFileCB archive_file_cb
ArchiveShutdownCB shutdown_cb
ArchiveCheckConfiguredCB check_configured_cb
ArchiveStartupCB startup_cb
pg_atomic_uint32 force_dir_scan
char arch_filenames[NUM_FILES_PER_DIRECTORY_SCAN][MAX_XFN_CHARS+1]
char * arch_files[NUM_FILES_PER_DIRECTORY_SCAN]
void disable_all_timeouts(bool keep_indicators)
static void pgstat_report_wait_end(void)
#define WL_POSTMASTER_DEATH
char * XLogArchiveCommand
#define XLogArchivingActive()
static bool IsTLHistoryFileName(const char *fname)
static void StatusFilePath(char *path, const char *xlog, const char *suffix)