[issue5863] bz2.BZ2File should accept other file-like objects. - Code Review (original) (raw)
OLD
NEW
(Empty)
1 """Interface to the libbzip2 compression library.
2
3 This module provides a file interface, classes for incremental
4 (de)compression, and functions for one-shot (de)compression.
5 """
6
7 __all__ = ["BZ2File", "BZ2Compressor", "BZ2Decompressor", "compress",
8 "decompress"]
9
10 __author__ = "Nadeem Vawda nadeem.vawda@gmail.com"
11
12 import _bz2
13 import threading
14 import warnings
15
16
17 _MODE_CLOSED = 0
18 _MODE_READ = 1
19 _MODE_READ_EOF = 2
20 _MODE_WRITE = 3
21
22 _BUFFER_SIZE = 8192
23
24
25 class BZ2File:
26
27 """A file object providing transparent bzip2 (de)compression.
28
29 A BZ2File can act as a wrapper for an existing file object, or refer
30 directly to a named file on disk.
31
32 Note that BZ2File provides a *binary* file interface - data read is
33 returned as bytes, and data to be written should be given as bytes.
34 """
35
36 def __init__(self, filename=None, mode="r", buffering=None,
37 compresslevel=9, fileobj=None):
38 """Open a bzip2-compressed file.
39
40 If filename is given, open the named file. Otherwise, operate on
41 the file object given by fileobj. Exactly one of these two
42 parameters should be provided.
43
44 mode can be 'r' for reading (default), or 'w' for writing.
45
46 buffering is ignored. Its use is deprecated.
47
48 If mode is 'w', compresslevel can be a number between 1 and 9
49 specifying the level of compression: 1 produces the least
50 compression, and 9 (default) produces the most compression.
51 """
52 self._lock = threading.Lock()
53 self._fp = None
54 self._closefp = False
55 self._mode = _MODE_CLOSED
56 self._pos = 0
57 self._size = -1
58
59 if buffering is not None:
60 warnings.warn("Use of 'buffering' argument is deprecated",
61 DeprecationWarning)
62
63 if not (1 <= compresslevel <= 9):
64 raise ValueError("compresslevel must be between 1 and 9")
65
66 if mode in ("", "r", "rb"):
67 mode = "rb"
68 mode_code = _MODE_READ
69 self._decompressor = _bz2.Decompressor()
70 self._buffer = None
71 elif mode in ("w", "wb"):
72 mode = "wb"
73 mode_code = _MODE_WRITE
74 self._compressor = _bz2.Compressor()
75 else:
76 raise ValueError("Invalid mode: {!r}".format(mode))
77
78 if filename is not None and fileobj is None:
79 self._fp = open(filename, mode)
80 self._closefp = True
81 self._mode = mode_code
82 elif fileobj is not None and filename is None:
83 self._fp = fileobj
84 self._mode = mode_code
85 else:
86 raise ValueError("Must give exactly one of filename and fileobj")
87
88 def __del__(self):
89 self._close()
90
91 def _close(self):
92 if self._mode == _MODE_CLOSED:
93 return
94 try:
95 if self._mode in (_MODE_READ, _MODE_READ_EOF):
96 self._decompressor = None
97 elif self._mode == _MODE_WRITE:
98 self._fp.write(self._compressor.flush())
99 self._compressor = None
100 finally:
101 try:ยท
102 if self._closefp:
103 self._fp.close()
104 finally:
105 self._fp = None
106 self._closefp = False
107 self._mode = _MODE_CLOSED
108 self._buffer = None
109
110 # Context management protocol:
111
112 def __enter__(self):
113 self._check_not_closed()
114 return self
115
116 def __exit__(self, *args):
117 self.close()
118
119 # Iterator protocol for line iteration:
120
121 def __iter__(self):
122 self._check_not_closed()
123 return self
124
125 def __next__(self):
126 with self._lock:
127 self._check_can_read()
128 line = self._read_line(-1)
129 if not line:
130 raise StopIteration()
131 return line
132
133 # Public I/O methods:
134
135 def read(self, size=-1):
136 """Read up to size uncompressed bytes from the file.
137
138 If size is negative or omitted, read until EOF is reached.
139 Returns b'' if the file is already at EOF.
140 """
141 with self._lock:
142 self._check_can_read()
143 if self._mode == _MODE_READ_EOF or size == 0:
144 return b""
145 elif size < 0:
146 return self._read_all()
147 else:
148 return self._read_block(size)
149
150 def peek(self, n=0):
151 """Return buffered data without advancing the file position.
152
153 At least one byte of data will be returned (unless at EOF).
154 The exact number of bytes returned is unspecified.
155 """
156 with self._lock:
157 self._check_can_read()
158 if self._mode == _MODE_READ_EOF:
159 return b""
160 self._fill_buffer()
161 return self._buffer
162
163 def readline(self, size=-1):
164 """Read a line of uncompressed bytes from the file.
165
166 The terminating newline (if present) is retained. If size is
167 non-negative, no more than size bytes will be read (in which
168 case the line may be incomplete). Returns b'' if already at EOF.
169 """
170 with self._lock:
171 self._check_can_read()
172 return self._read_line(size)
173
174 def readlines(self, size=-1):
175 """Read a list of lines of uncompressed bytes from the file.
176
177 size can be specified to control the number of lines read: no
178 further lines will be read once the total size of the lines read
179 so far equals or exceeds size.
180 """
181 with self._lock:
182 self._check_can_read()
183 lines = []
184 nread = 0
185 while (size < 0 or nread < size) and self._mode != _MODE_READ_EOF:
186 line = self._read_line(-1)
187 if not line:
188 break
189 lines.append(line)
190 nread += len(line)
191 return lines
192
193 def write(self, data):
194 """Write a byte string to the file.
195
196 Returns the number of uncompressed bytes written, which is
197 always len(data). Note that due to buffering, the file on disk
198 may not reflect the data written until close() is called.
199 """
200 with self._lock:
201 self._check_can_write()
202 return self._write(data)
203
204 def writelines(self, seq):
205 """Write a sequence of byte strings to the file.
206
207 Returns the number of uncompressed bytes written.
208 seq can be any iterable yielding byte strings.
209
210 Line separators are not added between the written byte strings.
211 """
212 with self._lock:
213 self._check_can_write()
214 nwritten = 0
215 for block in seq:
216 nwritten += self._write(block)
217 return nwritten
218
219 def seek(self, offset, whence=0):
220 """Change the file position.
221
222 The new position is specified by offset, relative to the
223 position indicated by whence. Values for whence are:
224
225 0: start of stream (default); offset must not be negative
226 1: current stream position
227 2: end of stream; offset must not be positive
228
229 Returns the new file position.
230
231 Note that seeking is emulated, so depending on the parameters,
232 this operation may be extremely slow.
233 """
234 with self._lock:
235 self._check_can_seek()
236
237 # Recalculate offset as an absolute file position.
238 if whence == 0:
239 pass
240 elif whence == 1:
241 offset = self._pos + offset
242 elif whence == 2:
243 # Seeking relative to EOF - we need to know the file's size.
244 if self._size < 0:
245 self._read_all(return_data=False)
246 offset = self._size + offset
247 else:
248 raise ValueError("Invalid value for whence: {}".format(whence))
249
250 # Make it so that offset is the number of bytes to skip forward.
251 if offset < self._pos:
252 self._rewind()
253 else:
254 offset -= self._pos
255
256 # Read and discard data until we reach the desired position.
257 if self._mode != _MODE_READ_EOF:
258 self._read_block(offset, return_data=False)
259
260 return self._pos
261
262 def tell(self):
263 """Return the current file position."""
264 with self._lock:
265 self._check_not_closed()
266 return self._pos
267
268 def close(self):
269 """Flush and close the file.
270
271 May be called more than once without error. Once the file is
272 closed, any other operation on it will raise a ValueError.
273 """
274 with self._lock:
275 self._close()
276
277 @property
278 def closed(self):
279 """True if this file is closed."""
280 return self._mode == _MODE_CLOSED
281
282 # Helper methods. All methods below here (except for _check_*())
283 # assume that the caller owns the object's lock.
284
285 def _check_not_closed(self):
286 if self._mode == _MODE_CLOSED:
287 raise ValueError("I/O operation on closed file")
288
289 def _check_can_read(self):
290 if self._mode not in (_MODE_READ, _MODE_READ_EOF):
291 self._check_not_closed()
292 raise IOError("File not open for reading")
293
294 def _check_can_write(self):
295 if self._mode != _MODE_WRITE:
296 self._check_not_closed()
297 raise IOError("File not open for writing")
298
299 def _check_can_seek(self):
300 if self._mode not in (_MODE_READ, _MODE_READ_EOF):
301 self._check_not_closed()
302 raise IOError("Seeking only works on files opening for reading")
303
304 # Fill the readahead buffer if it is empty. Returns False on EOF.
305 def _fill_buffer(self):
306 if self._buffer:
307 return True
308 if self._decompressor.eof:
309 self._mode = _MODE_READ_EOF
310 self._size = self._pos
311 return False
312 rawblock = self._fp.read(_BUFFER_SIZE)
313 if not rawblock:
314 raise EOFError("Compressed file ended before the "
315 "end-of-stream marker was reached")
316 self._buffer = self._decompressor.decompress(rawblock)
317 return True
318
319 # Read data until EOF.
320 # If return_data is false, consume the data without returning it.
321 def _read_all(self, return_data=True):
322 blocks = []
323 while self._fill_buffer():
324 if return_data:
325 blocks.append(self._buffer)
326 self._pos += len(self._buffer)
327 self._buffer = None
328 if return_data:
329 return b"".join(blocks)
330
331 # Read a block of up to size bytes.
332 # If return_data is false, consume the data without returning it.
333 def _read_block(self, size, return_data=True):
334 blocks = []
335 nread = 0
336 while nread < size and self._fill_buffer():
337 nbuf = len(self._buffer)
338 ct = min(nbuf, size - nread)
339 if return_data:
340 blocks.append(self._buffer[:ct] if ct < nbuf else self._buffer)
341 nread += ct
342 self._pos += ct
343 self._buffer = self._buffer[ct:] if ct < nbuf else None
344 if return_data:
345 return b"".join(blocks)
346
347 # Read a single line of up to size bytes. Negative size -> no size limit.
348 def _read_line(self, size):
349 blocks = []
350 nread = 0
351 while (size < 0 or nread < size) and self._fill_buffer():
352 nbuf = len(self._buffer)
353 if size < 0 or nbuf <= size - nread:
354 limit = nbuf
355 else:
356 limit = size - nread
357 try:
358 ct = self._buffer.index(b'\n', 0, limit) + 1
359 found_eol = True
360 except ValueError:
361 ct = limit
362 found_eol = False
363 blocks.append(self._buffer[:ct] if ct < nbuf else self._buffer)
364 self._pos += ct
365 self._buffer = self._buffer[ct:] if ct < nbuf else None
366 if found_eol:
367 break
368 return b"".join(blocks)
369
370 # Compress and write a block of data.
371 def _write(self, data):
372 compressed = self._compressor.compress(data)
373 self._fp.write(compressed)
374 self._pos += len(data)
375 return len(data)
376
377 # Rewind the file to the beginning of the data stream.
378 def _rewind(self):
379 self._fp.seek(0, 0)
380 self._mode = _MODE_READ
381 self._pos = 0
382 self._decompressor = _bz2.Decompressor()
383 self._buffer = None
384
385
386 class BZ2Compressor:
387
388 """A compressor object for compressing data incrementally.
389
390 For one-shot compression, use the compress() function instead.
391 """
392
393 def __init__(self, compresslevel=9):
394 """Create a new compressor object.
395
396 compresslevel, if given, must be a number between 1 and 9.
397 """
398 if not (1 <= compresslevel <= 9):
399 raise ValueError("compresslevel must be between 1 and 9")
400 self._lock = threading.Lock()
401 self._compressor = _bz2.Compressor(compresslevel)
402
403 def compress(self, data):
404 """Provide data to the compressor object.
405
406 Returns a chunk of decompressed data if possible, or b''
407 otherwise.
408
409 When you have finished providing data to the compressor, call
410 the flush() method to finish the compression process.
411 """
412 with self._lock:
413 return self._compressor.compress(data)
414
415 def flush(self):
416 """Flush the internal buffers of the compressor object.
417
418 Returns any compressed data that was not returned by earlier
419 calls to decompress().
420
421 Once this method has been called, the compressor object may no
422 longer be used.
423 """
424 with self._lock:
425 return self._compressor.flush()
426
427
428 class BZ2Decompressor:
429
430 """A decompressor object for decompressing data incrementally.
431
432 For one-shot decompression, use the decompress() function instead.
433 """
434
435 def __init__(self):
436 """Create a new decompressor object."""
437 self._lock = threading.Lock()
438 self._decompressor = _bz2.Decompressor()
439
440 def decompress(self, data):
441 """Provide data to the decompressor object.
442
443 Returns a chunk of decompressed data if possible, or b''
444 otherwise.
445
446 Attempting to decompress data after the end of stream is reached
447 raises an EOFError. Any data found after the end of the stream
448 is ignored and saved in the unused_data attribute.
449 """
450 with self._lock:
451 return self._decompressor.decompress(data)
452
453 @property
454 def eof(self):
455 """True if the end-of-stream marker has been reached."""
456 return self._decompressor.eof
457
458 @property
459 def unused_data(self):
460 """Data found after the end of the compressed stream."""
461 return self._decompressor.unused_data
462
463
464 def compress(data, compresslevel=9):
465 """Compress a block of data.
466
467 compresslevel, if given, must be a number between 1 and 9.
468
469 For incremental compression, use a BZ2Compressor object instead.
470 """
471 comp = _bz2.Compressor(compresslevel)
472 return comp.compress(data) + comp.flush()
473
474
475 def decompress(data):
476 """Decompress a block of data.
477
478 For incremental decompression, use a BZ2Decompressor object instead.
479 """
480 if len(data) == 0:
481 return b""
482 decomp = _bz2.Decompressor()
483 result = decomp.decompress(data)
484 if not decomp.eof:
485 raise ValueError("Compressed data ended before the "
486 "end-of-stream marker was reached")
487 return result
OLD
NEW