PostgreSQL Source Code: src/test/modules/libpq_pipeline/libpq_pipeline.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
17
20
21#include "catalog/pg_type_d.h"
24
25
30 int numsent);
31
32static const char *const progname = "libpq_pipeline";
33
34
35static char *tracefile = NULL;
36
37
38#ifdef DEBUG_OUTPUT
39#define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
40#else
41#define pg_debug(...)
42#endif
43
45"DROP TABLE IF EXISTS pq_pipeline_demo";
47"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
48"int8filler int8);";
50"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
52"INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
53
54
55#define MAXINTLEN 12
56#define MAXINT8LEN 20
57
58static void
60{
62 exit(1);
63}
64
65
66
67
68
69
70
71
72
73#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
76{
78
80
82 va_start(args, fmt);
85 Assert(fmt[strlen(fmt) - 1] != '\n');
87 exit(1);
88}
89
90
91
92
93#define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
94static void
96{
98
100 if (res == NULL)
101 pg_fatal_impl(line, "PQgetResult returned null: %s",
104 pg_fatal_impl(line, "query did not fail when it was expected");
106 pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
109
112}
113
114
115
116
117
118
119static void
121 char *state, char *event)
122{
123 const Oid paramTypes[] = {INT4OID, TEXTOID};
124 const char *paramValues[2];
125 char *pidstr = psprintf("%d", procpid);
126
127 Assert((state == NULL) ^ (event == NULL));
128
129 paramValues[0] = pidstr;
130 paramValues[1] = state ? state : event;
131
132 while (true)
133 {
136
137 if (state != NULL)
139 "SELECT count(*) FROM pg_stat_activity WHERE "
140 "pid = 1ANDstate=1 AND state = 1ANDstate=2",
141 2, paramTypes, paramValues, NULL, NULL, 0);
142 else
144 "SELECT count(*) FROM pg_stat_activity WHERE "
145 "pid = 1ANDwaitevent=1 AND wait_event = 1ANDwaitevent=2",
146 2, paramTypes, paramValues, NULL, NULL, 0);
147
155 if (strcmp(value, "0") != 0)
156 {
158 break;
159 }
161
162
164 }
165
167}
168
169#define send_cancellable_query(conn, monitorConn) \
170 send_cancellable_query_impl(__LINE__, conn, monitorConn)
171static void
173{
174 const char *env_wait;
175 const Oid paramTypes[1] = {INT4OID};
176
177
178
179
180
181
183
184 env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
185 if (env_wait == NULL)
186 env_wait = "180";
187
189 &env_wait, NULL, NULL, 0) != 1)
191
192
193
194
195
197}
198
199
200
201
204{
208 const char **vals;
209 int nopts = 0;
210 int i;
211
213 nopts++;
214 nopts++;
215
217 vals = pg_malloc(sizeof(char *) * nopts);
218
219 i = 0;
221 {
222 if (opt->val)
223 {
225 vals[i] = opt->val;
226 i++;
227 }
228 }
230
232
234 pg_fatal("Connection to database failed: %s",
236
237 return copyConn;
238}
239
240
241
242
243static void
245{
249 char errorbuf[256];
250
251 fprintf(stderr, "test cancellations... ");
252
255
256
257
258
259
262
263
266 if ((cancel, errorbuf, sizeof(errorbuf)))
267 pg_fatal("failed to run PQcancel: %s", errorbuf);
269
270
272 if ((cancel, errorbuf, sizeof(errorbuf)))
273 pg_fatal("failed to run PQcancel: %s", errorbuf);
275
277
278
283
284
291
292
297 while (true)
298 {
299 struct timeval tv;
300 fd_set input_mask;
301 fd_set output_mask;
304
306 break;
307
308 FD_ZERO(&input_mask);
309 FD_ZERO(&output_mask);
310 switch (pollres)
311 {
313 pg_debug("polling for reads\n");
314 FD_SET(sock, &input_mask);
315 break;
317 pg_debug("polling for writes\n");
318 FD_SET(sock, &output_mask);
319 break;
320 default:
322 }
323
324 if (sock < 0)
326
327 tv.tv_sec = 3;
328 tv.tv_usec = 0;
329
330 while (true)
331 {
332 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
333 {
334 if (errno == EINTR)
335 continue;
336 pg_fatal("select() failed: %m");
337 }
338 break;
339 }
340 }
344
345
346
347
348
350
354 while (true)
355 {
356 struct timeval tv;
357 fd_set input_mask;
358 fd_set output_mask;
361
363 break;
364
365 FD_ZERO(&input_mask);
366 FD_ZERO(&output_mask);
367 switch (pollres)
368 {
370 pg_debug("polling for reads\n");
371 FD_SET(sock, &input_mask);
372 break;
374 pg_debug("polling for writes\n");
375 FD_SET(sock, &output_mask);
376 break;
377 default:
379 }
380
381 if (sock < 0)
383
384 tv.tv_sec = 3;
385 tv.tv_usec = 0;
386
387 while (true)
388 {
389 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
390 {
391 if (errno == EINTR)
392 continue;
393 pg_fatal("select() failed: %m");
394 }
395 break;
396 }
397 }
401
403
405}
406
407static void
409{
411
412 fprintf(stderr, "test error cases... ");
413
415 pg_fatal("Expected blocking connection mode");
416
418 pg_fatal("Unable to enter pipeline mode");
419
421 pg_fatal("Pipeline mode not activated properly");
422
423
426 pg_fatal("PQexec should fail in pipeline mode but succeeded");
428 "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
429 pg_fatal("did not get expected error message; got: \"%s\"",
431
432
434 pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
436 "PQsendQuery not allowed in pipeline mode\n") != 0)
437 pg_fatal("did not get expected error message; got: \"%s\"",
439
440
442 pg_fatal("re-entering pipeline mode should be a no-op but failed");
443
445 pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
446
447
449 pg_fatal("couldn't exit idle empty pipeline mode");
450
452 pg_fatal("Pipeline mode not terminated properly");
453
454
456 pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
457
458
461 pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
463
465}
466
467static void
469{
471 const char *dummy_params[1] = {"1"};
472 Oid dummy_param_oids[1] = {INT4OID};
473
474 fprintf(stderr, "multi pipeline... ");
475
476
477
478
479
482
483
485 dummy_params, NULL, NULL, 0) != 1)
487
490
491
493 dummy_params, NULL, NULL, 0) != 1)
495
496
499
500
502 dummy_params, NULL, NULL, 0) != 1)
504
507
508
509
510
511
513 if (res == NULL)
514 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
516
518 pg_fatal("Unexpected result code %s from first pipeline item",
521 res = NULL;
522
524 pg_fatal("PQgetResult returned something extra after first result");
525
527 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
528
530 if (res == NULL)
531 pg_fatal("PQgetResult returned null when sync result expected: %s",
533
535 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
538
539
540
542 if (res == NULL)
543 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
545
547 pg_fatal("Unexpected result code %s from second pipeline item",
550 res = NULL;
551
553 pg_fatal("PQgetResult returned something extra after first result");
554
556 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
557
559 if (res == NULL)
560 pg_fatal("PQgetResult returned null when sync result expected: %s",
562
564 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
567
568
569
571 if (res == NULL)
572 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
574
576 pg_fatal("Unexpected result code %s from third pipeline item",
578
580 if (res != NULL)
581 pg_fatal("Expected null result, got %s",
583
585 if (res == NULL)
586 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
588
590 pg_fatal("Unexpected result code %s from second pipeline sync",
592
593
595 pg_fatal("Fell out of pipeline mode somehow");
596
597
599 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
601
603 pg_fatal("exiting pipeline mode didn't seem to work");
604
606}
607
608
609
610
611
612static void
614{
615 int numqueries = 10;
616 int results = 0;
618
619 fprintf(stderr, "nosync... ");
620
621 if (sock < 0)
623
625 pg_fatal("could not enter pipeline mode");
626 for (int i = 0; i < numqueries; i++)
627 {
628 fd_set input_mask;
629 struct timeval tv;
630
632 0, NULL, NULL, NULL, NULL, 0) != 1)
635
636
637
638
639 FD_ZERO(&input_mask);
640 FD_SET(sock, &input_mask);
641 tv.tv_sec = 0;
642 tv.tv_usec = 0;
643 if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
644 {
645 fprintf(stderr, "select() failed: %m\n");
647 }
650 }
651
652
654 pg_fatal("failed to send flush request");
656
657
658 for (;;)
659 {
661
663
664
665 if (res == NULL)
666 pg_fatal("got unexpected NULL result after %d results", results);
667
668
670 {
672
673
675 if (res2 != NULL)
676 pg_fatal("expected NULL, got %s",
679 results++;
680
681
682 if (results == numqueries)
683 break;
684
685 continue;
686 }
687
688
690 }
691
693}
694
695
696
697
698
699
700
701
702
703
704static void
706{
708 const char *dummy_params[1] = {"1"};
709 Oid dummy_param_oids[1] = {INT4OID};
710 int i;
711 int gotrows;
712 bool goterror;
713
714 fprintf(stderr, "aborted pipeline... ");
715
719
723
724
725
726
727
728
731
732 dummy_params[0] = "1";
734 dummy_params, NULL, NULL, 0) != 1)
736
738 1, dummy_param_oids, dummy_params,
739 NULL, NULL, 0) != 1)
741
742 dummy_params[0] = "2";
744 dummy_params, NULL, NULL, 0) != 1)
746
749
750 dummy_params[0] = "3";
752 dummy_params, NULL, NULL, 0) != 1)
753 pg_fatal("dispatching second-pipeline insert failed: %s",
755
758
759
760
761
762
763
764
765
767 if (res == NULL)
770 pg_fatal("Unexpected result status %s: %s",
774
775
777 pg_fatal("Expected null result, got %s",
779
780
782 if (res == NULL)
785 pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
788
789
791 pg_fatal("Expected null result, got %s",
793
794
795
796
797
798
799
800
802 pg_fatal("pipeline should be flagged as aborted but isn't");
803
804
806 if (res == NULL)
809 pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
812
813
816
818 pg_fatal("pipeline should be flagged as aborted but isn't");
819
820
822 pg_fatal("Fell out of pipeline mode somehow");
823
824
825
826
827
828
829
831 if (res == NULL)
834 pg_fatal("Unexpected result code from first pipeline sync\n"
835 "Expected PGRES_PIPELINE_SYNC, got %s",
838
840 pg_fatal("sync should've cleared the aborted flag but didn't");
841
842
844 pg_fatal("Fell out of pipeline mode somehow");
845
846
848 if (res == NULL)
851 pg_fatal("Unexpected result code %s from first item in second pipeline",
854
855
858
859
863 pg_fatal("Unexpected result code %s from second pipeline sync",
866
868 pg_fatal("Expected null result, got %s: %s",
871
872
873 if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
877 goterror = false;
879 {
881 {
884 pg_fatal("expected error about multiple commands, got %s",
887 goterror = true;
888 break;
889 default:
891 break;
892 }
893 }
894 if (!goterror)
895 pg_fatal("did not get cannot-insert-multiple-commands error");
897 if (res == NULL)
900 pg_fatal("Unexpected result code %s from pipeline sync",
903
904
906 0, NULL, NULL, NULL, NULL, 0) != 1)
911 goterror = false;
912 gotrows = 0;
914 {
916 {
919 gotrows++;
920 break;
923 pg_fatal("expected division-by-zero, got: %s (%s)",
926 printf("got expected division-by-zero\n");
927 goterror = true;
928 break;
929 default:
931 }
933 }
934 if (!goterror)
935 pg_fatal("did not get division-by-zero error");
936 if (gotrows != 3)
937 pg_fatal("did not get three rows");
938
942 pg_fatal("Unexpected result code %s from third pipeline sync",
945
946
948 pg_fatal("Fell out of pipeline mode somehow");
949
950
952 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
954
956 pg_fatal("exiting pipeline mode didn't seem to work");
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973 res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
974
976 pg_fatal("Expected tuples, got %s: %s",
981 {
983
984 if (strcmp(val, "3") != 0)
985 pg_fatal("expected only insert with value 3, got %s", val);
986 }
987
989
991}
992
993
995{
1004};
1005
1006static void
1008{
1009 Oid insert_param_oids[2] = {INT4OID, INT8OID};
1010 const char *insert_params[2];
1015 int rows_to_send,
1016 rows_to_receive;
1017
1018 insert_params[0] = insert_param_0;
1019 insert_params[1] = insert_param_1;
1020
1021 rows_to_send = rows_to_receive = n_rows;
1022
1023
1024
1025
1028
1030 {
1031 const char *sql;
1032
1033 switch (send_step)
1034 {
1036 sql = "BEGIN TRANSACTION";
1038 break;
1039
1043 break;
1044
1048 break;
1049
1050 default:
1052 sql = NULL;
1053 }
1054
1055 pg_debug("sending: %s\n", sql);
1057 0, NULL, NULL, NULL, NULL, 0) != 1)
1059 }
1060
1066
1067
1068
1069
1070
1071
1072
1073
1074
1077
1078 while (recv_step != BI_DONE)
1079 {
1080 int sock;
1081 fd_set input_mask;
1082 fd_set output_mask;
1083
1085
1086 if (sock < 0)
1087 break;
1088
1089 FD_ZERO(&input_mask);
1090 FD_SET(sock, &input_mask);
1091 FD_ZERO(&output_mask);
1092 FD_SET(sock, &output_mask);
1093
1094 if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1095 {
1096 fprintf(stderr, "select() failed: %m\n");
1098 }
1099
1100
1101
1102
1103
1104 if (FD_ISSET(sock, &input_mask))
1105 {
1107
1108
1110 {
1112 const char *cmdtag = "";
1114 int status;
1115
1116
1117
1118
1119
1121 if (res == NULL)
1122 continue;
1123
1125 switch (recv_step)
1126 {
1128 cmdtag = "BEGIN";
1129 recv_step++;
1130 break;
1132 cmdtag = "DROP TABLE";
1133 recv_step++;
1134 break;
1136 cmdtag = "CREATE TABLE";
1137 recv_step++;
1138 break;
1140 cmdtag = "";
1142 recv_step++;
1143 break;
1145 cmdtag = "INSERT";
1146 rows_to_receive--;
1147 if (rows_to_receive == 0)
1148 recv_step++;
1149 break;
1151 cmdtag = "COMMIT";
1152 recv_step++;
1153 break;
1155 cmdtag = "";
1158 recv_step++;
1159 break;
1161
1162 pg_fatal("unreachable state");
1163 }
1164
1166 pg_fatal("%s reported status %s, expected %s\n"
1167 "Error message: \"%s\"",
1170
1171 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1172 pg_fatal("%s expected command tag '%s', got '%s'",
1174
1176
1178 }
1179 }
1180
1181
1182 if (FD_ISSET(sock, &output_mask))
1183 {
1185
1187 {
1189
1191
1193 2, insert_params, NULL, NULL, 0) == 1)
1194 {
1195 pg_debug("sent row %d\n", rows_to_send);
1196
1197 rows_to_send--;
1198 if (rows_to_send == 0)
1199 send_step++;
1200 }
1201 else
1202 {
1203
1204
1205
1206
1207 fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1209 }
1210 }
1212 {
1214 0, NULL, NULL, NULL, NULL, 0) == 1)
1215 {
1217 send_step++;
1218 }
1219 else
1220 {
1221 fprintf(stderr, "WARNING: failed to send commit: %s\n",
1223 }
1224 }
1225 else if (send_step == BI_SYNC)
1226 {
1228 {
1230 send_step++;
1231 }
1232 else
1233 {
1234 fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1236 }
1237 }
1238 }
1239 }
1240
1241
1243 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1245
1248
1249 fprintf(stderr, "ok\n");
1250}
1251
1252static void
1254{
1256 Oid param_oids[1] = {INT4OID};
1257 Oid expected_oids[4];
1258 Oid typ;
1259
1260 fprintf(stderr, "prepared... ");
1261
1264 if (PQsendPrepare(conn, "select_one", "SELECT 1,′42′,1, '42', 1,′42′,1::numeric, "
1265 "interval '1 sec'",
1266 1, param_oids) != 1)
1268 expected_oids[0] = INT4OID;
1269 expected_oids[1] = TEXTOID;
1270 expected_oids[2] = NUMERICOID;
1271 expected_oids[3] = INTERVALOID;
1276
1278 if (res == NULL)
1279 pg_fatal("PQgetResult returned null");
1284 if (res != NULL)
1285 pg_fatal("expected NULL result");
1286
1288 if (res == NULL)
1289 pg_fatal("PQgetResult returned NULL");
1293 pg_fatal("expected %zu columns, got %d",
1296 {
1298 if (typ != expected_oids[i])
1299 pg_fatal("field %d: expected type %u, got %u",
1300 i, expected_oids[i], typ);
1301 }
1304 if (res != NULL)
1305 pg_fatal("expected NULL result");
1306
1310
1311 fprintf(stderr, "closing statement..");
1316
1318 if (res == NULL)
1319 pg_fatal("expected non-NULL result");
1324 if (res != NULL)
1325 pg_fatal("expected NULL result");
1329
1332
1333
1337
1338
1339
1340
1341
1345
1346 fprintf(stderr, "creating portal... ");
1348 PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1355 if (res == NULL)
1356 pg_fatal("PQgetResult returned null");
1359
1361 if (typ != INT4OID)
1362 pg_fatal("portal: expected type %u, got %u",
1363 INT4OID, typ);
1366 if (res != NULL)
1367 pg_fatal("expected NULL result");
1371
1372 fprintf(stderr, "closing portal... ");
1377
1379 if (res == NULL)
1380 pg_fatal("expected non-NULL result");
1385 if (res != NULL)
1386 pg_fatal("expected NULL result");
1390
1393
1394
1398
1399
1400
1401
1402
1406
1407 fprintf(stderr, "ok\n");
1408}
1409
1410
1411
1412
1413static void
1415{
1417 const char **vals;
1418 int nopts;
1420 int protocol_version;
1421 int max_protocol_version_index;
1422 int i;
1423
1424
1425
1426
1427
1428 nopts = 0;
1430 nopts++;
1431 nopts++;
1432 nopts++;
1433
1435 vals = pg_malloc0(sizeof(char *) * nopts);
1436
1437 i = 0;
1439 {
1440 if (opt->val)
1441 {
1443 vals[i] = opt->val;
1444 i++;
1445 }
1446 }
1447
1448 max_protocol_version_index = i;
1449 keywords[i] = "max_protocol_version";
1450 i++;
1452
1453
1454
1455
1456 vals[max_protocol_version_index] = "3.0";
1458
1460 pg_fatal("Connection to database failed: %s",
1462
1464 if (protocol_version != 30000)
1465 pg_fatal("expected 30000, got %d", protocol_version);
1466
1468
1469
1470
1471
1472
1473 vals[max_protocol_version_index] = "3.1";
1475
1477 pg_fatal("Connecting with max_protocol_version 3.1 should have failed.");
1478
1480
1481
1482
1483
1484 vals[max_protocol_version_index] = "3.2";
1486
1488 pg_fatal("Connection to database failed: %s",
1490
1492 if (protocol_version != 30002)
1493 pg_fatal("expected 30002, got %d", protocol_version);
1494
1496
1497
1498
1499
1500 vals[max_protocol_version_index] = "latest";
1502
1504 pg_fatal("Connection to database failed: %s",
1506
1508 if (protocol_version != 30002)
1509 pg_fatal("expected 30002, got %d", protocol_version);
1510
1512}
1513
1514
1515static void
1517{
1518 int *n_notices = (int *) arg;
1519
1520 (*n_notices)++;
1521 fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1522}
1523
1524
1525static void
1527{
1529 int n_notices = 0;
1530
1531 fprintf(stderr, "\npipeline idle...\n");
1532
1534
1535
1542 if (res == NULL)
1543 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1546 pg_fatal("unexpected result code %s from first pipeline item",
1550 if (res != NULL)
1551 pg_fatal("did not receive terminating NULL");
1555 pg_fatal("exiting pipeline succeeded when it shouldn't");
1557 strlen("cannot exit pipeline mode")) != 0)
1558 pg_fatal("did not get expected error; got: %s",
1563 pg_fatal("unexpected result code %s from second pipeline item",
1567 if (res != NULL)
1568 pg_fatal("did not receive terminating NULL");
1571
1572 if (n_notices > 0)
1573 pg_fatal("got %d notice(s)", n_notices);
1574 fprintf(stderr, "ok - 1\n");
1575
1576
1579 if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1583 if (res == NULL)
1584 pg_fatal("unexpected NULL result received");
1589 fprintf(stderr, "ok - 2\n");
1590}
1591
1592static void
1594{
1596 const char *dummy_params[1] = {"1"};
1597 Oid dummy_param_oids[1] = {INT4OID};
1598
1599 fprintf(stderr, "simple pipeline... ");
1600
1601
1602
1603
1604
1605
1606
1607
1608
1610 pg_fatal("Expected blocking connection mode");
1611
1614
1616 1, dummy_param_oids, dummy_params,
1617 NULL, NULL, 0) != 1)
1619
1621 pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1622
1625
1627 if (res == NULL)
1628 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1630
1632 pg_fatal("Unexpected result code %s from first pipeline item",
1634
1636 res = NULL;
1637
1639 pg_fatal("PQgetResult returned something extra after first query result.");
1640
1641
1642
1643
1644
1646 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1647
1649 if (res == NULL)
1650 pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1652
1654 pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1656
1658 res = NULL;
1659
1661 pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1663
1664
1666 pg_fatal("Fell out of pipeline mode somehow");
1667
1668
1670 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1672
1674 pg_fatal("Exiting pipeline mode didn't seem to work");
1675
1676 fprintf(stderr, "ok\n");
1677}
1678
1679static void
1681{
1683 int i;
1684 bool pipeline_ended = false;
1685
1687 pg_fatal("failed to enter pipeline mode: %s",
1689
1690
1692 {
1693 char *param[1];
1694
1695 param[0] = psprintf("%d", 44 + i);
1696
1698 "SELECT generate_series(42, $1)",
1699 1,
1700 NULL,
1701 (const char **) param,
1702 NULL,
1703 NULL,
1704 0) != 1)
1705 pg_fatal("failed to send query: %s",
1707 pfree(param[0]);
1708 }
1711
1712 for (i = 0; !pipeline_ended; i++)
1713 {
1714 bool first = true;
1715 bool saw_ending_tuplesok;
1716 bool isSingleTuple = false;
1717
1718
1719 if (i < 2)
1720 {
1722 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1723 }
1724
1725
1726 saw_ending_tuplesok = false;
1728 {
1730
1732 {
1733 fprintf(stderr, "end of pipeline reached\n");
1734 pipeline_ended = true;
1736 if (i != 3)
1737 pg_fatal("Expected three results, got %d", i);
1738 break;
1739 }
1740
1741
1742 if (first)
1743 {
1745 pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1748 pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1750 first = false;
1751 }
1752
1754 switch (est)
1755 {
1758 saw_ending_tuplesok = true;
1759 if (isSingleTuple)
1760 {
1762 fprintf(stderr, "all tuples received in query %d\n", i);
1763 else
1764 pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1765 }
1766 break;
1767
1769 isSingleTuple = true;
1771 break;
1772
1773 default:
1775 }
1777 }
1778 if (!pipeline_ended && !saw_ending_tuplesok)
1779 pg_fatal("didn't get expected terminating TUPLES_OK");
1780 }
1781
1782
1783
1784
1785
1786
1788 0, NULL, NULL, NULL, NULL, 0) != 1)
1789 pg_fatal("failed to send query: %s",
1792 pg_fatal("failed to send flush request");
1794 pg_fatal("PQsetSingleRowMode() failed");
1796 if (res == NULL)
1797 pg_fatal("unexpected NULL");
1799 pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1802 if (res == NULL)
1803 pg_fatal("unexpected NULL");
1805 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1808 pg_fatal("expected NULL result");
1809
1811 0, NULL, NULL, NULL, NULL, 0) != 1)
1812 pg_fatal("failed to send query: %s",
1815 pg_fatal("failed to send flush request");
1817 if (res == NULL)
1818 pg_fatal("unexpected NULL");
1820 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1823 pg_fatal("expected NULL result");
1824
1825
1826
1827
1828
1830 0, NULL, NULL, NULL, NULL, 0) != 1)
1831 pg_fatal("failed to send query: %s",
1834 pg_fatal("failed to send flush request");
1836 pg_fatal("PQsetChunkedRowsMode() failed");
1838 if (res == NULL)
1839 pg_fatal("unexpected NULL");
1841 pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
1847 if (res == NULL)
1848 pg_fatal("unexpected NULL");
1850 pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
1855 if (res == NULL)
1856 pg_fatal("unexpected NULL");
1858 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1863 pg_fatal("expected NULL result");
1864
1867
1868 fprintf(stderr, "ok\n");
1869}
1870
1871
1872
1873
1874
1875static void
1877{
1879 bool expect_null;
1880 int num_syncs = 0;
1881
1882 res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1883 "CREATE TABLE pq_pipeline_tst (id int)");
1885 pg_fatal("failed to create test table: %s",
1888
1890 pg_fatal("failed to enter pipeline mode: %s",
1893 pg_fatal("could not send prepare on pipeline: %s",
1895
1897 "BEGIN",
1898 0, NULL, NULL, NULL, NULL, 0) != 1)
1899 pg_fatal("failed to send query: %s",
1902 "SELECT 0/0",
1903 0, NULL, NULL, NULL, NULL, 0) != 1)
1904 pg_fatal("failed to send query: %s",
1906
1907
1908
1909
1910
1912 pg_fatal("failed to execute prepared: %s",
1914
1915
1917 "INSERT INTO pq_pipeline_tst VALUES (1)",
1918 0, NULL, NULL, NULL, NULL, 0) != 1)
1919 pg_fatal("failed to send query: %s",
1923 num_syncs++;
1924
1925
1926
1927
1928
1930 "INSERT INTO pq_pipeline_tst VALUES (2)",
1931 0, NULL, NULL, NULL, NULL, 0) != 1)
1932 pg_fatal("failed to send query: %s",
1936 num_syncs++;
1937
1938
1939
1940
1941
1943 pg_fatal("failed to execute prepared: %s",
1945
1946
1947
1948
1949
1951 "INSERT INTO pq_pipeline_tst VALUES (3)",
1952 0, NULL, NULL, NULL, NULL, 0) != 1)
1953 pg_fatal("failed to send query: %s",
1955
1958 num_syncs++;
1961 num_syncs++;
1962
1963 expect_null = false;
1965 {
1967
1969 if (res == NULL)
1970 {
1971 printf("%d: got NULL result\n", i);
1972 if (!expect_null)
1973 pg_fatal("did not expect NULL here");
1974 expect_null = false;
1975 continue;
1976 }
1979 if (expect_null)
1984 {
1985 printf(": command didn't run because pipeline aborted\n");
1986 }
1987 else
1990
1992 num_syncs--;
1993 else
1994 expect_null = true;
1995 if (num_syncs <= 0)
1996 break;
1997 }
1999 pg_fatal("returned something extra after all the syncs: %s",
2001
2004
2005
2006 res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
2010 pg_fatal("did not get 1 tuple");
2011 if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
2012 pg_fatal("did not get expected tuple");
2014
2015 fprintf(stderr, "ok\n");
2016}
2017
2018
2019
2020
2021
2022
2023static void
2025{
2028 Oid paramTypes[2] = {INT8OID, INT8OID};
2029 const char *paramValues[2];
2032 int ctr = 0;
2033 int numsent = 0;
2034 int results = 0;
2035 bool read_done = false;
2036 bool write_done = false;
2037 bool error_sent = false;
2038 bool got_error = false;
2039 int switched = 0;
2040 int socketful = 0;
2041 fd_set in_fds;
2042 fd_set out_fds;
2043
2044 fprintf(stderr, "uniqviol ...");
2045
2047
2048 paramValues[0] = paramValue0;
2049 paramValues[1] = paramValue1;
2050 sprintf(paramValue1, "42");
2051
2052 res = PQexec(conn, "drop table if exists ppln_uniqviol;"
2053 "create table ppln_uniqviol(id bigint primary key, idata bigint)");
2056
2060
2062 "insert into ppln_uniqviol values ($1, $2) returning id",
2063 2, paramTypes);
2066
2068 pg_fatal("failed to enter pipeline mode");
2069
2070 while (!read_done)
2071 {
2072
2073
2074
2075
2076
2077
2078
2080 {
2081 bool new_error;
2082
2083 if (results >= numsent)
2084 {
2085 if (write_done)
2086 read_done = true;
2087 break;
2088 }
2089
2092 if (new_error && got_error)
2094 got_error |= new_error;
2095 if (results++ >= numsent - 1)
2096 {
2097 if (write_done)
2098 read_done = true;
2099 break;
2100 }
2101 }
2102
2103 if (read_done)
2104 break;
2105
2106 FD_ZERO(&out_fds);
2107 FD_SET(sock, &out_fds);
2108
2109 FD_ZERO(&in_fds);
2110 FD_SET(sock, &in_fds);
2111
2112 if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
2113 {
2114 if (errno == EINTR)
2115 continue;
2116 pg_fatal("select() failed: %m");
2117 }
2118
2121
2122
2123
2124
2125
2126 if (!write_done && FD_ISSET(sock, &out_fds))
2127 {
2128 for (;;)
2129 {
2130 int flush;
2131
2132
2133
2134
2135
2136 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
2137 {
2138 sprintf(paramValue0, "%d", numsent / 2);
2140 error_sent = true;
2141 }
2142 else
2143 {
2145 sprintf(paramValue0, "%d", ctr++);
2146 }
2147
2150 numsent++;
2151
2152
2153 if (socketful != 0 && numsent % socketful == 42 && error_sent)
2154 {
2156 pg_fatal("failed to send flush request");
2157 write_done = true;
2158 fprintf(stderr, "\ndone writing\n");
2160 break;
2161 }
2162
2163
2165 if (flush == -1)
2167 if (flush == 1)
2168 {
2169 if (socketful == 0)
2170 socketful = numsent;
2171 fprintf(stderr, "\nswitch to reading\n");
2172 switched++;
2173 break;
2174 }
2175 }
2176 }
2177 }
2178
2179 if (!got_error)
2180 pg_fatal("did not get expected error");
2181
2182 fprintf(stderr, "ok\n");
2183}
2184
2185
2186
2187
2188
2189
2190
2191static bool
2193{
2195 bool got_error = false;
2196
2197 if (res == NULL)
2198 pg_fatal("got unexpected NULL");
2199
2201 {
2203 got_error = true;
2206
2208 if (res2 != NULL)
2209 pg_fatal("expected NULL, got %s",
2211 break;
2212
2214 fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2216
2218 if (res2 != NULL)
2219 pg_fatal("expected NULL, got %s",
2221 break;
2222
2224 fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2226 if (res2 != NULL)
2227 pg_fatal("expected NULL, got %s",
2229 break;
2230
2231 default:
2233 }
2234
2235 return got_error;
2236}
2237
2238
2239static void
2241{
2242 fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2243 fprintf(stderr, "Usage:\n");
2245 fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
2246 fprintf(stderr, "\nOptions:\n");
2247 fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
2248 fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
2249}
2250
2251static void
2253{
2255 printf("disallowed_in_pipeline\n");
2256 printf("multi_pipelines\n");
2258 printf("pipeline_abort\n");
2259 printf("pipeline_idle\n");
2260 printf("pipelined_insert\n");
2261 printf("prepared\n");
2262 printf("protocol_version\n");
2263 printf("simple_pipeline\n");
2264 printf("singlerow\n");
2265 printf("transaction\n");
2266 printf("uniqviol\n");
2267}
2268
2269int
2271{
2272 const char *conninfo = "";
2274 FILE *trace;
2275 char *testname;
2276 int numrows = 10000;
2278 int c;
2279
2280 while ((c = getopt(argc, argv, "r:t:")) != -1)
2281 {
2282 switch (c)
2283 {
2284 case 'r':
2285 errno = 0;
2286 numrows = strtol(optarg, NULL, 10);
2287 if (errno != 0 || numrows <= 0)
2288 {
2289 fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2291 exit(1);
2292 }
2293 break;
2294 case 't':
2296 break;
2297 }
2298 }
2299
2301 {
2304 }
2305 else
2306 {
2308 exit(1);
2309 }
2310
2311 if (strcmp(testname, "tests") == 0)
2312 {
2314 exit(0);
2315 }
2316
2318 {
2321 }
2322
2323
2326 {
2327 fprintf(stderr, "Connection to database failed: %s\n",
2330 }
2331
2332 res = PQexec(conn, "SET lc_messages TO \"C\"");
2335 res = PQexec(conn, "SET debug_parallel_query = off");
2338
2339
2341 {
2342 if (strcmp(tracefile, "-") == 0)
2344 else
2346 if (trace == NULL)
2348
2349
2350 setvbuf(trace, NULL, PG_IOLBF, 0);
2351
2355 }
2356
2357 if (strcmp(testname, "cancel") == 0)
2359 else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2361 else if (strcmp(testname, "multi_pipelines") == 0)
2363 else if (strcmp(testname, "nosync") == 0)
2365 else if (strcmp(testname, "pipeline_abort") == 0)
2367 else if (strcmp(testname, "pipeline_idle") == 0)
2369 else if (strcmp(testname, "pipelined_insert") == 0)
2371 else if (strcmp(testname, "prepared") == 0)
2373 else if (strcmp(testname, "protocol_version") == 0)
2375 else if (strcmp(testname, "simple_pipeline") == 0)
2377 else if (strcmp(testname, "singlerow") == 0)
2379 else if (strcmp(testname, "transaction") == 0)
2381 else if (strcmp(testname, "uniqviol") == 0)
2383 else
2384 {
2385 fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2386 exit(1);
2387 }
2388
2389
2391 return 0;
2392}
#define pg_attribute_printf(f, a)
static PGcancel *volatile cancelConn
#define fprintf(file, fmt, msg)
PGcancel * PQgetCancel(PGconn *conn)
void PQcancelReset(PGcancelConn *cancelConn)
PGcancelConn * PQcancelCreate(PGconn *conn)
ConnStatusType PQcancelStatus(const PGcancelConn *cancelConn)
int PQcancelBlocking(PGcancelConn *cancelConn)
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn)
void PQcancelFinish(PGcancelConn *cancelConn)
int PQrequestCancel(PGconn *conn)
void PQfreeCancel(PGcancel *cancel)
int PQcancelSocket(const PGcancelConn *cancelConn)
char * PQcancelErrorMessage(const PGcancelConn *cancelConn)
int PQcancelStart(PGcancelConn *cancelConn)
int PQfullProtocolVersion(const PGconn *conn)
PGconn * PQconnectdb(const char *conninfo)
PQconninfoOption * PQconninfo(PGconn *conn)
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
int PQbackendPID(const PGconn *conn)
PGpipelineStatus PQpipelineStatus(const PGconn *conn)
PQnoticeProcessor PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
int PQsetSingleRowMode(PGconn *conn)
int PQflush(PGconn *conn)
Oid PQftype(const PGresult *res, int field_num)
PGresult * PQdescribePrepared(PGconn *conn, const char *stmt)
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
int PQexitPipelineMode(PGconn *conn)
int PQsendClosePortal(PGconn *conn, const char *portal)
int PQenterPipelineMode(PGconn *conn)
PGresult * PQclosePrepared(PGconn *conn, const char *stmt)
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
PGresult * PQclosePortal(PGconn *conn, const char *portal)
PGresult * PQgetResult(PGconn *conn)
ExecStatusType PQresultStatus(const PGresult *res)
void PQclear(PGresult *res)
int PQsendClosePrepared(PGconn *conn, const char *stmt)
int PQsendPipelineSync(PGconn *conn)
int PQntuples(const PGresult *res)
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
char * PQresultErrorMessage(const PGresult *res)
int PQsendDescribePrepared(PGconn *conn, const char *stmt)
int PQconsumeInput(PGconn *conn)
int PQsetnonblocking(PGconn *conn, int arg)
int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
PGresult * PQdescribePortal(PGconn *conn, const char *portal)
int PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
char * PQresultErrorField(const PGresult *res, int fieldcode)
int PQsendQuery(PGconn *conn, const char *query)
char * PQcmdStatus(PGresult *res)
int PQpipelineSync(PGconn *conn)
int PQsendDescribePortal(PGconn *conn, const char *portal)
char * PQresStatus(ExecStatusType status)
int PQisBusy(PGconn *conn)
PGresult * PQexec(PGconn *conn, const char *query)
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
int PQsendFlushRequest(PGconn *conn)
int PQisnonblocking(const PGconn *conn)
int PQnfields(const PGresult *res)
void PQtrace(PGconn *conn, FILE *debug_port)
void PQsetTraceFlags(PGconn *conn, int flags)
void * pg_malloc(size_t size)
char * pg_strdup(const char *in)
void * pg_malloc0(size_t size)
Assert(PointerIsAligned(start, uint64))
static const JsonPathKeyword keywords[]
#define PQTRACE_SUPPRESS_TIMESTAMPS
PostgresPollingStatusType
#define PQTRACE_REGRESS_MODE
static void usage(const char *progname)
static void print_test_list(void)
static const char *const insert_sql2
static void confirm_query_canceled_impl(int line, PGconn *conn)
static void wait_for_connection_state(int line, PGconn *monitorConn, int procpid, char *state, char *event)
static void exit_nicely(PGconn *conn)
int main(int argc, char **argv)
#define confirm_query_canceled(conn)
static void test_uniqviol(PGconn *conn)
static void send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
static void test_simple_pipeline(PGconn *conn)
static pg_noreturn void static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)
static void test_multi_pipelines(PGconn *conn)
static void test_pipeline_idle(PGconn *conn)
static const char *const create_table_sql
static const char *const insert_sql
static void test_protocol_version(PGconn *conn)
static void test_nosync(PGconn *conn)
static const char *const progname
static void test_pipeline_abort(PGconn *conn)
static pg_noreturn void pg_fatal_impl(int line, const char *fmt,...) pg_attribute_printf(2
static const char *const drop_table_sql
#define send_cancellable_query(conn, monitorConn)
static void notice_processor(void *arg, const char *message)
static void test_transaction(PGconn *conn)
static PGconn * copy_connection(PGconn *conn)
static void test_prepared(PGconn *conn)
static void test_cancel(PGconn *conn)
static void test_singlerowmode(PGconn *conn)
static void test_disallowed_in_pipeline(PGconn *conn)
static void test_pipelined_insert(PGconn *conn, int n_rows)
void pfree(void *pointer)
static AmcheckOptions opts
int getopt(int nargc, char *const *nargv, const char *ostr)
PGDLLIMPORT char * optarg
char * psprintf(const char *fmt,...)
void pg_usleep(long microsec)
#define select(n, r, w, e, timeout)