PostgreSQL Source Code: src/bin/pg_basebackup/pg_createsubscriber.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

15

16#include <sys/stat.h>

20

30

31#define DEFAULT_SUB_PORT "50432"

32#define OBJECTTYPE_PUBLICATIONS 0x0001

33

34

36{

39 char *socket_dir;

40 char *sub_port;

42 bool two_phase;

50};

51

52

54{

55 char *dbname;

56 char *pubconninfo;

57 char *subconninfo;

58 char *pubname;

59 char *subname;

61

64};

65

66

67

68

69

71{

73 bool two_phase;

75

76};

77

79static void usage();

86 const char *pub_base_conninfo,

87 const char *sub_base_conninfo);

99 const char *consistent_lsn);

101 const char *lsn);

103 const char *slotname);

108 const char *slot_name);

109static void pg_ctl_status(const char *pg_ctl_cmd, int rc);

111 bool restricted_access,

112 bool restrict_logical_worker);

122 const char *lsn);

129 bool dbnamespecified);

130

131#define USEC_PER_SEC 1000000

132#define WAIT_INTERVAL 1

133

135

138

140

142static int num_dbs = 0;

143static int num_pubs = 0;

144static int num_subs = 0;

145static int num_replslots = 0;

146

148

151

152

154

157

159{

163

164

165

166

167

168

169

170

171

172

173

174

175

176static void

178{

180 return;

181

182

183

184

185

186

188 {

190 pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "

191 "You must recreate the physical replica before continuing.");

192 }

193

195 {

197

199 {

201

203 if (conn != NULL)

204 {

211 }

212 else

213 {

214

215

216

217

218

220 {

221 pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",

225 }

227 {

228 pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",

231 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");

232 }

233 }

234 }

235 }

236

239}

240

241static void

243{

244 printf(_("%s creates a new logical replica from a standby server.\n\n"),

248 printf(_("\nOptions:\n"));

249 printf(_(" -a, --all create subscriptions for all databases except template\n"

250 " databases or databases that don't allow connections\n"));

251 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));

252 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));

253 printf(_(" -n, --dry-run dry run, just show what would be done\n"));

254 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);

255 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));

256 printf(_(" -R, --remove=OBJECTTYPE remove all objects of the specified type from specified\n"

257 " databases on the subscriber; accepts: publications\n"));

258 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));

259 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));

260 printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));

261 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));

262 printf(_(" -v, --verbose output verbose messages\n"));

263 printf(_(" --config-file=FILENAME use specified main server configuration\n"

264 " file when running target cluster\n"));

265 printf(_(" --publication=NAME publication name\n"));

266 printf(_(" --replication-slot=NAME replication slot name\n"));

267 printf(_(" --subscription=NAME subscription name\n"));

268 printf(_(" -V, --version output version information, then exit\n"));

269 printf(_(" -?, --help show this help, then exit\n"));

270 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);

271 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);

272}

273

274

275

276

277

278static void

280{

281 if (buf->len > 0)

286}

287

288

289

290

291

292

293

294

295

296

297

298

299

300static char *

302{

307 char *ret;

308

310 if (conn_opts == NULL)

311 {

314 return NULL;

315 }

316

318 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)

319 {

320 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')

321 {

322 if (strcmp(conn_opt->keyword, "dbname") == 0)

323 {

326 continue;

327 }

329 }

330 }

331

333

336

337 return ret;

338}

339

340

341

342

343

344static char *

346{

348 char *ret;

349

351#if !defined(WIN32)

353#endif

357

359

361

362 return ret;

363}

364

365

366

367

368

369

370static char *

372{

373 char *versionstr;

375 int ret;

376

377 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);

380

381 if (ret < 0)

382 {

384

387

388 if (ret == -1)

389 pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",

390 progname, "pg_createsubscriber", full_path);

391 else

392 pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",

393 progname, full_path, "pg_createsubscriber");

394 }

395

397

399}

400

401

402

403

404

405

406static void

408{

409 struct stat statbuf;

411

412 pg_log_info("checking if directory \"%s\" is a cluster data directory",

414

416 {

417 if (errno == ENOENT)

418 pg_fatal("data directory \"%s\" does not exist", datadir);

419 else

420 pg_fatal("could not access directory \"%s\": %m", datadir);

421 }

422

424 if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)

425 {

426 pg_fatal("directory \"%s\" is not a database cluster directory",

428 }

429}

430

431

432

433

434

435

436

437

438static char *

440{

442 char *ret;

443

444 Assert(conninfo != NULL);

445

448

451

452 return ret;

453}

454

455

456

457

458

459

460

461

464 const char *pub_base_conninfo,

465 const char *sub_base_conninfo)

466{

471 int i = 0;

472

474

481

483 {

484 char *conninfo;

485

486

489 dbinfo[i].dbname = cell->val;

492 else

496 else

500

505 else

507

508

509 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,

513 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,

517

519 pubcell = pubcell->next;

521 subcell = subcell->next;

523 replslotcell = replslotcell->next;

524

525 i++;

526 }

527

528 return dbinfo;

529}

530

531

532

533

534

537{

540

543 {

544 pg_log_error("connection to database failed: %s",

547

548 if (exit_on_error)

549 exit(1);

550 return NULL;

551 }

552

553

556 {

557 pg_log_error("could not clear \"search_path\": %s",

561

562 if (exit_on_error)

563 exit(1);

564 return NULL;

565 }

567

569}

570

571

572

573

574

575static void

577{

579

581

582 if (exit_on_error)

583 exit(1);

584}

585

586

587

588

589

592{

596

597 pg_log_info("getting system identifier from publisher");

598

600

601 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");

603 {

604 pg_log_error("could not get system identifier: %s",

607 }

609 {

610 pg_log_error("could not get system identifier: got %d rows, expected %d row",

613 }

614

615 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);

616

617 pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);

618

621

622 return sysid;

623}

624

625

626

627

628

629

632{

634 bool crc_ok;

636

637 pg_log_info("getting system identifier from subscriber");

638

640 if (!crc_ok)

641 pg_fatal("control file appears to be corrupt");

642

644

645 pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);

646

648

649 return sysid;

650}

651

652

653

654

655

656

657static void

659{

661 bool crc_ok;

662 struct timeval tv;

663

664 char *cmd_str;

665

666 pg_log_info("modifying system identifier of subscriber");

667

669 if (!crc_ok)

670 pg_fatal("control file appears to be corrupt");

671

672

673

674

675

676

681

684

685 pg_log_info("system identifier is %" PRIu64 " on subscriber",

687

688 pg_log_info("running pg_resetwal on the subscriber");

689

692

693 pg_log_debug("pg_resetwal command is: %s", cmd_str);

694

696 {

697 int rc = system(cmd_str);

698

699 if (rc == 0)

700 pg_log_info("subscriber successfully changed the system identifier");

701 else

703 }

704

706}

707

708

709

710

711

712

713static char *

715{

719 char *objname;

720

722 "SELECT oid FROM pg_catalog.pg_database "

723 "WHERE datname = pg_catalog.current_database()");

725 {

726 pg_log_error("could not obtain database OID: %s",

729 }

730

732 {

733 pg_log_error("could not obtain database OID: got %d rows, expected %d row",

736 }

737

738

739 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);

740

742

743

745

746

747

748

749

750

751 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);

752

753 return objname;

754}

755

756

757

758

759

760

761

762static char *

764{

765 char *lsn = NULL;

766

768

770 {

772 char *genname = NULL;

773

775

776

777

778

779

780

781

782

791

792

793

794

795

796

797

799

800

801 if (lsn)

804 if (lsn != NULL || dry_run)

805 pg_log_info("create replication slot \"%s\" on publisher",

806 dbinfo[i].replslotname);

807 else

808 exit(1);

809

810

811

812

813

814

815

816

817

819 {

821

822 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");

824 {

825 pg_log_error("could not write an additional WAL record: %s",

828 }

830 }

831

833 }

834

835 return lsn;

836}

837

838

839

840

841static bool

843{

845 int ret;

846

847 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");

848

850 {

851 pg_log_error("could not obtain recovery progress: %s",

854 }

855

856

857 ret = strcmp("t", PQgetvalue(res, 0, 0));

858

860

861 return ret == 0;

862}

863

864

865

866

867

868

869static void

871{

874 bool failed = false;

875

877 int max_repslots;

878 int cur_repslots;

879 int max_walsenders;

880 int cur_walsenders;

881 int max_prepared_transactions;

882 char *max_slot_wal_keep_size;

883

884 pg_log_info("checking settings on publisher");

885

887

888

889

890

891

893 {

894 pg_log_error("primary server cannot be in recovery");

896 }

897

898

899

900

901

902

903

904

905

906

907

908

910 "SELECT pg_catalog.current_setting('wal_level'),"

911 " pg_catalog.current_setting('max_replication_slots'),"

912 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"

913 " pg_catalog.current_setting('max_wal_senders'),"

914 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"

915 " pg_catalog.current_setting('max_prepared_transactions'),"

916 " pg_catalog.current_setting('max_slot_wal_keep_size')");

917

919 {

920 pg_log_error("could not obtain publisher settings: %s",

923 }

924

926 max_repslots = atoi(PQgetvalue(res, 0, 1));

927 cur_repslots = atoi(PQgetvalue(res, 0, 2));

928 max_walsenders = atoi(PQgetvalue(res, 0, 3));

929 cur_walsenders = atoi(PQgetvalue(res, 0, 4));

930 max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));

932

934

936 pg_log_debug("publisher: max_replication_slots: %d", max_repslots);

937 pg_log_debug("publisher: current replication slots: %d", cur_repslots);

938 pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);

939 pg_log_debug("publisher: current wal senders: %d", cur_walsenders);

940 pg_log_debug("publisher: max_prepared_transactions: %d",

941 max_prepared_transactions);

942 pg_log_debug("publisher: max_slot_wal_keep_size: %s",

943 max_slot_wal_keep_size);

944

946

947 if (strcmp(wal_level, "logical") != 0)

948 {

949 pg_log_error("publisher requires \"wal_level\" >= \"logical\"");

950 failed = true;

951 }

952

953 if (max_repslots - cur_repslots < num_dbs)

954 {

955 pg_log_error("publisher requires %d replication slots, but only %d remain",

956 num_dbs, max_repslots - cur_repslots);

957 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",

958 "max_replication_slots", cur_repslots + num_dbs);

959 failed = true;

960 }

961

962 if (max_walsenders - cur_walsenders < num_dbs)

963 {

964 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",

965 num_dbs, max_walsenders - cur_walsenders);

966 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",

967 "max_wal_senders", cur_walsenders + num_dbs);

968 failed = true;

969 }

970

972 {

973 pg_log_warning("two_phase option will not be enabled for replication slots");

974 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "

975 "Prepared transactions will be replicated at COMMIT PREPARED.");

976 pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");

977 }

978

979

980

981

982

983

984 if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))

985 {

986 pg_log_warning("required WAL could be removed from the publisher");

987 pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",

988 "max_slot_wal_keep_size");

989 }

990

992

993 if (failed)

994 exit(1);

995}

996

997

998

999

1000

1001

1002

1003

1004

1005

1006

1007

1008static void

1010{

1013 bool failed = false;

1014

1015 int max_lrworkers;

1016 int max_reporigins;

1017 int max_wprocs;

1018

1019 pg_log_info("checking settings on subscriber");

1020

1022

1023

1025 {

1026 pg_log_error("target server must be a standby");

1028 }

1029

1030

1031

1032

1033

1034

1035

1036

1037

1038

1039

1041 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("

1042 "'max_logical_replication_workers', "

1043 "'max_active_replication_origins', "

1044 "'max_worker_processes', "

1045 "'primary_slot_name') "

1046 "ORDER BY name");

1047

1049 {

1050 pg_log_error("could not obtain subscriber settings: %s",

1053 }

1054

1055 max_reporigins = atoi(PQgetvalue(res, 0, 0));

1056 max_lrworkers = atoi(PQgetvalue(res, 1, 0));

1057 max_wprocs = atoi(PQgetvalue(res, 2, 0));

1058 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)

1060

1061 pg_log_debug("subscriber: max_logical_replication_workers: %d",

1062 max_lrworkers);

1063 pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);

1064 pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);

1067

1069

1071

1072 if (max_reporigins < num_dbs)

1073 {

1074 pg_log_error("subscriber requires %d active replication origins, but only %d remain",

1075 num_dbs, max_reporigins);

1076 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",

1077 "max_active_replication_origins", num_dbs);

1078 failed = true;

1079 }

1080

1081 if (max_lrworkers < num_dbs)

1082 {

1083 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",

1084 num_dbs, max_lrworkers);

1085 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",

1086 "max_logical_replication_workers", num_dbs);

1087 failed = true;

1088 }

1089

1090 if (max_wprocs < num_dbs + 1)

1091 {

1092 pg_log_error("subscriber requires %d worker processes, but only %d remain",

1093 num_dbs + 1, max_wprocs);

1094 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",

1095 "max_worker_processes", num_dbs + 1);

1096 failed = true;

1097 }

1098

1099 if (failed)

1100 exit(1);

1101}

1102

1103

1104

1105

1106

1107

1108

1109static void

1111{

1114

1116

1117

1118

1119

1120

1123 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",

1126

1127 pg_log_info("dropping subscription \"%s\" in database \"%s\"",

1129

1131 {

1133

1135 {

1136 pg_log_error("could not drop subscription \"%s\": %s",

1139 }

1140

1142 }

1143

1145}

1146

1147

1148

1149

1150static void

1153{

1157

1159

1161

1163 "SELECT s.subname FROM pg_catalog.pg_subscription s "

1164 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "

1165 "WHERE d.datname = %s",

1168

1170 {

1171 pg_log_error("could not obtain pre-existing subscriptions: %s",

1174 }

1175

1179

1183}

1184

1185

1186

1187

1188

1189

1190static void

1192{

1194 {

1196

1197

1199

1200

1201

1202

1203

1204

1205

1207

1208

1210

1212

1213

1215

1216

1218

1220 }

1221}

1222

1223

1224

1225

1226static void

1228{

1231

1232

1233

1234

1235

1236

1238

1239

1240

1241

1242

1243

1244

1245

1246

1247

1248

1252 "recovery_target_timeline = 'latest'\n");

1254 "recovery_target_inclusive = true\n");

1256 "recovery_target_action = promote\n");

1260

1262 {

1265 "recovery_target_lsn = '%X/%X'\n",

1267 }

1268 else

1269 {

1271 lsn);

1273 }

1275

1277}

1278

1279

1280

1281

1282

1283

1284

1285

1286static void

1288{

1290

1291

1293 return;

1294

1296 if (conn != NULL)

1297 {

1300 }

1301 else

1302 {

1303 pg_log_warning("could not drop replication slot \"%s\" on primary",

1304 slotname);

1305 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");

1306 }

1307}

1308

1309

1310

1311

1312

1313

1314

1315

1316static void

1318{

1321

1323 if (conn != NULL)

1324 {

1325

1327 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");

1328

1330 {

1331

1334 }

1335 else

1336 {

1337 pg_log_warning("could not obtain failover replication slot information: %s",

1339 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");

1340 }

1341

1344 }

1345 else

1346 {

1347 pg_log_warning("could not drop failover replication slot");

1348 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");

1349 }

1350}

1351

1352

1353

1354

1355

1356

1357

1358static char *

1360{

1363 const char *slot_name = dbinfo->replslotname;

1364 char *slot_name_esc;

1365 char *lsn = NULL;

1366

1368

1369 pg_log_info("creating the replication slot \"%s\" in database \"%s\"",

1370 slot_name, dbinfo->dbname);

1371

1373

1375 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",

1376 slot_name_esc,

1378

1380

1382

1384 {

1387 {

1388 pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",

1389 slot_name, dbinfo->dbname,

1393 return NULL;

1394 }

1395

1398 }

1399

1400

1402

1404

1405 return lsn;

1406}

1407

1408static void

1410 const char *slot_name)

1411{

1413 char *slot_name_esc;

1415

1417

1418 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",

1419 slot_name, dbinfo->dbname);

1420

1422

1423 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);

1424

1426

1428

1430 {

1433 {

1434 pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",

1436 dbinfo->made_replslot = false;

1437 }

1438

1440 }

1441

1443}

1444

1445

1446

1447

1448static void

1450{

1451 if (rc != 0)

1452 {

1454 {

1456 }

1458 {

1459#if defined(WIN32)

1460 pg_log_error("pg_ctl was terminated by exception 0x%X",

1462 pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");

1463#else

1464 pg_log_error("pg_ctl was terminated by signal %d: %s",

1466#endif

1467 }

1468 else

1469 {

1470 pg_log_error("pg_ctl exited with unrecognized status %d", rc);

1471 }

1472

1474 exit(1);

1475 }

1476}

1477

1478static void

1480 bool restrict_logical_worker)

1481{

1483 int rc;

1484

1487 appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");

1488

1489

1490 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");

1491

1492 if (restricted_access)

1493 {

1495#if !defined(WIN32)

1496

1497

1498

1499

1500

1501

1502

1503 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");

1505 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",

1508#endif

1509 }

1513

1514

1515 if (restrict_logical_worker)

1516 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");

1517

1519 rc = system(pg_ctl_cmd->data);

1524}

1525

1526static void

1528{

1529 char *pg_ctl_cmd;

1530 int rc;

1531

1534 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);

1535 rc = system(pg_ctl_cmd);

1539}

1540

1541

1542

1543

1544

1545

1546

1547

1548

1549

1550static void

1552{

1555 int timer = 0;

1556

1557 pg_log_info("waiting for the target server to reach the consistent state");

1558

1560

1561 for (;;)

1562 {

1564

1565

1566

1567

1568

1569 if (!in_recovery || dry_run)

1570 {

1573 break;

1574 }

1575

1576

1578 {

1582 }

1583

1584

1586

1588 }

1589

1591

1593 pg_fatal("server did not end recovery");

1594

1595 pg_log_info("target server reached the consistent state");

1596 pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");

1597}

1598

1599

1600

1601

1602static void

1604{

1607 char *ipubname_esc;

1608 char *spubname_esc;

1609

1611

1614

1615

1617 "SELECT 1 FROM pg_catalog.pg_publication "

1618 "WHERE pubname = %s",

1619 spubname_esc);

1622 {

1623 pg_log_error("could not obtain publication information: %s",

1626 }

1627

1629 {

1630

1631

1632

1633

1634

1635

1636

1638 pg_log_error_hint("Consider renaming this publication before continuing.");

1640 }

1641

1644

1645 pg_log_info("creating publication \"%s\" in database \"%s\"",

1647

1649 ipubname_esc);

1650

1652

1654 {

1657 {

1658 pg_log_error("could not create publication \"%s\" in database \"%s\": %s",

1661 }

1663 }

1664

1665

1667

1671}

1672

1673

1674

1675

1676static void

1678 bool *made_publication)

1679{

1682 char *pubname_esc;

1683

1685

1687

1688 pg_log_info("dropping publication \"%s\" in database \"%s\"",

1690

1692

1694

1696

1698 {

1701 {

1702 pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",

1704 *made_publication = false;

1705

1706

1707

1708

1709

1710

1711

1712

1713 }

1715 }

1716

1718}

1719

1720

1721

1722

1723

1724

1725

1726

1727

1728

1729static void

1731{

1734

1736

1737 if (drop_all_pubs)

1738 {

1739 pg_log_info("dropping all existing publications in database \"%s\"",

1741

1742

1743 res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");

1745 {

1746 pg_log_error("could not obtain publication information: %s",

1750 }

1751

1752

1756

1758 }

1759

1760

1761

1762

1763

1764 if (!drop_all_pubs || dry_run)

1767}

1768

1769

1770

1771

1772

1773

1774

1775

1776

1777

1778

1779

1780static void

1782{

1785 char *pubname_esc;

1786 char *subname_esc;

1787 char *pubconninfo_esc;

1788 char *replslotname_esc;

1789

1791

1796

1797 pg_log_info("creating subscription \"%s\" in database \"%s\"",

1799

1801 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "

1802 "WITH (create_slot = false, enabled = false, "

1803 "slot_name = %s, copy_data = false, two_phase = %s)",

1804 subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,

1806

1811

1813

1815 {

1818 {

1819 pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",

1822 }

1824 }

1825

1827}

1828

1829

1830

1831

1832

1833

1834

1835

1836

1837

1838

1839static void

1841{

1844 Oid suboid;

1847 char *originname;

1848 char *lsnstr;

1849

1851

1854

1856 "SELECT s.oid FROM pg_catalog.pg_subscription s "

1857 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "

1858 "WHERE s.subname = %s AND d.datname = %s",

1860

1863 {

1864 pg_log_error("could not obtain subscription OID: %s",

1867 }

1868

1870 {

1871 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",

1874 }

1875

1877 {

1880 }

1881 else

1882 {

1883 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);

1884 lsnstr = psprintf("%s", lsn);

1885 }

1886

1888

1889

1890

1891

1892

1893 originname = psprintf("pg_%u", suboid);

1894

1895 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",

1896 originname, lsnstr, dbinfo->dbname);

1897

1900 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",

1901 originname, lsnstr);

1902

1904

1906 {

1909 {

1910 pg_log_error("could not set replication progress for subscription \"%s\": %s",

1913 }

1915 }

1916

1922}

1923

1924

1925

1926

1927

1928

1929

1930static void

1932{

1936

1938

1940

1941 pg_log_info("enabling subscription \"%s\" in database \"%s\"",

1943

1945

1947

1949 {

1952 {

1953 pg_log_error("could not enable subscription \"%s\": %s",

1956 }

1957

1959 }

1960

1963}

1964

1965

1966

1967

1968

1969

1970static void

1972 bool dbnamespecified)

1973{

1976

1977

1978 if (dbnamespecified)

1980 else

1981 {

1982

1983 char *conninfo;

1984

1989 {

1993 }

1994 }

1995

1996 res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");

1998 {

2002 }

2003

2005 {

2007

2009

2010

2012 }

2013

2016}

2017

2018int

2020{

2021 static struct option long_options[] =

2022 {

2032 {"enable-two-phase", no_argument, NULL, 'T'},

2041 {NULL, 0, NULL, 0}

2042 };

2043

2045

2046 int c;

2047 int option_index;

2048

2049 char *pub_base_conninfo;

2050 char *sub_base_conninfo;

2051 char *dbname_conninfo = NULL;

2052

2055 struct stat statbuf;

2056

2057 char *consistent_lsn;

2058

2060

2065

2066 if (argc > 1)

2067 {

2068 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)

2069 {

2071 exit(0);

2072 }

2073 else if (strcmp(argv[1], "-V") == 0

2074 || strcmp(argv[1], "--version") == 0)

2075 {

2076 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);

2077 exit(0);

2078 }

2079 }

2080

2081

2090 {

2091 0

2092 };

2095

2096

2097

2098

2099

2100#ifndef WIN32

2101 if (geteuid() == 0)

2102 {

2103 pg_log_error("cannot be executed by \"root\"");

2104 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",

2106 exit(1);

2107 }

2108#endif

2109

2111

2112 while ((c = getopt_long(argc, argv, "ad:D:np:P:R:s:t:TU:v",

2113 long_options, &option_index)) != -1)

2114 {

2115 switch (c)

2116 {

2117 case 'a':

2119 break;

2120 case 'd':

2122 {

2125 }

2126 else

2127 pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);

2128 break;

2129 case 'D':

2132 break;

2133 case 'n':

2135 break;

2136 case 'p':

2138 break;

2139 case 'P':

2141 break;

2142 case 'R':

2145 else

2146 pg_fatal("object type \"%s\" is specified more than once for -R/--remove", optarg);

2147 break;

2148 case 's':

2151 break;

2152 case 't':

2154 break;

2155 case 'T':

2157 break;

2158 case 'U':

2160 break;

2161 case 'v':

2163 break;

2164 case 1:

2166 break;

2167 case 2:

2169 {

2172 }

2173 else

2174 pg_fatal("publication \"%s\" specified more than once for --publication", optarg);

2175 break;

2176 case 3:

2178 {

2181 }

2182 else

2183 pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);

2184 break;

2185 case 4:

2187 {

2190 }

2191 else

2192 pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);

2193 break;

2194 default:

2195

2197 exit(1);

2198 }

2199 }

2200

2201

2203 {

2204 char *bad_switch = NULL;

2205

2207 bad_switch = "--database";

2209 bad_switch = "--publication";

2211 bad_switch = "--replication-slot";

2213 bad_switch = "--subscription";

2214

2215 if (bad_switch)

2216 {

2217 pg_log_error("%s cannot be used with -a/--all", bad_switch);

2219 exit(1);

2220 }

2221 }

2222

2223

2225 {

2226 pg_log_error("too many command-line arguments (first is \"%s\")",

2229 exit(1);

2230 }

2231

2232

2234 {

2235 pg_log_error("no subscriber data directory specified");

2237 exit(1);

2238 }

2239

2240

2242 {

2244

2246 pg_fatal("could not determine current directory");

2249 }

2250

2251

2252

2253

2254

2256 {

2257

2258

2259

2260

2261

2262

2263 pg_log_error("no publisher connection string specified");

2265 exit(1);

2266 }

2267 pg_log_info("validating publisher connection string");

2269 &dbname_conninfo);

2270 if (pub_base_conninfo == NULL)

2271 exit(1);

2272

2273 pg_log_info("validating subscriber connection string");

2275

2276

2277

2278

2279

2280

2282 {

2283 bool dbnamespecified = (dbname_conninfo != NULL);

2284

2286 }

2287

2289 {

2290 pg_log_info("no database was specified");

2291

2292

2293

2294

2295

2296 if (dbname_conninfo)

2297 {

2300

2301 pg_log_info("database name \"%s\" was extracted from the publisher connection string",

2302 dbname_conninfo);

2303 }

2304 else

2305 {

2309 exit(1);

2310 }

2311 }

2312

2313

2315 {

2316 pg_log_error("wrong number of publication names specified");

2317 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",

2319 exit(1);

2320 }

2322 {

2323 pg_log_error("wrong number of subscription names specified");

2324 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",

2326 exit(1);

2327 }

2329 {

2330 pg_log_error("wrong number of replication slot names specified");

2331 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",

2333 exit(1);

2334 }

2335

2336

2338 {

2339 if (pg_strcasecmp(cell->val, "publications") == 0)

2341 else

2342 {

2343 pg_log_error("invalid object type \"%s\" specified for -R/--remove", cell->val);

2345 exit(1);

2346 }

2347 }

2348

2349

2352

2353

2355

2357

2358

2359

2360

2361

2362

2364

2365

2367

2368

2369

2370

2371

2374 if (pub_sysid != sub_sysid)

2375 pg_fatal("subscriber data directory is not a copy of the source database cluster");

2376

2377

2379

2380

2381

2382

2383

2384

2385

2386 if (stat(pidfile, &statbuf) == 0)

2387 {

2390 exit(1);

2391 }

2392

2393

2394

2395

2396

2397

2398 pg_log_info("starting the standby server with command-line options");

2400

2401

2403

2404

2406

2407

2408

2409

2410

2411

2412

2413

2416

2417

2419

2420

2422

2423

2424

2425

2426

2427

2430

2431

2433

2434

2435

2436

2437

2438

2439

2441

2442

2444

2445

2447

2448

2451

2452

2454

2456

2458

2459 return 0;

2460}

#define PG_TEXTDOMAIN(domain)

int find_my_exec(const char *argv0, char *retpath)

void set_pglocale_pgservice(const char *argv0, const char *app)

int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)

#define ALWAYS_SECURE_SEARCH_PATH_SQL

void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)

ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)

int errmsg(const char *fmt,...)

PGconn * PQconnectdb(const char *conninfo)

void PQconninfoFree(PQconninfoOption *connOptions)

PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)

ConnStatusType PQstatus(const PGconn *conn)

void PQfinish(PGconn *conn)

char * PQerrorMessage(const PGconn *conn)

void PQfreemem(void *ptr)

char * PQgetvalue(const PGresult *res, int tup_num, int field_num)

ExecStatusType PQresultStatus(const PGresult *res)

void PQclear(PGresult *res)

int PQntuples(const PGresult *res)

char * PQresultErrorMessage(const PGresult *res)

char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)

PGresult * PQexec(PGconn *conn, const char *query)

char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)

void * pg_malloc(size_t size)

char * pg_strdup(const char *in)

#define pg_malloc_array(type, count)

int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)

#define required_argument

Assert(PointerIsAligned(start, uint64))

void pg_logging_increase_verbosity(void)

void pg_logging_init(const char *argv0)

void pg_logging_set_level(enum pg_log_level new_level)

#define pg_log_error(...)

#define pg_log_error_hint(...)

#define pg_log_warning_hint(...)

#define pg_log_info_hint(...)

#define pg_log_warning_detail(...)

#define pg_log_error_detail(...)

#define pg_log_debug(...)

static PQExpBuffer recoveryconfcontents

static void pg_ctl_status(const char *pg_ctl_cmd, int rc)

static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)

static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)

static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)

static void stop_standby_server(const char *datadir)

static char * pg_ctl_path

static bool server_is_in_recovery(PGconn *conn)

static char * get_exec_path(const char *argv0, const char *progname)

static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)

static void check_publisher(const struct LogicalRepInfo *dbinfo)

static char * subscriber_dir

static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)

int main(int argc, char **argv)

static char * primary_slot_name

static void cleanup_objects_atexit(void)

static void check_subscriber(const struct LogicalRepInfo *dbinfo)

static pg_prng_state prng_state

static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)

static void check_data_directory(const char *datadir)

static char * setup_publisher(struct LogicalRepInfo *dbinfo)

@ POSTMASTER_STILL_STARTING

static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)

static char * get_base_conninfo(const char *conninfo, char **dbname)

static struct LogicalRepInfos dbinfos

static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)

static uint64 get_standby_sysid(const char *datadir)

static bool recovery_ended

static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)

static void drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)

static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)

static void disconnect_database(PGconn *conn, bool exit_on_error)

static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)

static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)

static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)

static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)

static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)

static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified)

static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)

#define OBJECTTYPE_PUBLICATIONS

static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)

static char * pg_resetwal_path

static char * generate_object_name(PGconn *conn)

static const char * progname

static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)

static bool standby_running

static uint64 get_primary_sysid(const char *conninfo)

static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)

static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)

static PGconn * connect_database(const char *conninfo, bool exit_on_error)

PGDLLIMPORT char * optarg

uint32 pg_prng_uint32(pg_prng_state *state)

void pg_prng_seed(pg_prng_state *state, uint64 seed)

#define pg_log_warning(...)

int pg_strcasecmp(const char *s1, const char *s2)

const char * pg_strsignal(int signum)

void canonicalize_path(char *path)

const char * get_progname(const char *argv0)

size_t strlcpy(char *dst, const char *src, size_t siz)

PQExpBuffer createPQExpBuffer(void)

void resetPQExpBuffer(PQExpBuffer str)

void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)

void destroyPQExpBuffer(PQExpBuffer str)

void appendPQExpBufferChar(PQExpBuffer str, char ch)

void appendPQExpBufferStr(PQExpBuffer str, const char *data)

char * psprintf(const char *fmt,...)

void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)

PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)

void get_restricted_token(void)

void pg_usleep(long microsec)

bool simple_string_list_member(SimpleStringList *list, const char *val)

void simple_string_list_append(SimpleStringList *list, const char *val)

struct SimpleStringList SimpleStringList

void appendShellString(PQExpBuffer buf, const char *str)

void appendConnStrVal(PQExpBuffer buf, const char *str)

const char * sub_username

SimpleStringList database_names

SimpleStringList objecttypes_to_remove

SimpleStringList sub_names

SimpleStringList replslot_names

SimpleStringList pub_names

bits32 objecttypes_to_remove

struct LogicalRepInfo * dbinfo

char val[FLEXIBLE_ARRAY_MEMBER]

struct SimpleStringListCell * next

SimpleStringListCell * head

char * wait_result_to_str(int exitstatus)

int gettimeofday(struct timeval *tp, void *tzp)

#define LSN_FORMAT_ARGS(lsn)

#define InvalidXLogRecPtr