[Python-Dev] futures API (original) (raw)
Thomas Nagy tnagyemail-mail at yahoo.fr
Sun Dec 12 03:32:18 CET 2010
- Previous message: [Python-Dev] porting python.org
- Next message: [Python-Dev] use case for bytes.format
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
--- El sáb, 11/12/10, Brian Quinlan escribió:
On Dec 11, 2010, at 6:44 AM, Thomas Nagy wrote: > --- El vie, 10/12/10, Brian Quinlan escribió: >> On Dec 10, 2010, at 10:51 AM, Thomas Nagy wrote: >>> --- El vie, 10/12/10, Brian Quinlan escribió: >>>> On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote: >>>>> I have a process running for a long time, and >> which >>>> may use futures of different maxworkers count. I >> think it >>>> is not too far-fetched to create a new futures >> object each >>>> time. Yet, the execution becomes slower after each >> call, for >>>> example with http://freehackers.org/~tnagy/futurestest.py: >>>>> >>>>> """ >>>>> import concurrent.futures >>>>> from queue import Queue >>>>> import datetime >>>>> >>>>> class counter(object): >>>>> def init(self, fut): >>>>> self.fut = >> fut >>>>> >>>>> def run(self): >>>>> def >>>> lookbusy(num, obj): >>>>> >>>> tot = 0 >>>>> >>>> for x in range(num): >>>>> >>>> tot += x >>>>> >>>> obj.outq.put(tot) >>>>> >>>>> start = >>>> datetime.datetime.utcnow() >>>>> self.count = >> 0 >>>>> self.outq >> = >>>> Queue(0) >>>>> for x in >>>> range(1000): >>>>> >>>> self.count += 1 >>>>> >>>> self.fut.submit(lookbusy, >> self.count, >>>> self) >>>>> >>>>> while >>>> self.count: >>>>> >>>> self.count -= 1 >>>>> >>>> self.outq.get() >>>>> >>>>> delta = >>>> datetime.datetime.utcnow() - start >>>>> >>>> print(delta.totalseconds()) >>>>> >>>>> fut = >>>> >> concurrent.futures.ThreadPoolExecutor(maxworkers=20) >>>>> for x in range(100): >>>>> # comment the following >> line >>>>> fut = >>>> >> concurrent.futures.ThreadPoolExecutor(maxworkers=20) >>>>> c = counter(fut) >>>>> c.run() >>>>> """ >>>>> >>>>> The runtime grows after each step: >>>>> 0.216451 >>>>> 0.225186 >>>>> 0.223725 >>>>> 0.222274 >>>>> 0.230964 >>>>> 0.240531 >>>>> 0.24137 >>>>> 0.252393 >>>>> 0.249948 >>>>> 0.257153 >>>>> ... >>>>> >>>>> Is there a mistake in this piece of code? >>>> >>>> There is no mistake that I can see but I suspect >> that the >>>> circular references that you are building are >> causing the >>>> ThreadPoolExecutor to take a long time to be >> collected. Try >>>> adding: >>>> >>>> c = counter(fut) >>>> c.run() >>>> + fut.shutdown() >>>> >>>> Even if that fixes your problem, I still don't >> fully >>>> understand this because I would expect the runtime >> to fall >>>> after a while as ThreadPoolExecutors are >> collected. >>> >>> The shutdown call is indeed a good fix :-) Here is the >> time response >>> of the calls to counter() when shutdown is not >> called: >>> http://www.freehackers.org/~tnagy/runtimefutures.png >> >> FWIW, I think that you are confusion the term "future" >> with >> "executor". A future represents a single work item. An >> executor >> creates futures and schedules their underlying work. > > Ah yes, sorry. I have also realized that the executor is not the killer feature I was expecting, it can only replace a little part of the code I have: controlling the exceptions and the workflow is the most complicated part. > > I have also observed a minor performance degradation with the executor replacement (3 seconds for 5000 work items). The amount of work items processed by unit of time does not seem to be a straight line: http://www.freehackers.org/~tnagy/runtimefutures2.png . That looks pretty linear to me.
Ok.
> Out of curiosity, what is the "threadreferences" for?
There is a big comment above it in the code: # Workers are created as daemon threads. This is done to allow the interpreter # to exit when there are still idle threads in a ThreadPoolExecutor's thread # pool (i.e. shutdown() was not called). However, allowing workers to die with # the interpreter has two undesirable properties: # - The workers would still be running during interpretor shutdown, # meaning that they would fail in unpredictable ways. # - The workers could be killed while evaluating a work item, which could # be bad if the callable being evaluated has external side-effects e.g. # writing to a file. # # To work around this problem, an exit handler is installed which tells the # workers to exit when their work queues are empty and then waits until the # threads finish. threadreferences = set() shutdown = False def pythonexit(): global shutdown shutdown = True for threadreference in threadreferences: thread = threadreference() if thread is not None: thread.join() Is it still unclear why it is there? Maybe you could propose some additional documentation.
I was thinking that if exceptions have to be caught - and it is likely to be the case in general - then this scheme is redundant. Now I see that the threads are getting their work items from a queue, so it is clear now.
Thanks for all the information,
Thomas
- Previous message: [Python-Dev] porting python.org
- Next message: [Python-Dev] use case for bytes.format
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]