msg201278 - (view) |
Author: cantor (cantor) |
Date: 2013-10-25 19:20 |
import lzma from functools import partial import multiprocessing def run_lzma(data,c): return c.compress(data) def split_len(seq, length): return [str.encode(seq[i:i+length]) for i in range(0, len(seq), length)] def lzma_mp(sequence,threads=3): lzc = lzma.LZMACompressor() blocksize = int(round(len(sequence)/threads)) strings = split_len(sequence, blocksize) lzc_partial = partial(run_lzma,c=lzc) pool=multiprocessing.Pool() lzc_pool = list(pool.map(lzc_partial,strings)) pool.close() pool.join() out_flush = lzc.flush() return b"".join(lzc_pool + [out_flush]) sequence = 'AAAAAJKDDDDDDDDDDDDDDDDDDDDDDDDDDDDGJFKSHFKLHALWEHAIHWEOIAH IOAHIOWEHIOHEIOFEAFEASFEAFWEWWWWWWWWWWWWWWWWWWWWWWWWWWWWWEWFQWEWQWQGEWQFEWFDWEWEGEFGWEG' lzma_mp(sequence,threads=3) |
|
|
msg201280 - (view) |
Author: cantor (cantor) |
Date: 2013-10-25 19:28 |
lzma |
|
|
msg201287 - (view) |
Author: Antoine Pitrou (pitrou) *  |
Date: 2013-10-25 20:22 |
The problem is that using an unpickled LZMACompressor crashes: $ ./python -c "import lzma, pickle; c = pickle.loads(pickle.dumps(lzma.LZMACompressor())); c.compress(b'')" Erreur de segmentation Here is the gdb backtrace: #0 0x00007ffff7bcafc0 in sem_trywait () from /lib/x86_64-linux-gnu/libpthread.so.0 #1 0x0000000000436c15 in PyThread_acquire_lock_timed (lock=0x0, microseconds=0, intr_flag=0) at Python/thread_pthread.h:350 #2 0x0000000000436db8 in PyThread_acquire_lock (lock=0x0, waitflag=0) at Python/thread_pthread.h:556 #3 0x00007ffff64a6538 in Compressor_compress (self=0x7ffff7e129a0, args=0x7ffff7f17468) at /home/antoine/cpython/default/Modules/_lzmamodule.c:533 |
|
|
msg201291 - (view) |
Author: Nadeem Vawda (nadeem.vawda) *  |
Date: 2013-10-25 20:49 |
As far as I can tell, liblzma provides no way to serialize a compressor's state, so the best we can do is raise a TypeError when attempting to pickle the LZMACompressor (and likewise for LZMADecompressor). Also, it's worth pointing out that the provided code wouldn't work even if you could serialize LZMACompressor objects - each call to compress() updates the compressor's internal state with information needed by the final call to flush(), but each compress() call would be made on a *copy* of the compressor rather than the original object. So flush() would end up producing bogus data (and mostly likely all compress() calls after the first would too). If you are trying to do this because LZMA compression is too slow, I'd suggest you try using zlib or bz2 instead - both of these algorithms can compress faster than LZMA (at the expense of your compression ratio). zlib is faster on both compression and decompression, while bz2 is slower than lzma at decompression. Alternatively, you can do parallel compression by calling lzma.compress() on each block (instead of creating an LZMACompressor), and then joining the results. But note that (a) this will give you a worse compression ratio than serial compression (because it can't exploit redundancy shared between blocks), and (b) using multiprocessing has a performance overhead of its own, because you will need to copy the input when sending it to the worker subprocess, and then copy the result when sending it back to the main process. |
|
|
msg201293 - (view) |
Author: cantor (cantor) |
Date: 2013-10-25 21:04 |
just to mention that map() (i.e. the non parallel version) works: import lzma from functools import partial import multiprocessing def run_lzma(data,c): return c.compress(data) def split_len(seq, length): return [str.encode(seq[i:i+length]) for i in range(0, len(seq), length)] sequence='AAAAAJKDDDDDDDDDDDDDDDDDDDDDDDDDDDDGJFKSHFKLHALWEHAIHWEOIAH IOAHIOWEHIOHEIOFEAFEASFEAFWEWWWWWWWWWWWWWWWWWWWWWWWWWWWWWEWFQWEWQWQGEWQFEWFDWEWEGEFGWEG' threads=3 blocksize = int(round(len(sequence)/threads)) strings = split_len(sequence, blocksize) #map works lzc = lzma.LZMACompressor() out = list(map(lzc.compress,strings)) out_flush = lzc.flush() result = b"".join(out + [out_flush]) lzma.compress(str.encode(sequence)) lzma.compress(str.encode(sequence)) == result True # map with the use of partial function works as well lzc = lzma.LZMACompressor() lzc_partial = partial(run_lzma,c=lzc) out = list(map(lzc_partial,strings)) out_flush = lzc.flush() result = b"".join(out + [out_flush]) lzma.compress(str.encode(sequence)) == result |
|
|
msg201294 - (view) |
Author: Nadeem Vawda (nadeem.vawda) *  |
Date: 2013-10-25 21:12 |
Yes, that's because the builtin map function doesn't handle each input in a separate process, so it uses the same LZMACompressor object everywhere. Whereas multiprocessing.Pool.map creates a new copy of the compressor object for each input, which is where the problem comes in. |
|
|
msg201295 - (view) |
Author: cantor (cantor) |
Date: 2013-10-25 21:23 |
in python 2.7.3 this kind of works however it is less efficient than the pure lzma.compress() from threading import Thread from backports import lzma from functools import partial import multiprocessing class CompressClass(Thread): def __init__ (self,data,c): Thread.__init__(self) self.exception=False self.data=data self.datacompressed="" self.c=c def getException(self): return self.exception def getOutput(self): return self.datacompressed def run(self): self.datacompressed=(self.c).compress(self.data) def split_len(seq, length): return [seq[i:i+length] for i in range(0, len(seq), length)] def launch_multiple_lzma(data,c): print 'cores' present=CompressClass(data,c) present.start() present.join() return present.getOutput() def threaded_lzma_map(sequence,threads): lzc = lzma.LZMACompressor() blocksize = int(round(len(sequence)/threads)) lzc_partial = partial(launch_multiple_lzma,c=lzc) lzlist = map(lzc_partial,split_len(sequence, blocksize)) #pool=multiprocessing.Pool() #lzclist = pool.map(lzc_partial,split_len(sequence, blocksize)) #pool.close() #pool.join() out_flush = lzc.flush() res = "".join(lzlist + [out_flush]) return res sequence = 'AAAAAJKDDDDDDDDDDDDDDDDDDDDDDDDDDDDGJFKSHFKLHALWEHAIHWEOIAH IOAHIOWEHIOHEIOFEAFEASFEAFWEWWWWWWWWWWWWWWWWWWWWWWWWWWWWWEWFQWEWQWQGEWQFEWFDWEWEGEFGWEG' lzma.compress(sequence) == threaded_lzma_map(sequence,threads=16) Any way this could be imporved? |
|
|
msg201311 - (view) |
Author: cantor (cantor) |
Date: 2013-10-26 03:05 |
python 3.3 version - tried this code and got a sliglty faster processing time then when running lzma.compress() on its own. Could this be improved upon? import lzma from functools import partial from threading import Thread def split_len(seq, length): return [str.encode(seq[i:i+length]) for i in range(0, len(seq), length)] class CompressClass(Thread): def __init__ (self,data,c): Thread.__init__(self) self.exception=False self.data=data self.datacompressed="" self.c=c def getException(self): return self.exception def getOutput(self): return self.datacompressed def run(self): self.datacompressed=(self.c).compress(self.data) def launch_multiple_lzma(data,c): present=CompressClass(data,c) present.start() present.join() return present.getOutput() def threaded_lzma_map(sequence,threads): lzc = lzma.LZMACompressor() blocksize = int(round(len(sequence)/threads)) lzc_partial = partial(launch_multiple_lzma,c=lzc) lzlist = list(map(lzc_partial,split_len(sequence, blocksize))) out_flush = lzc.flush() return b"".join(lzlist + [out_flush]) threaded_lzma_map(sequence,threads=16) |
|
|
msg201312 - (view) |
Author: Tim Peters (tim.peters) *  |
Date: 2013-10-26 03:08 |
@cantor, this is a Python issue tracker, not a help desk. If you want advice about Python programming, please use the Python mailing list or any number of "help desk" web sites (e.g., stackoverflow). |
|
|
msg201373 - (view) |
Author: Antoine Pitrou (pitrou) *  |
Date: 2013-10-26 17:00 |
If it's not possible (or easily doable) to recreate the compressor's internal state, I agree it would be helpful for pickling to raise a TypeError. |
|
|
msg201389 - (view) |
Author: Nadeem Vawda (nadeem.vawda) *  |
Date: 2013-10-26 18:11 |
It looks like there's also a separate problem in the multiprocessing module. The following code hangs after hitting a TypeError trying to pickle one of the TextIOWrapper objects: import multiprocessing def read(f): return f.read() files = [open(path) for path in 3 * ['/dev/null']] pool = multiprocessing.Pool() results = pool.map(read, files) print(results) |
|
|
msg201577 - (view) |
Author: Roundup Robot (python-dev)  |
Date: 2013-10-28 20:46 |
New changeset be363c1e94fe by Nadeem Vawda in branch '3.3': #19395: Raise exception when pickling a (BZ2|LZMA)(Compressor |
Decompressor). http://hg.python.org/cpython/rev/be363c1e94fe New changeset b9df25608ad0 by Nadeem Vawda in branch 'default': #19395: Raise exception when pickling a (BZ2 |
LZMA)(Compressor |
msg201581 - (view) |
Author: Nadeem Vawda (nadeem.vawda) *  |
Date: 2013-10-28 20:53 |
The part of this issue specific to LZMACompressor should now be fixed; I've filed issue 19425 for the issue with Pool.map hanging. |
|
|