PostgreSQL Source Code: src/bin/pg_upgrade/task.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

46

50

51

52

53

54

55

57

58

59

60

61

62

64

65

66

67

68

69

70

72{

74 bool free_result;

75 void *arg;

77

78

79

80

81

83{

87};

88

89

90

91

93{

94 FREE,

95 CONNECTING,

98

99

100

101

103{

105 int db_idx;

106 int step_idx;

107 PGconn *conn;

108 bool ready;

109 bool select_mode;

110 int sock;

112

113

114

115

118{

120

122

123

125

126 return task;

127}

128

129

130

131

132void

134{

138}

139

140

141

142

143

144

145

146

147

148

149

150void

153 void *arg)

154{

156

159

164

166}

167

168

169

170

171

172

173static void

175{

178

179

187 {

190 }

191

193

194 if (!slot->conn)

195 pg_fatal("failed to create connection with connection string: \"%s\"",

196 conn_opts.data);

197

199}

200

201

202

203

204

205static void

208{

213

218

219

220

221

222

223

224 if (process_cb)

225 (*process_cb) (dbinfo, res, steps->arg);

226

229}

230

231

232

233

234static void

236{

238

239 if (!slot->ready)

240 return;

241

242 switch (slot->state)

243 {

245

246

247

248

249

251 return;

252

253

254

255

256

260

261 return;

262

264

265

269

270

272 {

274 return;

275 }

276

277

278

279

281 slot->select_mode = true;

284

285 return;

286

288

289

290

291

292

295

296

297

298

299

301 {

302

304 return;

305

307 }

308

309

310

311

312

313

314

315

319 slot->ready = true;

320

322

323 return;

324 }

325}

326

327

328

329

330static int

332{

333 fd_set save_input = *input;

334 fd_set save_output = *output;

335

336 if (maxFd == 0)

337 return 0;

338

339 for (;;)

340 {

341 int i;

342

343 *input = save_input;

344 *output = save_output;

345

347

348#ifndef WIN32

349 if (i < 0 && errno == EINTR)

350 continue;

351#else

352 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)

353 continue;

354#endif

355 return i;

356 }

357}

358

359

360

361

362

363static void

365{

368 int maxFd = 0;

369

372

373 for (int i = 0; i < numslots; i++)

374 {

375

376

377

378

379

380 slots[i].ready = false;

381

382

383

384

385

386

387

389 continue;

390

391

392

393

395 if (slots[i].sock < 0)

397 FD_SET(slots[i].sock, slots[i].select_mode ? &input : &output);

398 maxFd = Max(maxFd, slots[i].sock);

399 }

400

401

402

403

405 pg_fatal("select() failed: %m");

406

407

408

409

410 for (int i = 0; i < numslots; i++)

411 slots[i].ready |= (FD_ISSET(slots[i].sock, &input) ||

412 FD_ISSET(slots[i].sock, &output));

413}

414

415

416

417

418

419void

421{

424

427

428

429

430

431 for (int i = 0; i < jobs; i++)

432 slots[i].ready = true;

433

434 while (dbs_complete < cluster->dbarr.ndbs)

435 {

436 for (int i = 0; i < jobs; i++)

438

440 }

441

443}

void cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)

#define ALWAYS_SECURE_SEARCH_PATH_SQL

PostgresPollingStatusType PQconnectPoll(PGconn *conn)

ConnStatusType PQstatus(const PGconn *conn)

PGconn * PQconnectStart(const char *conninfo)

void PQfinish(PGconn *conn)

char * PQerrorMessage(const PGconn *conn)

int PQsocket(const PGconn *conn)

PGresult * PQgetResult(PGconn *conn)

ExecStatusType PQresultStatus(const PGresult *res)

void PQclear(PGresult *res)

int PQconsumeInput(PGconn *conn)

int PQsendQuery(PGconn *conn, const char *query)

int PQisBusy(PGconn *conn)

void * pg_malloc0(size_t size)

void * pg_realloc(void *ptr, size_t size)

PostgresPollingStatusType

void(* UpgradeTaskProcessCB)(DbInfo *dbinfo, PGresult *res, void *arg)

PQExpBuffer createPQExpBuffer(void)

void initPQExpBuffer(PQExpBuffer str)

void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)

void destroyPQExpBuffer(PQExpBuffer str)

void appendPQExpBufferStr(PQExpBuffer str, const char *data)

void termPQExpBuffer(PQExpBuffer str)

void appendConnStrVal(PQExpBuffer buf, const char *str)

UpgradeTaskSlotState state

UpgradeTaskProcessCB process_cb

static int dbs_processing

UpgradeTask * upgrade_task_create(void)

static void wait_on_slots(UpgradeTaskSlot *slots, int numslots)

struct UpgradeTaskSlot UpgradeTaskSlot

struct UpgradeTaskStep UpgradeTaskStep

void upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster)

static void process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)

static void start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot)

static int select_loop(int maxFd, fd_set *input, fd_set *output)

void upgrade_task_free(UpgradeTask *task)

static void process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)

void upgrade_task_add_step(UpgradeTask *task, const char *query, UpgradeTaskProcessCB process_cb, bool free_result, void *arg)

#define select(n, r, w, e, timeout)