cpython: fccdcd83708a (original) (raw)
Mercurial > cpython
changeset 76895:fccdcd83708a
Issue #14366: Support lzma compression in zip files. Patch by Serhiy Storchaka. [#14366]
Martin v. Löwis martin@v.loewis.de | |
---|---|
date | Sun, 13 May 2012 10:06:36 +0200 |
parents | 5bd55fb3e091 |
children | 9b1f618b648b |
files | Doc/library/zipfile.rst Lib/test/support.py Lib/test/test_zipfile.py Lib/zipfile.py Misc/NEWS |
diffstat | 5 files changed, 257 insertions(+), 27 deletions(-)[+] [-] Doc/library/zipfile.rst 26 Lib/test/support.py 9 Lib/test/test_zipfile.py 125 Lib/zipfile.py 121 Misc/NEWS 3 |
line wrap: on
line diff
--- a/Doc/library/zipfile.rst +++ b/Doc/library/zipfile.rst @@ -97,12 +97,20 @@ The module defines the following items: .. versionadded:: 3.3 +.. data:: ZIP_LZMA +
- The numeric constant for the LZMA compression method. This requires the
- lzma module. +
- .. versionadded:: 3.3 + .. note:: The ZIP file format specification has included support for bzip2 compression
since 2001. However, some tools (including older Python releases) do not[](#l1.17)
support it, and may either refuse to process the ZIP file altogether, or[](#l1.18)
fail to extract individual files.[](#l1.19)
since 2001, and for LZMA compression since 2006. However, some tools[](#l1.20)
(including older Python releases) do not support these compression[](#l1.21)
methods, and may either refuse to process the ZIP file altogether,[](#l1.22)
or fail to extract individual files.[](#l1.23)
.. seealso::
@@ -133,11 +141,11 @@ ZipFile Objects
adding a ZIP archive to another file (such as :file:python.exe
). If
mode is a
and the file does not exist at all, it is created.
compression is the ZIP compression method to use when writing the archive,
- and should be :const:
ZIP_STORED
, :const:ZIP_DEFLATED
; or - :const:
ZIP_DEFLATED
; unrecognized - values will cause :exc:
RuntimeError
to be raised. If :const:ZIP_DEFLATED
or - :const:
ZIP_BZIP2
is specified but the corresponded module - (:mod:
zlib
or :mod:bz2
) is not available, :exc:RuntimeError
- and should be :const:
ZIP_STORED
, :const:ZIP_DEFLATED
, - :const:
ZIP_BZIP2
or :const:ZIP_LZMA
; unrecognized - values will cause :exc:
RuntimeError
to be raised. If :const:ZIP_DEFLATED
, - :const:
ZIP_BZIP2
or :const:ZIP_LZMA
is specified but the corresponded module - (:mod:
zlib
, :mod:bz2
or :mod:lzma
) is not available, :exc:RuntimeError
is also raised. The default is :const:ZIP_STORED
. If allowZip64 isTrue
zipfile will create ZIP files that use the ZIP64 extensions when the zipfile is larger than 2 GB. If it is false (the default) :mod:zipfile
@@ -161,7 +169,7 @@ ZipFile Objects Added the ability to use :class:ZipFile
as a context manager. .. versionchanged:: 3.3
Added support for :mod:`bzip2` compression.[](#l1.48)
Added support for :mod:`bzip2` and :mod:`lzma` compression.[](#l1.49)
--- a/Lib/test/support.py +++ b/Lib/test/support.py @@ -45,6 +45,11 @@ try: except ImportError: bz2 = None +try:
+ all = [ "Error", "TestFailed", "ResourceDenied", "import_module", "verbose", "use_resources", "max_memuse", "record_original_stdout", @@ -62,7 +67,7 @@ except ImportError: "get_attribute", "swap_item", "swap_attr", "requires_IEEE_754", "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink", "import_fresh_module", "requires_zlib", "PIPE_MAX_SIZE", "failfast",
- "anticipate_failure", "run_with_tz", "requires_bz2", "requires_lzma" ] class Error(Exception): @@ -513,6 +518,8 @@ requires_zlib = unittest.skipUnless(zlib requires_bz2 = unittest.skipUnless(bz2, 'requires bz2') +requires_lzma = unittest.skipUnless(lzma, 'requires lzma') + is_jython = sys.platform.startswith('java')
Filename used for testing
--- a/Lib/test/test_zipfile.py +++ b/Lib/test/test_zipfile.py @@ -13,7 +13,7 @@ from tempfile import TemporaryFile from random import randint, random from unittest import skipUnless -from test.support import TESTFN, run_unittest, findfile, unlink, requires_zlib, requires_bz2 +from test.support import TESTFN, run_unittest, findfile, unlink, requires_zlib, requires_bz2, requires_lzma TESTFN2 = TESTFN + "2" TESTFNDIR = TESTFN + "d" @@ -361,6 +361,55 @@ class TestsWithSourceFile(unittest.TestC self.assertEqual(openobj.read(1), b'1') self.assertEqual(openobj.read(1), b'2')
- @requires_lzma
- def test_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.18)
self.zip_test(f, zipfile.ZIP_LZMA)[](#l3.19)
- @requires_lzma
- def test_open_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.23)
self.zip_open_test(f, zipfile.ZIP_LZMA)[](#l3.24)
- @requires_lzma
- def test_random_open_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.28)
self.zip_random_open_test(f, zipfile.ZIP_LZMA)[](#l3.29)
- @requires_lzma
- def test_readline_read_lzma(self):
# Issue #7610: calls to readline() interleaved with calls to read().[](#l3.33)
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.34)
self.zip_readline_read_test(f, zipfile.ZIP_LZMA)[](#l3.35)
- @requires_lzma
- def test_readline_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.39)
self.zip_readline_test(f, zipfile.ZIP_LZMA)[](#l3.40)
- @requires_lzma
- def test_readlines_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.44)
self.zip_readlines_test(f, zipfile.ZIP_LZMA)[](#l3.45)
- @requires_lzma
- def test_iterlines_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.49)
self.zip_iterlines_test(f, zipfile.ZIP_LZMA)[](#l3.50)
- @requires_lzma
- def test_low_compression_lzma(self):
"""Check for cases where compressed data is larger than original."""[](#l3.54)
# Create the ZIP archive[](#l3.55)
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_LZMA) as zipfp:[](#l3.56)
zipfp.writestr("strfile", '12')[](#l3.57)
# Get an open object for strfile[](#l3.59)
with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_LZMA) as zipfp:[](#l3.60)
with zipfp.open("strfile") as openobj:[](#l3.61)
self.assertEqual(openobj.read(1), b'1')[](#l3.62)
self.assertEqual(openobj.read(1), b'2')[](#l3.63)
+ def test_absolute_arcnames(self): with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: zipfp.write(TESTFN, "/absolute") @@ -508,6 +557,13 @@ class TestsWithSourceFile(unittest.TestC info = zipfp.getinfo('b.txt') self.assertEqual(info.compress_type, zipfile.ZIP_BZIP2)
- @requires_lzma
- def test_writestr_compression_lzma(self):
zipfp = zipfile.ZipFile(TESTFN2, "w")[](#l3.74)
zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_LZMA)[](#l3.75)
info = zipfp.getinfo('b.txt')[](#l3.76)
self.assertEqual(info.compress_type, zipfile.ZIP_LZMA)[](#l3.77)
+ def zip_test_writestr_permissions(self, f, compression): # Make sure that writestr creates files with mode 0600, # when it is passed a name rather than a ZipInfo instance. @@ -686,6 +742,11 @@ class TestZip64InSmallFiles(unittest.Tes for f in (TESTFN2, TemporaryFile(), io.BytesIO()): self.zip_test(f, zipfile.ZIP_BZIP2)
- @requires_lzma
- def test_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.88)
self.zip_test(f, zipfile.ZIP_LZMA)[](#l3.89)
+ def test_absolute_arcnames(self): with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, allowZip64=True) as zipfp: @@ -826,6 +887,16 @@ class OtherTests(unittest.TestCase): b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' b'\x00\x00\x00\x00'),
zipfile.ZIP_LZMA: ([](#l3.98)
b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA'[](#l3.99)
b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'[](#l3.100)
b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I'[](#l3.101)
b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK'[](#l3.102)
b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA'[](#l3.103)
b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00'[](#l3.104)
b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil'[](#l3.105)
b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00'[](#l3.106)
} def test_unsupported_version(self): @@ -1104,6 +1175,10 @@ class OtherTests(unittest.TestCase): def test_testzip_with_bad_crc_bzip2(self): self.check_testzip_with_bad_crc(zipfile.ZIP_BZIP2)b'\x00>\x00\x00\x00\x00\x00'),[](#l3.107)
- @requires_lzma
- def test_testzip_with_bad_crc_lzma(self):
self.check_testzip_with_bad_crc(zipfile.ZIP_LZMA)[](#l3.117)
+ def check_read_with_bad_crc(self, compression): """Tests that files with bad CRCs raise a BadZipFile exception when read.""" zipdata = self.zips_with_bad_crc[compression] @@ -1136,6 +1211,10 @@ class OtherTests(unittest.TestCase): def test_read_with_bad_crc_bzip2(self): self.check_read_with_bad_crc(zipfile.ZIP_BZIP2)
- @requires_lzma
- def test_read_with_bad_crc_lzma(self):
self.check_read_with_bad_crc(zipfile.ZIP_LZMA)[](#l3.128)
+ def check_read_return_size(self, compression): # Issue #9837: ZipExtFile.read() shouldn't return more bytes # than requested. @@ -1160,6 +1239,10 @@ class OtherTests(unittest.TestCase): def test_read_return_size_bzip2(self): self.check_read_return_size(zipfile.ZIP_BZIP2)
- @requires_lzma
- def test_read_return_size_lzma(self):
self.check_read_return_size(zipfile.ZIP_LZMA)[](#l3.139)
+ def test_empty_zipfile(self): # Check that creating a file in 'w' or 'a' mode and closing without # adding any files to the archives creates a valid empty ZIP file @@ -1306,6 +1389,11 @@ class TestsWithRandomBinaryFiles(unittes for f in (TESTFN2, TemporaryFile(), io.BytesIO()): self.zip_test(f, zipfile.ZIP_BZIP2)
- @requires_lzma
- def test_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.150)
self.zip_test(f, zipfile.ZIP_LZMA)[](#l3.151)
+ def zip_open_test(self, f, compression): self.make_test_archive(f, compression) @@ -1351,6 +1439,11 @@ class TestsWithRandomBinaryFiles(unittes for f in (TESTFN2, TemporaryFile(), io.BytesIO()): self.zip_open_test(f, zipfile.ZIP_BZIP2)
- @requires_lzma
- def test_open_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.162)
self.zip_open_test(f, zipfile.ZIP_LZMA)[](#l3.163)
+ def zip_random_open_test(self, f, compression): self.make_test_archive(f, compression) @@ -1384,6 +1477,11 @@ class TestsWithRandomBinaryFiles(unittes for f in (TESTFN2, TemporaryFile(), io.BytesIO()): self.zip_random_open_test(f, zipfile.ZIP_BZIP2)
- @requires_lzma
- def test_random_open_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.174)
self.zip_random_open_test(f, zipfile.ZIP_LZMA)[](#l3.175)
+ @requires_zlib class TestsWithMultipleOpens(unittest.TestCase): @@ -1628,6 +1726,31 @@ class UniversalNewlineTests(unittest.Tes for f in (TESTFN2, TemporaryFile(), io.BytesIO()): self.iterlines_test(f, zipfile.ZIP_BZIP2)
- @requires_lzma
- def test_read_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.186)
self.read_test(f, zipfile.ZIP_LZMA)[](#l3.187)
- @requires_lzma
- def test_readline_read_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.191)
self.readline_read_test(f, zipfile.ZIP_LZMA)[](#l3.192)
- @requires_lzma
- def test_readline_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.196)
self.readline_test(f, zipfile.ZIP_LZMA)[](#l3.197)
- @requires_lzma
- def test_readlines_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.201)
self.readlines_test(f, zipfile.ZIP_LZMA)[](#l3.202)
- @requires_lzma
- def test_iterlines_lzma(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()):[](#l3.206)
self.iterlines_test(f, zipfile.ZIP_LZMA)[](#l3.207)
+ def tearDown(self): for sep, fn in self.arcfiles.items(): os.remove(fn)
--- a/Lib/zipfile.py +++ b/Lib/zipfile.py @@ -27,8 +27,13 @@ try: except ImportError: bz2 = None +try:
+ all = ["BadZipFile", "BadZipfile", "error",
"ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2",[](#l4.13)
"ZIP_STORED", "ZIP_DEFLATED", "ZIP_BZIP2", "ZIP_LZMA",[](#l4.14) "is_zipfile", "ZipInfo", "ZipFile", "PyZipFile", "LargeZipFile"][](#l4.15)
class BadZipFile(Exception): @@ -52,13 +57,15 @@ ZIP_MAX_COMMENT = (1 << 16) - 1 ZIP_STORED = 0 ZIP_DEFLATED = 8 ZIP_BZIP2 = 12 +ZIP_LZMA = 14
Other ZIP compression methods not supported
DEFAULT_VERSION = 20 ZIP64_VERSION = 45 BZIP2_VERSION = 46 +LZMA_VERSION = 63
we recognize (but not necessarily support) all features up to that version
-MAX_EXTRACT_VERSION = 46 +MAX_EXTRACT_VERSION = 63
Below are some formats and associated data for reading/writing headers using
the struct module. The names and structures of headers/records are those used
@@ -367,6 +374,8 @@ class ZipInfo (object): if self.compress_type == ZIP_BZIP2: min_version = max(BZIP2_VERSION, min_version)
elif self.compress_type == ZIP_LZMA:[](#l4.39)
min_version = max(LZMA_VERSION, min_version)[](#l4.40)
self.extract_version = max(min_version, self.extract_version) self.create_version = max(min_version, self.create_version) @@ -480,6 +489,77 @@ class _ZipDecrypter: return c +class LZMACompressor: +
- def _init(self):
props = lzma.encode_filter_properties({'id': lzma.FILTER_LZMA1})[](#l4.54)
self._comp = lzma.LZMACompressor(lzma.FORMAT_RAW, filters=[[](#l4.55)
lzma.decode_filter_properties(lzma.FILTER_LZMA1, props)[](#l4.56)
])[](#l4.57)
return struct.pack('<BBH', 9, 4, len(props)) + props[](#l4.58)
- def compress(self, data):
if self._comp is None:[](#l4.61)
return self._init() + self._comp.compress(data)[](#l4.62)
return self._comp.compress(data)[](#l4.63)
- def flush(self):
if self._comp is None:[](#l4.66)
return self._init() + self._comp.flush()[](#l4.67)
return self._comp.flush()[](#l4.68)
+ + +class LZMADecompressor: +
- def init(self):
self._decomp = None[](#l4.74)
self._unconsumed = b''[](#l4.75)
self.eof = False[](#l4.76)
- def decompress(self, data):
if self._decomp is None:[](#l4.79)
self._unconsumed += data[](#l4.80)
if len(self._unconsumed) <= 4:[](#l4.81)
return b''[](#l4.82)
psize, = struct.unpack('<H', self._unconsumed[2:4])[](#l4.83)
if len(self._unconsumed) <= 4 + psize:[](#l4.84)
return b''[](#l4.85)
self._decomp = lzma.LZMADecompressor(lzma.FORMAT_RAW, filters=[[](#l4.87)
lzma.decode_filter_properties(lzma.FILTER_LZMA1,[](#l4.88)
self._unconsumed[4:4 + psize])[](#l4.89)
])[](#l4.90)
data = self._unconsumed[4 + psize:][](#l4.91)
del self._unconsumed[](#l4.92)
result = self._decomp.decompress(data)[](#l4.94)
self.eof = self._decomp.eof[](#l4.95)
return result[](#l4.96)
- 0: 'store',
- 1: 'shrink',
- 2: 'reduce',
- 3: 'reduce',
- 4: 'reduce',
- 5: 'reduce',
- 6: 'implode',
- 7: 'tokenize',
- 8: 'deflate',
- 9: 'deflate64',
- 10: 'implode',
- 12: 'bzip2',
- 14: 'lzma',
- 18: 'terse',
- 19: 'lz77',
- 97: 'wavpack',
- 98: 'ppmd',
+} + def _check_compression(compression): if compression == ZIP_STORED: pass @@ -491,6 +571,10 @@ def _check_compression(compression): if not bz2: raise RuntimeError( "Compression requires the (missing) bz2 module")
- elif compression == ZIP_LZMA:
if not lzma:[](#l4.127)
raise RuntimeError([](#l4.128)
else: raise RuntimeError("That compression method is not supported")"Compression requires the (missing) lzma module")[](#l4.129)
@@ -501,6 +585,8 @@ def _get_compressor(compress_type): zlib.DEFLATED, -15) elif compress_type == ZIP_BZIP2: return bz2.BZ2Compressor()
@@ -512,19 +598,10 @@ def _get_decompressor(compress_type): return zlib.decompressobj(-15) elif compress_type == ZIP_BZIP2: return bz2.BZ2Decompressor()
unknown_compressors = {[](#l4.149)
1: 'shrink',[](#l4.150)
2: 'reduce',[](#l4.151)
3: 'reduce',[](#l4.152)
4: 'reduce',[](#l4.153)
5: 'reduce',[](#l4.154)
6: 'implode',[](#l4.155)
9: 'enhanced deflate',[](#l4.156)
10: 'implode',[](#l4.157)
14: 'lzma',[](#l4.158)
}[](#l4.159)
descr = unknown_compressors.get(compress_type)[](#l4.160)
descr = compressor_names.get(compress_type)[](#l4.161) if descr:[](#l4.162) raise NotImplementedError("compression type %d (%s)" % (compress_type, descr))[](#l4.163) else:[](#l4.164)
@@ -781,8 +858,8 @@ class ZipFile: file: Either the path to the file, or a file-like object. If it is a path, the file will be opened and closed by ZipFile. mode: The mode can be either read "r", write "w" or append "a".
- compression: ZIP_STORED (no compression), ZIP_DEFLATED (requires zlib) or
ZIP_BZIP2 (requires bz2).[](#l4.170)
- compression: ZIP_STORED (no compression), ZIP_DEFLATED (requires zlib),
allowZip64: if True ZipFile will create files with ZIP64 extensions when needed, otherwise it will raise an exception when this would be necessary.ZIP_BZIP2 (requires bz2) or ZIP_LZMA (requires lzma).[](#l4.172)
@@ -1062,6 +1139,10 @@ class ZipFile: # Zip 2.7: compressed patched data raise NotImplementedError("compressed patched data (flag bit 5)")
if zinfo.flag_bits & 0x40:[](#l4.180)
# strong encryption[](#l4.181)
raise NotImplementedError("strong encryption (flag bit 6)")[](#l4.182)
+ if zinfo.flag_bits & 0x800: # UTF-8 filename fname_str = fname.decode("utf-8") @@ -1220,6 +1301,9 @@ class ZipFile: zinfo.file_size = st.st_size zinfo.flag_bits = 0x00 zinfo.header_offset = self.fp.tell() # Start of header bytes
if zinfo.compress_type == ZIP_LZMA:[](#l4.191)
# Compressed data includes an end-of-stream (EOS) marker[](#l4.192)
zinfo.flag_bits |= 0x02[](#l4.193)
self._writecheck(zinfo) self._didModify = True @@ -1292,6 +1376,9 @@ class ZipFile: zinfo.header_offset = self.fp.tell() # Start of header data if compress_type is not None: zinfo.compress_type = compress_type
if zinfo.compress_type == ZIP_LZMA:[](#l4.201)
# Compressed data includes an end-of-stream (EOS) marker[](#l4.202)
zinfo.flag_bits |= 0x02[](#l4.203)
self._writecheck(zinfo) self._didModify = True @@ -1360,6 +1447,8 @@ class ZipFile: if zinfo.compress_type == ZIP_BZIP2: min_version = max(BZIP2_VERSION, min_version)
elif zinfo.compress_type == ZIP_LZMA:[](#l4.211)
min_version = max(LZMA_VERSION, min_version)[](#l4.212)
extract_version = max(min_version, zinfo.extract_version) create_version = max(min_version, zinfo.create_version)
--- a/Misc/NEWS +++ b/Misc/NEWS @@ -23,6 +23,9 @@ Core and Builtins Library ------- +- Issue #14366: Support lzma compression in zip files.