[issue1736190] asyncore/asynchat patches - Code Review (original) (raw)
OLD
NEW
1 # -*- Mode: Python; tab-width: 4 -*-
1 # -*- Mode: Python; tab-width: 4 -*-
2 # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
2 # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing rushing@nightmare.com
3 # Author: Sam Rushing rushing@nightmare.com
4
4
5 # ======================================================================
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
6 # Copyright 1996 by Sam Rushing
7 #
7 #
8 # All Rights Reserved
8 # All Rights Reserved
9 #
9 #
10 # Permission to use, copy, modify, and distribute this software and
10 # Permission to use, copy, modify, and distribute this software and
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading...
53 class async_chat (asyncore.dispatcher):
53 class async_chat (asyncore.dispatcher):
54 """This is an abstract class. You must derive from this class, and add
54 """This is an abstract class. You must derive from this class, and add
55 the two methods collect_incoming_data() and found_terminator()"""
55 the two methods collect_incoming_data() and found_terminator()"""
56
56
57 # these are overridable defaults
57 # these are overridable defaults
58
58
59 ac_in_buffer_size = 4096
59 ac_in_buffer_size = 4096
60 ac_out_buffer_size = 4096
60 ac_out_buffer_size = 4096
61
61
62 def __init__ (self, conn=None):
62 def __init__ (self, conn=None):
63 # for string terminator matching
63 self.ac_in_buffer = ''
64 self.ac_in_buffer = ''
64 self.ac_out_buffer = ''
65 ········
65 self.producer_fifo = fifo()
66 # we use a list here rather than cStringIO for a few reasons...
67 # del lst[:] is faster than sio.truncate(0)
68 # lst = [] is faster than sio.truncate(0)
69 # cStringIO will be gaining unicode support in py3k, which
70 # will negatively affect the performance of bytes compared to
71 # a ''.join() equivalent
72 self.incoming = []
73 ········
74 # we toss the use of the "simple producer" and replace it with
75 # a pure deque, which the original fifo was a wrapping of
76 self.producer_fifo = deque()
66 asyncore.dispatcher.__init__ (self, conn)
77 asyncore.dispatcher.__init__ (self, conn)
67
78
68 def collect_incoming_data(self, data):
79 def collect_incoming_data(self, data):
69 raise NotImplementedError, "must be implemented in subclass"
80 raise NotImplementedError("must be implemented in subclass")
81 ····
82 def _collect_incoming_data(self, data):
83 self.incoming.append(data)
84 ····
85 def _get_data(self):
86 d = ''.join(self.incoming)
87 del self.incoming[:]
88 return d
70
89
71 def found_terminator(self):
90 def found_terminator(self):
72 raise NotImplementedError, "must be implemented in subclass"
91 raise NotImplementedError("must be implemented in subclass")
73
92
74 def set_terminator (self, term):
93 def set_terminator (self, term):
75 "Set the input delimiter. Can be a fixed string of any length, an integ er, or None"
94 "Set the input delimiter. Can be a fixed string of any length, an integ er, or None"
76 self.terminator = term
95 self.terminator = term
77
96
78 def get_terminator (self):
97 def get_terminator (self):
79 return self.terminator
98 return self.terminator
80
99
81 # grab some more data from the socket,
100 # grab some more data from the socket,
82 # throw it to the collector method,
101 # throw it to the collector method,
83 # check for the terminator,
102 # check for the terminator,
84 # if found, transition to the next state.
103 # if found, transition to the next state.
85
104
86 def handle_read (self):
105 def handle_read (self):
87
106
88 try:
107 try:
89 data = self.recv (self.ac_in_buffer_size)
108 data = self.recv (self.ac_in_buffer_size)
90 except socket.error, why:
109 except socket.error, why:
91 self.handle_error()
110 self.handle_error()
92 return
111 return
93
112
94 self.ac_in_buffer = self.ac_in_buffer + data
113 self.ac_in_buffer = self.ac_in_buffer + data
95
114
96 # Continue to search for self.terminator in self.ac_in_buffer,
115 # Continue to search for self.terminator in self.ac_in_buffer,
97 # while calling self.collect_incoming_data. The while loop
116 # while calling self.collect_incoming_data. The while loop
98 # is necessary because we might read several data+terminator
117 # is necessary because we might read several data+terminator
99 # combos with a single recv(1024).
118 # combos with a single recv(4096).
100
119
101 while self.ac_in_buffer:
120 while self.ac_in_buffer:
102 lb = len(self.ac_in_buffer)
121 lb = len(self.ac_in_buffer)
103 terminator = self.get_terminator()
122 terminator = self.get_terminator()
104 if not terminator:
123 if not terminator:
105 # no terminator, collect it all
124 # no terminator, collect it all
106 self.collect_incoming_data (self.ac_in_buffer)
125 self.collect_incoming_data (self.ac_in_buffer)
107 self.ac_in_buffer = ''
126 self.ac_in_buffer = ''
108 elif isinstance(terminator, int) or isinstance(terminator, long):
127 elif isinstance(terminator, int) or isinstance(terminator, long):
109 # numeric terminator
128 # numeric terminator
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading...
143 # we found a prefix, collect up to the prefix
162 # we found a prefix, collect up to the prefix
144 self.collect_incoming_data (self.ac_in_buffer[:-inde x])
163 self.collect_incoming_data (self.ac_in_buffer[:-inde x])
145 self.ac_in_buffer = self.ac_in_buffer[-index:]
164 self.ac_in_buffer = self.ac_in_buffer[-index:]
146 break
165 break
147 else:
166 else:
148 # no prefix, collect it all
167 # no prefix, collect it all
149 self.collect_incoming_data (self.ac_in_buffer)
168 self.collect_incoming_data (self.ac_in_buffer)
150 self.ac_in_buffer = ''
169 self.ac_in_buffer = ''
151
170
152 def handle_write (self):
171 def handle_write (self):
153 self.initiate_send ()
172 self.initiate_send()
154
173
155 def handle_close (self):
174 def handle_close (self):
156 self.close()
175 self.close()
157
176
158 def push (self, data):
177 def push (self, data):
159 self.producer_fifo.push (simple_producer (data))
178 sabs = self.ac_out_buffer_size
179 if len(data) > sabs:
180 for i in xrange(0, len(data), sabs):
181 self.producer_fifo.append(data[i:i+sabs])
182 else:
183 self.producer_fifo.append(data)
160 self.initiate_send()
184 self.initiate_send()
161
185
162 def push_with_producer (self, producer):
186 def push_with_producer (self, producer):
163 self.producer_fifo.push (producer)
187 self.producer_fifo.append(producer)
164 self.initiate_send()
188 self.initiate_send()
165
189
166 def readable (self):
190 def readable (self):
167 "predicate for inclusion in the readable for select()"
191 "predicate for inclusion in the readable for select()"
168 return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
192 # cannot use the old predicate, it violates the claim of the
193 # set_terminator method.
194 ········
195 # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
196 return 1·
169
197
170 def writable (self):
198 def writable (self):
171 "predicate for inclusion in the writable for select()"
199 "predicate for inclusion in the writable for select()"
172 # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self .connected)
200 return self.producer_fifo or (not self.connected)
173 # this is about twice as fast, though not as clear.
174 return not (
175 (self.ac_out_buffer == '') and
176 self.producer_fifo.is_empty() and
177 self.connected
178 )
179
201
180 def close_when_done (self):
202 def close_when_done (self):
181 "automatically close this channel once the outgoing queue is empty"
203 "automatically close this channel once the outgoing queue is empty"
182 self.producer_fifo.push (None)
204 self.producer_fifo.append(None)
183
205
184 # refill the outgoing buffer by calling the more() method
206 def initiate_send(self):
185 # of the first producer in the queue
207 while self.producer_fifo and self.connected:
186 def refill_buffer (self):
208 first = self.producer_fifo[0]
187 while 1:
209 # handle empty string/buffer or None entry
188 if len(self.producer_fifo):
210 if not first:
189 p = self.producer_fifo.first()
211 del self.producer_fifo[0]
190 # a 'None' in the producer fifo is a sentinel,
212 if first is None:
191 # telling us to close the channel.
213 self.handle_close()
192 if p is None:
193 if not self.ac_out_buffer:
194 self.producer_fifo.pop()
195 self.close()
196 return
214 return
197 elif isinstance(p, str):
215
198 self.producer_fifo.pop()
216 # handle classic producer behavior
199 self.ac_out_buffer = self.ac_out_buffer + p
217 obs = self.ac_out_buffer_size
200 return
218 try:
201 data = p.more()
219 data = buffer(first, 0, obs)
220 except TypeError:
221 data = first.more()
202 if data:
222 if data:
203 self.ac_out_buffer = self.ac_out_buffer + data
223 self.producer_fifo.appendleft(data)
204 return
205 else:
224 else:
206 self.producer_fifo.pop()
225 del self.producer_fifo[0]
207 else:
226 continue
227
228 # send the data
229 try:
230 num_sent = self.send(data)
231 except socket.error:
232 self.handle_error()
208 return
233 return
209
234
210 def initiate_send (self):
235 if num_sent:
211 obs = self.ac_out_buffer_size
236 if num_sent < len(data) or obs < len(first):
212 # try to refill the buffer
237 self.producer_fifo[0] = first[num_sent:]
213 if (len (self.ac_out_buffer) < obs):
238 else:
214 self.refill_buffer()
239 del self.producer_fifo[0]
215
240 # we tried to send some actual data
216 if self.ac_out_buffer and self.connected:
241 return
217 # try to send the buffer
218 try:
219 num_sent = self.send (self.ac_out_buffer[:obs])
220 if num_sent:
221 self.ac_out_buffer = self.ac_out_buffer[num_sent:]
222
223 except socket.error, why:
224 self.handle_error()
225 return
226
242
227 def discard_buffers (self):
243 def discard_buffers (self):
228 # Emergencies only!
244 # Emergencies only!
229 self.ac_in_buffer = ''
245 self.ac_in_buffer = ''
230 self.ac_out_buffer = ''
246 del self.incoming[:]
231 while self.producer_fifo:
247 self.producer_fifo.clear()
232 self.producer_fifo.pop()
233
234
248
235 class simple_producer:
249 class simple_producer:
236
250
237 def __init__ (self, data, buffer_size=512):
251 def __init__ (self, data, buffer_size=512):
238 self.data = data
252 self.data = data
239 self.buffer_size = buffer_size
253 self.buffer_size = buffer_size
240
254
241 def more (self):
255 def more (self):
242 if len (self.data) > self.buffer_size:
256 if len (self.data) > self.buffer_size:
243 result = self.data[:self.buffer_size]
257 result = self.data[:self.buffer_size]
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading...
286 # new python: 28961/s
300 # new python: 28961/s
287 # old python: 18307/s
301 # old python: 18307/s
288 # re: 12820/s
302 # re: 12820/s
289 # regex: 14035/s
303 # regex: 14035/s
290
304
291 def find_prefix_at_end (haystack, needle):
305 def find_prefix_at_end (haystack, needle):
292 l = len(needle) - 1
306 l = len(needle) - 1
293 while l and not haystack.endswith(needle[:l]):
307 while l and not haystack.endswith(needle[:l]):
294 l -= 1
308 l -= 1
295 return l
309 return l
OLD
NEW