Issue 19395: unpickled LZMACompressor is crashy (original) (raw)

Created on 2013-10-25 19:20 by cantor, last changed 2022-04-11 14:57 by admin. This issue is now closed.

Messages (13)
msg201278 - (view) Author: cantor (cantor) Date: 2013-10-25 19:20
import lzma from functools import partial import multiprocessing def run_lzma(data,c): return c.compress(data) def split_len(seq, length): return [str.encode(seq[i:i+length]) for i in range(0, len(seq), length)] def lzma_mp(sequence,threads=3): lzc = lzma.LZMACompressor() blocksize = int(round(len(sequence)/threads)) strings = split_len(sequence, blocksize) lzc_partial = partial(run_lzma,c=lzc) pool=multiprocessing.Pool() lzc_pool = list(pool.map(lzc_partial,strings)) pool.close() pool.join() out_flush = lzc.flush() return b"".join(lzc_pool + [out_flush]) sequence = 'AAAAAJKDDDDDDDDDDDDDDDDDDDDDDDDDDDDGJFKSHFKLHALWEHAIHWEOIAH IOAHIOWEHIOHEIOFEAFEASFEAFWEWWWWWWWWWWWWWWWWWWWWWWWWWWWWWEWFQWEWQWQGEWQFEWFDWEWEGEFGWEG' lzma_mp(sequence,threads=3)
msg201280 - (view) Author: cantor (cantor) Date: 2013-10-25 19:28
lzma
msg201287 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2013-10-25 20:22
The problem is that using an unpickled LZMACompressor crashes: $ ./python -c "import lzma, pickle; c = pickle.loads(pickle.dumps(lzma.LZMACompressor())); c.compress(b'')" Erreur de segmentation Here is the gdb backtrace: #0 0x00007ffff7bcafc0 in sem_trywait () from /lib/x86_64-linux-gnu/libpthread.so.0 #1 0x0000000000436c15 in PyThread_acquire_lock_timed (lock=0x0, microseconds=0, intr_flag=0) at Python/thread_pthread.h:350 #2 0x0000000000436db8 in PyThread_acquire_lock (lock=0x0, waitflag=0) at Python/thread_pthread.h:556 #3 0x00007ffff64a6538 in Compressor_compress (self=0x7ffff7e129a0, args=0x7ffff7f17468) at /home/antoine/cpython/default/Modules/_lzmamodule.c:533
msg201291 - (view) Author: Nadeem Vawda (nadeem.vawda) * (Python committer) Date: 2013-10-25 20:49
As far as I can tell, liblzma provides no way to serialize a compressor's state, so the best we can do is raise a TypeError when attempting to pickle the LZMACompressor (and likewise for LZMADecompressor). Also, it's worth pointing out that the provided code wouldn't work even if you could serialize LZMACompressor objects - each call to compress() updates the compressor's internal state with information needed by the final call to flush(), but each compress() call would be made on a *copy* of the compressor rather than the original object. So flush() would end up producing bogus data (and mostly likely all compress() calls after the first would too). If you are trying to do this because LZMA compression is too slow, I'd suggest you try using zlib or bz2 instead - both of these algorithms can compress faster than LZMA (at the expense of your compression ratio). zlib is faster on both compression and decompression, while bz2 is slower than lzma at decompression. Alternatively, you can do parallel compression by calling lzma.compress() on each block (instead of creating an LZMACompressor), and then joining the results. But note that (a) this will give you a worse compression ratio than serial compression (because it can't exploit redundancy shared between blocks), and (b) using multiprocessing has a performance overhead of its own, because you will need to copy the input when sending it to the worker subprocess, and then copy the result when sending it back to the main process.
msg201293 - (view) Author: cantor (cantor) Date: 2013-10-25 21:04
just to mention that map() (i.e. the non parallel version) works: import lzma from functools import partial import multiprocessing def run_lzma(data,c): return c.compress(data) def split_len(seq, length): return [str.encode(seq[i:i+length]) for i in range(0, len(seq), length)] sequence='AAAAAJKDDDDDDDDDDDDDDDDDDDDDDDDDDDDGJFKSHFKLHALWEHAIHWEOIAH IOAHIOWEHIOHEIOFEAFEASFEAFWEWWWWWWWWWWWWWWWWWWWWWWWWWWWWWEWFQWEWQWQGEWQFEWFDWEWEGEFGWEG' threads=3 blocksize = int(round(len(sequence)/threads)) strings = split_len(sequence, blocksize) #map works lzc = lzma.LZMACompressor() out = list(map(lzc.compress,strings)) out_flush = lzc.flush() result = b"".join(out + [out_flush]) lzma.compress(str.encode(sequence)) lzma.compress(str.encode(sequence)) == result True # map with the use of partial function works as well lzc = lzma.LZMACompressor() lzc_partial = partial(run_lzma,c=lzc) out = list(map(lzc_partial,strings)) out_flush = lzc.flush() result = b"".join(out + [out_flush]) lzma.compress(str.encode(sequence)) == result
msg201294 - (view) Author: Nadeem Vawda (nadeem.vawda) * (Python committer) Date: 2013-10-25 21:12
Yes, that's because the builtin map function doesn't handle each input in a separate process, so it uses the same LZMACompressor object everywhere. Whereas multiprocessing.Pool.map creates a new copy of the compressor object for each input, which is where the problem comes in.
msg201295 - (view) Author: cantor (cantor) Date: 2013-10-25 21:23
in python 2.7.3 this kind of works however it is less efficient than the pure lzma.compress() from threading import Thread from backports import lzma from functools import partial import multiprocessing class CompressClass(Thread): def __init__ (self,data,c): Thread.__init__(self) self.exception=False self.data=data self.datacompressed="" self.c=c def getException(self): return self.exception def getOutput(self): return self.datacompressed def run(self): self.datacompressed=(self.c).compress(self.data) def split_len(seq, length): return [seq[i:i+length] for i in range(0, len(seq), length)] def launch_multiple_lzma(data,c): print 'cores' present=CompressClass(data,c) present.start() present.join() return present.getOutput() def threaded_lzma_map(sequence,threads): lzc = lzma.LZMACompressor() blocksize = int(round(len(sequence)/threads)) lzc_partial = partial(launch_multiple_lzma,c=lzc) lzlist = map(lzc_partial,split_len(sequence, blocksize)) #pool=multiprocessing.Pool() #lzclist = pool.map(lzc_partial,split_len(sequence, blocksize)) #pool.close() #pool.join() out_flush = lzc.flush() res = "".join(lzlist + [out_flush]) return res sequence = 'AAAAAJKDDDDDDDDDDDDDDDDDDDDDDDDDDDDGJFKSHFKLHALWEHAIHWEOIAH IOAHIOWEHIOHEIOFEAFEASFEAFWEWWWWWWWWWWWWWWWWWWWWWWWWWWWWWEWFQWEWQWQGEWQFEWFDWEWEGEFGWEG' lzma.compress(sequence) == threaded_lzma_map(sequence,threads=16) Any way this could be imporved?
msg201311 - (view) Author: cantor (cantor) Date: 2013-10-26 03:05
python 3.3 version - tried this code and got a sliglty faster processing time then when running lzma.compress() on its own. Could this be improved upon? import lzma from functools import partial from threading import Thread def split_len(seq, length): return [str.encode(seq[i:i+length]) for i in range(0, len(seq), length)] class CompressClass(Thread): def __init__ (self,data,c): Thread.__init__(self) self.exception=False self.data=data self.datacompressed="" self.c=c def getException(self): return self.exception def getOutput(self): return self.datacompressed def run(self): self.datacompressed=(self.c).compress(self.data) def launch_multiple_lzma(data,c): present=CompressClass(data,c) present.start() present.join() return present.getOutput() def threaded_lzma_map(sequence,threads): lzc = lzma.LZMACompressor() blocksize = int(round(len(sequence)/threads)) lzc_partial = partial(launch_multiple_lzma,c=lzc) lzlist = list(map(lzc_partial,split_len(sequence, blocksize))) out_flush = lzc.flush() return b"".join(lzlist + [out_flush]) threaded_lzma_map(sequence,threads=16)
msg201312 - (view) Author: Tim Peters (tim.peters) * (Python committer) Date: 2013-10-26 03:08
@cantor, this is a Python issue tracker, not a help desk. If you want advice about Python programming, please use the Python mailing list or any number of "help desk" web sites (e.g., stackoverflow).
msg201373 - (view) Author: Antoine Pitrou (pitrou) * (Python committer) Date: 2013-10-26 17:00
If it's not possible (or easily doable) to recreate the compressor's internal state, I agree it would be helpful for pickling to raise a TypeError.
msg201389 - (view) Author: Nadeem Vawda (nadeem.vawda) * (Python committer) Date: 2013-10-26 18:11
It looks like there's also a separate problem in the multiprocessing module. The following code hangs after hitting a TypeError trying to pickle one of the TextIOWrapper objects: import multiprocessing def read(f): return f.read() files = [open(path) for path in 3 * ['/dev/null']] pool = multiprocessing.Pool() results = pool.map(read, files) print(results)
msg201577 - (view) Author: Roundup Robot (python-dev) (Python triager) Date: 2013-10-28 20:46
New changeset be363c1e94fe by Nadeem Vawda in branch '3.3': #19395: Raise exception when pickling a (BZ2|LZMA)(Compressor Decompressor). http://hg.python.org/cpython/rev/be363c1e94fe New changeset b9df25608ad0 by Nadeem Vawda in branch 'default': #19395: Raise exception when pickling a (BZ2 LZMA)(Compressor
msg201581 - (view) Author: Nadeem Vawda (nadeem.vawda) * (Python committer) Date: 2013-10-28 20:53
The part of this issue specific to LZMACompressor should now be fixed; I've filed issue 19425 for the issue with Pool.map hanging.
History
Date User Action Args
2022-04-11 14:57:52 admin set github: 63594
2013-10-28 20:53:56 nadeem.vawda set status: open -> closedresolution: fixedmessages: + stage: needs patch -> resolved
2013-10-28 20:46:06 python-dev set nosy: + python-devmessages: +
2013-10-26 18:11:41 nadeem.vawda set nosy: + jnoller, sbtmessages: +
2013-10-26 17:00:01 pitrou set messages: +
2013-10-26 03:08:44 tim.peters set nosy: + tim.petersmessages: +
2013-10-26 03:05:46 cantor set messages: +
2013-10-25 21:23:18 cantor set messages: +
2013-10-25 21:12:34 nadeem.vawda set messages: +
2013-10-25 21:04:59 cantor set messages: +
2013-10-25 20:49:19 nadeem.vawda set messages: +
2013-10-25 20:22:07 pitrou set versions: + Python 3.4type: behavior -> crashnosy: + pitroutitle: lzma hangs for a very long time when run in parallel using python's muptiprocessing module? -> unpickled LZMACompressor is crashymessages: + stage: needs patch
2013-10-25 19:36:01 cantor set components: - ctypes
2013-10-25 19:28:17 cantor set messages: +
2013-10-25 19:27:55 cantor set nosy: + nadeem.vawda
2013-10-25 19:20:43 cantor create