PostgreSQL Source Code: src/backend/replication/walreceiverfuncs.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
18
19#include <sys/stat.h>
23#include <signal.h>
24
33
35
36
37
38
39
40#define WALRCV_STARTUP_TIMEOUT 10
41
42
45{
46 Size size = 0;
47
49
50 return size;
51}
52
53
54void
56{
57 bool found;
58
61
62 if (!found)
63 {
64
71 }
72}
73
74
75bool
77{
81
83
86
88
89
90
91
92
93
94
96 {
98
100 {
101 bool stopped = false;
102
105 {
107 stopped = true;
108 }
110
111 if (stopped)
113 }
114 }
115
117 return true;
118 else
119 return false;
120}
121
122
123
124
125
126bool
128{
132
134
137
139
140
141
142
143
144
145
147 {
149
151 {
152 bool stopped = false;
153
156 {
158 stopped = true;
159 }
161
162 if (stopped)
164 }
165 }
166
169 return true;
170 else
171 return false;
172}
173
174
175
176
177
178void
180{
182 pid_t walrcvpid = 0;
183 bool stopped = false;
184
185
186
187
188
189
192 {
194 break;
197 stopped = true;
198 break;
199
204
206 walrcvpid = walrcv->pid;
207 break;
208 }
210
211
212 if (stopped)
214
215
216
217
218 if (walrcvpid != 0)
219 kill(walrcvpid, SIGTERM);
220
221
222
223
224
228 WAIT_EVENT_WAL_RECEIVER_EXIT);
230}
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245void
247 const char *slotname, bool create_temp_slot)
248{
250 bool launch = false;
253
254
255
256
257
258
259
262
264
265
268
269 if (conninfo != NULL)
271 else
273
274
275
276
277
278
279
280 if (slotname != NULL && slotname[0] != '\0')
281 {
284 }
285 else
286 {
289 }
290
292 {
293 launch = true;
295 }
296 else
299
300
301
302
303
305 {
309 }
312
313 walrcv_proc = walrcv->procno;
314
316
317 if (launch)
321}
322
323
324
325
326
327
328
329
330
333{
336
339 if (latestChunkStart)
344
345 return recptr;
346}
347
348
349
350
351
354{
356
358}
359
360
361
362
363
364int
366{
371
375
377
378 if (receivePtr == replayPtr)
379 return 0;
380
382
383 if (chunkReplayStartTime == 0)
384 return -1;
385
388}
389
390
391
392
393
394int
396{
400
405
407 lastMsgReceiptTime);
408}
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
#define MemSet(start, val, len)
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
Assert(PointerIsAligned(start, uint64))
void SetLatch(Latch *latch)
void SendPostmasterSignal(PMSignalReason reason)
@ PMSIGNAL_START_WALRECEIVER
size_t strlcpy(char *dst, const char *src, size_t siz)
#define GetPGProcByNumber(n)
#define INVALID_PROC_NUMBER
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
TimestampTz lastMsgReceiptTime
TimeLineID receiveStartTLI
char slotname[NAMEDATALEN]
XLogRecPtr latestChunkStart
ConditionVariable walRcvStoppedCV
pg_atomic_uint64 writtenUpto
TimestampTz lastMsgSendTime
char conninfo[MAXCONNINFO]
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool WalRcvStreaming(void)
void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot)
XLogRecPtr GetWalRcvWriteRecPtr(void)
void ShutdownWalRcv(void)
#define WALRCV_STARTUP_TIMEOUT
int GetReplicationApplyDelay(void)
void WalRcvShmemInit(void)
Size WalRcvShmemSize(void)
int GetReplicationTransferLatency(void)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
static TimeLineID receiveTLI
TimestampTz GetCurrentChunkReplayStartTime(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)