[issue5863] bz2.BZ2File should accept other file-like objects. - Code Review (original) (raw)

OLD

NEW

(Empty)

1 """Interface to the libbzip2 compression library.

2

3 This module provides a file interface, classes for incremental

4 (de)compression, and functions for one-shot (de)compression.

5 """

6

7 __all__ = ["BZ2File", "BZ2Compressor", "BZ2Decompressor", "compress",

8 "decompress"]

9

10 __author__ = "Nadeem Vawda nadeem.vawda@gmail.com"

11

12 import _bz2

13 import threading

14 import warnings

15

16

17 _MODE_CLOSED = 0

18 _MODE_READ = 1

19 _MODE_READ_EOF = 2

20 _MODE_WRITE = 3

21

22 _BUFFER_SIZE = 8192

23

24

25 class BZ2File:

26

27 """A file object providing transparent bzip2 (de)compression.

28

29 A BZ2File can act as a wrapper for an existing file object, or refer

30 directly to a named file on disk.

31

32 Note that BZ2File provides a *binary* file interface - data read is

33 returned as bytes, and data to be written should be given as bytes.

34 """

35

36 def __init__(self, filename=None, mode="r", buffering=None,

37 compresslevel=9, fileobj=None):

38 """Open a bzip2-compressed file.

39

40 If filename is given, open the named file. Otherwise, operate on

41 the file object given by fileobj. Exactly one of these two

42 parameters should be provided.

43

44 mode can be 'r' for reading (default), or 'w' for writing.

45

46 buffering is ignored. Its use is deprecated.

47

48 If mode is 'w', compresslevel can be a number between 1 and 9

49 specifying the level of compression: 1 produces the least

50 compression, and 9 (default) produces the most compression.

51 """

52 self._lock = threading.Lock()

53 self._fp = None

54 self._closefp = False

55 self._mode = _MODE_CLOSED

56 self._pos = 0

57 self._size = -1

58

59 if buffering is not None:

60 warnings.warn("Use of 'buffering' argument is deprecated",

61 DeprecationWarning)

62

63 if not (1 <= compresslevel <= 9):

64 raise ValueError("compresslevel must be between 1 and 9")

65

66 if mode in ("", "r", "rb"):

67 mode = "rb"

68 mode_code = _MODE_READ

69 self._decompressor = _bz2.Decompressor()

70 self._buffer = None

71 elif mode in ("w", "wb"):

72 mode = "wb"

73 mode_code = _MODE_WRITE

74 self._compressor = _bz2.Compressor()

75 else:

76 raise ValueError("Invalid mode: {!r}".format(mode))

77

78 if filename is not None and fileobj is None:

79 self._fp = open(filename, mode)

80 self._closefp = True

81 self._mode = mode_code

82 elif fileobj is not None and filename is None:

83 self._fp = fileobj

84 self._mode = mode_code

85 else:

86 raise ValueError("Must give exactly one of filename and fileobj")

87

88 def __del__(self):

89 self._close()

90

91 def _close(self):

92 if self._mode == _MODE_CLOSED:

93 return

94 try:

95 if self._mode in (_MODE_READ, _MODE_READ_EOF):

96 self._decompressor = None

97 elif self._mode == _MODE_WRITE:

98 self._fp.write(self._compressor.flush())

99 self._compressor = None

100 finally:

101 try:ยท

102 if self._closefp:

103 self._fp.close()

104 finally:

105 self._fp = None

106 self._closefp = False

107 self._mode = _MODE_CLOSED

108 self._buffer = None

109

110 # Context management protocol:

111

112 def __enter__(self):

113 self._check_not_closed()

114 return self

115

116 def __exit__(self, *args):

117 self.close()

118

119 # Iterator protocol for line iteration:

120

121 def __iter__(self):

122 self._check_not_closed()

123 return self

124

125 def __next__(self):

126 with self._lock:

127 self._check_can_read()

128 line = self._read_line(-1)

129 if not line:

130 raise StopIteration()

131 return line

132

133 # Public I/O methods:

134

135 def read(self, size=-1):

136 """Read up to size uncompressed bytes from the file.

137

138 If size is negative or omitted, read until EOF is reached.

139 Returns b'' if the file is already at EOF.

140 """

141 with self._lock:

142 self._check_can_read()

143 if self._mode == _MODE_READ_EOF or size == 0:

144 return b""

145 elif size < 0:

146 return self._read_all()

147 else:

148 return self._read_block(size)

149

150 def peek(self, n=0):

151 """Return buffered data without advancing the file position.

152

153 At least one byte of data will be returned (unless at EOF).

154 The exact number of bytes returned is unspecified.

155 """

156 with self._lock:

157 self._check_can_read()

158 if self._mode == _MODE_READ_EOF:

159 return b""

160 self._fill_buffer()

161 return self._buffer

162

163 def readline(self, size=-1):

164 """Read a line of uncompressed bytes from the file.

165

166 The terminating newline (if present) is retained. If size is

167 non-negative, no more than size bytes will be read (in which

168 case the line may be incomplete). Returns b'' if already at EOF.

169 """

170 with self._lock:

171 self._check_can_read()

172 return self._read_line(size)

173

174 def readlines(self, size=-1):

175 """Read a list of lines of uncompressed bytes from the file.

176

177 size can be specified to control the number of lines read: no

178 further lines will be read once the total size of the lines read

179 so far equals or exceeds size.

180 """

181 with self._lock:

182 self._check_can_read()

183 lines = []

184 nread = 0

185 while (size < 0 or nread < size) and self._mode != _MODE_READ_EOF:

186 line = self._read_line(-1)

187 if not line:

188 break

189 lines.append(line)

190 nread += len(line)

191 return lines

192

193 def write(self, data):

194 """Write a byte string to the file.

195

196 Returns the number of uncompressed bytes written, which is

197 always len(data). Note that due to buffering, the file on disk

198 may not reflect the data written until close() is called.

199 """

200 with self._lock:

201 self._check_can_write()

202 return self._write(data)

203

204 def writelines(self, seq):

205 """Write a sequence of byte strings to the file.

206

207 Returns the number of uncompressed bytes written.

208 seq can be any iterable yielding byte strings.

209

210 Line separators are not added between the written byte strings.

211 """

212 with self._lock:

213 self._check_can_write()

214 nwritten = 0

215 for block in seq:

216 nwritten += self._write(block)

217 return nwritten

218

219 def seek(self, offset, whence=0):

220 """Change the file position.

221

222 The new position is specified by offset, relative to the

223 position indicated by whence. Values for whence are:

224

225 0: start of stream (default); offset must not be negative

226 1: current stream position

227 2: end of stream; offset must not be positive

228

229 Returns the new file position.

230

231 Note that seeking is emulated, so depending on the parameters,

232 this operation may be extremely slow.

233 """

234 with self._lock:

235 self._check_can_seek()

236

237 # Recalculate offset as an absolute file position.

238 if whence == 0:

239 pass

240 elif whence == 1:

241 offset = self._pos + offset

242 elif whence == 2:

243 # Seeking relative to EOF - we need to know the file's size.

244 if self._size < 0:

245 self._read_all(return_data=False)

246 offset = self._size + offset

247 else:

248 raise ValueError("Invalid value for whence: {}".format(whence))

249

250 # Make it so that offset is the number of bytes to skip forward.

251 if offset < self._pos:

252 self._rewind()

253 else:

254 offset -= self._pos

255

256 # Read and discard data until we reach the desired position.

257 if self._mode != _MODE_READ_EOF:

258 self._read_block(offset, return_data=False)

259

260 return self._pos

261

262 def tell(self):

263 """Return the current file position."""

264 with self._lock:

265 self._check_not_closed()

266 return self._pos

267

268 def close(self):

269 """Flush and close the file.

270

271 May be called more than once without error. Once the file is

272 closed, any other operation on it will raise a ValueError.

273 """

274 with self._lock:

275 self._close()

276

277 @property

278 def closed(self):

279 """True if this file is closed."""

280 return self._mode == _MODE_CLOSED

281

282 # Helper methods. All methods below here (except for _check_*())

283 # assume that the caller owns the object's lock.

284

285 def _check_not_closed(self):

286 if self._mode == _MODE_CLOSED:

287 raise ValueError("I/O operation on closed file")

288

289 def _check_can_read(self):

290 if self._mode not in (_MODE_READ, _MODE_READ_EOF):

291 self._check_not_closed()

292 raise IOError("File not open for reading")

293

294 def _check_can_write(self):

295 if self._mode != _MODE_WRITE:

296 self._check_not_closed()

297 raise IOError("File not open for writing")

298

299 def _check_can_seek(self):

300 if self._mode not in (_MODE_READ, _MODE_READ_EOF):

301 self._check_not_closed()

302 raise IOError("Seeking only works on files opening for reading")

303

304 # Fill the readahead buffer if it is empty. Returns False on EOF.

305 def _fill_buffer(self):

306 if self._buffer:

307 return True

308 if self._decompressor.eof:

309 self._mode = _MODE_READ_EOF

310 self._size = self._pos

311 return False

312 rawblock = self._fp.read(_BUFFER_SIZE)

313 if not rawblock:

314 raise EOFError("Compressed file ended before the "

315 "end-of-stream marker was reached")

316 self._buffer = self._decompressor.decompress(rawblock)

317 return True

318

319 # Read data until EOF.

320 # If return_data is false, consume the data without returning it.

321 def _read_all(self, return_data=True):

322 blocks = []

323 while self._fill_buffer():

324 if return_data:

325 blocks.append(self._buffer)

326 self._pos += len(self._buffer)

327 self._buffer = None

328 if return_data:

329 return b"".join(blocks)

330

331 # Read a block of up to size bytes.

332 # If return_data is false, consume the data without returning it.

333 def _read_block(self, size, return_data=True):

334 blocks = []

335 nread = 0

336 while nread < size and self._fill_buffer():

337 nbuf = len(self._buffer)

338 ct = min(nbuf, size - nread)

339 if return_data:

340 blocks.append(self._buffer[:ct] if ct < nbuf else self._buffer)

341 nread += ct

342 self._pos += ct

343 self._buffer = self._buffer[ct:] if ct < nbuf else None

344 if return_data:

345 return b"".join(blocks)

346

347 # Read a single line of up to size bytes. Negative size -> no size limit.

348 def _read_line(self, size):

349 blocks = []

350 nread = 0

351 while (size < 0 or nread < size) and self._fill_buffer():

352 nbuf = len(self._buffer)

353 if size < 0 or nbuf <= size - nread:

354 limit = nbuf

355 else:

356 limit = size - nread

357 try:

358 ct = self._buffer.index(b'\n', 0, limit) + 1

359 found_eol = True

360 except ValueError:

361 ct = limit

362 found_eol = False

363 blocks.append(self._buffer[:ct] if ct < nbuf else self._buffer)

364 self._pos += ct

365 self._buffer = self._buffer[ct:] if ct < nbuf else None

366 if found_eol:

367 break

368 return b"".join(blocks)

369

370 # Compress and write a block of data.

371 def _write(self, data):

372 compressed = self._compressor.compress(data)

373 self._fp.write(compressed)

374 self._pos += len(data)

375 return len(data)

376

377 # Rewind the file to the beginning of the data stream.

378 def _rewind(self):

379 self._fp.seek(0, 0)

380 self._mode = _MODE_READ

381 self._pos = 0

382 self._decompressor = _bz2.Decompressor()

383 self._buffer = None

384

385

386 class BZ2Compressor:

387

388 """A compressor object for compressing data incrementally.

389

390 For one-shot compression, use the compress() function instead.

391 """

392

393 def __init__(self, compresslevel=9):

394 """Create a new compressor object.

395

396 compresslevel, if given, must be a number between 1 and 9.

397 """

398 if not (1 <= compresslevel <= 9):

399 raise ValueError("compresslevel must be between 1 and 9")

400 self._lock = threading.Lock()

401 self._compressor = _bz2.Compressor(compresslevel)

402

403 def compress(self, data):

404 """Provide data to the compressor object.

405

406 Returns a chunk of decompressed data if possible, or b''

407 otherwise.

408

409 When you have finished providing data to the compressor, call

410 the flush() method to finish the compression process.

411 """

412 with self._lock:

413 return self._compressor.compress(data)

414

415 def flush(self):

416 """Flush the internal buffers of the compressor object.

417

418 Returns any compressed data that was not returned by earlier

419 calls to decompress().

420

421 Once this method has been called, the compressor object may no

422 longer be used.

423 """

424 with self._lock:

425 return self._compressor.flush()

426

427

428 class BZ2Decompressor:

429

430 """A decompressor object for decompressing data incrementally.

431

432 For one-shot decompression, use the decompress() function instead.

433 """

434

435 def __init__(self):

436 """Create a new decompressor object."""

437 self._lock = threading.Lock()

438 self._decompressor = _bz2.Decompressor()

439

440 def decompress(self, data):

441 """Provide data to the decompressor object.

442

443 Returns a chunk of decompressed data if possible, or b''

444 otherwise.

445

446 Attempting to decompress data after the end of stream is reached

447 raises an EOFError. Any data found after the end of the stream

448 is ignored and saved in the unused_data attribute.

449 """

450 with self._lock:

451 return self._decompressor.decompress(data)

452

453 @property

454 def eof(self):

455 """True if the end-of-stream marker has been reached."""

456 return self._decompressor.eof

457

458 @property

459 def unused_data(self):

460 """Data found after the end of the compressed stream."""

461 return self._decompressor.unused_data

462

463

464 def compress(data, compresslevel=9):

465 """Compress a block of data.

466

467 compresslevel, if given, must be a number between 1 and 9.

468

469 For incremental compression, use a BZ2Compressor object instead.

470 """

471 comp = _bz2.Compressor(compresslevel)

472 return comp.compress(data) + comp.flush()

473

474

475 def decompress(data):

476 """Decompress a block of data.

477

478 For incremental decompression, use a BZ2Decompressor object instead.

479 """

480 if len(data) == 0:

481 return b""

482 decomp = _bz2.Decompressor()

483 result = decomp.decompress(data)

484 if not decomp.eof:

485 raise ValueError("Compressed data ended before the "

486 "end-of-stream marker was reached")

487 return result

OLD

NEW