PostgreSQL Source Code: src/test/modules/libpq_pipeline/libpq_pipeline.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

17

20

21#include "catalog/pg_type_d.h"

24

25

30 int numsent);

31

32static const char *const progname = "libpq_pipeline";

33

34

35static char *tracefile = NULL;

36

37

38#ifdef DEBUG_OUTPUT

39#define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)

40#else

41#define pg_debug(...)

42#endif

43

45"DROP TABLE IF EXISTS pq_pipeline_demo";

47"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"

48"int8filler int8);";

50"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";

52"INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";

53

54

55#define MAXINTLEN 12

56#define MAXINT8LEN 20

57

58static void

60{

62 exit(1);

63}

64

65

66

67

68

69

70

71

72

73#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)

76{

78

80

82 va_start(args, fmt);

85 Assert(fmt[strlen(fmt) - 1] != '\n');

87 exit(1);

88}

89

90

91

92

93#define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)

94static void

96{

98

100 if (res == NULL)

101 pg_fatal_impl(line, "PQgetResult returned null: %s",

104 pg_fatal_impl(line, "query did not fail when it was expected");

106 pg_fatal_impl(line, "query failed with a different error than cancellation: %s",

109

112}

113

114

115

116

117

118

119static void

121 char *state, char *event)

122{

123 const Oid paramTypes[] = {INT4OID, TEXTOID};

124 const char *paramValues[2];

125 char *pidstr = psprintf("%d", procpid);

126

127 Assert((state == NULL) ^ (event == NULL));

128

129 paramValues[0] = pidstr;

130 paramValues[1] = state ? state : event;

131

132 while (true)

133 {

136

137 if (state != NULL)

139 "SELECT count(*) FROM pg_stat_activity WHERE "

140 "pid = 1ANDstate=1 AND state = 1ANDstate=2",

141 2, paramTypes, paramValues, NULL, NULL, 0);

142 else

144 "SELECT count(*) FROM pg_stat_activity WHERE "

145 "pid = 1ANDwaitevent=1 AND wait_event = 1ANDwaitevent=2",

146 2, paramTypes, paramValues, NULL, NULL, 0);

147

155 if (strcmp(value, "0") != 0)

156 {

158 break;

159 }

161

162

164 }

165

167}

168

169#define send_cancellable_query(conn, monitorConn) \

170 send_cancellable_query_impl(__LINE__, conn, monitorConn)

171static void

173{

174 const char *env_wait;

175 const Oid paramTypes[1] = {INT4OID};

176

177

178

179

180

181

183

184 env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");

185 if (env_wait == NULL)

186 env_wait = "180";

187

189 &env_wait, NULL, NULL, 0) != 1)

191

192

193

194

195

197}

198

199

200

201

204{

208 const char **vals;

209 int nopts = 0;

210 int i;

211

213 nopts++;

214 nopts++;

215

217 vals = pg_malloc(sizeof(char *) * nopts);

218

219 i = 0;

221 {

222 if (opt->val)

223 {

225 vals[i] = opt->val;

226 i++;

227 }

228 }

230

232

234 pg_fatal("Connection to database failed: %s",

236

237 return copyConn;

238}

239

240

241

242

243static void

245{

249 char errorbuf[256];

250

251 fprintf(stderr, "test cancellations... ");

252

255

256

257

258

259

262

263

266 if (PQcancel(cancel, errorbuf, sizeof(errorbuf)))

267 pg_fatal("failed to run PQcancel: %s", errorbuf);

269

270

272 if (PQcancel(cancel, errorbuf, sizeof(errorbuf)))

273 pg_fatal("failed to run PQcancel: %s", errorbuf);

275

277

278

283

284

291

292

297 while (true)

298 {

299 struct timeval tv;

300 fd_set input_mask;

301 fd_set output_mask;

304

306 break;

307

308 FD_ZERO(&input_mask);

309 FD_ZERO(&output_mask);

310 switch (pollres)

311 {

313 pg_debug("polling for reads\n");

314 FD_SET(sock, &input_mask);

315 break;

317 pg_debug("polling for writes\n");

318 FD_SET(sock, &output_mask);

319 break;

320 default:

322 }

323

324 if (sock < 0)

326

327 tv.tv_sec = 3;

328 tv.tv_usec = 0;

329

330 while (true)

331 {

332 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)

333 {

334 if (errno == EINTR)

335 continue;

336 pg_fatal("select() failed: %m");

337 }

338 break;

339 }

340 }

344

345

346

347

348

350

354 while (true)

355 {

356 struct timeval tv;

357 fd_set input_mask;

358 fd_set output_mask;

361

363 break;

364

365 FD_ZERO(&input_mask);

366 FD_ZERO(&output_mask);

367 switch (pollres)

368 {

370 pg_debug("polling for reads\n");

371 FD_SET(sock, &input_mask);

372 break;

374 pg_debug("polling for writes\n");

375 FD_SET(sock, &output_mask);

376 break;

377 default:

379 }

380

381 if (sock < 0)

383

384 tv.tv_sec = 3;

385 tv.tv_usec = 0;

386

387 while (true)

388 {

389 if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)

390 {

391 if (errno == EINTR)

392 continue;

393 pg_fatal("select() failed: %m");

394 }

395 break;

396 }

397 }

401

403

405}

406

407static void

409{

411

412 fprintf(stderr, "test error cases... ");

413

415 pg_fatal("Expected blocking connection mode");

416

418 pg_fatal("Unable to enter pipeline mode");

419

421 pg_fatal("Pipeline mode not activated properly");

422

423

426 pg_fatal("PQexec should fail in pipeline mode but succeeded");

428 "synchronous command execution functions are not allowed in pipeline mode\n") != 0)

429 pg_fatal("did not get expected error message; got: \"%s\"",

431

432

434 pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");

436 "PQsendQuery not allowed in pipeline mode\n") != 0)

437 pg_fatal("did not get expected error message; got: \"%s\"",

439

440

442 pg_fatal("re-entering pipeline mode should be a no-op but failed");

443

445 pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");

446

447

449 pg_fatal("couldn't exit idle empty pipeline mode");

450

452 pg_fatal("Pipeline mode not terminated properly");

453

454

456 pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");

457

458

461 pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",

463

465}

466

467static void

469{

471 const char *dummy_params[1] = {"1"};

472 Oid dummy_param_oids[1] = {INT4OID};

473

474 fprintf(stderr, "multi pipeline... ");

475

476

477

478

479

482

483

485 dummy_params, NULL, NULL, 0) != 1)

487

490

491

493 dummy_params, NULL, NULL, 0) != 1)

495

496

499

500

502 dummy_params, NULL, NULL, 0) != 1)

504

507

508

509

510

511

513 if (res == NULL)

514 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",

516

518 pg_fatal("Unexpected result code %s from first pipeline item",

521 res = NULL;

522

524 pg_fatal("PQgetResult returned something extra after first result");

525

527 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");

528

530 if (res == NULL)

531 pg_fatal("PQgetResult returned null when sync result expected: %s",

533

535 pg_fatal("Unexpected result code %s instead of sync result, error: %s",

538

539

540

542 if (res == NULL)

543 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",

545

547 pg_fatal("Unexpected result code %s from second pipeline item",

550 res = NULL;

551

553 pg_fatal("PQgetResult returned something extra after first result");

554

556 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");

557

559 if (res == NULL)

560 pg_fatal("PQgetResult returned null when sync result expected: %s",

562

564 pg_fatal("Unexpected result code %s instead of sync result, error: %s",

567

568

569

571 if (res == NULL)

572 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",

574

576 pg_fatal("Unexpected result code %s from third pipeline item",

578

580 if (res != NULL)

581 pg_fatal("Expected null result, got %s",

583

585 if (res == NULL)

586 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",

588

590 pg_fatal("Unexpected result code %s from second pipeline sync",

592

593

595 pg_fatal("Fell out of pipeline mode somehow");

596

597

599 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",

601

603 pg_fatal("exiting pipeline mode didn't seem to work");

604

606}

607

608

609

610

611

612static void

614{

615 int numqueries = 10;

616 int results = 0;

618

619 fprintf(stderr, "nosync... ");

620

621 if (sock < 0)

623

625 pg_fatal("could not enter pipeline mode");

626 for (int i = 0; i < numqueries; i++)

627 {

628 fd_set input_mask;

629 struct timeval tv;

630

632 0, NULL, NULL, NULL, NULL, 0) != 1)

635

636

637

638

639 FD_ZERO(&input_mask);

640 FD_SET(sock, &input_mask);

641 tv.tv_sec = 0;

642 tv.tv_usec = 0;

643 if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)

644 {

645 fprintf(stderr, "select() failed: %m\n");

647 }

650 }

651

652

654 pg_fatal("failed to send flush request");

656

657

658 for (;;)

659 {

661

663

664

665 if (res == NULL)

666 pg_fatal("got unexpected NULL result after %d results", results);

667

668

670 {

672

673

675 if (res2 != NULL)

676 pg_fatal("expected NULL, got %s",

679 results++;

680

681

682 if (results == numqueries)

683 break;

684

685 continue;

686 }

687

688

690 }

691

693}

694

695

696

697

698

699

700

701

702

703

704static void

706{

708 const char *dummy_params[1] = {"1"};

709 Oid dummy_param_oids[1] = {INT4OID};

710 int i;

711 int gotrows;

712 bool goterror;

713

714 fprintf(stderr, "aborted pipeline... ");

715

719

723

724

725

726

727

728

731

732 dummy_params[0] = "1";

734 dummy_params, NULL, NULL, 0) != 1)

736

738 1, dummy_param_oids, dummy_params,

739 NULL, NULL, 0) != 1)

741

742 dummy_params[0] = "2";

744 dummy_params, NULL, NULL, 0) != 1)

746

749

750 dummy_params[0] = "3";

752 dummy_params, NULL, NULL, 0) != 1)

753 pg_fatal("dispatching second-pipeline insert failed: %s",

755

758

759

760

761

762

763

764

765

767 if (res == NULL)

770 pg_fatal("Unexpected result status %s: %s",

774

775

777 pg_fatal("Expected null result, got %s",

779

780

782 if (res == NULL)

785 pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",

788

789

791 pg_fatal("Expected null result, got %s",

793

794

795

796

797

798

799

800

802 pg_fatal("pipeline should be flagged as aborted but isn't");

803

804

806 if (res == NULL)

809 pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",

812

813

816

818 pg_fatal("pipeline should be flagged as aborted but isn't");

819

820

822 pg_fatal("Fell out of pipeline mode somehow");

823

824

825

826

827

828

829

831 if (res == NULL)

834 pg_fatal("Unexpected result code from first pipeline sync\n"

835 "Expected PGRES_PIPELINE_SYNC, got %s",

838

840 pg_fatal("sync should've cleared the aborted flag but didn't");

841

842

844 pg_fatal("Fell out of pipeline mode somehow");

845

846

848 if (res == NULL)

851 pg_fatal("Unexpected result code %s from first item in second pipeline",

854

855

858

859

863 pg_fatal("Unexpected result code %s from second pipeline sync",

866

868 pg_fatal("Expected null result, got %s: %s",

871

872

873 if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)

877 goterror = false;

879 {

881 {

884 pg_fatal("expected error about multiple commands, got %s",

887 goterror = true;

888 break;

889 default:

891 break;

892 }

893 }

894 if (!goterror)

895 pg_fatal("did not get cannot-insert-multiple-commands error");

897 if (res == NULL)

900 pg_fatal("Unexpected result code %s from pipeline sync",

903

904

906 0, NULL, NULL, NULL, NULL, 0) != 1)

911 goterror = false;

912 gotrows = 0;

914 {

916 {

919 gotrows++;

920 break;

923 pg_fatal("expected division-by-zero, got: %s (%s)",

926 printf("got expected division-by-zero\n");

927 goterror = true;

928 break;

929 default:

931 }

933 }

934 if (!goterror)

935 pg_fatal("did not get division-by-zero error");

936 if (gotrows != 3)

937 pg_fatal("did not get three rows");

938

942 pg_fatal("Unexpected result code %s from third pipeline sync",

945

946

948 pg_fatal("Fell out of pipeline mode somehow");

949

950

952 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",

954

956 pg_fatal("exiting pipeline mode didn't seem to work");

957

958

959

960

961

962

963

964

965

966

967

968

969

970

971

972

973 res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");

974

976 pg_fatal("Expected tuples, got %s: %s",

981 {

983

984 if (strcmp(val, "3") != 0)

985 pg_fatal("expected only insert with value 3, got %s", val);

986 }

987

989

991}

992

993

995{

1004};

1005

1006static void

1008{

1009 Oid insert_param_oids[2] = {INT4OID, INT8OID};

1010 const char *insert_params[2];

1015 int rows_to_send,

1016 rows_to_receive;

1017

1018 insert_params[0] = insert_param_0;

1019 insert_params[1] = insert_param_1;

1020

1021 rows_to_send = rows_to_receive = n_rows;

1022

1023

1024

1025

1028

1030 {

1031 const char *sql;

1032

1033 switch (send_step)

1034 {

1036 sql = "BEGIN TRANSACTION";

1038 break;

1039

1043 break;

1044

1048 break;

1049

1050 default:

1052 sql = NULL;

1053 }

1054

1055 pg_debug("sending: %s\n", sql);

1057 0, NULL, NULL, NULL, NULL, 0) != 1)

1059 }

1060

1066

1067

1068

1069

1070

1071

1072

1073

1074

1077

1078 while (recv_step != BI_DONE)

1079 {

1080 int sock;

1081 fd_set input_mask;

1082 fd_set output_mask;

1083

1085

1086 if (sock < 0)

1087 break;

1088

1089 FD_ZERO(&input_mask);

1090 FD_SET(sock, &input_mask);

1091 FD_ZERO(&output_mask);

1092 FD_SET(sock, &output_mask);

1093

1094 if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)

1095 {

1096 fprintf(stderr, "select() failed: %m\n");

1098 }

1099

1100

1101

1102

1103

1104 if (FD_ISSET(sock, &input_mask))

1105 {

1107

1108

1110 {

1112 const char *cmdtag = "";

1114 int status;

1115

1116

1117

1118

1119

1121 if (res == NULL)

1122 continue;

1123

1125 switch (recv_step)

1126 {

1128 cmdtag = "BEGIN";

1129 recv_step++;

1130 break;

1132 cmdtag = "DROP TABLE";

1133 recv_step++;

1134 break;

1136 cmdtag = "CREATE TABLE";

1137 recv_step++;

1138 break;

1140 cmdtag = "";

1142 recv_step++;

1143 break;

1145 cmdtag = "INSERT";

1146 rows_to_receive--;

1147 if (rows_to_receive == 0)

1148 recv_step++;

1149 break;

1151 cmdtag = "COMMIT";

1152 recv_step++;

1153 break;

1155 cmdtag = "";

1158 recv_step++;

1159 break;

1161

1162 pg_fatal("unreachable state");

1163 }

1164

1166 pg_fatal("%s reported status %s, expected %s\n"

1167 "Error message: \"%s\"",

1170

1171 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)

1172 pg_fatal("%s expected command tag '%s', got '%s'",

1174

1176

1178 }

1179 }

1180

1181

1182 if (FD_ISSET(sock, &output_mask))

1183 {

1185

1187 {

1189

1191

1193 2, insert_params, NULL, NULL, 0) == 1)

1194 {

1195 pg_debug("sent row %d\n", rows_to_send);

1196

1197 rows_to_send--;

1198 if (rows_to_send == 0)

1199 send_step++;

1200 }

1201 else

1202 {

1203

1204

1205

1206

1207 fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",

1209 }

1210 }

1212 {

1214 0, NULL, NULL, NULL, NULL, 0) == 1)

1215 {

1217 send_step++;

1218 }

1219 else

1220 {

1221 fprintf(stderr, "WARNING: failed to send commit: %s\n",

1223 }

1224 }

1225 else if (send_step == BI_SYNC)

1226 {

1228 {

1230 send_step++;

1231 }

1232 else

1233 {

1234 fprintf(stderr, "WARNING: pipeline sync failed: %s\n",

1236 }

1237 }

1238 }

1239 }

1240

1241

1243 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",

1245

1248

1249 fprintf(stderr, "ok\n");

1250}

1251

1252static void

1254{

1256 Oid param_oids[1] = {INT4OID};

1257 Oid expected_oids[4];

1258 Oid typ;

1259

1260 fprintf(stderr, "prepared... ");

1261

1264 if (PQsendPrepare(conn, "select_one", "SELECT 1,′42′,1, '42', 1,42,1::numeric, "

1265 "interval '1 sec'",

1266 1, param_oids) != 1)

1268 expected_oids[0] = INT4OID;

1269 expected_oids[1] = TEXTOID;

1270 expected_oids[2] = NUMERICOID;

1271 expected_oids[3] = INTERVALOID;

1276

1278 if (res == NULL)

1279 pg_fatal("PQgetResult returned null");

1284 if (res != NULL)

1285 pg_fatal("expected NULL result");

1286

1288 if (res == NULL)

1289 pg_fatal("PQgetResult returned NULL");

1293 pg_fatal("expected %zu columns, got %d",

1296 {

1298 if (typ != expected_oids[i])

1299 pg_fatal("field %d: expected type %u, got %u",

1300 i, expected_oids[i], typ);

1301 }

1304 if (res != NULL)

1305 pg_fatal("expected NULL result");

1306

1310

1311 fprintf(stderr, "closing statement..");

1316

1318 if (res == NULL)

1319 pg_fatal("expected non-NULL result");

1324 if (res != NULL)

1325 pg_fatal("expected NULL result");

1329

1332

1333

1337

1338

1339

1340

1341

1345

1346 fprintf(stderr, "creating portal... ");

1348 PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");

1355 if (res == NULL)

1356 pg_fatal("PQgetResult returned null");

1359

1361 if (typ != INT4OID)

1362 pg_fatal("portal: expected type %u, got %u",

1363 INT4OID, typ);

1366 if (res != NULL)

1367 pg_fatal("expected NULL result");

1371

1372 fprintf(stderr, "closing portal... ");

1377

1379 if (res == NULL)

1380 pg_fatal("expected non-NULL result");

1385 if (res != NULL)

1386 pg_fatal("expected NULL result");

1390

1393

1394

1398

1399

1400

1401

1402

1406

1407 fprintf(stderr, "ok\n");

1408}

1409

1410

1411

1412

1413static void

1415{

1417 const char **vals;

1418 int nopts;

1420 int protocol_version;

1421 int max_protocol_version_index;

1422 int i;

1423

1424

1425

1426

1427

1428 nopts = 0;

1430 nopts++;

1431 nopts++;

1432 nopts++;

1433

1435 vals = pg_malloc0(sizeof(char *) * nopts);

1436

1437 i = 0;

1439 {

1440 if (opt->val)

1441 {

1443 vals[i] = opt->val;

1444 i++;

1445 }

1446 }

1447

1448 max_protocol_version_index = i;

1449 keywords[i] = "max_protocol_version";

1450 i++;

1452

1453

1454

1455

1456 vals[max_protocol_version_index] = "3.0";

1458

1460 pg_fatal("Connection to database failed: %s",

1462

1464 if (protocol_version != 30000)

1465 pg_fatal("expected 30000, got %d", protocol_version);

1466

1468

1469

1470

1471

1472

1473 vals[max_protocol_version_index] = "3.1";

1475

1477 pg_fatal("Connecting with max_protocol_version 3.1 should have failed.");

1478

1480

1481

1482

1483

1484 vals[max_protocol_version_index] = "3.2";

1486

1488 pg_fatal("Connection to database failed: %s",

1490

1492 if (protocol_version != 30002)

1493 pg_fatal("expected 30002, got %d", protocol_version);

1494

1496

1497

1498

1499

1500 vals[max_protocol_version_index] = "latest";

1502

1504 pg_fatal("Connection to database failed: %s",

1506

1508 if (protocol_version != 30002)

1509 pg_fatal("expected 30002, got %d", protocol_version);

1510

1512}

1513

1514

1515static void

1517{

1518 int *n_notices = (int *) arg;

1519

1520 (*n_notices)++;

1521 fprintf(stderr, "NOTICE %d: %s", *n_notices, message);

1522}

1523

1524

1525static void

1527{

1529 int n_notices = 0;

1530

1531 fprintf(stderr, "\npipeline idle...\n");

1532

1534

1535

1542 if (res == NULL)

1543 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",

1546 pg_fatal("unexpected result code %s from first pipeline item",

1550 if (res != NULL)

1551 pg_fatal("did not receive terminating NULL");

1555 pg_fatal("exiting pipeline succeeded when it shouldn't");

1557 strlen("cannot exit pipeline mode")) != 0)

1558 pg_fatal("did not get expected error; got: %s",

1563 pg_fatal("unexpected result code %s from second pipeline item",

1567 if (res != NULL)

1568 pg_fatal("did not receive terminating NULL");

1571

1572 if (n_notices > 0)

1573 pg_fatal("got %d notice(s)", n_notices);

1574 fprintf(stderr, "ok - 1\n");

1575

1576

1579 if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)

1583 if (res == NULL)

1584 pg_fatal("unexpected NULL result received");

1589 fprintf(stderr, "ok - 2\n");

1590}

1591

1592static void

1594{

1596 const char *dummy_params[1] = {"1"};

1597 Oid dummy_param_oids[1] = {INT4OID};

1598

1599 fprintf(stderr, "simple pipeline... ");

1600

1601

1602

1603

1604

1605

1606

1607

1608

1610 pg_fatal("Expected blocking connection mode");

1611

1614

1616 1, dummy_param_oids, dummy_params,

1617 NULL, NULL, 0) != 1)

1619

1621 pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");

1622

1625

1627 if (res == NULL)

1628 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",

1630

1632 pg_fatal("Unexpected result code %s from first pipeline item",

1634

1636 res = NULL;

1637

1639 pg_fatal("PQgetResult returned something extra after first query result.");

1640

1641

1642

1643

1644

1646 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");

1647

1649 if (res == NULL)

1650 pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",

1652

1654 pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",

1656

1658 res = NULL;

1659

1661 pg_fatal("PQgetResult returned something extra after pipeline end: %s",

1663

1664

1666 pg_fatal("Fell out of pipeline mode somehow");

1667

1668

1670 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",

1672

1674 pg_fatal("Exiting pipeline mode didn't seem to work");

1675

1676 fprintf(stderr, "ok\n");

1677}

1678

1679static void

1681{

1683 int i;

1684 bool pipeline_ended = false;

1685

1687 pg_fatal("failed to enter pipeline mode: %s",

1689

1690

1691 for (i = 0; i < 3; i++)

1692 {

1693 char *param[1];

1694

1695 param[0] = psprintf("%d", 44 + i);

1696

1698 "SELECT generate_series(42, $1)",

1699 1,

1700 NULL,

1701 (const char **) param,

1702 NULL,

1703 NULL,

1704 0) != 1)

1705 pg_fatal("failed to send query: %s",

1707 pfree(param[0]);

1708 }

1711

1712 for (i = 0; !pipeline_ended; i++)

1713 {

1714 bool first = true;

1715 bool saw_ending_tuplesok;

1716 bool isSingleTuple = false;

1717

1718

1719 if (i < 2)

1720 {

1722 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);

1723 }

1724

1725

1726 saw_ending_tuplesok = false;

1728 {

1730

1732 {

1733 fprintf(stderr, "end of pipeline reached\n");

1734 pipeline_ended = true;

1736 if (i != 3)

1737 pg_fatal("Expected three results, got %d", i);

1738 break;

1739 }

1740

1741

1742 if (first)

1743 {

1745 pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",

1748 pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",

1750 first = false;

1751 }

1752

1754 switch (est)

1755 {

1758 saw_ending_tuplesok = true;

1759 if (isSingleTuple)

1760 {

1762 fprintf(stderr, "all tuples received in query %d\n", i);

1763 else

1764 pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");

1765 }

1766 break;

1767

1769 isSingleTuple = true;

1771 break;

1772

1773 default:

1775 }

1777 }

1778 if (!pipeline_ended && !saw_ending_tuplesok)

1779 pg_fatal("didn't get expected terminating TUPLES_OK");

1780 }

1781

1782

1783

1784

1785

1786

1788 0, NULL, NULL, NULL, NULL, 0) != 1)

1789 pg_fatal("failed to send query: %s",

1792 pg_fatal("failed to send flush request");

1794 pg_fatal("PQsetSingleRowMode() failed");

1796 if (res == NULL)

1797 pg_fatal("unexpected NULL");

1799 pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",

1802 if (res == NULL)

1803 pg_fatal("unexpected NULL");

1805 pg_fatal("Expected PGRES_TUPLES_OK, got %s",

1808 pg_fatal("expected NULL result");

1809

1811 0, NULL, NULL, NULL, NULL, 0) != 1)

1812 pg_fatal("failed to send query: %s",

1815 pg_fatal("failed to send flush request");

1817 if (res == NULL)

1818 pg_fatal("unexpected NULL");

1820 pg_fatal("Expected PGRES_TUPLES_OK, got %s",

1823 pg_fatal("expected NULL result");

1824

1825

1826

1827

1828

1830 0, NULL, NULL, NULL, NULL, 0) != 1)

1831 pg_fatal("failed to send query: %s",

1834 pg_fatal("failed to send flush request");

1836 pg_fatal("PQsetChunkedRowsMode() failed");

1838 if (res == NULL)

1839 pg_fatal("unexpected NULL");

1841 pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",

1847 if (res == NULL)

1848 pg_fatal("unexpected NULL");

1850 pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",

1855 if (res == NULL)

1856 pg_fatal("unexpected NULL");

1858 pg_fatal("Expected PGRES_TUPLES_OK, got %s",

1863 pg_fatal("expected NULL result");

1864

1867

1868 fprintf(stderr, "ok\n");

1869}

1870

1871

1872

1873

1874

1875static void

1877{

1879 bool expect_null;

1880 int num_syncs = 0;

1881

1882 res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"

1883 "CREATE TABLE pq_pipeline_tst (id int)");

1885 pg_fatal("failed to create test table: %s",

1888

1890 pg_fatal("failed to enter pipeline mode: %s",

1893 pg_fatal("could not send prepare on pipeline: %s",

1895

1897 "BEGIN",

1898 0, NULL, NULL, NULL, NULL, 0) != 1)

1899 pg_fatal("failed to send query: %s",

1902 "SELECT 0/0",

1903 0, NULL, NULL, NULL, NULL, 0) != 1)

1904 pg_fatal("failed to send query: %s",

1906

1907

1908

1909

1910

1912 pg_fatal("failed to execute prepared: %s",

1914

1915

1917 "INSERT INTO pq_pipeline_tst VALUES (1)",

1918 0, NULL, NULL, NULL, NULL, 0) != 1)

1919 pg_fatal("failed to send query: %s",

1923 num_syncs++;

1924

1925

1926

1927

1928

1930 "INSERT INTO pq_pipeline_tst VALUES (2)",

1931 0, NULL, NULL, NULL, NULL, 0) != 1)

1932 pg_fatal("failed to send query: %s",

1936 num_syncs++;

1937

1938

1939

1940

1941

1943 pg_fatal("failed to execute prepared: %s",

1945

1946

1947

1948

1949

1951 "INSERT INTO pq_pipeline_tst VALUES (3)",

1952 0, NULL, NULL, NULL, NULL, 0) != 1)

1953 pg_fatal("failed to send query: %s",

1955

1958 num_syncs++;

1961 num_syncs++;

1962

1963 expect_null = false;

1964 for (int i = 0;; i++)

1965 {

1967

1969 if (res == NULL)

1970 {

1971 printf("%d: got NULL result\n", i);

1972 if (!expect_null)

1973 pg_fatal("did not expect NULL here");

1974 expect_null = false;

1975 continue;

1976 }

1979 if (expect_null)

1984 {

1985 printf(": command didn't run because pipeline aborted\n");

1986 }

1987 else

1990

1992 num_syncs--;

1993 else

1994 expect_null = true;

1995 if (num_syncs <= 0)

1996 break;

1997 }

1999 pg_fatal("returned something extra after all the syncs: %s",

2001

2004

2005

2006 res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");

2010 pg_fatal("did not get 1 tuple");

2011 if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)

2012 pg_fatal("did not get expected tuple");

2014

2015 fprintf(stderr, "ok\n");

2016}

2017

2018

2019

2020

2021

2022

2023static void

2025{

2028 Oid paramTypes[2] = {INT8OID, INT8OID};

2029 const char *paramValues[2];

2032 int ctr = 0;

2033 int numsent = 0;

2034 int results = 0;

2035 bool read_done = false;

2036 bool write_done = false;

2037 bool error_sent = false;

2038 bool got_error = false;

2039 int switched = 0;

2040 int socketful = 0;

2041 fd_set in_fds;

2042 fd_set out_fds;

2043

2044 fprintf(stderr, "uniqviol ...");

2045

2047

2048 paramValues[0] = paramValue0;

2049 paramValues[1] = paramValue1;

2050 sprintf(paramValue1, "42");

2051

2052 res = PQexec(conn, "drop table if exists ppln_uniqviol;"

2053 "create table ppln_uniqviol(id bigint primary key, idata bigint)");

2056

2060

2062 "insert into ppln_uniqviol values ($1, $2) returning id",

2063 2, paramTypes);

2066

2068 pg_fatal("failed to enter pipeline mode");

2069

2070 while (!read_done)

2071 {

2072

2073

2074

2075

2076

2077

2078

2080 {

2081 bool new_error;

2082

2083 if (results >= numsent)

2084 {

2085 if (write_done)

2086 read_done = true;

2087 break;

2088 }

2089

2092 if (new_error && got_error)

2094 got_error |= new_error;

2095 if (results++ >= numsent - 1)

2096 {

2097 if (write_done)

2098 read_done = true;

2099 break;

2100 }

2101 }

2102

2103 if (read_done)

2104 break;

2105

2106 FD_ZERO(&out_fds);

2107 FD_SET(sock, &out_fds);

2108

2109 FD_ZERO(&in_fds);

2110 FD_SET(sock, &in_fds);

2111

2112 if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)

2113 {

2114 if (errno == EINTR)

2115 continue;

2116 pg_fatal("select() failed: %m");

2117 }

2118

2121

2122

2123

2124

2125

2126 if (!write_done && FD_ISSET(sock, &out_fds))

2127 {

2128 for (;;)

2129 {

2130 int flush;

2131

2132

2133

2134

2135

2136 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)

2137 {

2138 sprintf(paramValue0, "%d", numsent / 2);

2140 error_sent = true;

2141 }

2142 else

2143 {

2145 sprintf(paramValue0, "%d", ctr++);

2146 }

2147

2150 numsent++;

2151

2152

2153 if (socketful != 0 && numsent % socketful == 42 && error_sent)

2154 {

2156 pg_fatal("failed to send flush request");

2157 write_done = true;

2158 fprintf(stderr, "\ndone writing\n");

2160 break;

2161 }

2162

2163

2165 if (flush == -1)

2167 if (flush == 1)

2168 {

2169 if (socketful == 0)

2170 socketful = numsent;

2171 fprintf(stderr, "\nswitch to reading\n");

2172 switched++;

2173 break;

2174 }

2175 }

2176 }

2177 }

2178

2179 if (!got_error)

2180 pg_fatal("did not get expected error");

2181

2182 fprintf(stderr, "ok\n");

2183}

2184

2185

2186

2187

2188

2189

2190

2191static bool

2193{

2195 bool got_error = false;

2196

2197 if (res == NULL)

2198 pg_fatal("got unexpected NULL");

2199

2201 {

2203 got_error = true;

2206

2208 if (res2 != NULL)

2209 pg_fatal("expected NULL, got %s",

2211 break;

2212

2214 fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));

2216

2218 if (res2 != NULL)

2219 pg_fatal("expected NULL, got %s",

2221 break;

2222

2224 fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);

2226 if (res2 != NULL)

2227 pg_fatal("expected NULL, got %s",

2229 break;

2230

2231 default:

2233 }

2234

2235 return got_error;

2236}

2237

2238

2239static void

2241{

2242 fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);

2243 fprintf(stderr, "Usage:\n");

2245 fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);

2246 fprintf(stderr, "\nOptions:\n");

2247 fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");

2248 fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");

2249}

2250

2251static void

2253{

2255 printf("disallowed_in_pipeline\n");

2256 printf("multi_pipelines\n");

2258 printf("pipeline_abort\n");

2259 printf("pipeline_idle\n");

2260 printf("pipelined_insert\n");

2261 printf("prepared\n");

2262 printf("protocol_version\n");

2263 printf("simple_pipeline\n");

2264 printf("singlerow\n");

2265 printf("transaction\n");

2266 printf("uniqviol\n");

2267}

2268

2269int

2271{

2272 const char *conninfo = "";

2274 FILE *trace;

2275 char *testname;

2276 int numrows = 10000;

2278 int c;

2279

2280 while ((c = getopt(argc, argv, "r:t:")) != -1)

2281 {

2282 switch (c)

2283 {

2284 case 'r':

2285 errno = 0;

2286 numrows = strtol(optarg, NULL, 10);

2287 if (errno != 0 || numrows <= 0)

2288 {

2289 fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",

2291 exit(1);

2292 }

2293 break;

2294 case 't':

2296 break;

2297 }

2298 }

2299

2301 {

2304 }

2305 else

2306 {

2308 exit(1);

2309 }

2310

2311 if (strcmp(testname, "tests") == 0)

2312 {

2314 exit(0);

2315 }

2316

2318 {

2321 }

2322

2323

2326 {

2327 fprintf(stderr, "Connection to database failed: %s\n",

2330 }

2331

2332 res = PQexec(conn, "SET lc_messages TO \"C\"");

2335 res = PQexec(conn, "SET debug_parallel_query = off");

2338

2339

2341 {

2342 if (strcmp(tracefile, "-") == 0)

2344 else

2346 if (trace == NULL)

2348

2349

2350 setvbuf(trace, NULL, PG_IOLBF, 0);

2351

2355 }

2356

2357 if (strcmp(testname, "cancel") == 0)

2359 else if (strcmp(testname, "disallowed_in_pipeline") == 0)

2361 else if (strcmp(testname, "multi_pipelines") == 0)

2363 else if (strcmp(testname, "nosync") == 0)

2365 else if (strcmp(testname, "pipeline_abort") == 0)

2367 else if (strcmp(testname, "pipeline_idle") == 0)

2369 else if (strcmp(testname, "pipelined_insert") == 0)

2371 else if (strcmp(testname, "prepared") == 0)

2373 else if (strcmp(testname, "protocol_version") == 0)

2375 else if (strcmp(testname, "simple_pipeline") == 0)

2377 else if (strcmp(testname, "singlerow") == 0)

2379 else if (strcmp(testname, "transaction") == 0)

2381 else if (strcmp(testname, "uniqviol") == 0)

2383 else

2384 {

2385 fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);

2386 exit(1);

2387 }

2388

2389

2391 return 0;

2392}

#define pg_attribute_printf(f, a)

static PGcancel *volatile cancelConn

#define fprintf(file, fmt, msg)

PGcancel * PQgetCancel(PGconn *conn)

void PQcancelReset(PGcancelConn *cancelConn)

PGcancelConn * PQcancelCreate(PGconn *conn)

ConnStatusType PQcancelStatus(const PGcancelConn *cancelConn)

int PQcancelBlocking(PGcancelConn *cancelConn)

int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)

PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn)

void PQcancelFinish(PGcancelConn *cancelConn)

int PQrequestCancel(PGconn *conn)

void PQfreeCancel(PGcancel *cancel)

int PQcancelSocket(const PGcancelConn *cancelConn)

char * PQcancelErrorMessage(const PGcancelConn *cancelConn)

int PQcancelStart(PGcancelConn *cancelConn)

int PQfullProtocolVersion(const PGconn *conn)

PGconn * PQconnectdb(const char *conninfo)

PQconninfoOption * PQconninfo(PGconn *conn)

ConnStatusType PQstatus(const PGconn *conn)

void PQfinish(PGconn *conn)

int PQbackendPID(const PGconn *conn)

PGpipelineStatus PQpipelineStatus(const PGconn *conn)

PQnoticeProcessor PQsetNoticeProcessor(PGconn *conn, PQnoticeProcessor proc, void *arg)

char * PQerrorMessage(const PGconn *conn)

int PQsocket(const PGconn *conn)

PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)

int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)

int PQsetSingleRowMode(PGconn *conn)

int PQflush(PGconn *conn)

Oid PQftype(const PGresult *res, int field_num)

PGresult * PQdescribePrepared(PGconn *conn, const char *stmt)

PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)

int PQexitPipelineMode(PGconn *conn)

int PQsendClosePortal(PGconn *conn, const char *portal)

int PQenterPipelineMode(PGconn *conn)

PGresult * PQclosePrepared(PGconn *conn, const char *stmt)

char * PQgetvalue(const PGresult *res, int tup_num, int field_num)

PGresult * PQclosePortal(PGconn *conn, const char *portal)

PGresult * PQgetResult(PGconn *conn)

ExecStatusType PQresultStatus(const PGresult *res)

void PQclear(PGresult *res)

int PQsendClosePrepared(PGconn *conn, const char *stmt)

int PQsendPipelineSync(PGconn *conn)

int PQntuples(const PGresult *res)

PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)

char * PQresultErrorMessage(const PGresult *res)

int PQsendDescribePrepared(PGconn *conn, const char *stmt)

int PQconsumeInput(PGconn *conn)

int PQsetnonblocking(PGconn *conn, int arg)

int PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)

PGresult * PQdescribePortal(PGconn *conn, const char *portal)

int PQsetChunkedRowsMode(PGconn *conn, int chunkSize)

char * PQresultErrorField(const PGresult *res, int fieldcode)

int PQsendQuery(PGconn *conn, const char *query)

char * PQcmdStatus(PGresult *res)

int PQpipelineSync(PGconn *conn)

int PQsendDescribePortal(PGconn *conn, const char *portal)

char * PQresStatus(ExecStatusType status)

int PQisBusy(PGconn *conn)

PGresult * PQexec(PGconn *conn, const char *query)

int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)

int PQsendFlushRequest(PGconn *conn)

int PQisnonblocking(const PGconn *conn)

int PQnfields(const PGresult *res)

void PQtrace(PGconn *conn, FILE *debug_port)

void PQsetTraceFlags(PGconn *conn, int flags)

void * pg_malloc(size_t size)

char * pg_strdup(const char *in)

void * pg_malloc0(size_t size)

Assert(PointerIsAligned(start, uint64))

static const JsonPathKeyword keywords[]

#define PQTRACE_SUPPRESS_TIMESTAMPS

PostgresPollingStatusType

#define PQTRACE_REGRESS_MODE

static void usage(const char *progname)

static void print_test_list(void)

static const char *const insert_sql2

static void confirm_query_canceled_impl(int line, PGconn *conn)

static void wait_for_connection_state(int line, PGconn *monitorConn, int procpid, char *state, char *event)

static void exit_nicely(PGconn *conn)

int main(int argc, char **argv)

#define confirm_query_canceled(conn)

static void test_uniqviol(PGconn *conn)

static void send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)

static void test_simple_pipeline(PGconn *conn)

static pg_noreturn void static bool process_result(PGconn *conn, PGresult *res, int results, int numsent)

static void test_multi_pipelines(PGconn *conn)

static void test_pipeline_idle(PGconn *conn)

static const char *const create_table_sql

static const char *const insert_sql

static void test_protocol_version(PGconn *conn)

static void test_nosync(PGconn *conn)

static const char *const progname

static void test_pipeline_abort(PGconn *conn)

static pg_noreturn void pg_fatal_impl(int line, const char *fmt,...) pg_attribute_printf(2

static const char *const drop_table_sql

#define send_cancellable_query(conn, monitorConn)

static void notice_processor(void *arg, const char *message)

static void test_transaction(PGconn *conn)

static PGconn * copy_connection(PGconn *conn)

static void test_prepared(PGconn *conn)

static void test_cancel(PGconn *conn)

static void test_singlerowmode(PGconn *conn)

static void test_disallowed_in_pipeline(PGconn *conn)

static void test_pipelined_insert(PGconn *conn, int n_rows)

void pfree(void *pointer)

static AmcheckOptions opts

int getopt(int nargc, char *const *nargv, const char *ostr)

PGDLLIMPORT char * optarg

char * psprintf(const char *fmt,...)

void pg_usleep(long microsec)

#define select(n, r, w, e, timeout)