Committing Py3k version of changelist 64080 and 64257, along with upd… · python/cpython@d74900e (original) (raw)

`@@ -45,12 +45,23 @@

`

45

45

`method) up to the terminator, and then control will be returned to

`

46

46

`you - by calling your self.found_terminator() method.

`

47

47

`"""

`

48

``

-

49

``

`-

import sys

`

50

48

`import socket

`

51

49

`import asyncore

`

52

50

`from collections import deque

`

53

51

``

``

52

`+

def buffer(obj, start=None, stop=None):

`

``

53

`+

if memoryview objects gain slicing semantics,

`

``

54

`+

this function will change for the better

`

``

55

`+

memoryview used for the TypeError

`

``

56

`+

memoryview(obj)

`

``

57

`+

if start == None:

`

``

58

`+

start = 0

`

``

59

`+

if stop == None:

`

``

60

`+

stop = len(obj)

`

``

61

`+

x = obj[start:stop]

`

``

62

`+

print("buffer type is: %s"%(type(x),))

`

``

63

`+

return x

`

``

64

+

54

65

`class async_chat (asyncore.dispatcher):

`

55

66

`"""This is an abstract class. You must derive from this class, and add

`

56

67

` the two methods collect_incoming_data() and found_terminator()"""

`

`@@ -60,20 +71,47 @@ class async_chat (asyncore.dispatcher):

`

60

71

`ac_in_buffer_size = 4096

`

61

72

`ac_out_buffer_size = 4096

`

62

73

``

``

74

`+

we don't want to enable the use of encoding by default, because that is a

`

``

75

`+

sign of an application bug that we don't want to pass silently

`

``

76

+

``

77

`+

use_encoding = 0

`

``

78

`+

encoding = 'latin1'

`

``

79

+

63

80

`def init (self, conn=None):

`

``

81

`+

for string terminator matching

`

64

82

`self.ac_in_buffer = b''

`

65

``

`-

self.ac_out_buffer = b''

`

66

``

`-

self.producer_fifo = fifo()

`

``

83

+

``

84

`+

we use a list here rather than cStringIO for a few reasons...

`

``

85

`+

del lst[:] is faster than sio.truncate(0)

`

``

86

`+

lst = [] is faster than sio.truncate(0)

`

``

87

`+

cStringIO will be gaining unicode support in py3k, which

`

``

88

`+

will negatively affect the performance of bytes compared to

`

``

89

`+

a ''.join() equivalent

`

``

90

`+

self.incoming = []

`

``

91

+

``

92

`+

we toss the use of the "simple producer" and replace it with

`

``

93

`+

a pure deque, which the original fifo was a wrapping of

`

``

94

`+

self.producer_fifo = deque()

`

67

95

`asyncore.dispatcher.init (self, conn)

`

68

96

``

69

97

`def collect_incoming_data(self, data):

`

70

98

`raise NotImplementedError("must be implemented in subclass")

`

71

99

``

``

100

`+

def _collect_incoming_data(self, data):

`

``

101

`+

self.incoming.append(data)

`

``

102

+

``

103

`+

def _get_data(self):

`

``

104

`+

d = b''.join(self.incoming)

`

``

105

`+

del self.incoming[:]

`

``

106

`+

return d

`

``

107

+

72

108

`def found_terminator(self):

`

73

109

`raise NotImplementedError("must be implemented in subclass")

`

74

110

``

75

111

`def set_terminator (self, term):

`

76

112

`"Set the input delimiter. Can be a fixed string of any length, an integer, or None"

`

``

113

`+

if isinstance(term, str) and self.use_encoding:

`

``

114

`+

term = bytes(term, self.encoding)

`

77

115

`self.terminator = term

`

78

116

``

79

117

`def get_terminator (self):

`

`@@ -92,14 +130,14 @@ def handle_read (self):

`

92

130

`self.handle_error()

`

93

131

`return

`

94

132

``

95

``

`-

if isinstance(data, str):

`

96

``

`-

data = data.encode('ascii')

`

97

``

`-

self.ac_in_buffer = self.ac_in_buffer + bytes(data)

`

``

133

`+

if isinstance(data, str) and self.use_encoding:

`

``

134

`+

data = bytes(str, self.encoding)

`

``

135

`+

self.ac_in_buffer = self.ac_in_buffer + data

`

98

136

``

99

137

`# Continue to search for self.terminator in self.ac_in_buffer,

`

100

138

`# while calling self.collect_incoming_data. The while loop

`

101

139

`# is necessary because we might read several data+terminator

`

102

``

`-

combos with a single recv(1024).

`

``

140

`+

combos with a single recv(4096).

`

103

141

``

104

142

`while self.ac_in_buffer:

`

105

143

`lb = len(self.ac_in_buffer)

`

`@@ -108,7 +146,7 @@ def handle_read (self):

`

108

146

`# no terminator, collect it all

`

109

147

`self.collect_incoming_data (self.ac_in_buffer)

`

110

148

`self.ac_in_buffer = b''

`

111

``

`-

elif isinstance(terminator, int) or isinstance(terminator, int):

`

``

149

`+

elif isinstance(terminator, int):

`

112

150

`# numeric terminator

`

113

151

`n = terminator

`

114

152

`if lb < n:

`

`@@ -129,8 +167,6 @@ def handle_read (self):

`

129

167

`# 3) end of buffer does not match any prefix:

`

130

168

`# collect data

`

131

169

`terminator_len = len(terminator)

`

132

``

`-

if isinstance(terminator, str):

`

133

``

`-

terminator = terminator.encode('ascii')

`

134

170

`index = self.ac_in_buffer.find(terminator)

`

135

171

`if index != -1:

`

136

172

`# we found the terminator

`

`@@ -155,91 +191,87 @@ def handle_read (self):

`

155

191

`self.ac_in_buffer = b''

`

156

192

``

157

193

`def handle_write (self):

`

158

``

`-

self.initiate_send ()

`

``

194

`+

self.initiate_send()

`

159

195

``

160

196

`def handle_close (self):

`

161

197

`self.close()

`

162

198

``

163

199

`def push (self, data):

`

164

``

`-

self.producer_fifo.push (simple_producer (data))

`

``

200

`+

sabs = self.ac_out_buffer_size

`

``

201

`+

if len(data) > sabs:

`

``

202

`+

for i in range(0, len(data), sabs):

`

``

203

`+

self.producer_fifo.append(data[i:i+sabs])

`

``

204

`+

else:

`

``

205

`+

self.producer_fifo.append(data)

`

165

206

`self.initiate_send()

`

166

207

``

167

208

`def push_with_producer (self, producer):

`

168

``

`-

self.producer_fifo.push (producer)

`

``

209

`+

self.producer_fifo.append(producer)

`

169

210

`self.initiate_send()

`

170

211

``

171

212

`def readable (self):

`

172

213

`"predicate for inclusion in the readable for select()"

`

173

``

`-

return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)

`

``

214

`+

cannot use the old predicate, it violates the claim of the

`

``

215

`+

set_terminator method.

`

``

216

+

``

217

`+

return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)

`

``

218

`+

return 1

`

174

219

``

175

220

`def writable (self):

`

176

221

`"predicate for inclusion in the writable for select()"

`

177

``

`-

return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)

`

178

``

`-

this is about twice as fast, though not as clear.

`

179

``

`-

return not (

`

180

``

`-

(self.ac_out_buffer == b'') and

`

181

``

`-

self.producer_fifo.is_empty() and

`

182

``

`-

self.connected

`

183

``

`-

)

`

``

222

`+

return self.producer_fifo or (not self.connected)

`

184

223

``

185

224

`def close_when_done (self):

`

186

225

`"automatically close this channel once the outgoing queue is empty"

`

187

``

`-

self.producer_fifo.push (None)

`

188

``

-

189

``

`-

refill the outgoing buffer by calling the more() method

`

190

``

`-

of the first producer in the queue

`

191

``

`-

def refill_buffer (self):

`

192

``

`-

while 1:

`

193

``

`-

if len(self.producer_fifo):

`

194

``

`-

p = self.producer_fifo.first()

`

195

``

`-

a 'None' in the producer fifo is a sentinel,

`

196

``

`-

telling us to close the channel.

`

197

``

`-

if p is None:

`

198

``

`-

if not self.ac_out_buffer:

`

199

``

`-

self.producer_fifo.pop()

`

200

``

`-

self.close()

`

201

``

`-

return

`

202

``

`-

elif isinstance(p, str) or isinstance(p, bytes):

`

203

``

`-

if isinstance(p, str):

`

204

``

`-

p = p.encode('ascii')

`

205

``

`-

self.producer_fifo.pop()

`

206

``

`-

self.ac_out_buffer = self.ac_out_buffer + p

`

``

226

`+

self.producer_fifo.append(None)

`

``

227

+

``

228

`+

def initiate_send(self):

`

``

229

`+

while self.producer_fifo and self.connected:

`

``

230

`+

first = self.producer_fifo[0]

`

``

231

`+

handle empty string/buffer or None entry

`

``

232

`+

if not first:

`

``

233

`+

del self.producer_fifo[0]

`

``

234

`+

if first is None:

`

``

235

`+

print("first is None")

`

``

236

`+

self.handle_close()

`

207

237

`return

`

208

``

`-

data = p.more()

`

``

238

`+

print("first is not None")

`

``

239

+

``

240

`+

handle classic producer behavior

`

``

241

`+

obs = self.ac_out_buffer_size

`

``

242

`+

try:

`

``

243

`+

data = buffer(first, 0, obs)

`

``

244

`+

except TypeError:

`

``

245

`+

data = first.more()

`

209

246

`if data:

`

210

``

`-

if isinstance(data, str):

`

211

``

`-

data = data.encode('ascii')

`

212

``

`-

self.ac_out_buffer = self.ac_out_buffer + bytes(data)

`

213

``

`-

return

`

``

247

`+

self.producer_fifo.appendleft(data)

`

214

248

`else:

`

215

``

`-

self.producer_fifo.pop()

`

216

``

`-

else:

`

217

``

`-

return

`

``

249

`+

del self.producer_fifo[0]

`

``

250

`+

continue

`

218

251

``

219

``

`-

def initiate_send (self):

`

220

``

`-

obs = self.ac_out_buffer_size

`

221

``

`-

try to refill the buffer

`

222

``

`-

if (len (self.ac_out_buffer) < obs):

`

223

``

`-

self.refill_buffer()

`

``

252

`+

if isinstance(data, str) and self.use_encoding:

`

``

253

`+

data = bytes(data, self.encoding)

`

224

254

``

225

``

`-

if self.ac_out_buffer and self.connected:

`

226

``

`-

try to send the buffer

`

``

255

`+

send the data

`

227

256

`try:

`

228

``

`-

num_sent = self.send (self.ac_out_buffer[:obs])

`

229

``

`-

if num_sent:

`

230

``

`-

self.ac_out_buffer = self.ac_out_buffer[num_sent:]

`

231

``

-

232

``

`-

except socket.error as why:

`

``

257

`+

num_sent = self.send(data)

`

``

258

`+

except socket.error:

`

233

259

`self.handle_error()

`

234

260

`return

`

235

261

``

``

262

`+

if num_sent:

`

``

263

`+

if num_sent < len(data) or obs < len(first):

`

``

264

`+

self.producer_fifo[0] = first[num_sent:]

`

``

265

`+

else:

`

``

266

`+

del self.producer_fifo[0]

`

``

267

`+

we tried to send some actual data

`

``

268

`+

return

`

``

269

+

236

270

`def discard_buffers (self):

`

237

271

`# Emergencies only!

`

238

272

`self.ac_in_buffer = b''

`

239

``

`-

self.ac_out_buffer = b''

`

240

``

`-

while self.producer_fifo:

`

241

``

`-

self.producer_fifo.pop()

`

242

``

-

``

273

`+

del self.incoming[:]

`

``

274

`+

self.producer_fifo.clear()

`

243

275

``

244

276

`class simple_producer:

`

245

277

``