PostgreSQL Source Code: src/test/modules/test_shm_mq/test.c Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

13

15

20

22

24

27

29 char *newdata);

30

31

33

34

35

36

37

38

39

40

41

44{

47 char *message_contents = VARDATA_ANY(message);

57

58

59 if (loop_count < 0)

61 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

62 errmsg("repeat count size must be an integer value greater than or equal to zero")));

63

64

65

66

67

68

69 if (nworkers <= 0)

71 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

72 errmsg("number of workers must be an integer value greater than zero")));

73

74

76

77

78 res = shm_mq_send(outqh, message_size, message_contents, false, true);

81 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

82 errmsg("could not send message")));

83

84

85

86

87

88 for (;;)

89 {

90

94 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

95 errmsg("could not receive message")));

96

97

98 if (--loop_count <= 0)

99 break;

100

101

105 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

106 errmsg("could not send message")));

107 }

108

109

110

111

112

114

115

117

119}

120

121

122

123

124

125

126

127

128

129

130

133{

136 char *message_contents = VARDATA_ANY(message);

141 int32 send_count = 0;

142 int32 receive_count = 0;

149

150

151 if (loop_count < 0)

153 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

154 errmsg("repeat count size must be an integer value greater than or equal to zero")));

155

156

157

158

159

160 if (nworkers < 0)

162 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

163 errmsg("number of workers must be an integer value greater than or equal to zero")));

164

165

167

168

169 for (;;)

170 {

171 bool wait = true;

172

173

174

175

176

177

178

179

180 if (send_count < loop_count)

181 {

182 res = shm_mq_send(outqh, message_size, message_contents, true,

183 true);

185 {

186 ++send_count;

187 wait = false;

188 }

191 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

192 errmsg("could not send message")));

193 }

194

195

196

197

198

199 if (receive_count < loop_count)

200 {

203 {

204 ++receive_count;

205

206 if (verify)

208 wait = false;

209 }

212 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

213 errmsg("could not receive message")));

214 }

215 else

216 {

217

218

219

220

221 if (send_count != receive_count)

223 (errcode(ERRCODE_INTERNAL_ERROR),

224 errmsg("message sent %d times, but received %d times",

225 send_count, receive_count)));

226 break;

227 }

228

229 if (wait)

230 {

231

234

235

236

237

238

239

240

245 }

246 }

247

248

250

252}

253

254

255

256

257static void

259{

261

262 if (origlen != newlen)

264 (errmsg("message corrupted"),

265 errdetail("The original message was %zu bytes but the final message is %zu bytes.",

266 origlen, newlen)));

267

268 for (i = 0; i < origlen; ++i)

269 if (origdata[i] != newdata[i])

271 (errmsg("message corrupted"),

272 errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));

273}

void dsm_detach(dsm_segment *seg)

int errdetail(const char *fmt,...)

int errcode(int sqlerrcode)

int errmsg(const char *fmt,...)

#define ereport(elevel,...)

#define PG_GETARG_TEXT_PP(n)

#define PG_GETARG_INT64(n)

#define PG_GETARG_INT32(n)

#define PG_GETARG_BOOL(n)

void ResetLatch(Latch *latch)

int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)

#define CHECK_FOR_INTERRUPTS()

void test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)

shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)

shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)

Datum test_shm_mq_pipelined(PG_FUNCTION_ARGS)

static void verify_message(Size origlen, char *origdata, Size newlen, char *newdata)

PG_FUNCTION_INFO_V1(test_shm_mq)

Datum test_shm_mq(PG_FUNCTION_ARGS)

static uint32 we_message_queue

#define VARSIZE_ANY_EXHDR(PTR)

uint32 WaitEventExtensionNew(const char *wait_event_name)

#define WL_EXIT_ON_PM_DEATH