PostgreSQL Source Code: src/test/modules/test_shm_mq/test.c Source File (original) (raw)
1
2
3
4
5
6
7
8
9
10
11
12
13
15
20
22
24
27
29 char *newdata);
30
31
33
34
35
36
37
38
39
40
41
44{
47 char *message_contents = VARDATA_ANY(message);
57
58
59 if (loop_count < 0)
61 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
62 errmsg("repeat count size must be an integer value greater than or equal to zero")));
63
64
65
66
67
68
69 if (nworkers <= 0)
71 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
72 errmsg("number of workers must be an integer value greater than zero")));
73
74
76
77
78 res = shm_mq_send(outqh, message_size, message_contents, false, true);
81 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
82 errmsg("could not send message")));
83
84
85
86
87
88 for (;;)
89 {
90
94 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
95 errmsg("could not receive message")));
96
97
98 if (--loop_count <= 0)
99 break;
100
101
105 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
106 errmsg("could not send message")));
107 }
108
109
110
111
112
114
115
117
119}
120
121
122
123
124
125
126
127
128
129
130
133{
136 char *message_contents = VARDATA_ANY(message);
141 int32 send_count = 0;
142 int32 receive_count = 0;
149
150
151 if (loop_count < 0)
153 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
154 errmsg("repeat count size must be an integer value greater than or equal to zero")));
155
156
157
158
159
160 if (nworkers < 0)
162 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
163 errmsg("number of workers must be an integer value greater than or equal to zero")));
164
165
167
168
169 for (;;)
170 {
171 bool wait = true;
172
173
174
175
176
177
178
179
180 if (send_count < loop_count)
181 {
182 res = shm_mq_send(outqh, message_size, message_contents, true,
183 true);
185 {
186 ++send_count;
187 wait = false;
188 }
191 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
192 errmsg("could not send message")));
193 }
194
195
196
197
198
199 if (receive_count < loop_count)
200 {
203 {
204 ++receive_count;
205
206 if (verify)
208 wait = false;
209 }
212 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
213 errmsg("could not receive message")));
214 }
215 else
216 {
217
218
219
220
221 if (send_count != receive_count)
223 (errcode(ERRCODE_INTERNAL_ERROR),
224 errmsg("message sent %d times, but received %d times",
225 send_count, receive_count)));
226 break;
227 }
228
229 if (wait)
230 {
231
234
235
236
237
238
239
240
245 }
246 }
247
248
250
252}
253
254
255
256
257static void
259{
261
262 if (origlen != newlen)
264 (errmsg("message corrupted"),
265 errdetail("The original message was %zu bytes but the final message is %zu bytes.",
266 origlen, newlen)));
267
268 for (i = 0; i < origlen; ++i)
269 if (origdata[i] != newdata[i])
271 (errmsg("message corrupted"),
272 errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
273}
void dsm_detach(dsm_segment *seg)
int errdetail(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define PG_GETARG_TEXT_PP(n)
#define PG_GETARG_INT64(n)
#define PG_GETARG_INT32(n)
#define PG_GETARG_BOOL(n)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define CHECK_FOR_INTERRUPTS()
void test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Datum test_shm_mq_pipelined(PG_FUNCTION_ARGS)
static void verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
PG_FUNCTION_INFO_V1(test_shm_mq)
Datum test_shm_mq(PG_FUNCTION_ARGS)
static uint32 we_message_queue
#define VARSIZE_ANY_EXHDR(PTR)
uint32 WaitEventExtensionNew(const char *wait_event_name)
#define WL_EXIT_ON_PM_DEATH