PostgreSQL Source Code: src/bin/pg_basebackup/pg_createsubscriber.c File Reference (original) (raw)

Go to the source code of this file.

Data Structures
struct CreateSubscriberOptions
struct LogicalRepInfo
struct LogicalRepInfos
Macros
#define DEFAULT_SUB_PORT "50432"
#define OBJECTTYPE_PUBLICATIONS 0x0001
#define USEC_PER_SEC 1000000
#define WAIT_INTERVAL 1 /* 1 second */
Functions
static void cleanup_objects_atexit (void)
static void usage ()
static char * get_base_conninfo (const char *conninfo, char **dbname)
static char * get_sub_conninfo (const struct CreateSubscriberOptions *opt)
static char * get_exec_path (const char *argv0, const char *progname)
static void check_data_directory (const char *datadir)
static char * concat_conninfo_dbname (const char *conninfo, const char *dbname)
static struct LogicalRepInfo * store_pub_sub_info (const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static PGconn * connect_database (const char *conninfo, bool exit_on_error)
static void disconnect_database (PGconn *conn, bool exit_on_error)
static uint64 get_primary_sysid (const char *conninfo)
static uint64 get_standby_sysid (const char *datadir)
static void modify_subscriber_sysid (const struct CreateSubscriberOptions *opt)
static bool server_is_in_recovery (PGconn *conn)
static char * generate_object_name (PGconn *conn)
static void check_publisher (const struct LogicalRepInfo *dbinfo)
static char * setup_publisher (struct LogicalRepInfo *dbinfo)
static void check_subscriber (const struct LogicalRepInfo *dbinfo)
static void setup_subscriber (struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static void setup_recovery (const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
static void drop_primary_replication_slot (struct LogicalRepInfo *dbinfo, const char *slotname)
static void drop_failover_replication_slots (struct LogicalRepInfo *dbinfo)
static char * create_logical_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo)
static void drop_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static void pg_ctl_status (const char *pg_ctl_cmd, int rc)
static void start_standby_server (const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static void stop_standby_server (const char *datadir)
static void wait_for_end_recovery (const char *conninfo, const struct CreateSubscriberOptions *opt)
static void create_publication (PGconn *conn, struct LogicalRepInfo *dbinfo)
static void drop_publication (PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
static void check_and_drop_publications (PGconn *conn, struct LogicalRepInfo *dbinfo)
static void create_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void set_replication_progress (PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static void enable_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void check_and_drop_existing_subscriptions (PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void drop_existing_subscriptions (PGconn *conn, const char *subname, const char *dbname)
static void get_publisher_databases (struct CreateSubscriberOptions *opt, bool dbnamespecified)
static void appendConnStrItem (PQExpBuffer buf, const char *keyword, const char *val)
int main (int argc, char **argv)
Variables
static const char * progname
static char * primary_slot_name = NULL
static bool dry_run = false
static bool success = false
static struct LogicalRepInfos dbinfos
static int num_dbs = 0
static int num_pubs = 0
static int num_subs = 0
static int num_replslots = 0
static pg_prng_state prng_state
static char * pg_ctl_path = NULL
static char * pg_resetwal_path = NULL
static char * subscriber_dir = NULL
static bool recovery_ended = false
static bool standby_running = false

DEFAULT_SUB_PORT

#define DEFAULT_SUB_PORT "50432"

OBJECTTYPE_PUBLICATIONS

#define OBJECTTYPE_PUBLICATIONS 0x0001

USEC_PER_SEC

#define USEC_PER_SEC 1000000

WAIT_INTERVAL

#define WAIT_INTERVAL 1 /* 1 second */

WaitPMResult

Enumerator
POSTMASTER_READY
POSTMASTER_STILL_STARTING
POSTMASTER_READY
POSTMASTER_STILL_STARTING
POSTMASTER_SHUTDOWN_IN_RECOVERY
POSTMASTER_FAILED

Definition at line 158 of file pg_createsubscriber.c.

159{

162};

@ POSTMASTER_STILL_STARTING

appendConnStrItem()

static void appendConnStrItem ( PQExpBuffer buf, const char * keyword, const char * val ) static

check_and_drop_existing_subscriptions()

static void check_and_drop_existing_subscriptions ( PGconn * conn, const struct LogicalRepInfo * dbinfo ) static

Definition at line 1151 of file pg_createsubscriber.c.

1153{

1157

1159

1161

1163 "SELECT s.subname FROM pg_catalog.pg_subscription s "

1164 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "

1165 "WHERE d.datname = %s",

1168

1170 {

1171 pg_log_error("could not obtain pre-existing subscriptions: %s",

1174 }

1175

1179

1183}

void PQfreemem(void *ptr)

char * PQgetvalue(const PGresult *res, int tup_num, int field_num)

ExecStatusType PQresultStatus(const PGresult *res)

void PQclear(PGresult *res)

int PQntuples(const PGresult *res)

char * PQresultErrorMessage(const PGresult *res)

char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)

PGresult * PQexec(PGconn *conn, const char *query)

Assert(PointerIsAligned(start, uint64))

#define pg_log_error(...)

static void drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)

static void disconnect_database(PGconn *conn, bool exit_on_error)

PQExpBuffer createPQExpBuffer(void)

void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)

void destroyPQExpBuffer(PQExpBuffer str)

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), PQExpBufferData::data, LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), drop_existing_subscriptions(), i, pg_log_error, PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), and PQresultStatus().

Referenced by setup_subscriber().

check_and_drop_publications()

Definition at line 1730 of file pg_createsubscriber.c.

1731{

1734

1736

1737 if (drop_all_pubs)

1738 {

1739 pg_log_info("dropping all existing publications in database \"%s\"",

1741

1742

1743 res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");

1745 {

1746 pg_log_error("could not obtain publication information: %s",

1750 }

1751

1752

1756

1758 }

1759

1760

1761

1762

1763

1764 if (!drop_all_pubs || dry_run)

1767}

static struct LogicalRepInfos dbinfos

static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)

#define OBJECTTYPE_PUBLICATIONS

bits32 objecttypes_to_remove

References Assert(), conn, dbinfos, LogicalRepInfo::dbname, disconnect_database(), drop_publication(), dry_run, i, LogicalRepInfo::made_publication, OBJECTTYPE_PUBLICATIONS, LogicalRepInfos::objecttypes_to_remove, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), and LogicalRepInfo::pubname.

Referenced by setup_subscriber().

check_data_directory()

static void check_data_directory ( const char * datadir) static

Definition at line 407 of file pg_createsubscriber.c.

408{

409 struct stat statbuf;

411

412 pg_log_info("checking if directory \"%s\" is a cluster data directory",

414

416 {

417 if (errno == ENOENT)

418 pg_fatal("data directory \"%s\" does not exist", datadir);

419 else

420 pg_fatal("could not access directory \"%s\": %m", datadir);

421 }

422

424 if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)

425 {

426 pg_fatal("directory \"%s\" is not a database cluster directory",

428 }

429}

References datadir, MAXPGPATH, pg_fatal, pg_log_info, snprintf, and stat.

Referenced by main().

check_publisher()

static void check_publisher ( const struct LogicalRepInfo * dbinfo) static

Definition at line 870 of file pg_createsubscriber.c.

871{

874 bool failed = false;

875

877 int max_repslots;

878 int cur_repslots;

879 int max_walsenders;

880 int cur_walsenders;

881 int max_prepared_transactions;

882 char *max_slot_wal_keep_size;

883

884 pg_log_info("checking settings on publisher");

885

887

888

889

890

891

893 {

894 pg_log_error("primary server cannot be in recovery");

896 }

897

898

899

900

901

902

903

904

905

906

907

908

910 "SELECT pg_catalog.current_setting('wal_level'),"

911 " pg_catalog.current_setting('max_replication_slots'),"

912 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"

913 " pg_catalog.current_setting('max_wal_senders'),"

914 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"

915 " pg_catalog.current_setting('max_prepared_transactions'),"

916 " pg_catalog.current_setting('max_slot_wal_keep_size')");

917

919 {

920 pg_log_error("could not obtain publisher settings: %s",

923 }

924

926 max_repslots = atoi(PQgetvalue(res, 0, 1));

927 cur_repslots = atoi(PQgetvalue(res, 0, 2));

928 max_walsenders = atoi(PQgetvalue(res, 0, 3));

929 cur_walsenders = atoi(PQgetvalue(res, 0, 4));

930 max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));

932

934

936 pg_log_debug("publisher: max_replication_slots: %d", max_repslots);

937 pg_log_debug("publisher: current replication slots: %d", cur_repslots);

938 pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);

939 pg_log_debug("publisher: current wal senders: %d", cur_walsenders);

940 pg_log_debug("publisher: max_prepared_transactions: %d",

941 max_prepared_transactions);

942 pg_log_debug("publisher: max_slot_wal_keep_size: %s",

943 max_slot_wal_keep_size);

944

946

947 if (strcmp(wal_level, "logical") != 0)

948 {

949 pg_log_error("publisher requires \"wal_level\" >= \"logical\"");

950 failed = true;

951 }

952

953 if (max_repslots - cur_repslots < num_dbs)

954 {

955 pg_log_error("publisher requires %d replication slots, but only %d remain",

956 num_dbs, max_repslots - cur_repslots);

957 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",

958 "max_replication_slots", cur_repslots + num_dbs);

959 failed = true;

960 }

961

962 if (max_walsenders - cur_walsenders < num_dbs)

963 {

964 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",

965 num_dbs, max_walsenders - cur_walsenders);

966 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",

967 "max_wal_senders", cur_walsenders + num_dbs);

968 failed = true;

969 }

970

972 {

973 pg_log_warning("two_phase option will not be enabled for replication slots");

974 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "

975 "Prepared transactions will be replicated at COMMIT PREPARED.");

976 pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");

977 }

978

979

980

981

982

983

984 if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))

985 {

986 pg_log_warning("required WAL could be removed from the publisher");

987 pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",

988 "max_slot_wal_keep_size");

989 }

990

992

993 if (failed)

994 exit(1);

995}

char * pg_strdup(const char *in)

#define pg_log_error_hint(...)

#define pg_log_warning_hint(...)

#define pg_log_warning_detail(...)

#define pg_log_debug(...)

static bool server_is_in_recovery(PGconn *conn)

static PGconn * connect_database(const char *conninfo, bool exit_on_error)

#define pg_log_warning(...)

References conn, connect_database(), dbinfos, disconnect_database(), dry_run, num_dbs, pg_free(), pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_log_warning, pg_log_warning_detail, pg_log_warning_hint, pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQresultErrorMessage(), PQresultStatus(), server_is_in_recovery(), LogicalRepInfos::two_phase, and wal_level.

Referenced by main().

check_subscriber()

static void check_subscriber ( const struct LogicalRepInfo * dbinfo) static

Definition at line 1009 of file pg_createsubscriber.c.

1010{

1013 bool failed = false;

1014

1015 int max_lrworkers;

1016 int max_reporigins;

1017 int max_wprocs;

1018

1019 pg_log_info("checking settings on subscriber");

1020

1022

1023

1025 {

1026 pg_log_error("target server must be a standby");

1028 }

1029

1030

1031

1032

1033

1034

1035

1036

1037

1038

1039

1041 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("

1042 "'max_logical_replication_workers', "

1043 "'max_active_replication_origins', "

1044 "'max_worker_processes', "

1045 "'primary_slot_name') "

1046 "ORDER BY name");

1047

1049 {

1050 pg_log_error("could not obtain subscriber settings: %s",

1053 }

1054

1055 max_reporigins = atoi(PQgetvalue(res, 0, 0));

1056 max_lrworkers = atoi(PQgetvalue(res, 1, 0));

1057 max_wprocs = atoi(PQgetvalue(res, 2, 0));

1058 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)

1060

1061 pg_log_debug("subscriber: max_logical_replication_workers: %d",

1062 max_lrworkers);

1063 pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);

1064 pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);

1067

1069

1071

1072 if (max_reporigins < num_dbs)

1073 {

1074 pg_log_error("subscriber requires %d active replication origins, but only %d remain",

1075 num_dbs, max_reporigins);

1076 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",

1077 "max_active_replication_origins", num_dbs);

1078 failed = true;

1079 }

1080

1081 if (max_lrworkers < num_dbs)

1082 {

1083 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",

1084 num_dbs, max_lrworkers);

1085 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",

1086 "max_logical_replication_workers", num_dbs);

1087 failed = true;

1088 }

1089

1090 if (max_wprocs < num_dbs + 1)

1091 {

1092 pg_log_error("subscriber requires %d worker processes, but only %d remain",

1093 num_dbs + 1, max_wprocs);

1094 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",

1095 "max_worker_processes", num_dbs + 1);

1096 failed = true;

1097 }

1098

1099 if (failed)

1100 exit(1);

1101}

static char * primary_slot_name

References conn, connect_database(), disconnect_database(), num_dbs, pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQresultErrorMessage(), PQresultStatus(), primary_slot_name, and server_is_in_recovery().

Referenced by main().

cleanup_objects_atexit()

static void cleanup_objects_atexit ( void ) static

Definition at line 177 of file pg_createsubscriber.c.

178{

180 return;

181

182

183

184

185

186

188 {

190 pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "

191 "You must recreate the physical replica before continuing.");

192 }

193

195 {

197

199 {

201

203 if (conn != NULL)

204 {

211 }

212 else

213 {

214

215

216

217

218

220 {

221 pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",

225 }

227 {

228 pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",

231 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");

232 }

233 }

234 }

235 }

236

239}

static void stop_standby_server(const char *datadir)

static char * subscriber_dir

static bool recovery_ended

static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)

static bool standby_running

struct LogicalRepInfo * dbinfo

References conn, connect_database(), LogicalRepInfos::dbinfo, dbinfos, LogicalRepInfo::dbname, disconnect_database(), drop_publication(), drop_replication_slot(), i, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, num_dbs, pg_log_warning, pg_log_warning_hint, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, recovery_ended, LogicalRepInfo::replslotname, standby_running, stop_standby_server(), subscriber_dir, and success.

Referenced by main().

concat_conninfo_dbname()

static char * concat_conninfo_dbname ( const char * conninfo, const char * dbname ) static

Definition at line 439 of file pg_createsubscriber.c.

440{

442 char *ret;

443

444 Assert(conninfo != NULL);

445

448

451

452 return ret;

453}

static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)

References appendConnStrItem(), appendPQExpBufferStr(), Assert(), buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), and pg_strdup().

Referenced by get_publisher_databases(), and store_pub_sub_info().

connect_database()

static PGconn * connect_database ( const char * conninfo, bool exit_on_error ) static

Definition at line 536 of file pg_createsubscriber.c.

537{

540

543 {

544 pg_log_error("connection to database failed: %s",

547

548 if (exit_on_error)

549 exit(1);

550 return NULL;

551 }

552

553

556 {

557 pg_log_error("could not clear \"search_path\": %s",

561

562 if (exit_on_error)

563 exit(1);

564 return NULL;

565 }

567

569}

#define ALWAYS_SECURE_SEARCH_PATH_SQL

PGconn * PQconnectdb(const char *conninfo)

ConnStatusType PQstatus(const PGconn *conn)

void PQfinish(PGconn *conn)

char * PQerrorMessage(const PGconn *conn)

References ALWAYS_SECURE_SEARCH_PATH_SQL, conn, CONNECTION_OK, pg_log_error, PGRES_TUPLES_OK, PQclear(), PQconnectdb(), PQerrorMessage(), PQexec(), PQfinish(), PQresultErrorMessage(), PQresultStatus(), and PQstatus().

Referenced by check_publisher(), check_subscriber(), cleanup_objects_atexit(), drop_failover_replication_slots(), drop_primary_replication_slot(), get_primary_sysid(), get_publisher_databases(), setup_publisher(), setup_recovery(), setup_subscriber(), and wait_for_end_recovery().

create_logical_replication_slot()

static char * create_logical_replication_slot ( PGconn * conn, struct LogicalRepInfo * dbinfo ) static

Definition at line 1359 of file pg_createsubscriber.c.

1360{

1363 const char *slot_name = dbinfo->replslotname;

1364 char *slot_name_esc;

1365 char *lsn = NULL;

1366

1368

1369 pg_log_info("creating the replication slot \"%s\" in database \"%s\"",

1370 slot_name, dbinfo->dbname);

1371

1373

1375 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",

1376 slot_name_esc,

1378

1380

1382

1384 {

1387 {

1388 pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",

1389 slot_name, dbinfo->dbname,

1393 return NULL;

1394 }

1395

1398 }

1399

1400

1402

1404

1405 return lsn;

1406}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), dbinfos, LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, LogicalRepInfo::made_replslot, pg_log_debug, pg_log_error, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue(), PQresultErrorMessage(), PQresultStatus(), LogicalRepInfo::replslotname, str, and LogicalRepInfos::two_phase.

Referenced by setup_publisher().

create_publication()

Definition at line 1603 of file pg_createsubscriber.c.

1604{

1607 char *ipubname_esc;

1608 char *spubname_esc;

1609

1611

1614

1615

1617 "SELECT 1 FROM pg_catalog.pg_publication "

1618 "WHERE pubname = %s",

1619 spubname_esc);

1622 {

1623 pg_log_error("could not obtain publication information: %s",

1626 }

1627

1629 {

1630

1631

1632

1633

1634

1635

1636

1638 pg_log_error_hint("Consider renaming this publication before continuing.");

1640 }

1641

1644

1645 pg_log_info("creating publication \"%s\" in database \"%s\"",

1647

1649 ipubname_esc);

1650

1652

1654 {

1657 {

1658 pg_log_error("could not create publication \"%s\" in database \"%s\": %s",

1661 }

1663 }

1664

1665

1667

1671}

char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)

void resetPQExpBuffer(PQExpBuffer str)

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, LogicalRepInfo::made_publication, pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQfreemem(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), LogicalRepInfo::pubname, resetPQExpBuffer(), and str.

Referenced by setup_publisher().

create_subscription()

Definition at line 1781 of file pg_createsubscriber.c.

1782{

1785 char *pubname_esc;

1786 char *subname_esc;

1787 char *pubconninfo_esc;

1788 char *replslotname_esc;

1789

1791

1796

1797 pg_log_info("creating subscription \"%s\" in database \"%s\"",

1799

1801 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "

1802 "WITH (create_slot = false, enabled = false, "

1803 "slot_name = %s, copy_data = false, two_phase = %s)",

1804 subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,

1806

1811

1813

1815 {

1818 {

1819 pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",

1822 }

1824 }

1825

1827}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), dbinfos, LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQfreemem(), PQresultErrorMessage(), PQresultStatus(), LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, str, LogicalRepInfo::subname, and LogicalRepInfos::two_phase.

Referenced by setup_subscriber().

disconnect_database()

static void disconnect_database ( PGconn * conn, bool exit_on_error ) static

Definition at line 576 of file pg_createsubscriber.c.

577{

579

581

582 if (exit_on_error)

583 exit(1);

584}

References Assert(), conn, and PQfinish().

Referenced by check_and_drop_existing_subscriptions(), check_and_drop_publications(), check_publisher(), check_subscriber(), cleanup_objects_atexit(), create_publication(), create_subscription(), drop_existing_subscriptions(), drop_failover_replication_slots(), drop_primary_replication_slot(), enable_subscription(), generate_object_name(), get_primary_sysid(), get_publisher_databases(), server_is_in_recovery(), set_replication_progress(), setup_publisher(), setup_recovery(), setup_subscriber(), and wait_for_end_recovery().

drop_existing_subscriptions()

static void drop_existing_subscriptions ( PGconn * conn, const char * subname, const char * dbname ) static

Definition at line 1110 of file pg_createsubscriber.c.

1111{

1114

1116

1117

1118

1119

1120

1123 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",

1126

1127 pg_log_info("dropping subscription \"%s\" in database \"%s\"",

1129

1131 {

1133

1135 {

1136 pg_log_error("could not drop subscription \"%s\": %s",

1139 }

1140

1142 }

1143

1145}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), PQExpBufferData::data, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQexec(), PQresultErrorMessage(), PQresultStatus(), and subname.

Referenced by check_and_drop_existing_subscriptions().

drop_failover_replication_slots()

static void drop_failover_replication_slots ( struct LogicalRepInfo * dbinfo) static

Definition at line 1317 of file pg_createsubscriber.c.

1318{

1321

1323 if (conn != NULL)

1324 {

1325

1327 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");

1328

1330 {

1331

1334 }

1335 else

1336 {

1337 pg_log_warning("could not obtain failover replication slot information: %s",

1339 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");

1340 }

1341

1344 }

1345 else

1346 {

1347 pg_log_warning("could not drop failover replication slot");

1348 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");

1349 }

1350}

References conn, connect_database(), disconnect_database(), drop_replication_slot(), i, pg_log_warning, pg_log_warning_hint, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), and PQresultStatus().

Referenced by main().

drop_primary_replication_slot()

static void drop_primary_replication_slot ( struct LogicalRepInfo * dbinfo, const char * slotname ) static

drop_publication()

static void drop_publication ( PGconn * conn, const char * pubname, const char * dbname, bool * made_publication ) static

Definition at line 1677 of file pg_createsubscriber.c.

1679{

1682 char *pubname_esc;

1683

1685

1687

1688 pg_log_info("dropping publication \"%s\" in database \"%s\"",

1690

1692

1694

1696

1698 {

1701 {

1702 pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",

1704 *made_publication = false;

1705

1706

1707

1708

1709

1710

1711

1712

1713 }

1715 }

1716

1718}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), dbname, destroyPQExpBuffer(), dry_run, pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage(), PQresultStatus(), and str.

Referenced by check_and_drop_publications(), and cleanup_objects_atexit().

drop_replication_slot()

static void drop_replication_slot ( PGconn * conn, struct LogicalRepInfo * dbinfo, const char * slot_name ) static

Definition at line 1409 of file pg_createsubscriber.c.

1411{

1413 char *slot_name_esc;

1415

1417

1418 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",

1419 slot_name, dbinfo->dbname);

1420

1422

1423 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);

1424

1426

1428

1430 {

1433 {

1434 pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",

1436 dbinfo->made_replslot = false;

1437 }

1438

1440 }

1441

1443}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, LogicalRepInfo::made_replslot, pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQfreemem(), PQresultErrorMessage(), PQresultStatus(), and str.

Referenced by cleanup_objects_atexit(), drop_failover_replication_slots(), and drop_primary_replication_slot().

enable_subscription()

Definition at line 1931 of file pg_createsubscriber.c.

1932{

1936

1938

1940

1941 pg_log_info("enabling subscription \"%s\" in database \"%s\"",

1943

1945

1947

1949 {

1952 {

1953 pg_log_error("could not enable subscription \"%s\": %s",

1956 }

1957

1959 }

1960

1963}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage(), PQresultStatus(), str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

generate_object_name()

static char * generate_object_name ( PGconn * conn) static

Definition at line 714 of file pg_createsubscriber.c.

715{

719 char *objname;

720

722 "SELECT oid FROM pg_catalog.pg_database "

723 "WHERE datname = pg_catalog.current_database()");

725 {

726 pg_log_error("could not obtain database OID: %s",

729 }

730

732 {

733 pg_log_error("could not obtain database OID: got %d rows, expected %d row",

736 }

737

738

739 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);

740

742

743

745

746

747

748

749

750

751 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);

752

753 return objname;

754}

static pg_prng_state prng_state

uint32 pg_prng_uint32(pg_prng_state *state)

char * psprintf(const char *fmt,...)

References conn, disconnect_database(), pg_log_error, pg_prng_uint32(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), prng_state, and psprintf().

Referenced by setup_publisher().

get_base_conninfo()

static char * get_base_conninfo ( const char * conninfo, char ** dbname ) static

Definition at line 301 of file pg_createsubscriber.c.

302{

307 char *ret;

308

310 if (conn_opts == NULL)

311 {

314 return NULL;

315 }

316

318 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)

319 {

320 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')

321 {

322 if (strcmp(conn_opt->keyword, "dbname") == 0)

323 {

326 continue;

327 }

329 }

330 }

331

333

336

337 return ret;

338}

int errmsg(const char *fmt,...)

void PQconninfoFree(PQconninfoOption *connOptions)

PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)

References appendConnStrItem(), buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), errmsg(), _PQconninfoOption::keyword, pg_log_error, pg_strdup(), PQconninfoFree(), PQconninfoParse(), PQfreemem(), and _PQconninfoOption::val.

Referenced by main().

get_exec_path()

static char * get_exec_path ( const char * argv0, const char * progname ) static

Definition at line 371 of file pg_createsubscriber.c.

372{

373 char *versionstr;

375 int ret;

376

377 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);

380

381 if (ret < 0)

382 {

384

387

388 if (ret == -1)

389 pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",

390 progname, "pg_createsubscriber", full_path);

391 else

392 pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",

393 progname, full_path, "pg_createsubscriber");

394 }

395

397

399}

int find_my_exec(const char *argv0, char *retpath)

int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)

void * pg_malloc(size_t size)

static const char * progname

size_t strlcpy(char *dst, const char *src, size_t siz)

References argv0, exec_path, find_my_exec(), find_other_exec(), MAXPGPATH, pg_fatal, pg_log_debug, pg_malloc(), progname, psprintf(), and strlcpy().

Referenced by main().

get_primary_sysid()

static uint64 get_primary_sysid ( const char * conninfo) static

Definition at line 591 of file pg_createsubscriber.c.

592{

596

597 pg_log_info("getting system identifier from publisher");

598

600

601 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");

603 {

604 pg_log_error("could not get system identifier: %s",

607 }

609 {

610 pg_log_error("could not get system identifier: got %d rows, expected %d row",

613 }

614

615 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);

616

617 pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);

618

621

622 return sysid;

623}

References conn, connect_database(), disconnect_database(), pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), and PQresultStatus().

Referenced by main().

get_publisher_databases()

Definition at line 1971 of file pg_createsubscriber.c.

1973{

1976

1977

1978 if (dbnamespecified)

1980 else

1981 {

1982

1983 char *conninfo;

1984

1989 {

1993 }

1994 }

1995

1996 res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");

1998 {

2002 }

2003

2005 {

2007

2009

2010

2012 }

2013

2016}

static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)

void simple_string_list_append(SimpleStringList *list, const char *val)

SimpleStringList database_names

References concat_conninfo_dbname(), conn, connect_database(), CreateSubscriberOptions::database_names, dbname, disconnect_database(), i, num_dbs, pg_free(), pg_log_error, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), CreateSubscriberOptions::pub_conninfo_str, and simple_string_list_append().

Referenced by main().

get_standby_sysid()

static uint64 get_standby_sysid ( const char * datadir) static

get_sub_conninfo()

main()

int main ( int argc,
char ** argv
)

Definition at line 2019 of file pg_createsubscriber.c.

2020{

2021 static struct option long_options[] =

2022 {

2032 {"enable-two-phase", no_argument, NULL, 'T'},

2041 {NULL, 0, NULL, 0}

2042 };

2043

2045

2046 int c;

2047 int option_index;

2048

2049 char *pub_base_conninfo;

2050 char *sub_base_conninfo;

2051 char *dbname_conninfo = NULL;

2052

2055 struct stat statbuf;

2056

2057 char *consistent_lsn;

2058

2060

2065

2066 if (argc > 1)

2067 {

2068 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)

2069 {

2071 exit(0);

2072 }

2073 else if (strcmp(argv[1], "-V") == 0

2074 || strcmp(argv[1], "--version") == 0)

2075 {

2076 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);

2077 exit(0);

2078 }

2079 }

2080

2081

2090 {

2091 0

2092 };

2095

2096

2097

2098

2099

2100#ifndef WIN32

2101 if (geteuid() == 0)

2102 {

2103 pg_log_error("cannot be executed by \"root\"");

2104 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",

2106 exit(1);

2107 }

2108#endif

2109

2111

2112 while ((c = getopt_long(argc, argv, "ad:D:np:P:R:s:t:TU:v",

2113 long_options, &option_index)) != -1)

2114 {

2115 switch (c)

2116 {

2117 case 'a':

2119 break;

2120 case 'd':

2122 {

2125 }

2126 else

2127 pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);

2128 break;

2129 case 'D':

2132 break;

2133 case 'n':

2135 break;

2136 case 'p':

2138 break;

2139 case 'P':

2141 break;

2142 case 'R':

2145 else

2146 pg_fatal("object type \"%s\" is specified more than once for -R/--remove", optarg);

2147 break;

2148 case 's':

2151 break;

2152 case 't':

2154 break;

2155 case 'T':

2157 break;

2158 case 'U':

2160 break;

2161 case 'v':

2163 break;

2164 case 1:

2166 break;

2167 case 2:

2169 {

2172 }

2173 else

2174 pg_fatal("publication \"%s\" specified more than once for --publication", optarg);

2175 break;

2176 case 3:

2178 {

2181 }

2182 else

2183 pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);

2184 break;

2185 case 4:

2187 {

2190 }

2191 else

2192 pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);

2193 break;

2194 default:

2195

2197 exit(1);

2198 }

2199 }

2200

2201

2203 {

2204 char *bad_switch = NULL;

2205

2207 bad_switch = "--database";

2209 bad_switch = "--publication";

2211 bad_switch = "--replication-slot";

2213 bad_switch = "--subscription";

2214

2215 if (bad_switch)

2216 {

2217 pg_log_error("%s cannot be used with -a/--all", bad_switch);

2219 exit(1);

2220 }

2221 }

2222

2223

2225 {

2226 pg_log_error("too many command-line arguments (first is \"%s\")",

2229 exit(1);

2230 }

2231

2232

2234 {

2235 pg_log_error("no subscriber data directory specified");

2237 exit(1);

2238 }

2239

2240

2242 {

2244

2246 pg_fatal("could not determine current directory");

2249 }

2250

2251

2252

2253

2254

2256 {

2257

2258

2259

2260

2261

2262

2263 pg_log_error("no publisher connection string specified");

2265 exit(1);

2266 }

2267 pg_log_info("validating publisher connection string");

2269 &dbname_conninfo);

2270 if (pub_base_conninfo == NULL)

2271 exit(1);

2272

2273 pg_log_info("validating subscriber connection string");

2275

2276

2277

2278

2279

2280

2282 {

2283 bool dbnamespecified = (dbname_conninfo != NULL);

2284

2286 }

2287

2289 {

2290 pg_log_info("no database was specified");

2291

2292

2293

2294

2295

2296 if (dbname_conninfo)

2297 {

2300

2301 pg_log_info("database name \"%s\" was extracted from the publisher connection string",

2302 dbname_conninfo);

2303 }

2304 else

2305 {

2309 exit(1);

2310 }

2311 }

2312

2313

2315 {

2316 pg_log_error("wrong number of publication names specified");

2317 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",

2319 exit(1);

2320 }

2322 {

2323 pg_log_error("wrong number of subscription names specified");

2324 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",

2326 exit(1);

2327 }

2329 {

2330 pg_log_error("wrong number of replication slot names specified");

2331 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",

2333 exit(1);

2334 }

2335

2336

2338 {

2339 if (pg_strcasecmp(cell->val, "publications") == 0)

2341 else

2342 {

2343 pg_log_error("invalid object type \"%s\" specified for -R/--remove", cell->val);

2345 exit(1);

2346 }

2347 }

2348

2349

2352

2353

2355

2357

2358

2359

2360

2361

2362

2364

2365

2367

2368

2369

2370

2371

2374 if (pub_sysid != sub_sysid)

2375 pg_fatal("subscriber data directory is not a copy of the source database cluster");

2376

2377

2379

2380

2381

2382

2383

2384

2385

2386 if (stat(pidfile, &statbuf) == 0)

2387 {

2390 exit(1);

2391 }

2392

2393

2394

2395

2396

2397

2398 pg_log_info("starting the standby server with command-line options");

2400

2401

2403

2404

2406

2407

2408

2409

2410

2411

2412

2413

2416

2417

2419

2420

2422

2423

2424

2425

2426

2427

2430

2431

2433

2434

2435

2436

2437

2438

2439

2441

2442

2444

2445

2447

2448

2451

2452

2454

2456

2458

2459 return 0;

2460}

#define PG_TEXTDOMAIN(domain)

void set_pglocale_pgservice(const char *argv0, const char *app)

int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)

#define required_argument

void pg_logging_increase_verbosity(void)

void pg_logging_init(const char *argv0)

void pg_logging_set_level(enum pg_log_level new_level)

#define pg_log_error_detail(...)

static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)

static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)

static char * pg_ctl_path

static char * get_exec_path(const char *argv0, const char *progname)

static void check_publisher(const struct LogicalRepInfo *dbinfo)

static void cleanup_objects_atexit(void)

static void check_subscriber(const struct LogicalRepInfo *dbinfo)

static void check_data_directory(const char *datadir)

static char * setup_publisher(struct LogicalRepInfo *dbinfo)

static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)

static char * get_base_conninfo(const char *conninfo, char **dbname)

static uint64 get_standby_sysid(const char *datadir)

static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)

static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)

static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)

static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)

static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)

static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified)

static char * pg_resetwal_path

static uint64 get_primary_sysid(const char *conninfo)

static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)

PGDLLIMPORT char * optarg

int pg_strcasecmp(const char *s1, const char *s2)

void canonicalize_path(char *path)

const char * get_progname(const char *argv0)

void get_restricted_token(void)

bool simple_string_list_member(SimpleStringList *list, const char *val)

struct SimpleStringList SimpleStringList

SimpleStringList objecttypes_to_remove

SimpleStringList sub_names

SimpleStringList replslot_names

SimpleStringList pub_names

struct SimpleStringListCell * next

SimpleStringListCell * head

References CreateSubscriberOptions::all_dbs, canonicalize_path(), check_data_directory(), check_publisher(), check_subscriber(), cleanup_objects_atexit(), CreateSubscriberOptions::config_file, CreateSubscriberOptions::database_names, LogicalRepInfos::dbinfo, dbinfos, DEFAULT_SUB_PORT, drop_failover_replication_slots(), drop_primary_replication_slot(), dry_run, get_base_conninfo(), get_exec_path(), get_primary_sysid(), get_progname(), get_publisher_databases(), get_restricted_token(), get_standby_sysid(), get_sub_conninfo(), getopt_long(), SimpleStringList::head, MAXPGPATH, modify_subscriber_sysid(), SimpleStringListCell::next, no_argument, num_dbs, num_pubs, num_replslots, num_subs, OBJECTTYPE_PUBLICATIONS, CreateSubscriberOptions::objecttypes_to_remove, LogicalRepInfos::objecttypes_to_remove, optarg, optind, pg_ctl_path, pg_fatal, pg_log_error, pg_log_error_detail, pg_log_error_hint, pg_log_info, PG_LOG_WARNING, pg_logging_increase_verbosity(), pg_logging_init(), pg_logging_set_level(), pg_resetwal_path, pg_strcasecmp(), pg_strdup(), PG_TEXTDOMAIN, primary_slot_name, progname, CreateSubscriberOptions::pub_conninfo_str, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, CreateSubscriberOptions::recovery_timeout, CreateSubscriberOptions::replslot_names, required_argument, set_pglocale_pgservice(), setup_publisher(), setup_recovery(), setup_subscriber(), simple_string_list_append(), simple_string_list_member(), snprintf, CreateSubscriberOptions::socket_dir, start_standby_server(), stat, stop_standby_server(), store_pub_sub_info(), CreateSubscriberOptions::sub_names, CreateSubscriberOptions::sub_port, CreateSubscriberOptions::sub_username, LogicalRepInfo::subconninfo, subscriber_dir, success, CreateSubscriberOptions::two_phase, LogicalRepInfos::two_phase, usage(), and wait_for_end_recovery().

modify_subscriber_sysid()

Definition at line 658 of file pg_createsubscriber.c.

659{

661 bool crc_ok;

662 struct timeval tv;

663

664 char *cmd_str;

665

666 pg_log_info("modifying system identifier of subscriber");

667

669 if (!crc_ok)

670 pg_fatal("control file appears to be corrupt");

671

672

673

674

675

676

681

684

685 pg_log_info("system identifier is %" PRIu64 " on subscriber",

687

688 pg_log_info("running pg_resetwal on the subscriber");

689

692

693 pg_log_debug("pg_resetwal command is: %s", cmd_str);

694

696 {

697 int rc = system(cmd_str);

698

699 if (rc == 0)

700 pg_log_info("subscriber successfully changed the system identifier");

701 else

703 }

704

706}

void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)

char * wait_result_to_str(int exitstatus)

int gettimeofday(struct timeval *tp, void *tzp)

References DEVNULL, dry_run, get_controlfile(), gettimeofday(), pg_fatal, pg_free(), pg_log_debug, pg_log_info, pg_resetwal_path, psprintf(), subscriber_dir, ControlFileData::system_identifier, update_controlfile(), and wait_result_to_str().

Referenced by main().

pg_ctl_status()

static void pg_ctl_status ( const char * pg_ctl_cmd, int rc ) static

Definition at line 1449 of file pg_createsubscriber.c.

1450{

1451 if (rc != 0)

1452 {

1454 {

1456 }

1458 {

1459#if defined(WIN32)

1460 pg_log_error("pg_ctl was terminated by exception 0x%X",

1462 pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");

1463#else

1464 pg_log_error("pg_ctl was terminated by signal %d: %s",

1466#endif

1467 }

1468 else

1469 {

1470 pg_log_error("pg_ctl exited with unrecognized status %d", rc);

1471 }

1472

1474 exit(1);

1475 }

1476}

const char * pg_strsignal(int signum)

References pg_log_error, pg_log_error_detail, pg_strsignal(), WEXITSTATUS, WIFEXITED, WIFSIGNALED, and WTERMSIG.

Referenced by start_standby_server(), and stop_standby_server().

server_is_in_recovery()

static bool server_is_in_recovery ( PGconn * conn) static

Definition at line 842 of file pg_createsubscriber.c.

843{

845 int ret;

846

847 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");

848

850 {

851 pg_log_error("could not obtain recovery progress: %s",

854 }

855

856

857 ret = strcmp("t", PQgetvalue(res, 0, 0));

858

860

861 return ret == 0;

862}

References conn, disconnect_database(), pg_log_error, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQresultErrorMessage(), and PQresultStatus().

Referenced by check_publisher(), check_subscriber(), and wait_for_end_recovery().

set_replication_progress()

static void set_replication_progress ( PGconn * conn, const struct LogicalRepInfo * dbinfo, const char * lsn ) static

Definition at line 1840 of file pg_createsubscriber.c.

1841{

1844 Oid suboid;

1847 char *originname;

1848 char *lsnstr;

1849

1851

1854

1856 "SELECT s.oid FROM pg_catalog.pg_subscription s "

1857 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "

1858 "WHERE s.subname = %s AND d.datname = %s",

1860

1863 {

1864 pg_log_error("could not obtain subscription OID: %s",

1867 }

1868

1870 {

1871 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",

1874 }

1875

1877 {

1880 }

1881 else

1882 {

1883 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);

1884 lsnstr = psprintf("%s", lsn);

1885 }

1886

1888

1889

1890

1891

1892

1893 originname = psprintf("pg_%u", suboid);

1894

1895 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",

1896 originname, lsnstr, dbinfo->dbname);

1897

1900 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",

1901 originname, lsnstr);

1902

1904

1906 {

1909 {

1910 pg_log_error("could not set replication progress for subscription \"%s\": %s",

1913 }

1915 }

1916

1922}

#define LSN_FORMAT_ARGS(lsn)

#define InvalidXLogRecPtr

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, InvalidOid, InvalidXLogRecPtr, LSN_FORMAT_ARGS, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), psprintf(), resetPQExpBuffer(), str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

setup_publisher()

static char * setup_publisher ( struct LogicalRepInfo * dbinfo) static

Definition at line 763 of file pg_createsubscriber.c.

764{

765 char *lsn = NULL;

766

768

770 {

772 char *genname = NULL;

773

775

776

777

778

779

780

781

782

791

792

793

794

795

796

797

799

800

801 if (lsn)

804 if (lsn != NULL || dry_run)

805 pg_log_info("create replication slot \"%s\" on publisher",

806 dbinfo[i].replslotname);

807 else

808 exit(1);

809

810

811

812

813

814

815

816

817

819 {

821

822 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");

824 {

825 pg_log_error("could not write an additional WAL record: %s",

828 }

830 }

831

833 }

834

835 return lsn;

836}

static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)

static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)

static char * generate_object_name(PGconn *conn)

void pg_prng_seed(pg_prng_state *state, uint64 seed)

References conn, connect_database(), create_logical_replication_slot(), create_publication(), disconnect_database(), dry_run, generate_object_name(), i, num_dbs, num_pubs, num_replslots, num_subs, pg_free(), pg_log_error, pg_log_info, pg_prng_seed(), pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQresultErrorMessage(), PQresultStatus(), prng_state, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, LogicalRepInfo::subname, and subname.

Referenced by main().

setup_recovery()

static void setup_recovery ( const struct LogicalRepInfo * dbinfo, const char * datadir, const char * lsn ) static

Definition at line 1227 of file pg_createsubscriber.c.

1228{

1231

1232

1233

1234

1235

1236

1238

1239

1240

1241

1242

1243

1244

1245

1246

1247

1248

1252 "recovery_target_timeline = 'latest'\n");

1254 "recovery_target_inclusive = true\n");

1256 "recovery_target_action = promote\n");

1260

1262 {

1265 "recovery_target_lsn = '%X/%X'\n",

1267 }

1268 else

1269 {

1271 lsn);

1273 }

1275

1277}

static PQExpBuffer recoveryconfcontents

void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)

PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)

References appendPQExpBuffer(), appendPQExpBufferStr(), conn, connect_database(), PQExpBufferData::data, datadir, disconnect_database(), dry_run, GenerateRecoveryConfig(), InvalidXLogRecPtr, LSN_FORMAT_ARGS, pg_log_debug, recoveryconfcontents, and WriteRecoveryConfig().

Referenced by main().

setup_subscriber()

static void setup_subscriber ( struct LogicalRepInfo * dbinfo, const char * consistent_lsn ) static

Definition at line 1191 of file pg_createsubscriber.c.

1192{

1194 {

1196

1197

1199

1200

1201

1202

1203

1204

1205

1207

1208

1210

1212

1213

1215

1216

1218

1220 }

1221}

static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)

static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)

static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)

static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)

static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)

References check_and_drop_existing_subscriptions(), check_and_drop_publications(), conn, connect_database(), create_subscription(), disconnect_database(), enable_subscription(), i, num_dbs, and set_replication_progress().

Referenced by main().

start_standby_server()

static void start_standby_server ( const struct CreateSubscriberOptions * opt, bool restricted_access, bool restrict_logical_worker ) static

Definition at line 1479 of file pg_createsubscriber.c.

1481{

1483 int rc;

1484

1487 appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");

1488

1489

1490 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");

1491

1492 if (restricted_access)

1493 {

1495#if !defined(WIN32)

1496

1497

1498

1499

1500

1501

1502

1503 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");

1505 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",

1508#endif

1509 }

1513

1514

1515 if (restrict_logical_worker)

1516 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");

1517

1519 rc = system(pg_ctl_cmd->data);

1524}

static void pg_ctl_status(const char *pg_ctl_cmd, int rc)

void appendShellString(PQExpBuffer buf, const char *str)

References appendPQExpBuffer(), appendPQExpBufferChar(), appendPQExpBufferStr(), appendShellString(), CreateSubscriberOptions::config_file, createPQExpBuffer(), PQExpBufferData::data, destroyPQExpBuffer(), pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, CreateSubscriberOptions::socket_dir, standby_running, CreateSubscriberOptions::sub_port, and subscriber_dir.

Referenced by main().

stop_standby_server()

static void stop_standby_server ( const char * datadir) static

store_pub_sub_info()

Definition at line 463 of file pg_createsubscriber.c.

466{

471 int i = 0;

472

474

481

483 {

484 char *conninfo;

485

486

489 dbinfo[i].dbname = cell->val;

492 else

496 else

500

505 else

507

508

509 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,

513 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,

517

519 pubcell = pubcell->next;

521 subcell = subcell->next;

523 replslotcell = replslotcell->next;

524

525 i++;

526 }

527

528 return dbinfo;

529}

#define pg_malloc_array(type, count)

char val[FLEXIBLE_ARRAY_MEMBER]

References concat_conninfo_dbname(), CreateSubscriberOptions::database_names, dbinfos, LogicalRepInfo::dbname, SimpleStringList::head, i, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, SimpleStringListCell::next, num_dbs, num_pubs, num_replslots, num_subs, pg_log_debug, pg_malloc_array, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, CreateSubscriberOptions::replslot_names, LogicalRepInfo::replslotname, CreateSubscriberOptions::sub_names, LogicalRepInfo::subconninfo, LogicalRepInfo::subname, subname, LogicalRepInfos::two_phase, and SimpleStringListCell::val.

Referenced by main().

usage()

static void usage ( void ) static

Definition at line 242 of file pg_createsubscriber.c.

243{

244 printf(_("%s creates a new logical replica from a standby server.\n\n"),

248 printf(_("\nOptions:\n"));

249 printf(_(" -a, --all create subscriptions for all databases except template\n"

250 " databases or databases that don't allow connections\n"));

251 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));

252 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));

253 printf(_(" -n, --dry-run dry run, just show what would be done\n"));

254 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);

255 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));

256 printf(_(" -R, --remove=OBJECTTYPE remove all objects of the specified type from specified\n"

257 " databases on the subscriber; accepts: publications\n"));

258 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));

259 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));

260 printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));

261 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));

262 printf(_(" -v, --verbose output verbose messages\n"));

263 printf(_(" --config-file=FILENAME use specified main server configuration\n"

264 " file when running target cluster\n"));

265 printf(_(" --publication=NAME publication name\n"));

266 printf(_(" --replication-slot=NAME replication slot name\n"));

267 printf(_(" --subscription=NAME subscription name\n"));

268 printf(_(" -V, --version output version information, then exit\n"));

269 printf(_(" -?, --help show this help, then exit\n"));

270 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);

271 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);

272}

References _, DEFAULT_SUB_PORT, printf, and progname.

Referenced by main().

wait_for_end_recovery()

Definition at line 1551 of file pg_createsubscriber.c.

1552{

1555 int timer = 0;

1556

1557 pg_log_info("waiting for the target server to reach the consistent state");

1558

1560

1561 for (;;)

1562 {

1564

1565

1566

1567

1568

1569 if (!in_recovery || dry_run)

1570 {

1573 break;

1574 }

1575

1576

1578 {

1582 }

1583

1584

1586

1588 }

1589

1591

1593 pg_fatal("server did not end recovery");

1594

1595 pg_log_info("target server reached the consistent state");

1596 pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");

1597}

#define pg_log_info_hint(...)

void pg_usleep(long microsec)

References conn, connect_database(), disconnect_database(), dry_run, pg_fatal, pg_log_error, pg_log_info, pg_log_info_hint, pg_usleep(), POSTMASTER_READY, POSTMASTER_STILL_STARTING, recovery_ended, CreateSubscriberOptions::recovery_timeout, server_is_in_recovery(), stop_standby_server(), subscriber_dir, USEC_PER_SEC, and WAIT_INTERVAL.

Referenced by main().

dbinfos

dry_run

Definition at line 137 of file pg_createsubscriber.c.

Referenced by check_and_drop_publications(), check_publisher(), copy_file(), create_logical_replication_slot(), create_publication(), create_subscription(), create_target_dir(), create_target_symlink(), debug_reconstruction(), drop_existing_subscriptions(), drop_publication(), drop_replication_slot(), enable_subscription(), main(), modify_subscriber_sysid(), open_target_file(), reconstruct_from_incremental_file(), remove_target_dir(), remove_target_file(), remove_target_symlink(), set_replication_progress(), setup_publisher(), setup_recovery(), sync_target_dir(), truncate_target_file(), wait_for_end_recovery(), write_reconstructed_file(), and write_target_range().

num_dbs

num_pubs

num_replslots

num_subs

pg_ctl_path

pg_resetwal_path

char* pg_resetwal_path = NULL static

primary_slot_name

char* primary_slot_name = NULL static

prng_state

progname

recovery_ended

bool recovery_ended = false static

standby_running

bool standby_running = false static

subscriber_dir

char* subscriber_dir = NULL static

success