PostgreSQL Source Code: src/bin/pg_basebackup/receivelog.h File Reference (original) (raw)
Go to the source code of this file.
◆ stream_stop_callback
typedef bool(* stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
◆ StreamCtl
◆ CheckServerVersionForStreaming()
bool CheckServerVersionForStreaming | ( | PGconn * | conn | ) |
---|
Definition at line 374 of file receivelog.c.
375{
376 int minServerMajor,
377 maxServerMajor;
378 int serverMajor;
379
380
381
382
383
384
385
386 minServerMajor = 903;
387 maxServerMajor = PG_VERSION_NUM / 100;
389 if (serverMajor < minServerMajor)
390 {
392
393 pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
394 serverver ? serverver : "'unknown'",
395 "9.3");
396 return false;
397 }
398 else if (serverMajor > maxServerMajor)
399 {
401
402 pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
403 serverver ? serverver : "'unknown'",
404 PG_VERSION);
405 return false;
406 }
407 return true;
408}
int PQserverVersion(const PGconn *conn)
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
#define pg_log_error(...)
References conn, pg_log_error, PQparameterStatus(), and PQserverVersion().
Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().
◆ ReceiveXlogStream()
Definition at line 452 of file receivelog.c.
453{
454 char query[128];
455 char slotcmd[128];
458
459
460
461
462
464 return false;
465
466
467
468
469
470
471
472
473
474
475
476
478 {
481 }
482 else
483 {
486 else
488 slotcmd[0] = 0;
489 }
490
492 {
493 char *sysidentifier = NULL;
495
496
497
498
500 {
502 return false;
503 }
504
505 if (strcmp(stream->sysidentifier, sysidentifier) != 0)
506 {
507 pg_log_error("system identifier does not match between base backup and streaming connection");
509 return false;
510 }
512
513 if (stream->timeline > servertli)
514 {
515 pg_log_error("starting timeline %u is not present in the server",
517 return false;
518 }
519 }
520
521
522
523
524
526
527 while (1)
528 {
529
530
531
532
533
534
536 {
537 snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
540 {
541
542 pg_log_error("could not send replication command \"%s\": %s",
545 return false;
546 }
547
548
549
550
551
553 {
554 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
556 }
557
558
562
564 }
565
566
567
568
569
571 return true;
572
573
574 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
575 slotcmd,
580 {
581 pg_log_error("could not send replication command \"%s\": %s",
584 return false;
585 }
587
588
590 if (res == NULL)
592
593
594
595
596
597
598
599
600
601
602
604 {
605
606
607
608
609
610
611
612
613
615 bool parsed;
616
619 if (!parsed)
621
622
623 if (newtimeline <= stream->timeline)
624 {
625 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
626 newtimeline, stream->timeline);
628 }
629 if (stream->startpos > stoppos)
630 {
631 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
635 }
636
637
640 {
641 pg_log_error("unexpected termination of replication stream: %s",
645 }
647
648
649
650
651
652 stream->timeline = newtimeline;
655 continue;
656 }
658 {
660
661
662
663
664
665
666
668 return true;
669 else
670 {
671 pg_log_error("replication stream was terminated before stop point");
673 }
674 }
675 else
676 {
677
678 pg_log_error("unexpected termination of replication stream: %s",
682 }
683 }
684
687 pg_log_error("could not close file \"%s\": %s",
690 return false;
691}
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
PGresult * PQgetResult(PGconn *conn)
ExecStatusType PQresultStatus(const PGresult *res)
void PQclear(PGresult *res)
int PQntuples(const PGresult *res)
char * PQresultErrorMessage(const PGresult *res)
PGresult * PQexec(PGconn *conn, const char *query)
int PQnfields(const PGresult *res)
#define pg_log_warning(...)
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
static bool reportFlushPosition
static bool existsTimeLineHistoryFile(StreamCtl *stream)
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
bool CheckServerVersionForStreaming(PGconn *conn)
static XLogRecPtr lastFlushPosition
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
stream_stop_callback stream_stop
WalWriteMethod * walmethod
int(* close)(Walfile *f, WalCloseMethod method)
const WalWriteMethodOps * ops
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
References CheckServerVersionForStreaming(), WalWriteMethodOps::close, CLOSE_NO_RENAME, conn, error(), existsTimeLineHistoryFile(), GetLastWalMethodError(), HandleCopyStream(), lastFlushPosition, LSN_FORMAT_ARGS, WalWriteMethod::ops, Walfile::pathname, pg_free(), pg_log_error, pg_log_warning, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, RunIdentifySystem(), snprintf, sprintf, StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, writeTimeLineHistoryFile(), and XLogSegmentOffset.
Referenced by LogStreamerMain(), and StreamLog().