Committing Py3k version of changelist 64080 and 64257, along with upd… · python/cpython@d74900e (original) (raw)
`@@ -45,12 +45,23 @@
`
45
45
`method) up to the terminator, and then control will be returned to
`
46
46
`you - by calling your self.found_terminator() method.
`
47
47
`"""
`
48
``
-
49
``
`-
import sys
`
50
48
`import socket
`
51
49
`import asyncore
`
52
50
`from collections import deque
`
53
51
``
``
52
`+
def buffer(obj, start=None, stop=None):
`
``
53
`+
if memoryview objects gain slicing semantics,
`
``
54
`+
this function will change for the better
`
``
55
`+
memoryview used for the TypeError
`
``
56
`+
memoryview(obj)
`
``
57
`+
if start == None:
`
``
58
`+
start = 0
`
``
59
`+
if stop == None:
`
``
60
`+
stop = len(obj)
`
``
61
`+
x = obj[start:stop]
`
``
62
`+
print("buffer type is: %s"%(type(x),))
`
``
63
`+
return x
`
``
64
+
54
65
`class async_chat (asyncore.dispatcher):
`
55
66
`"""This is an abstract class. You must derive from this class, and add
`
56
67
` the two methods collect_incoming_data() and found_terminator()"""
`
`@@ -60,20 +71,47 @@ class async_chat (asyncore.dispatcher):
`
60
71
`ac_in_buffer_size = 4096
`
61
72
`ac_out_buffer_size = 4096
`
62
73
``
``
74
`+
we don't want to enable the use of encoding by default, because that is a
`
``
75
`+
sign of an application bug that we don't want to pass silently
`
``
76
+
``
77
`+
use_encoding = 0
`
``
78
`+
encoding = 'latin1'
`
``
79
+
63
80
`def init (self, conn=None):
`
``
81
`+
for string terminator matching
`
64
82
`self.ac_in_buffer = b''
`
65
``
`-
self.ac_out_buffer = b''
`
66
``
`-
self.producer_fifo = fifo()
`
``
83
+
``
84
`+
we use a list here rather than cStringIO for a few reasons...
`
``
85
`+
del lst[:] is faster than sio.truncate(0)
`
``
86
`+
lst = [] is faster than sio.truncate(0)
`
``
87
`+
cStringIO will be gaining unicode support in py3k, which
`
``
88
`+
will negatively affect the performance of bytes compared to
`
``
89
`+
a ''.join() equivalent
`
``
90
`+
self.incoming = []
`
``
91
+
``
92
`+
we toss the use of the "simple producer" and replace it with
`
``
93
`+
a pure deque, which the original fifo was a wrapping of
`
``
94
`+
self.producer_fifo = deque()
`
67
95
`asyncore.dispatcher.init (self, conn)
`
68
96
``
69
97
`def collect_incoming_data(self, data):
`
70
98
`raise NotImplementedError("must be implemented in subclass")
`
71
99
``
``
100
`+
def _collect_incoming_data(self, data):
`
``
101
`+
self.incoming.append(data)
`
``
102
+
``
103
`+
def _get_data(self):
`
``
104
`+
d = b''.join(self.incoming)
`
``
105
`+
del self.incoming[:]
`
``
106
`+
return d
`
``
107
+
72
108
`def found_terminator(self):
`
73
109
`raise NotImplementedError("must be implemented in subclass")
`
74
110
``
75
111
`def set_terminator (self, term):
`
76
112
`"Set the input delimiter. Can be a fixed string of any length, an integer, or None"
`
``
113
`+
if isinstance(term, str) and self.use_encoding:
`
``
114
`+
term = bytes(term, self.encoding)
`
77
115
`self.terminator = term
`
78
116
``
79
117
`def get_terminator (self):
`
`@@ -92,14 +130,14 @@ def handle_read (self):
`
92
130
`self.handle_error()
`
93
131
`return
`
94
132
``
95
``
`-
if isinstance(data, str):
`
96
``
`-
data = data.encode('ascii')
`
97
``
`-
self.ac_in_buffer = self.ac_in_buffer + bytes(data)
`
``
133
`+
if isinstance(data, str) and self.use_encoding:
`
``
134
`+
data = bytes(str, self.encoding)
`
``
135
`+
self.ac_in_buffer = self.ac_in_buffer + data
`
98
136
``
99
137
`# Continue to search for self.terminator in self.ac_in_buffer,
`
100
138
`# while calling self.collect_incoming_data. The while loop
`
101
139
`# is necessary because we might read several data+terminator
`
102
``
`-
combos with a single recv(1024).
`
``
140
`+
combos with a single recv(4096).
`
103
141
``
104
142
`while self.ac_in_buffer:
`
105
143
`lb = len(self.ac_in_buffer)
`
`@@ -108,7 +146,7 @@ def handle_read (self):
`
108
146
`# no terminator, collect it all
`
109
147
`self.collect_incoming_data (self.ac_in_buffer)
`
110
148
`self.ac_in_buffer = b''
`
111
``
`-
elif isinstance(terminator, int) or isinstance(terminator, int):
`
``
149
`+
elif isinstance(terminator, int):
`
112
150
`# numeric terminator
`
113
151
`n = terminator
`
114
152
`if lb < n:
`
`@@ -129,8 +167,6 @@ def handle_read (self):
`
129
167
`# 3) end of buffer does not match any prefix:
`
130
168
`# collect data
`
131
169
`terminator_len = len(terminator)
`
132
``
`-
if isinstance(terminator, str):
`
133
``
`-
terminator = terminator.encode('ascii')
`
134
170
`index = self.ac_in_buffer.find(terminator)
`
135
171
`if index != -1:
`
136
172
`# we found the terminator
`
`@@ -155,91 +191,87 @@ def handle_read (self):
`
155
191
`self.ac_in_buffer = b''
`
156
192
``
157
193
`def handle_write (self):
`
158
``
`-
self.initiate_send ()
`
``
194
`+
self.initiate_send()
`
159
195
``
160
196
`def handle_close (self):
`
161
197
`self.close()
`
162
198
``
163
199
`def push (self, data):
`
164
``
`-
self.producer_fifo.push (simple_producer (data))
`
``
200
`+
sabs = self.ac_out_buffer_size
`
``
201
`+
if len(data) > sabs:
`
``
202
`+
for i in range(0, len(data), sabs):
`
``
203
`+
self.producer_fifo.append(data[i:i+sabs])
`
``
204
`+
else:
`
``
205
`+
self.producer_fifo.append(data)
`
165
206
`self.initiate_send()
`
166
207
``
167
208
`def push_with_producer (self, producer):
`
168
``
`-
self.producer_fifo.push (producer)
`
``
209
`+
self.producer_fifo.append(producer)
`
169
210
`self.initiate_send()
`
170
211
``
171
212
`def readable (self):
`
172
213
`"predicate for inclusion in the readable for select()"
`
173
``
`-
return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
`
``
214
`+
cannot use the old predicate, it violates the claim of the
`
``
215
`+
set_terminator method.
`
``
216
+
``
217
`+
return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
`
``
218
`+
return 1
`
174
219
``
175
220
`def writable (self):
`
176
221
`"predicate for inclusion in the writable for select()"
`
177
``
`-
return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)
`
178
``
`-
this is about twice as fast, though not as clear.
`
179
``
`-
return not (
`
180
``
`-
(self.ac_out_buffer == b'') and
`
181
``
`-
self.producer_fifo.is_empty() and
`
182
``
`-
self.connected
`
183
``
`-
)
`
``
222
`+
return self.producer_fifo or (not self.connected)
`
184
223
``
185
224
`def close_when_done (self):
`
186
225
`"automatically close this channel once the outgoing queue is empty"
`
187
``
`-
self.producer_fifo.push (None)
`
188
``
-
189
``
`-
refill the outgoing buffer by calling the more() method
`
190
``
`-
of the first producer in the queue
`
191
``
`-
def refill_buffer (self):
`
192
``
`-
while 1:
`
193
``
`-
if len(self.producer_fifo):
`
194
``
`-
p = self.producer_fifo.first()
`
195
``
`-
a 'None' in the producer fifo is a sentinel,
`
196
``
`-
telling us to close the channel.
`
197
``
`-
if p is None:
`
198
``
`-
if not self.ac_out_buffer:
`
199
``
`-
self.producer_fifo.pop()
`
200
``
`-
self.close()
`
201
``
`-
return
`
202
``
`-
elif isinstance(p, str) or isinstance(p, bytes):
`
203
``
`-
if isinstance(p, str):
`
204
``
`-
p = p.encode('ascii')
`
205
``
`-
self.producer_fifo.pop()
`
206
``
`-
self.ac_out_buffer = self.ac_out_buffer + p
`
``
226
`+
self.producer_fifo.append(None)
`
``
227
+
``
228
`+
def initiate_send(self):
`
``
229
`+
while self.producer_fifo and self.connected:
`
``
230
`+
first = self.producer_fifo[0]
`
``
231
`+
handle empty string/buffer or None entry
`
``
232
`+
if not first:
`
``
233
`+
del self.producer_fifo[0]
`
``
234
`+
if first is None:
`
``
235
`+
print("first is None")
`
``
236
`+
self.handle_close()
`
207
237
`return
`
208
``
`-
data = p.more()
`
``
238
`+
print("first is not None")
`
``
239
+
``
240
`+
handle classic producer behavior
`
``
241
`+
obs = self.ac_out_buffer_size
`
``
242
`+
try:
`
``
243
`+
data = buffer(first, 0, obs)
`
``
244
`+
except TypeError:
`
``
245
`+
data = first.more()
`
209
246
`if data:
`
210
``
`-
if isinstance(data, str):
`
211
``
`-
data = data.encode('ascii')
`
212
``
`-
self.ac_out_buffer = self.ac_out_buffer + bytes(data)
`
213
``
`-
return
`
``
247
`+
self.producer_fifo.appendleft(data)
`
214
248
`else:
`
215
``
`-
self.producer_fifo.pop()
`
216
``
`-
else:
`
217
``
`-
return
`
``
249
`+
del self.producer_fifo[0]
`
``
250
`+
continue
`
218
251
``
219
``
`-
def initiate_send (self):
`
220
``
`-
obs = self.ac_out_buffer_size
`
221
``
`-
try to refill the buffer
`
222
``
`-
if (len (self.ac_out_buffer) < obs):
`
223
``
`-
self.refill_buffer()
`
``
252
`+
if isinstance(data, str) and self.use_encoding:
`
``
253
`+
data = bytes(data, self.encoding)
`
224
254
``
225
``
`-
if self.ac_out_buffer and self.connected:
`
226
``
`-
try to send the buffer
`
``
255
`+
send the data
`
227
256
`try:
`
228
``
`-
num_sent = self.send (self.ac_out_buffer[:obs])
`
229
``
`-
if num_sent:
`
230
``
`-
self.ac_out_buffer = self.ac_out_buffer[num_sent:]
`
231
``
-
232
``
`-
except socket.error as why:
`
``
257
`+
num_sent = self.send(data)
`
``
258
`+
except socket.error:
`
233
259
`self.handle_error()
`
234
260
`return
`
235
261
``
``
262
`+
if num_sent:
`
``
263
`+
if num_sent < len(data) or obs < len(first):
`
``
264
`+
self.producer_fifo[0] = first[num_sent:]
`
``
265
`+
else:
`
``
266
`+
del self.producer_fifo[0]
`
``
267
`+
we tried to send some actual data
`
``
268
`+
return
`
``
269
+
236
270
`def discard_buffers (self):
`
237
271
`# Emergencies only!
`
238
272
`self.ac_in_buffer = b''
`
239
``
`-
self.ac_out_buffer = b''
`
240
``
`-
while self.producer_fifo:
`
241
``
`-
self.producer_fifo.pop()
`
242
``
-
``
273
`+
del self.incoming[:]
`
``
274
`+
self.producer_fifo.clear()
`
243
275
``
244
276
`class simple_producer:
`
245
277
``