PostgreSQL Source Code: src/bin/pg_basebackup/receivelog.h File Reference (original) (raw)

Go to the source code of this file.

stream_stop_callback

typedef bool(* stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished)

StreamCtl

CheckServerVersionForStreaming()

bool CheckServerVersionForStreaming ( PGconn * conn )

Definition at line 374 of file receivelog.c.

375{

376 int minServerMajor,

377 maxServerMajor;

378 int serverMajor;

379

380

381

382

383

384

385

386 minServerMajor = 903;

387 maxServerMajor = PG_VERSION_NUM / 100;

389 if (serverMajor < minServerMajor)

390 {

392

393 pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",

394 serverver ? serverver : "'unknown'",

395 "9.3");

396 return false;

397 }

398 else if (serverMajor > maxServerMajor)

399 {

401

402 pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",

403 serverver ? serverver : "'unknown'",

404 PG_VERSION);

405 return false;

406 }

407 return true;

408}

int PQserverVersion(const PGconn *conn)

const char * PQparameterStatus(const PGconn *conn, const char *paramName)

#define pg_log_error(...)

References conn, pg_log_error, PQparameterStatus(), and PQserverVersion().

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

ReceiveXlogStream()

Definition at line 452 of file receivelog.c.

453{

454 char query[128];

455 char slotcmd[128];

458

459

460

461

462

464 return false;

465

466

467

468

469

470

471

472

473

474

475

476

478 {

481 }

482 else

483 {

486 else

488 slotcmd[0] = 0;

489 }

490

492 {

493 char *sysidentifier = NULL;

495

496

497

498

500 {

502 return false;

503 }

504

505 if (strcmp(stream->sysidentifier, sysidentifier) != 0)

506 {

507 pg_log_error("system identifier does not match between base backup and streaming connection");

509 return false;

510 }

512

513 if (stream->timeline > servertli)

514 {

515 pg_log_error("starting timeline %u is not present in the server",

517 return false;

518 }

519 }

520

521

522

523

524

526

527 while (1)

528 {

529

530

531

532

533

534

536 {

537 snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);

540 {

541

542 pg_log_error("could not send replication command \"%s\": %s",

545 return false;

546 }

547

548

549

550

551

553 {

554 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",

556 }

557

558

562

564 }

565

566

567

568

569

571 return true;

572

573

574 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",

575 slotcmd,

580 {

581 pg_log_error("could not send replication command \"%s\": %s",

584 return false;

585 }

587

588

590 if (res == NULL)

592

593

594

595

596

597

598

599

600

601

602

604 {

605

606

607

608

609

610

611

612

613

615 bool parsed;

616

619 if (!parsed)

621

622

623 if (newtimeline <= stream->timeline)

624 {

625 pg_log_error("server reported unexpected next timeline %u, following timeline %u",

626 newtimeline, stream->timeline);

628 }

629 if (stream->startpos > stoppos)

630 {

631 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",

635 }

636

637

640 {

641 pg_log_error("unexpected termination of replication stream: %s",

645 }

647

648

649

650

651

652 stream->timeline = newtimeline;

655 continue;

656 }

658 {

660

661

662

663

664

665

666

668 return true;

669 else

670 {

671 pg_log_error("replication stream was terminated before stop point");

673 }

674 }

675 else

676 {

677

678 pg_log_error("unexpected termination of replication stream: %s",

682 }

683 }

684

687 pg_log_error("could not close file \"%s\": %s",

690 return false;

691}

char * PQgetvalue(const PGresult *res, int tup_num, int field_num)

PGresult * PQgetResult(PGconn *conn)

ExecStatusType PQresultStatus(const PGresult *res)

void PQclear(PGresult *res)

int PQntuples(const PGresult *res)

char * PQresultErrorMessage(const PGresult *res)

PGresult * PQexec(PGconn *conn, const char *query)

int PQnfields(const PGresult *res)

#define pg_log_warning(...)

static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)

static bool reportFlushPosition

static bool existsTimeLineHistoryFile(StreamCtl *stream)

static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)

bool CheckServerVersionForStreaming(PGconn *conn)

static XLogRecPtr lastFlushPosition

static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)

bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)

stream_stop_callback stream_stop

WalWriteMethod * walmethod

int(* close)(Walfile *f, WalCloseMethod method)

const WalWriteMethodOps * ops

const char * GetLastWalMethodError(WalWriteMethod *wwmethod)

#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)

#define LSN_FORMAT_ARGS(lsn)

References CheckServerVersionForStreaming(), WalWriteMethodOps::close, CLOSE_NO_RENAME, conn, error(), existsTimeLineHistoryFile(), GetLastWalMethodError(), HandleCopyStream(), lastFlushPosition, LSN_FORMAT_ARGS, WalWriteMethod::ops, Walfile::pathname, pg_free(), pg_log_error, pg_log_warning, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, RunIdentifySystem(), snprintf, sprintf, StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, writeTimeLineHistoryFile(), and XLogSegmentOffset.

Referenced by LogStreamerMain(), and StreamLog().