PostgreSQL Source Code: src/backend/commands/copyto.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

16

17#include <ctype.h>

19#include <sys/stat.h>

20

38

39

40

41

42

44{

45 COPY_FILE,

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

66{

67

69

70

72 FILE *copy_file;

74

75 int file_encoding;

78

79

83 char *filename;

84 bool is_program;

86

89

90

91

92

94

99

100

101typedef struct

102{

107

108

110

111

112

118 bool use_quote);

119

120

126 bool is_csv);

132

133

143

144

145

146

147

148

149

150

151

157};

158

159

165};

166

167

173};

174

175

178{

179 if (opts->csv_mode)

181 else if (opts->binary)

183

184

186}

187

188

189static void

191{

192

193

194

195

200

201

203 {

205 bool hdr_delim = false;

206

208 {

210 char *colname;

211

212 if (hdr_delim)

214 hdr_delim = true;

215

217

220 else

222 }

223

225 }

226}

227

228

229

230

231

232static void

234{

235 Oid func_oid;

236 bool is_varlena;

237

238

241}

242

243

244static void

246{

248}

249

250

251static void

253{

255}

256

257

258

259

260

261

262

266 bool is_csv)

267{

268 bool need_delim = false;

270

272 {

275

276 if (need_delim)

278 need_delim = true;

279

280 if (isnull)

281 {

283 }

284 else

285 {

287

290

291 if (is_csv)

294 else

296 }

297 }

298

300}

301

302

303static void

305{

306

307}

308

309

310

311

312

313static void

315{

317

318

320

321 tmp = 0;

323

324 tmp = 0;

326}

327

328

329

330

331

332static void

334{

335 Oid func_oid;

336 bool is_varlena;

337

338

341}

342

343

344static void

346{

348

349

351

353 {

356

357 if (isnull)

358 {

360 }

361 else

362 {

363 bytea *outputbytes;

364

370 }

371 }

372

374}

375

376

377static void

379{

380

382

384}

385

386

387

388

389

390static void

392{

396 int i;

397

401 for (i = 0; i < natts; i++)

405}

406

407static void

409{

410

412

414}

415

416

417

418

419

420

421

422

423

424

425

426static void

428{

430}

431

432static void

434{

436}

437

438static void

440{

442}

443

444static void

446{

448

450 {

452 if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,

455 {

457 {

458 if (errno == EPIPE)

459 {

460

461

462

463

464

465

467

468

469

470

471

472

473 errno = EPIPE;

474 }

477 errmsg("could not write to COPY program: %m")));

478 }

479 else

482 errmsg("could not write to COPY file: %m")));

483 }

484 break;

486

488 break;

491 break;

492 }

493

494

497

499}

500

501

502

503

504

505static inline void

507{

509 {

511

512#ifndef WIN32

514#else

516#endif

517 break;

519

521 break;

522 default:

523 break;

524 }

525

526

528}

529

530

531

532

533

534

535

536

537static inline void

539{

541

544}

545

546

547

548

549static inline void

551{

553

556}

557

558

559

560

561static void

563{

564 int pclose_rc;

565

567

569 if (pclose_rc == -1)

572 errmsg("could not close pipe to external command: %m")));

573 else if (pclose_rc != 0)

574 {

576 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),

577 errmsg("program \"%s\" failed",

580 }

581}

582

583

584

585

586static void

588{

590 {

592 }

593 else

594 {

598 errmsg("could not close file \"%s\": %m",

600 }

601

603

606}

607

608

609

610

611

612

613

614

615

616

617

618

619

620

621

626 Oid queryRelId,

628 bool is_program,

630 List *attnamelist,

632{

634 bool pipe = (filename == NULL && data_dest_cb == NULL);

636 int num_phys_attrs;

638 const int progress_cols[] = {

641 };

642 int64 progress_vals[] = {

644 0

645 };

646

647 if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)

648 {

649 if (rel->rd_rel->relkind == RELKIND_VIEW)

651 (errcode(ERRCODE_WRONG_OBJECT_TYPE),

652 errmsg("cannot copy from view \"%s\"",

654 errhint("Try the COPY (SELECT ...) TO variant.")));

655 else if (rel->rd_rel->relkind == RELKIND_MATVIEW)

656 {

659 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

660 errmsg("cannot copy from unpopulated materialized view \"%s\"",

662 errhint("Use the REFRESH MATERIALIZED VIEW command."));

663 }

664 else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)

666 (errcode(ERRCODE_WRONG_OBJECT_TYPE),

667 errmsg("cannot copy from foreign table \"%s\"",

669 errhint("Try the COPY (SELECT ...) TO variant.")));

670 else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)

672 (errcode(ERRCODE_WRONG_OBJECT_TYPE),

673 errmsg("cannot copy from sequence \"%s\"",

675 else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)

677 (errcode(ERRCODE_WRONG_OBJECT_TYPE),

678 errmsg("cannot copy from partitioned table \"%s\"",

680 errhint("Try the COPY (SELECT ...) TO variant.")));

681 else

683 (errcode(ERRCODE_WRONG_OBJECT_TYPE),

684 errmsg("cannot copy from non-table relation \"%s\"",

686 }

687

688

689

691

692

693

694

695

697 "COPY",

699

701

702

704

705

707

708

709 if (rel)

710 {

712

713 cstate->rel = rel;

714

716 }

717 else

718 {

719 List *rewritten;

723

724 cstate->rel = NULL;

725

726

727

728

729

732 NULL);

733

734

735 if (rewritten == NIL)

736 {

738 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

739 errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));

740 }

742 {

744

745

746 foreach(lc, rewritten)

747 {

749

752 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

753 errmsg("conditional DO INSTEAD rules are not supported for COPY")));

756 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

757 errmsg("DO ALSO rules are not supported for COPY")));

758 }

759

761 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

762 errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));

763 }

764

766

767

771 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

772 errmsg("COPY (SELECT INTO) is not supported")));

773

774

777 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

778 errmsg("COPY query must not be a utility command")));

779

780

781

782

783

786 {

791

793 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

794 errmsg("COPY query must have a RETURNING clause")));

795 }

796

797

800

801

802

803

804

805

806

807

808

809

810

811

813 {

814

815

816

817

818

819

822 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

823 errmsg("relation referenced by COPY statement has changed")));

824 }

825

826

827

828

829

832

833

836

837

841 dest, NULL, NULL, 0);

842

843

844

845

846

847

849

851 }

852

853

855

856 num_phys_attrs = tupDesc->natts;

857

858

861 {

863 }

865 {

866 List *attnums;

868

870

871 foreach(cur, attnums)

872 {

875

878 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),

879

880 errmsg("%s column \"%s\" not referenced by COPY",

881 "FORCE_QUOTE", NameStr(attr->attname))));

883 }

884 }

885

886

889 else

891

892

893

894

895

899 else

901

902

904

906

907 if (data_dest_cb)

908 {

912 }

913 else if (pipe)

914 {

916

917 Assert(!is_program);

920 }

921 else

922 {

925

926 if (is_program)

927 {

933 errmsg("could not execute command \"%s\": %m",

935 }

936 else

937 {

938 mode_t oumask;

939 struct stat st;

940

942

943

944

945

946

949 (errcode(ERRCODE_INVALID_NAME),

950 errmsg("relative path not allowed for COPY to file")));

951

954 {

956 }

958 {

959 umask(oumask);

960 }

963 {

964

965 int save_errno = errno;

966

969 errmsg("could not open file \"%s\" for writing: %m",

971 (save_errno == ENOENT || save_errno == EACCES) ?

972 errhint("COPY TO instructs the PostgreSQL server process to write a file. "

973 "You may want a client-side facility such as psql's \\copy.") : 0));

974 }

975

979 errmsg("could not stat file \"%s\": %m",

981

984 (errcode(ERRCODE_WRONG_OBJECT_TYPE),

986 }

987 }

988

989

993

995

997

998 return cstate;

999}

1000

1001

1002

1003

1004void

1006{

1008 {

1009

1014 }

1015

1016

1018}

1019

1020

1021

1022

1023

1024

1027{

1031 int num_phys_attrs;

1034

1035 if (fe_copy)

1037

1038 if (cstate->rel)

1040 else

1042 num_phys_attrs = tupDesc->natts;

1044

1045

1047

1048

1051 {

1054

1057 }

1058

1059

1060

1061

1062

1063

1064

1066 "COPY TO",

1068

1070

1071 if (cstate->rel)

1072 {

1075

1078

1079 processed = 0;

1081 {

1083

1084

1086

1087

1089

1090

1091

1092

1093

1095 ++processed);

1096 }

1097

1100 }

1101 else

1102 {

1103

1106 }

1107

1109

1111

1112 if (fe_copy)

1114

1115 return processed;

1116}

1117

1118

1119

1120

1121static inline void

1123{

1125

1128

1129

1131

1133

1135}

1136

1137

1138

1139

1140#define DUMPSOFAR() \

1141 do { \

1142 if (ptr > start) \

1143 CopySendData(cstate, start, ptr - start); \

1144 } while (0)

1145

1146static void

1148{

1149 const char *ptr;

1150 const char *start;

1151 char c;

1152 char delimc = cstate->opts.delim[0];

1153

1156 else

1158

1159

1160

1161

1162

1163

1164

1165

1166

1167

1168

1169

1170

1171

1172

1174 {

1176 while ((c = *ptr) != '\0')

1177 {

1178 if ((unsigned char) c < (unsigned char) 0x20)

1179 {

1180

1181

1182

1183

1184

1185

1186

1187 switch (c)

1188 {

1189 case '\b':

1190 c = 'b';

1191 break;

1192 case '\f':

1193 c = 'f';

1194 break;

1195 case '\n':

1196 c = 'n';

1197 break;

1198 case '\r':

1199 c = 'r';

1200 break;

1201 case '\t':

1202 c = 't';

1203 break;

1204 case '\v':

1205 c = 'v';

1206 break;

1207 default:

1208

1209 if (c == delimc)

1210 break;

1211

1212 ptr++;

1213 continue;

1214 }

1215

1219 start = ++ptr;

1220 }

1221 else if (c == '\\' || c == delimc)

1222 {

1225 start = ptr++;

1226 }

1229 else

1230 ptr++;

1231 }

1232 }

1233 else

1234 {

1236 while ((c = *ptr) != '\0')

1237 {

1238 if ((unsigned char) c < (unsigned char) 0x20)

1239 {

1240

1241

1242

1243

1244

1245

1246

1247 switch (c)

1248 {

1249 case '\b':

1250 c = 'b';

1251 break;

1252 case '\f':

1253 c = 'f';

1254 break;

1255 case '\n':

1256 c = 'n';

1257 break;

1258 case '\r':

1259 c = 'r';

1260 break;

1261 case '\t':

1262 c = 't';

1263 break;

1264 case '\v':

1265 c = 'v';

1266 break;

1267 default:

1268

1269 if (c == delimc)

1270 break;

1271

1272 ptr++;

1273 continue;

1274 }

1275

1279 start = ++ptr;

1280 }

1281 else if (c == '\\' || c == delimc)

1282 {

1285 start = ptr++;

1286 }

1287 else

1288 ptr++;

1289 }

1290 }

1291

1293}

1294

1295

1296

1297

1298

1299static void

1301 bool use_quote)

1302{

1303 const char *ptr;

1304 const char *start;

1305 char c;

1306 char delimc = cstate->opts.delim[0];

1307 char quotec = cstate->opts.quote[0];

1308 char escapec = cstate->opts.escape[0];

1310

1311

1312 if (!use_quote && strcmp(string, cstate->opts.null_print) == 0)

1313 use_quote = true;

1314

1317 else

1319

1320

1321

1322

1323 if (!use_quote)

1324 {

1325

1326

1327

1328

1329

1330

1331

1332 if (single_attr && strcmp(ptr, "\\.") == 0)

1333 use_quote = true;

1334 else

1335 {

1336 const char *tptr = ptr;

1337

1338 while ((c = *tptr) != '\0')

1339 {

1340 if (c == delimc || c == quotec || c == '\n' || c == '\r')

1341 {

1342 use_quote = true;

1343 break;

1344 }

1347 else

1348 tptr++;

1349 }

1350 }

1351 }

1352

1353 if (use_quote)

1354 {

1356

1357

1358

1359

1361 while ((c = *ptr) != '\0')

1362 {

1363 if (c == quotec || c == escapec)

1364 {

1367 start = ptr;

1368 }

1371 else

1372 ptr++;

1373 }

1375

1377 }

1378 else

1379 {

1380

1382 }

1383}

1384

1385

1386

1387

1388static void

1390{

1391

1392}

1393

1394

1395

1396

1397static bool

1399{

1402

1403

1405

1406

1409

1410 return true;

1411}

1412

1413

1414

1415

1416static void

1418{

1419

1420}

1421

1422

1423

1424

1425static void

1427{

1429}

1430

1431

1432

1433

1436{

1438

1444

1445 self->cstate = NULL;

1447

1449}

List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)

void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options)

void pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)

void pgstat_progress_update_param(int index, int64 val)

void pgstat_progress_update_multi_param(int nparam, const int *index, const int64 *val)

void pgstat_progress_end_command(void)

#define IS_HIGHBIT_SET(ch)

#define pg_attribute_always_inline

#define MemSet(start, val, len)

static void CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)

static void CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)

static void CopySendInt32(CopyToState cstate, int32 val)

static void ClosePipeToProgram(CopyToState cstate)

static const CopyToRoutine CopyToRoutineCSV

static bool copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)

static void CopyAttributeOutCSV(CopyToState cstate, const char *string, bool use_quote)

uint64 DoCopyTo(CopyToState cstate)

static void CopyToTextLikeEnd(CopyToState cstate)

static void CopyAttributeOutText(CopyToState cstate, const char *string)

struct CopyToStateData CopyToStateData

static const CopyToRoutine CopyToRoutineText

static void CopySendInt16(CopyToState cstate, int16 val)

static void CopySendData(CopyToState cstate, const void *databuf, int datasize)

static void CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot)

static void CopySendChar(CopyToState cstate, char c)

DestReceiver * CreateCopyDestReceiver(void)

static const CopyToRoutine * CopyToGetRoutine(const CopyFormatOptions *opts)

static void CopySendTextLikeEndOfRow(CopyToState cstate)

static void EndCopy(CopyToState cstate)

static void copy_dest_destroy(DestReceiver *self)

static void CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)

CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query, Oid queryRelId, const char *filename, bool is_program, copy_data_dest_cb data_dest_cb, List *attnamelist, List *options)

static void copy_dest_shutdown(DestReceiver *self)

static void copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)

static void CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc)

static void SendCopyBegin(CopyToState cstate)

static void SendCopyEnd(CopyToState cstate)

static void CopySendEndOfRow(CopyToState cstate)

static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)

static void CopyToTextLikeOneRow(CopyToState cstate, TupleTableSlot *slot, bool is_csv)

static void CopySendString(CopyToState cstate, const char *str)

static void CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot)

static const char BinarySignature[11]

void EndCopyTo(CopyToState cstate)

static void CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc)

static const CopyToRoutine CopyToRoutineBinary

static void CopyToBinaryEnd(CopyToState cstate)

DestReceiver * CreateDestReceiver(CommandDest dest)

int errdetail_internal(const char *fmt,...)

int errcode_for_file_access(void)

int errhint(const char *fmt,...)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

void ExecutorEnd(QueryDesc *queryDesc)

void ExecutorFinish(QueryDesc *queryDesc)

void ExecutorStart(QueryDesc *queryDesc, int eflags)

void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)

void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)

FILE * OpenPipeStream(const char *command, const char *mode)

int ClosePipeStream(FILE *file)

FILE * AllocateFile(const char *name, const char *mode)

void fmgr_info(Oid functionId, FmgrInfo *finfo)

bytea * SendFunctionCall(FmgrInfo *flinfo, Datum val)

char * OutputFunctionCall(FmgrInfo *flinfo, Datum val)

Assert(PointerIsAligned(start, uint64))

void(* copy_data_dest_cb)(void *data, int len)

#define pq_putmessage(msgtype, s, len)

bool list_member_int(const List *list, int datum)

bool list_member_oid(const List *list, Oid datum)

void getTypeBinaryOutputInfo(Oid type, Oid *typSend, bool *typIsVarlena)

void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)

int GetDatabaseEncoding(void)

int pg_get_client_encoding(void)

char * pg_server_to_any(const char *s, int len, int encoding)

void MemoryContextReset(MemoryContext context)

char * pstrdup(const char *in)

void pfree(void *pointer)

void * palloc0(Size size)

MemoryContext CurrentMemoryContext

void MemoryContextDelete(MemoryContext context)

#define AllocSetContextCreate

#define ALLOCSET_DEFAULT_SIZES

#define CHECK_FOR_INTERRUPTS()

#define IsA(nodeptr, _type_)

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

#define CURSOR_OPT_PARALLEL_OK

static AmcheckOptions opts

FormData_pg_attribute * Form_pg_attribute

#define lfirst_node(type, lc)

static int list_length(const List *l)

#define linitial_node(type, l)

#define foreach_int(var, lst)

#define PG_ENCODING_IS_CLIENT_ONLY(_enc)

#define is_absolute_path(filename)

PlannedStmt * pg_plan_query(Query *querytree, const char *query_string, int cursorOptions, ParamListInfo boundParams)

CommandDest whereToSendOutput

List * pg_analyze_and_rewrite_fixedparams(RawStmt *parsetree, const char *query_string, const Oid *paramTypes, int numParams, QueryEnvironment *queryEnv)

void pq_putemptymessage(char msgtype)

void pq_endmessage(StringInfo buf)

void pq_beginmessage(StringInfo buf, char msgtype)

static void pq_sendbyte(StringInfo buf, uint8 byt)

static void pq_sendint16(StringInfo buf, uint16 i)

void FreeQueryDesc(QueryDesc *qdesc)

QueryDesc * CreateQueryDesc(PlannedStmt *plannedstmt, const char *sourceText, Snapshot snapshot, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, QueryEnvironment *queryEnv, int instrument_options)

#define PROGRESS_COPY_COMMAND

#define PROGRESS_COPY_TYPE_FILE

#define PROGRESS_COPY_BYTES_PROCESSED

#define PROGRESS_COPY_COMMAND_TO

#define PROGRESS_COPY_TUPLES_PROCESSED

#define PROGRESS_COPY_TYPE

#define PROGRESS_COPY_TYPE_PROGRAM

#define PROGRESS_COPY_TYPE_CALLBACK

#define PROGRESS_COPY_TYPE_PIPE

#define PqMsg_CopyOutResponse

#define RelationGetRelid(relation)

#define RelationGetDescr(relation)

#define RelationGetRelationName(relation)

#define RelationIsPopulated(relation)

void UpdateActiveSnapshotCommandId(void)

void PopActiveSnapshot(void)

void PushCopiedSnapshot(Snapshot snapshot)

Snapshot GetActiveSnapshot(void)

StringInfo makeStringInfo(void)

void resetStringInfo(StringInfo str)

void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)

#define appendStringInfoCharMacro(str, ch)

CopyHeaderChoice header_line

void(* CopyToOutFunc)(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)

void(* CopyToOneRow)(CopyToState cstate, TupleTableSlot *slot)

void(* CopyToEnd)(CopyToState cstate)

void(* CopyToStart)(CopyToState cstate, TupleDesc tupDesc)

MemoryContext copycontext

const CopyToRoutine * routine

copy_data_dest_cb data_dest_cb

bool encoding_embeds_ascii

const char * p_sourcetext

void(* rStartup)(DestReceiver *self, int operation, TupleDesc typeinfo)

void(* rShutdown)(DestReceiver *self)

bool(* receiveSlot)(TupleTableSlot *slot, DestReceiver *self)

void(* rDestroy)(DestReceiver *self)

TupleTableSlot * table_slot_create(Relation relation, List **reglist)

static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, struct ScanKeyData *key)

static void table_endscan(TableScanDesc scan)

static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)

static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)

static void slot_getallattrs(TupleTableSlot *slot)

char * wait_result_to_str(int exitstatus)

int pg_encoding_mblen(int encoding, const char *mbstr)