[issue1736190] asyncore/asynchat patches - Code Review (original) (raw)

OLD

NEW

1 # -*- Mode: Python; tab-width: 4 -*-

1 # -*- Mode: Python; tab-width: 4 -*-

2 # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp

2 # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp

3 # Author: Sam Rushing rushing@nightmare.com

3 # Author: Sam Rushing rushing@nightmare.com

4

4

5 # ======================================================================

5 # ======================================================================

6 # Copyright 1996 by Sam Rushing

6 # Copyright 1996 by Sam Rushing

7 #

7 #

8 # All Rights Reserved

8 # All Rights Reserved

9 #

9 #

10 # Permission to use, copy, modify, and distribute this software and

10 # Permission to use, copy, modify, and distribute this software and

(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading...

53 class async_chat (asyncore.dispatcher):

53 class async_chat (asyncore.dispatcher):

54 """This is an abstract class. You must derive from this class, and add

54 """This is an abstract class. You must derive from this class, and add

55 the two methods collect_incoming_data() and found_terminator()"""

55 the two methods collect_incoming_data() and found_terminator()"""

56

56

57 # these are overridable defaults

57 # these are overridable defaults

58

58

59 ac_in_buffer_size = 4096

59 ac_in_buffer_size = 4096

60 ac_out_buffer_size = 4096

60 ac_out_buffer_size = 4096

61

61

62 def __init__ (self, conn=None):

62 def __init__ (self, conn=None):

63 # for string terminator matching

63 self.ac_in_buffer = ''

64 self.ac_in_buffer = ''

64 self.ac_out_buffer = ''

65 ········

65 self.producer_fifo = fifo()

66 # we use a list here rather than cStringIO for a few reasons...

67 # del lst[:] is faster than sio.truncate(0)

68 # lst = [] is faster than sio.truncate(0)

69 # cStringIO will be gaining unicode support in py3k, which

70 # will negatively affect the performance of bytes compared to

71 # a ''.join() equivalent

72 self.incoming = []

73 ········

74 # we toss the use of the "simple producer" and replace it with

75 # a pure deque, which the original fifo was a wrapping of

76 self.producer_fifo = deque()

66 asyncore.dispatcher.__init__ (self, conn)

77 asyncore.dispatcher.__init__ (self, conn)

67

78

68 def collect_incoming_data(self, data):

79 def collect_incoming_data(self, data):

69 raise NotImplementedError, "must be implemented in subclass"

80 raise NotImplementedError("must be implemented in subclass")

81 ····

82 def _collect_incoming_data(self, data):

83 self.incoming.append(data)

84 ····

85 def _get_data(self):

86 d = ''.join(self.incoming)

87 del self.incoming[:]

88 return d

70

89

71 def found_terminator(self):

90 def found_terminator(self):

72 raise NotImplementedError, "must be implemented in subclass"

91 raise NotImplementedError("must be implemented in subclass")

73

92

74 def set_terminator (self, term):

93 def set_terminator (self, term):

75 "Set the input delimiter. Can be a fixed string of any length, an integ er, or None"

94 "Set the input delimiter. Can be a fixed string of any length, an integ er, or None"

76 self.terminator = term

95 self.terminator = term

77

96

78 def get_terminator (self):

97 def get_terminator (self):

79 return self.terminator

98 return self.terminator

80

99

81 # grab some more data from the socket,

100 # grab some more data from the socket,

82 # throw it to the collector method,

101 # throw it to the collector method,

83 # check for the terminator,

102 # check for the terminator,

84 # if found, transition to the next state.

103 # if found, transition to the next state.

85

104

86 def handle_read (self):

105 def handle_read (self):

87

106

88 try:

107 try:

89 data = self.recv (self.ac_in_buffer_size)

108 data = self.recv (self.ac_in_buffer_size)

90 except socket.error, why:

109 except socket.error, why:

91 self.handle_error()

110 self.handle_error()

92 return

111 return

93

112

94 self.ac_in_buffer = self.ac_in_buffer + data

113 self.ac_in_buffer = self.ac_in_buffer + data

95

114

96 # Continue to search for self.terminator in self.ac_in_buffer,

115 # Continue to search for self.terminator in self.ac_in_buffer,

97 # while calling self.collect_incoming_data. The while loop

116 # while calling self.collect_incoming_data. The while loop

98 # is necessary because we might read several data+terminator

117 # is necessary because we might read several data+terminator

99 # combos with a single recv(1024).

118 # combos with a single recv(4096).

100

119

101 while self.ac_in_buffer:

120 while self.ac_in_buffer:

102 lb = len(self.ac_in_buffer)

121 lb = len(self.ac_in_buffer)

103 terminator = self.get_terminator()

122 terminator = self.get_terminator()

104 if not terminator:

123 if not terminator:

105 # no terminator, collect it all

124 # no terminator, collect it all

106 self.collect_incoming_data (self.ac_in_buffer)

125 self.collect_incoming_data (self.ac_in_buffer)

107 self.ac_in_buffer = ''

126 self.ac_in_buffer = ''

108 elif isinstance(terminator, int) or isinstance(terminator, long):

127 elif isinstance(terminator, int) or isinstance(terminator, long):

109 # numeric terminator

128 # numeric terminator

(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading...

143 # we found a prefix, collect up to the prefix

162 # we found a prefix, collect up to the prefix

144 self.collect_incoming_data (self.ac_in_buffer[:-inde x])

163 self.collect_incoming_data (self.ac_in_buffer[:-inde x])

145 self.ac_in_buffer = self.ac_in_buffer[-index:]

164 self.ac_in_buffer = self.ac_in_buffer[-index:]

146 break

165 break

147 else:

166 else:

148 # no prefix, collect it all

167 # no prefix, collect it all

149 self.collect_incoming_data (self.ac_in_buffer)

168 self.collect_incoming_data (self.ac_in_buffer)

150 self.ac_in_buffer = ''

169 self.ac_in_buffer = ''

151

170

152 def handle_write (self):

171 def handle_write (self):

153 self.initiate_send ()

172 self.initiate_send()

154

173

155 def handle_close (self):

174 def handle_close (self):

156 self.close()

175 self.close()

157

176

158 def push (self, data):

177 def push (self, data):

159 self.producer_fifo.push (simple_producer (data))

178 sabs = self.ac_out_buffer_size

179 if len(data) > sabs:

180 for i in xrange(0, len(data), sabs):

181 self.producer_fifo.append(data[i:i+sabs])

182 else:

183 self.producer_fifo.append(data)

160 self.initiate_send()

184 self.initiate_send()

161

185

162 def push_with_producer (self, producer):

186 def push_with_producer (self, producer):

163 self.producer_fifo.push (producer)

187 self.producer_fifo.append(producer)

164 self.initiate_send()

188 self.initiate_send()

165

189

166 def readable (self):

190 def readable (self):

167 "predicate for inclusion in the readable for select()"

191 "predicate for inclusion in the readable for select()"

168 return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)

192 # cannot use the old predicate, it violates the claim of the

193 # set_terminator method.

194 ········

195 # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)

196 return 1·

169

197

170 def writable (self):

198 def writable (self):

171 "predicate for inclusion in the writable for select()"

199 "predicate for inclusion in the writable for select()"

172 # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self .connected)

200 return self.producer_fifo or (not self.connected)

173 # this is about twice as fast, though not as clear.

174 return not (

175 (self.ac_out_buffer == '') and

176 self.producer_fifo.is_empty() and

177 self.connected

178 )

179

201

180 def close_when_done (self):

202 def close_when_done (self):

181 "automatically close this channel once the outgoing queue is empty"

203 "automatically close this channel once the outgoing queue is empty"

182 self.producer_fifo.push (None)

204 self.producer_fifo.append(None)

183

205

184 # refill the outgoing buffer by calling the more() method

206 def initiate_send(self):

185 # of the first producer in the queue

207 while self.producer_fifo and self.connected:

186 def refill_buffer (self):

208 first = self.producer_fifo[0]

187 while 1:

209 # handle empty string/buffer or None entry

188 if len(self.producer_fifo):

210 if not first:

189 p = self.producer_fifo.first()

211 del self.producer_fifo[0]

190 # a 'None' in the producer fifo is a sentinel,

212 if first is None:

191 # telling us to close the channel.

213 self.handle_close()

192 if p is None:

193 if not self.ac_out_buffer:

194 self.producer_fifo.pop()

195 self.close()

196 return

214 return

197 elif isinstance(p, str):

215

198 self.producer_fifo.pop()

216 # handle classic producer behavior

199 self.ac_out_buffer = self.ac_out_buffer + p

217 obs = self.ac_out_buffer_size

200 return

218 try:

201 data = p.more()

219 data = buffer(first, 0, obs)

220 except TypeError:

221 data = first.more()

202 if data:

222 if data:

203 self.ac_out_buffer = self.ac_out_buffer + data

223 self.producer_fifo.appendleft(data)

204 return

205 else:

224 else:

206 self.producer_fifo.pop()

225 del self.producer_fifo[0]

207 else:

226 continue

227

228 # send the data

229 try:

230 num_sent = self.send(data)

231 except socket.error:

232 self.handle_error()

208 return

233 return

209

234

210 def initiate_send (self):

235 if num_sent:

211 obs = self.ac_out_buffer_size

236 if num_sent < len(data) or obs < len(first):

212 # try to refill the buffer

237 self.producer_fifo[0] = first[num_sent:]

213 if (len (self.ac_out_buffer) < obs):

238 else:

214 self.refill_buffer()

239 del self.producer_fifo[0]

215

240 # we tried to send some actual data

216 if self.ac_out_buffer and self.connected:

241 return

217 # try to send the buffer

218 try:

219 num_sent = self.send (self.ac_out_buffer[:obs])

220 if num_sent:

221 self.ac_out_buffer = self.ac_out_buffer[num_sent:]

222

223 except socket.error, why:

224 self.handle_error()

225 return

226

242

227 def discard_buffers (self):

243 def discard_buffers (self):

228 # Emergencies only!

244 # Emergencies only!

229 self.ac_in_buffer = ''

245 self.ac_in_buffer = ''

230 self.ac_out_buffer = ''

246 del self.incoming[:]

231 while self.producer_fifo:

247 self.producer_fifo.clear()

232 self.producer_fifo.pop()

233

234

248

235 class simple_producer:

249 class simple_producer:

236

250

237 def __init__ (self, data, buffer_size=512):

251 def __init__ (self, data, buffer_size=512):

238 self.data = data

252 self.data = data

239 self.buffer_size = buffer_size

253 self.buffer_size = buffer_size

240

254

241 def more (self):

255 def more (self):

242 if len (self.data) > self.buffer_size:

256 if len (self.data) > self.buffer_size:

243 result = self.data[:self.buffer_size]

257 result = self.data[:self.buffer_size]

(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading...

286 # new python: 28961/s

300 # new python: 28961/s

287 # old python: 18307/s

301 # old python: 18307/s

288 # re: 12820/s

302 # re: 12820/s

289 # regex: 14035/s

303 # regex: 14035/s

290

304

291 def find_prefix_at_end (haystack, needle):

305 def find_prefix_at_end (haystack, needle):

292 l = len(needle) - 1

306 l = len(needle) - 1

293 while l and not haystack.endswith(needle[:l]):

307 while l and not haystack.endswith(needle[:l]):

294 l -= 1

308 l -= 1

295 return l

309 return l

OLD

NEW