PostgreSQL Source Code: src/backend/replication/logical/logicalfuncs.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

17

19

38

39

41{

47

48

49

50

51static void

53 bool last_write)

54{

56}

57

58

59

60

61static void

63 bool last_write)

64{

66 bool nulls[3];

68

69

71 elog(ERROR, "too much output for sql interface");

72

74

75 memset(nulls, 0, sizeof(nulls));

78

79

80

81

82

86 false));

87

88

90

93}

94

95

96

97

100{

103 int32 upto_nchanges;

115

117

119

122 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),

123 errmsg("slot name must not be null")));

125

128 else

130

133 else

135

138 (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),

139 errmsg("options array must not be null")));

141

142

144

145 p->binary_output = binary;

146

149

150

152 if (ndim > 1)

153 {

155 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

156 errmsg("array must be one-dimensional")));

157 }

159 {

161 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

162 errmsg("array must not contain nulls")));

163 }

164 else if (ndim == 1)

165 {

166 int nelems;

167 Datum *datum_opts;

168 int i;

169

171

173

174 if (nelems % 2 != 0)

176 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

177 errmsg("array must have even number of elements")));

178

179 for (i = 0; i < nelems; i += 2)

180 {

183

185 }

186 }

187

190 p->tupdesc = rsinfo->setDesc;

191

192

193

194

197 else

199

201

203 {

204

207 false,

213

215

216

217

218

219

220 if (!binary &&

223 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

224 errmsg("logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",

227

228

229

230

231

233 wait_for_wal_lsn = end_of_wal;

234 else

235 wait_for_wal_lsn = Min(upto_lsn, end_of_wal);

236

238

239 ctx->output_writer_private = p;

240

241

242

243

244

245

247

248

250

251

252 while (ctx->reader->EndRecPtr < end_of_wal)

253 {

255 char *errm = NULL;

256

258 if (errm)

259 elog(ERROR, "could not find record for logical decoding: %s", errm);

260

261

262

263

264

265 if (record != NULL)

267

268

270 upto_lsn <= ctx->reader->EndRecPtr)

271 break;

272 if (upto_nchanges != 0 &&

273 upto_nchanges <= p->returned_rows)

274 break;

276 }

277

278

279

280

281

282

284

285

286

287

288

290 {

292

293

294

295

296

297

298

299

300

301

302

303

304

305

307 }

308

309

311

314 }

316 {

317

319

321 }

323

324 return (Datum) 0;

325}

326

327

328

329

332{

334}

335

336

337

338

341{

343}

344

345

346

347

350{

352}

353

354

355

356

359{

361}

362

363

364

365

366

369{

375

377 transactional, flush);

379}

380

383{

384

386}

#define PG_GETARG_ARRAYTYPE_P(n)

bool array_contains_nulls(ArrayType *array)

void deconstruct_array_builtin(ArrayType *array, Oid elmtype, Datum **elemsp, bool **nullsp, int *nelemsp)

static Datum values[MAXATTR]

#define TextDatumGetCString(d)

void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

#define PG_GETARG_BYTEA_PP(n)

#define PG_GETARG_TEXT_PP(n)

#define PG_GETARG_NAME(n)

#define PG_GETARG_INT32(n)

#define PG_GETARG_BOOL(n)

void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)

Assert(PointerIsAligned(start, uint64))

void InvalidateSystemCaches(void)

List * lappend(List *list, void *datum)

void LogicalConfirmReceivedLocation(XLogRecPtr lsn)

void FreeDecodingContext(LogicalDecodingContext *ctx)

LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)

void CheckLogicalDecodingRequirements(void)

static void LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)

Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS)

static void LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)

Datum pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)

Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)

Datum pg_logical_emit_message_text(PG_FUNCTION_ARGS)

Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)

Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)

struct DecodingOutputState DecodingOutputState

static Datum pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)

DefElem * makeDefElem(char *name, Node *arg, int location)

int GetDatabaseEncoding(void)

bool pg_verify_mbstr(int encoding, const char *mbstr, int len, bool noError)

void * palloc0(Size size)

XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, size_t size, bool transactional, bool flush)

#define CHECK_FOR_INTERRUPTS()

@ OUTPUT_PLUGIN_TEXTUAL_OUTPUT

static MemoryContext MemoryContextSwitchTo(MemoryContext context)

static Datum LSNGetDatum(XLogRecPtr X)

static Datum PointerGetDatum(const void *X)

static Datum TransactionIdGetDatum(TransactionId X)

char * format_procedure(Oid procedure_oid)

ResourceOwner CurrentResourceOwner

void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)

void ReplicationSlotMarkDirty(void)

ReplicationSlot * MyReplicationSlot

void CheckSlotPermissions(void)

void ReplicationSlotRelease(void)

void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)

void resetStringInfo(StringInfo str)

Tuplestorestate * tupstore

MemoryContext ecxt_per_query_memory

void * output_writer_private

ReplicationSlotPersistentData data

Tuplestorestate * setResult

void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)

String * makeString(char *str)

#define VARSIZE_ANY_EXHDR(PTR)

text * cstring_to_text_with_len(const char *s, int len)

char * text_to_cstring(const text *t)

bool RecoveryInProgress(void)

XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)

#define XLogRecPtrIsInvalid(r)

#define InvalidXLogRecPtr

XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)

void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)

XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

void wal_segment_close(XLogReaderState *state)

void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)

int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)