(original) (raw)

import multiprocessing def run_pipe(duplex, datatype): r, w = multiprocessing.Pipe(duplex=duplex) if datatype == 'small': data = 'some data' else: data = [] for i in range(1000): data.append(i) for d in range(10000): w.send(data) res = r.recv() assert res == data def run_pipe_bytes(duplex, datatype): r, w = multiprocessing.Pipe(duplex=duplex) data = b'some data' for d in range(10000): w.send_bytes(data) res = r.recv_bytes() assert res == data if __name__=='__main__': from timeit import Timer t1 = Timer("run_pipe_bytes(False, 'small')", "from __main__ import run_pipe_bytes") t3 = Timer("run_pipe(False, 'small')", "from __main__ import run_pipe") t4 = Timer("run_pipe(False, 'big')", "from __main__ import run_pipe") print ('duplex = False with small bytestring : ', t1.timeit(1)) print ('duplex = False with small obj : ', t3.timeit(1)) print ('duplex = False with huge obj : ', t4.timeit(1)) else: run_pipe(False, 'small')