PostgreSQL Source Code: src/bin/pg_basebackup/pg_createsubscriber.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
15
16#include <sys/stat.h>
20
30
31#define DEFAULT_SUB_PORT "50432"
32#define OBJECTTYPE_PUBLICATIONS 0x0001
33
34
36{
39 char *socket_dir;
50};
51
52
54{
56 char *pubconninfo;
57 char *subconninfo;
61
64};
65
66
67
68
69
71{
75
76};
77
79static void usage();
86 const char *pub_base_conninfo,
87 const char *sub_base_conninfo);
99 const char *consistent_lsn);
101 const char *lsn);
103 const char *slotname);
108 const char *slot_name);
109static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
111 bool restricted_access,
112 bool restrict_logical_worker);
122 const char *lsn);
129 bool dbnamespecified);
130
131#define USEC_PER_SEC 1000000
132#define WAIT_INTERVAL 1
133
135
138
140
145static int num_replslots = 0;
146
148
151
152
154
157
159{
163
164
165
166
167
168
169
170
171
172
173
174
175
176static void
178{
180 return;
181
182
183
184
185
186
188 {
190 pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
191 "You must recreate the physical replica before continuing.");
192 }
193
195 {
197
199 {
201
203 if (conn != NULL)
204 {
211 }
212 else
213 {
214
215
216
217
218
220 {
221 pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
225 }
227 {
228 pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
231 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
232 }
233 }
234 }
235 }
236
239}
240
241static void
243{
244 printf(_("%s creates a new logical replica from a standby server.\n\n"),
248 printf(_("\nOptions:\n"));
249 printf(_(" -a, --all create subscriptions for all databases except template\n"
250 " databases or databases that don't allow connections\n"));
251 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
252 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
253 printf(_(" -n, --dry-run dry run, just show what would be done\n"));
254 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
255 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
256 printf(_(" -R, --remove=OBJECTTYPE remove all objects of the specified type from specified\n"
257 " databases on the subscriber; accepts: publications\n"));
258 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
259 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
260 printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
261 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
262 printf(_(" -v, --verbose output verbose messages\n"));
263 printf(_(" --config-file=FILENAME use specified main server configuration\n"
264 " file when running target cluster\n"));
265 printf(_(" --publication=NAME publication name\n"));
266 printf(_(" --replication-slot=NAME replication slot name\n"));
267 printf(_(" --subscription=NAME subscription name\n"));
268 printf(_(" -V, --version output version information, then exit\n"));
269 printf(_(" -?, --help show this help, then exit\n"));
270 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
271 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
272}
273
274
275
276
277
278static void
280{
281 if (buf->len > 0)
286}
287
288
289
290
291
292
293
294
295
296
297
298
299
300static char *
302{
307 char *ret;
308
310 if (conn_opts == NULL)
311 {
314 return NULL;
315 }
316
318 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
319 {
320 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
321 {
322 if (strcmp(conn_opt->keyword, "dbname") == 0)
323 {
326 continue;
327 }
329 }
330 }
331
333
336
337 return ret;
338}
339
340
341
342
343
344static char *
346{
348 char *ret;
349
351#if !defined(WIN32)
353#endif
357
359
361
362 return ret;
363}
364
365
366
367
368
369
370static char *
372{
373 char *versionstr;
375 int ret;
376
377 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
380
381 if (ret < 0)
382 {
384
387
388 if (ret == -1)
389 pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
390 progname, "pg_createsubscriber", full_path);
391 else
392 pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
393 progname, full_path, "pg_createsubscriber");
394 }
395
397
399}
400
401
402
403
404
405
406static void
408{
409 struct stat statbuf;
411
412 pg_log_info("checking if directory \"%s\" is a cluster data directory",
414
416 {
417 if (errno == ENOENT)
418 pg_fatal("data directory \"%s\" does not exist", datadir);
419 else
420 pg_fatal("could not access directory \"%s\": %m", datadir);
421 }
422
424 if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
425 {
426 pg_fatal("directory \"%s\" is not a database cluster directory",
428 }
429}
430
431
432
433
434
435
436
437
438static char *
440{
442 char *ret;
443
444 Assert(conninfo != NULL);
445
448
451
452 return ret;
453}
454
455
456
457
458
459
460
461
464 const char *pub_base_conninfo,
465 const char *sub_base_conninfo)
466{
471 int i = 0;
472
474
481
483 {
484 char *conninfo;
485
486
489 dbinfo[i].dbname = cell->val;
492 else
496 else
500
505 else
507
508
509 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
513 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
517
519 pubcell = pubcell->next;
521 subcell = subcell->next;
523 replslotcell = replslotcell->next;
524
525 i++;
526 }
527
528 return dbinfo;
529}
530
531
532
533
534
537{
540
543 {
544 pg_log_error("connection to database failed: %s",
547
548 if (exit_on_error)
549 exit(1);
550 return NULL;
551 }
552
553
556 {
557 pg_log_error("could not clear \"search_path\": %s",
561
562 if (exit_on_error)
563 exit(1);
564 return NULL;
565 }
567
569}
570
571
572
573
574
575static void
577{
579
581
582 if (exit_on_error)
583 exit(1);
584}
585
586
587
588
589
592{
596
597 pg_log_info("getting system identifier from publisher");
598
600
601 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
603 {
604 pg_log_error("could not get system identifier: %s",
607 }
609 {
610 pg_log_error("could not get system identifier: got %d rows, expected %d row",
613 }
614
615 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
616
617 pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
618
621
622 return sysid;
623}
624
625
626
627
628
629
632{
634 bool crc_ok;
636
637 pg_log_info("getting system identifier from subscriber");
638
640 if (!crc_ok)
641 pg_fatal("control file appears to be corrupt");
642
644
645 pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
646
648
649 return sysid;
650}
651
652
653
654
655
656
657static void
659{
661 bool crc_ok;
662 struct timeval tv;
663
664 char *cmd_str;
665
666 pg_log_info("modifying system identifier of subscriber");
667
669 if (!crc_ok)
670 pg_fatal("control file appears to be corrupt");
671
672
673
674
675
676
681
684
685 pg_log_info("system identifier is %" PRIu64 " on subscriber",
687
688 pg_log_info("running pg_resetwal on the subscriber");
689
692
693 pg_log_debug("pg_resetwal command is: %s", cmd_str);
694
696 {
697 int rc = system(cmd_str);
698
699 if (rc == 0)
700 pg_log_info("subscriber successfully changed the system identifier");
701 else
703 }
704
706}
707
708
709
710
711
712
713static char *
715{
719 char *objname;
720
722 "SELECT oid FROM pg_catalog.pg_database "
723 "WHERE datname = pg_catalog.current_database()");
725 {
726 pg_log_error("could not obtain database OID: %s",
729 }
730
732 {
733 pg_log_error("could not obtain database OID: got %d rows, expected %d row",
736 }
737
738
739 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
740
742
743
745
746
747
748
749
750
751 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
752
753 return objname;
754}
755
756
757
758
759
760
761
762static char *
764{
765 char *lsn = NULL;
766
768
770 {
772 char *genname = NULL;
773
775
776
777
778
779
780
781
782
791
792
793
794
795
796
797
799
800
801 if (lsn)
804 if (lsn != NULL || dry_run)
805 pg_log_info("create replication slot \"%s\" on publisher",
806 dbinfo[i].replslotname);
807 else
808 exit(1);
809
810
811
812
813
814
815
816
817
819 {
821
822 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
824 {
825 pg_log_error("could not write an additional WAL record: %s",
828 }
830 }
831
833 }
834
835 return lsn;
836}
837
838
839
840
841static bool
843{
845 int ret;
846
847 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
848
850 {
851 pg_log_error("could not obtain recovery progress: %s",
854 }
855
856
857 ret = strcmp("t", PQgetvalue(res, 0, 0));
858
860
861 return ret == 0;
862}
863
864
865
866
867
868
869static void
871{
874 bool failed = false;
875
877 int max_repslots;
878 int cur_repslots;
879 int max_walsenders;
880 int cur_walsenders;
881 int max_prepared_transactions;
882 char *max_slot_wal_keep_size;
883
884 pg_log_info("checking settings on publisher");
885
887
888
889
890
891
893 {
894 pg_log_error("primary server cannot be in recovery");
896 }
897
898
899
900
901
902
903
904
905
906
907
908
910 "SELECT pg_catalog.current_setting('wal_level'),"
911 " pg_catalog.current_setting('max_replication_slots'),"
912 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
913 " pg_catalog.current_setting('max_wal_senders'),"
914 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
915 " pg_catalog.current_setting('max_prepared_transactions'),"
916 " pg_catalog.current_setting('max_slot_wal_keep_size')");
917
919 {
920 pg_log_error("could not obtain publisher settings: %s",
923 }
924
926 max_repslots = atoi(PQgetvalue(res, 0, 1));
927 cur_repslots = atoi(PQgetvalue(res, 0, 2));
928 max_walsenders = atoi(PQgetvalue(res, 0, 3));
929 cur_walsenders = atoi(PQgetvalue(res, 0, 4));
930 max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
932
934
936 pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
937 pg_log_debug("publisher: current replication slots: %d", cur_repslots);
938 pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
939 pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
940 pg_log_debug("publisher: max_prepared_transactions: %d",
941 max_prepared_transactions);
942 pg_log_debug("publisher: max_slot_wal_keep_size: %s",
943 max_slot_wal_keep_size);
944
946
947 if (strcmp(wal_level, "logical") != 0)
948 {
949 pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
950 failed = true;
951 }
952
953 if (max_repslots - cur_repslots < num_dbs)
954 {
955 pg_log_error("publisher requires %d replication slots, but only %d remain",
956 num_dbs, max_repslots - cur_repslots);
957 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
958 "max_replication_slots", cur_repslots + num_dbs);
959 failed = true;
960 }
961
962 if (max_walsenders - cur_walsenders < num_dbs)
963 {
964 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
965 num_dbs, max_walsenders - cur_walsenders);
966 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
967 "max_wal_senders", cur_walsenders + num_dbs);
968 failed = true;
969 }
970
972 {
973 pg_log_warning("two_phase option will not be enabled for replication slots");
974 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
975 "Prepared transactions will be replicated at COMMIT PREPARED.");
976 pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");
977 }
978
979
980
981
982
983
984 if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
985 {
986 pg_log_warning("required WAL could be removed from the publisher");
987 pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
988 "max_slot_wal_keep_size");
989 }
990
992
993 if (failed)
994 exit(1);
995}
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008static void
1010{
1013 bool failed = false;
1014
1015 int max_lrworkers;
1016 int max_reporigins;
1017 int max_wprocs;
1018
1019 pg_log_info("checking settings on subscriber");
1020
1022
1023
1025 {
1026 pg_log_error("target server must be a standby");
1028 }
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1041 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1042 "'max_logical_replication_workers', "
1043 "'max_active_replication_origins', "
1044 "'max_worker_processes', "
1045 "'primary_slot_name') "
1046 "ORDER BY name");
1047
1049 {
1050 pg_log_error("could not obtain subscriber settings: %s",
1053 }
1054
1055 max_reporigins = atoi(PQgetvalue(res, 0, 0));
1056 max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1057 max_wprocs = atoi(PQgetvalue(res, 2, 0));
1058 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1060
1061 pg_log_debug("subscriber: max_logical_replication_workers: %d",
1062 max_lrworkers);
1063 pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
1064 pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1067
1069
1071
1072 if (max_reporigins < num_dbs)
1073 {
1074 pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1075 num_dbs, max_reporigins);
1076 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1077 "max_active_replication_origins", num_dbs);
1078 failed = true;
1079 }
1080
1081 if (max_lrworkers < num_dbs)
1082 {
1083 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1084 num_dbs, max_lrworkers);
1085 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1086 "max_logical_replication_workers", num_dbs);
1087 failed = true;
1088 }
1089
1090 if (max_wprocs < num_dbs + 1)
1091 {
1092 pg_log_error("subscriber requires %d worker processes, but only %d remain",
1093 num_dbs + 1, max_wprocs);
1094 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1095 "max_worker_processes", num_dbs + 1);
1096 failed = true;
1097 }
1098
1099 if (failed)
1100 exit(1);
1101}
1102
1103
1104
1105
1106
1107
1108
1109static void
1111{
1114
1116
1117
1118
1119
1120
1123 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1126
1127 pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1129
1131 {
1133
1135 {
1136 pg_log_error("could not drop subscription \"%s\": %s",
1139 }
1140
1142 }
1143
1145}
1146
1147
1148
1149
1150static void
1153{
1157
1159
1161
1163 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1164 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1165 "WHERE d.datname = %s",
1168
1170 {
1171 pg_log_error("could not obtain pre-existing subscriptions: %s",
1174 }
1175
1179
1183}
1184
1185
1186
1187
1188
1189
1190static void
1192{
1194 {
1196
1197
1199
1200
1201
1202
1203
1204
1205
1207
1208
1210
1212
1213
1215
1216
1218
1220 }
1221}
1222
1223
1224
1225
1226static void
1228{
1231
1232
1233
1234
1235
1236
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1252 "recovery_target_timeline = 'latest'\n");
1254 "recovery_target_inclusive = true\n");
1256 "recovery_target_action = promote\n");
1260
1262 {
1265 "recovery_target_lsn = '%X/%X'\n",
1267 }
1268 else
1269 {
1271 lsn);
1273 }
1275
1277}
1278
1279
1280
1281
1282
1283
1284
1285
1286static void
1288{
1290
1291
1293 return;
1294
1296 if (conn != NULL)
1297 {
1300 }
1301 else
1302 {
1303 pg_log_warning("could not drop replication slot \"%s\" on primary",
1304 slotname);
1305 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1306 }
1307}
1308
1309
1310
1311
1312
1313
1314
1315
1316static void
1318{
1321
1323 if (conn != NULL)
1324 {
1325
1327 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1328
1330 {
1331
1334 }
1335 else
1336 {
1337 pg_log_warning("could not obtain failover replication slot information: %s",
1339 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1340 }
1341
1344 }
1345 else
1346 {
1347 pg_log_warning("could not drop failover replication slot");
1348 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1349 }
1350}
1351
1352
1353
1354
1355
1356
1357
1358static char *
1360{
1363 const char *slot_name = dbinfo->replslotname;
1364 char *slot_name_esc;
1365 char *lsn = NULL;
1366
1368
1369 pg_log_info("creating the replication slot \"%s\" in database \"%s\"",
1370 slot_name, dbinfo->dbname);
1371
1373
1375 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1376 slot_name_esc,
1378
1380
1382
1384 {
1387 {
1388 pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1389 slot_name, dbinfo->dbname,
1393 return NULL;
1394 }
1395
1398 }
1399
1400
1402
1404
1405 return lsn;
1406}
1407
1408static void
1410 const char *slot_name)
1411{
1413 char *slot_name_esc;
1415
1417
1418 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1419 slot_name, dbinfo->dbname);
1420
1422
1423 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1424
1426
1428
1430 {
1433 {
1434 pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1436 dbinfo->made_replslot = false;
1437 }
1438
1440 }
1441
1443}
1444
1445
1446
1447
1448static void
1450{
1451 if (rc != 0)
1452 {
1454 {
1456 }
1458 {
1459#if defined(WIN32)
1460 pg_log_error("pg_ctl was terminated by exception 0x%X",
1462 pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1463#else
1464 pg_log_error("pg_ctl was terminated by signal %d: %s",
1466#endif
1467 }
1468 else
1469 {
1470 pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1471 }
1472
1474 exit(1);
1475 }
1476}
1477
1478static void
1480 bool restrict_logical_worker)
1481{
1483 int rc;
1484
1487 appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1488
1489
1490 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1491
1492 if (restricted_access)
1493 {
1495#if !defined(WIN32)
1496
1497
1498
1499
1500
1501
1502
1503 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1505 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1508#endif
1509 }
1513
1514
1515 if (restrict_logical_worker)
1516 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1517
1519 rc = system(pg_ctl_cmd->data);
1524}
1525
1526static void
1528{
1529 char *pg_ctl_cmd;
1530 int rc;
1531
1534 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1535 rc = system(pg_ctl_cmd);
1539}
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550static void
1552{
1555 int timer = 0;
1556
1557 pg_log_info("waiting for the target server to reach the consistent state");
1558
1560
1561 for (;;)
1562 {
1564
1565
1566
1567
1568
1569 if (!in_recovery || dry_run)
1570 {
1573 break;
1574 }
1575
1576
1578 {
1582 }
1583
1584
1586
1588 }
1589
1591
1593 pg_fatal("server did not end recovery");
1594
1595 pg_log_info("target server reached the consistent state");
1596 pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1597}
1598
1599
1600
1601
1602static void
1604{
1607 char *ipubname_esc;
1608 char *spubname_esc;
1609
1611
1614
1615
1617 "SELECT 1 FROM pg_catalog.pg_publication "
1618 "WHERE pubname = %s",
1619 spubname_esc);
1622 {
1623 pg_log_error("could not obtain publication information: %s",
1626 }
1627
1629 {
1630
1631
1632
1633
1634
1635
1636
1638 pg_log_error_hint("Consider renaming this publication before continuing.");
1640 }
1641
1644
1645 pg_log_info("creating publication \"%s\" in database \"%s\"",
1647
1649 ipubname_esc);
1650
1652
1654 {
1657 {
1658 pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1661 }
1663 }
1664
1665
1667
1671}
1672
1673
1674
1675
1676static void
1678 bool *made_publication)
1679{
1682 char *pubname_esc;
1683
1685
1687
1688 pg_log_info("dropping publication \"%s\" in database \"%s\"",
1690
1692
1694
1696
1698 {
1701 {
1702 pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1704 *made_publication = false;
1705
1706
1707
1708
1709
1710
1711
1712
1713 }
1715 }
1716
1718}
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729static void
1731{
1734
1736
1737 if (drop_all_pubs)
1738 {
1739 pg_log_info("dropping all existing publications in database \"%s\"",
1741
1742
1743 res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1745 {
1746 pg_log_error("could not obtain publication information: %s",
1750 }
1751
1752
1756
1758 }
1759
1760
1761
1762
1763
1764 if (!drop_all_pubs || dry_run)
1767}
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780static void
1782{
1785 char *pubname_esc;
1786 char *subname_esc;
1787 char *pubconninfo_esc;
1788 char *replslotname_esc;
1789
1791
1796
1797 pg_log_info("creating subscription \"%s\" in database \"%s\"",
1799
1801 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1802 "WITH (create_slot = false, enabled = false, "
1803 "slot_name = %s, copy_data = false, two_phase = %s)",
1804 subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
1806
1811
1813
1815 {
1818 {
1819 pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1822 }
1824 }
1825
1827}
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839static void
1841{
1844 Oid suboid;
1847 char *originname;
1848 char *lsnstr;
1849
1851
1854
1856 "SELECT s.oid FROM pg_catalog.pg_subscription s "
1857 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1858 "WHERE s.subname = %s AND d.datname = %s",
1860
1863 {
1864 pg_log_error("could not obtain subscription OID: %s",
1867 }
1868
1870 {
1871 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1874 }
1875
1877 {
1880 }
1881 else
1882 {
1883 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1884 lsnstr = psprintf("%s", lsn);
1885 }
1886
1888
1889
1890
1891
1892
1893 originname = psprintf("pg_%u", suboid);
1894
1895 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1896 originname, lsnstr, dbinfo->dbname);
1897
1900 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1901 originname, lsnstr);
1902
1904
1906 {
1909 {
1910 pg_log_error("could not set replication progress for subscription \"%s\": %s",
1913 }
1915 }
1916
1922}
1923
1924
1925
1926
1927
1928
1929
1930static void
1932{
1936
1938
1940
1941 pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1943
1945
1947
1949 {
1952 {
1953 pg_log_error("could not enable subscription \"%s\": %s",
1956 }
1957
1959 }
1960
1963}
1964
1965
1966
1967
1968
1969
1970static void
1972 bool dbnamespecified)
1973{
1976
1977
1978 if (dbnamespecified)
1980 else
1981 {
1982
1983 char *conninfo;
1984
1989 {
1993 }
1994 }
1995
1996 res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
1998 {
2002 }
2003
2005 {
2007
2009
2010
2012 }
2013
2016}
2017
2018int
2020{
2021 static struct option long_options[] =
2022 {
2032 {"enable-two-phase", no_argument, NULL, 'T'},
2041 {NULL, 0, NULL, 0}
2042 };
2043
2045
2046 int c;
2047 int option_index;
2048
2049 char *pub_base_conninfo;
2050 char *sub_base_conninfo;
2051 char *dbname_conninfo = NULL;
2052
2055 struct stat statbuf;
2056
2057 char *consistent_lsn;
2058
2060
2065
2066 if (argc > 1)
2067 {
2068 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2069 {
2071 exit(0);
2072 }
2073 else if (strcmp(argv[1], "-V") == 0
2074 || strcmp(argv[1], "--version") == 0)
2075 {
2076 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2077 exit(0);
2078 }
2079 }
2080
2081
2090 {
2091 0
2092 };
2095
2096
2097
2098
2099
2100#ifndef WIN32
2101 if (geteuid() == 0)
2102 {
2103 pg_log_error("cannot be executed by \"root\"");
2104 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2106 exit(1);
2107 }
2108#endif
2109
2111
2112 while ((c = getopt_long(argc, argv, "ad:D:np:P:R:s:t:TU:v",
2113 long_options, &option_index)) != -1)
2114 {
2115 switch (c)
2116 {
2117 case 'a':
2119 break;
2120 case 'd':
2122 {
2125 }
2126 else
2127 pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2128 break;
2129 case 'D':
2132 break;
2133 case 'n':
2135 break;
2136 case 'p':
2138 break;
2139 case 'P':
2141 break;
2142 case 'R':
2145 else
2146 pg_fatal("object type \"%s\" is specified more than once for -R/--remove", optarg);
2147 break;
2148 case 's':
2151 break;
2152 case 't':
2154 break;
2155 case 'T':
2157 break;
2158 case 'U':
2160 break;
2161 case 'v':
2163 break;
2164 case 1:
2166 break;
2167 case 2:
2169 {
2172 }
2173 else
2174 pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
2175 break;
2176 case 3:
2178 {
2181 }
2182 else
2183 pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2184 break;
2185 case 4:
2187 {
2190 }
2191 else
2192 pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2193 break;
2194 default:
2195
2197 exit(1);
2198 }
2199 }
2200
2201
2203 {
2204 char *bad_switch = NULL;
2205
2207 bad_switch = "--database";
2209 bad_switch = "--publication";
2211 bad_switch = "--replication-slot";
2213 bad_switch = "--subscription";
2214
2215 if (bad_switch)
2216 {
2217 pg_log_error("%s cannot be used with -a/--all", bad_switch);
2219 exit(1);
2220 }
2221 }
2222
2223
2225 {
2226 pg_log_error("too many command-line arguments (first is \"%s\")",
2229 exit(1);
2230 }
2231
2232
2234 {
2235 pg_log_error("no subscriber data directory specified");
2237 exit(1);
2238 }
2239
2240
2242 {
2244
2246 pg_fatal("could not determine current directory");
2249 }
2250
2251
2252
2253
2254
2256 {
2257
2258
2259
2260
2261
2262
2263 pg_log_error("no publisher connection string specified");
2265 exit(1);
2266 }
2267 pg_log_info("validating publisher connection string");
2269 &dbname_conninfo);
2270 if (pub_base_conninfo == NULL)
2271 exit(1);
2272
2273 pg_log_info("validating subscriber connection string");
2275
2276
2277
2278
2279
2280
2282 {
2283 bool dbnamespecified = (dbname_conninfo != NULL);
2284
2286 }
2287
2289 {
2290 pg_log_info("no database was specified");
2291
2292
2293
2294
2295
2296 if (dbname_conninfo)
2297 {
2300
2301 pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2302 dbname_conninfo);
2303 }
2304 else
2305 {
2309 exit(1);
2310 }
2311 }
2312
2313
2315 {
2316 pg_log_error("wrong number of publication names specified");
2317 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2319 exit(1);
2320 }
2322 {
2323 pg_log_error("wrong number of subscription names specified");
2324 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2326 exit(1);
2327 }
2329 {
2330 pg_log_error("wrong number of replication slot names specified");
2331 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2333 exit(1);
2334 }
2335
2336
2338 {
2339 if (pg_strcasecmp(cell->val, "publications") == 0)
2341 else
2342 {
2343 pg_log_error("invalid object type \"%s\" specified for -R/--remove", cell->val);
2345 exit(1);
2346 }
2347 }
2348
2349
2352
2353
2355
2357
2358
2359
2360
2361
2362
2364
2365
2367
2368
2369
2370
2371
2374 if (pub_sysid != sub_sysid)
2375 pg_fatal("subscriber data directory is not a copy of the source database cluster");
2376
2377
2379
2380
2381
2382
2383
2384
2385
2386 if (stat(pidfile, &statbuf) == 0)
2387 {
2390 exit(1);
2391 }
2392
2393
2394
2395
2396
2397
2398 pg_log_info("starting the standby server with command-line options");
2400
2401
2403
2404
2406
2407
2408
2409
2410
2411
2412
2413
2416
2417
2419
2420
2422
2423
2424
2425
2426
2427
2430
2431
2433
2434
2435
2436
2437
2438
2439
2441
2442
2444
2445
2447
2448
2451
2452
2454
2456
2458
2459 return 0;
2460}
#define PG_TEXTDOMAIN(domain)
int find_my_exec(const char *argv0, char *retpath)
void set_pglocale_pgservice(const char *argv0, const char *app)
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
#define ALWAYS_SECURE_SEARCH_PATH_SQL
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
int errmsg(const char *fmt,...)
PGconn * PQconnectdb(const char *conninfo)
void PQconninfoFree(PQconninfoOption *connOptions)
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
void PQfreemem(void *ptr)
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
ExecStatusType PQresultStatus(const PGresult *res)
void PQclear(PGresult *res)
int PQntuples(const PGresult *res)
char * PQresultErrorMessage(const PGresult *res)
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
PGresult * PQexec(PGconn *conn, const char *query)
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
void * pg_malloc(size_t size)
char * pg_strdup(const char *in)
#define pg_malloc_array(type, count)
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
#define required_argument
Assert(PointerIsAligned(start, uint64))
void pg_logging_increase_verbosity(void)
void pg_logging_init(const char *argv0)
void pg_logging_set_level(enum pg_log_level new_level)
#define pg_log_error(...)
#define pg_log_error_hint(...)
#define pg_log_warning_hint(...)
#define pg_log_info_hint(...)
#define pg_log_warning_detail(...)
#define pg_log_error_detail(...)
#define pg_log_debug(...)
static PQExpBuffer recoveryconfcontents
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
static void stop_standby_server(const char *datadir)
static char * pg_ctl_path
static bool server_is_in_recovery(PGconn *conn)
static char * get_exec_path(const char *argv0, const char *progname)
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static char * subscriber_dir
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
int main(int argc, char **argv)
static char * primary_slot_name
static void cleanup_objects_atexit(void)
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static pg_prng_state prng_state
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
static void check_data_directory(const char *datadir)
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
@ POSTMASTER_STILL_STARTING
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static char * get_base_conninfo(const char *conninfo, char **dbname)
static struct LogicalRepInfos dbinfos
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
static uint64 get_standby_sysid(const char *datadir)
static bool recovery_ended
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static void drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void disconnect_database(PGconn *conn, bool exit_on_error)
static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified)
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
#define OBJECTTYPE_PUBLICATIONS
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static char * pg_resetwal_path
static char * generate_object_name(PGconn *conn)
static const char * progname
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
PGDLLIMPORT char * optarg
uint32 pg_prng_uint32(pg_prng_state *state)
void pg_prng_seed(pg_prng_state *state, uint64 seed)
#define pg_log_warning(...)
int pg_strcasecmp(const char *s1, const char *s2)
const char * pg_strsignal(int signum)
void canonicalize_path(char *path)
const char * get_progname(const char *argv0)
size_t strlcpy(char *dst, const char *src, size_t siz)
PQExpBuffer createPQExpBuffer(void)
void resetPQExpBuffer(PQExpBuffer str)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
char * psprintf(const char *fmt,...)
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
void get_restricted_token(void)
void pg_usleep(long microsec)
bool simple_string_list_member(SimpleStringList *list, const char *val)
void simple_string_list_append(SimpleStringList *list, const char *val)
struct SimpleStringList SimpleStringList
void appendShellString(PQExpBuffer buf, const char *str)
void appendConnStrVal(PQExpBuffer buf, const char *str)
const char * sub_username
SimpleStringList database_names
SimpleStringList objecttypes_to_remove
SimpleStringList sub_names
SimpleStringList replslot_names
SimpleStringList pub_names
bits32 objecttypes_to_remove
struct LogicalRepInfo * dbinfo
char val[FLEXIBLE_ARRAY_MEMBER]
struct SimpleStringListCell * next
SimpleStringListCell * head
char * wait_result_to_str(int exitstatus)
int gettimeofday(struct timeval *tp, void *tzp)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr