PostgreSQL Source Code: src/bin/pg_basebackup/streamutil.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
16
19
29
30#define ERRCODE_DUPLICATE_OBJECT "42710"
31
33
35
36
37#define MINIMUM_VERSION_FOR_SHOW_CMD 100000
38
39
40
41
42#define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000
43
53
54
55
56
57
58
61{
63 int argcount = 7;
64
65 int i;
68 const char *tmpparam;
69 bool need_password;
72 char *err_msg = NULL;
73
74
75
76
77
79
80
81
82
83
84 i = 0;
86 {
88 if (conn_opts == NULL)
90
91 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
92 {
93 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
94 argcount++;
95 }
96
99
100
101
102
103
105 values[i] = "replication";
106 i++;
107
108 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
109 {
110 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
111 {
114 i++;
115 }
116 }
117 }
118 else
119 {
124 i++;
125 }
126
128 values[i] = (dbname == NULL) ? "true" : "database";
129 i++;
130 keywords[i] = "fallback_application_name";
132 i++;
133
135 {
138 i++;
139 }
141 {
144 i++;
145 }
147 {
150 i++;
151 }
152
153
155
156 do
157 {
158
159 if (need_password)
160 {
163 need_password = false;
164 }
165
166
168 {
171 }
172 else
173 {
176 }
177
178
179
180
181
183
184
185
186
187
188 if (!tmpconn)
189 pg_fatal("could not connect to server");
190
191
195 {
197 need_password = true;
198 }
199 }
200 while (need_password);
201
203 {
209 return NULL;
210 }
211
212
216
217
218
219
220
221
222
224 {
226
229 {
230 pg_log_error("could not clear \"search_path\": %s",
234 exit(1);
235 }
237 }
238
239
240
241
242
244 if (!tmpparam)
245 {
246 pg_log_error("could not determine server setting for \"integer_datetimes\"");
248 exit(1);
249 }
250
251 if (strcmp(tmpparam, "on") != 0)
252 {
253 pg_log_error("\"integer_datetimes\" compile flag does not match server");
255 exit(1);
256 }
257
258
259
260
261
263 {
265 exit(1);
266 }
267
268 return tmpconn;
269}
270
271
272
273
274
275bool
277{
279 char xlog_unit[3];
280 int xlog_val,
281 multiplier = 1;
282
283
285
286
288 {
290 return true;
291 }
292
293 res = PQexec(conn, "SHOW wal_segment_size");
295 {
296 pg_log_error("could not send replication command \"%s\": %s",
298
300 return false;
301 }
303 {
304 pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",
306
308 return false;
309 }
310
311
312 if (sscanf(PQgetvalue(res, 0, 0), "%d%2s", &xlog_val, xlog_unit) != 2)
313 {
314 pg_log_error("WAL segment size could not be parsed");
316 return false;
317 }
318
320
321
322 if (strcmp(xlog_unit, "MB") == 0)
323 multiplier = 1024 * 1024;
324 else if (strcmp(xlog_unit, "GB") == 0)
325 multiplier = 1024 * 1024 * 1024;
326
327
328 WalSegSz = xlog_val * multiplier;
329
331 {
332 pg_log_error(ngettext("remote server reported invalid WAL segment size (%d byte)",
333 "remote server reported invalid WAL segment size (%d bytes)",
336 pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
337 return false;
338 }
339
340 return true;
341}
342
343
344
345
346
347
348
349
350
351
352
353
354static bool
356{
359
360
362
363
365 return true;
366
367 res = PQexec(conn, "SHOW data_directory_mode");
369 {
370 pg_log_error("could not send replication command \"%s\": %s",
372
374 return false;
375 }
377 {
378 pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",
380
382 return false;
383 }
384
386 {
387 pg_log_error("group access flag could not be parsed: %s",
389
391 return false;
392 }
393
395
397 return true;
398}
399
400
401
402
403
404
405
406
407
408bool
411{
414 lo;
415
416
418
419 res = PQexec(conn, "IDENTIFY_SYSTEM");
421 {
422 pg_log_error("could not send replication command \"%s\": %s",
424
426 return false;
427 }
429 {
430 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
432
434 return false;
435 }
436
437
438 if (sysid != NULL)
440
441
442 if (starttli != NULL)
443 *starttli = atoi(PQgetvalue(res, 0, 1));
444
445
447 {
448 if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
449 {
450 pg_log_error("could not parse write-ahead log location \"%s\"",
452
454 return false;
455 }
457 }
458
459
460 if (db_name != NULL)
461 {
462 *db_name = NULL;
464 {
466 {
467 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
469
471 return false;
472 }
475 }
476 }
477
479 return true;
480}
481
482
483
484
485
486
487
488
489bool
492{
497
498 if (restart_lsn)
499 *restart_lsn = lsn_loc;
500 if (restart_tli)
501 *restart_tli = tli_loc;
502
507
509 {
510 pg_log_error("could not send replication command \"%s\": %s",
513 return false;
514 }
515
516
518 {
519 pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
522 return false;
523 }
524
525
526
527
528
530 {
531 pg_log_error("replication slot \"%s\" does not exist", slot_name);
533 return false;
534 }
535
536
537
538
539
540 if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
541 {
542 pg_log_error("expected a physical replication slot, got type \"%s\" instead",
545 return false;
546 }
547
548
550 {
552 lo;
553
554 if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
555 {
556 pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"",
559 return false;
560 }
561 lsn_loc = ((uint64) hi) << 32 | lo;
562 }
563
564
567
569
570
571 if (restart_lsn)
572 *restart_lsn = lsn_loc;
573 if (restart_tli)
574 *restart_tli = tli_loc;
575
576 return true;
577}
578
579
580
581
582
583bool
585 bool is_temporary, bool is_physical, bool reserve_wal,
587{
591
593
595 (!is_physical && plugin != NULL));
598 Assert(slot_name != NULL);
599
600
601 appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
602 if (is_temporary)
604 if (is_physical)
606 else
608
609
610 if (use_new_option_syntax)
612 if (is_physical)
613 {
614 if (reserve_wal)
616 "RESERVE_WAL");
617 }
618 else
619 {
622 "FAILOVER");
623
626 "TWO_PHASE");
627
629 {
630
631 if (use_new_option_syntax)
633 "SNAPSHOT", "nothing");
634 else
636 "NOEXPORT_SNAPSHOT");
637 }
638 }
639 if (use_new_option_syntax)
640 {
641
642 if (query->data[query->len - 1] == '(')
643 {
644 query->len -= 2;
645 query->data[query->len] = '\0';
646 }
647 else
649 }
650
651
654 {
656
658 sqlstate &&
660 {
663 return true;
664 }
665 else
666 {
667 pg_log_error("could not send replication command \"%s\": %s",
669
672 return false;
673 }
674 }
675
677 {
678 pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
679 slot_name,
681
684 return false;
685 }
686
689 return true;
690}
691
692
693
694
695
696bool
698{
701
702 Assert(slot_name != NULL);
703
705
706
708 slot_name);
711 {
712 pg_log_error("could not send replication command \"%s\": %s",
714
717 return false;
718 }
719
721 {
722 pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
723 slot_name,
725
728 return false;
729 }
730
733 return true;
734}
735
736
737
738
739
740
741
742
743
744
745void
747 char *option_name)
748{
749 if (buf->len > 0 && buf->data[buf->len - 1] != '(')
750 {
751 if (use_new_option_syntax)
753 else
755 }
756
758}
759
760
761
762
763
764
765
766void
768 char *option_name, char *option_value)
769{
771
772 if (option_value != NULL)
773 {
774 size_t length = strlen(option_value);
775 char *escaped_value = palloc(1 + 2 * length);
776
779 pfree(escaped_value);
780 }
781}
782
783
784
785
786
787
788
789void
791 char *option_name, int32 option_value)
792{
794
796}
797
798
799
800
801
804{
806 struct timeval tp;
807
809
813
814 return result;
815}
816
817
818
819
820
821void
823 long *secs, int *microsecs)
824{
826
827 if (diff <= 0)
828 {
829 *secs = 0;
830 *microsecs = 0;
831 }
832 else
833 {
836 }
837}
838
839
840
841
842
843bool
846 int msec)
847{
849
850 return (diff >= msec * INT64CONST(1000));
851}
852
853
854
855
856void
858{
860
861 memcpy(buf, &n64, sizeof(n64));
862}
863
864
865
866
869{
871
872 memcpy(&n64, buf, sizeof(n64));
873
875}
static Datum values[MAXATTR]
#define ngettext(s, p, n)
#define ALWAYS_SECURE_SEARCH_PATH_SQL
#define POSTGRES_EPOCH_JDATE
int PQserverVersion(const PGconn *conn)
void PQconninfoFree(PQconninfoOption *connOptions)
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
int PQconnectionNeedsPassword(const PGconn *conn)
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
size_t PQescapeStringConn(PGconn *conn, char *to, const char *from, size_t length, int *error)
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
ExecStatusType PQresultStatus(const PGresult *res)
void PQclear(PGresult *res)
int PQntuples(const PGresult *res)
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
char * PQresultErrorField(const PGresult *res, int fieldcode)
PGresult * PQexec(PGconn *conn, const char *query)
int PQnfields(const PGresult *res)
char * pg_strdup(const char *in)
void * pg_malloc0(size_t size)
void SetDataDirectoryCreatePerm(int dataDirMode)
Assert(PointerIsAligned(start, uint64))
static const JsonPathKeyword keywords[]
#define pg_log_error(...)
#define pg_log_error_detail(...)
void pfree(void *pointer)
#define DEFAULT_XLOG_SEG_SIZE
static bool slot_exists_ok
static const char * plugin
static XLogRecPtr startpos
PQExpBuffer createPQExpBuffer(void)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
char * simple_prompt(const char *prompt, bool echo)
void AppendIntegerCommandOption(PQExpBuffer buf, bool use_new_option_syntax, char *option_name, int32 option_value)
PGconn * GetConnection(void)
bool RetrieveWalSegSize(PGconn *conn)
#define ERRCODE_DUPLICATE_OBJECT
#define MINIMUM_VERSION_FOR_SHOW_CMD
int64 fe_recvint64(char *buf)
TimestampTz feGetCurrentTimestamp(void)
bool CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, bool is_temporary, bool is_physical, bool reserve_wal, bool slot_exists_ok, bool two_phase, bool failover)
#define MINIMUM_VERSION_FOR_GROUP_ACCESS
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
void AppendPlainCommandOption(PQExpBuffer buf, bool use_new_option_syntax, char *option_name)
void AppendStringCommandOption(PQExpBuffer buf, bool use_new_option_syntax, char *option_name, char *option_value)
void fe_sendint64(int64 i, char *buf)
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
bool GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
static bool RetrieveDataDirCreatePerm(PGconn *conn)
bool DropReplicationSlot(PGconn *conn, const char *slot_name)
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
int gettimeofday(struct timeval *tp, void *tzp)
#define IsValidWalSegSize(size)
#define InvalidXLogRecPtr