[Python-Dev] futures API (original) (raw)

Thomas Nagy tnagyemail-mail at yahoo.fr
Sun Dec 12 03:32:18 CET 2010


--- El sáb, 11/12/10, Brian Quinlan escribió:

On Dec 11, 2010, at 6:44 AM, Thomas Nagy wrote: > --- El vie, 10/12/10, Brian Quinlan escribió: >> On Dec 10, 2010, at 10:51 AM, Thomas Nagy wrote: >>> --- El vie, 10/12/10, Brian Quinlan escribió: >>>> On Dec 10, 2010, at 5:36 AM, Thomas Nagy wrote: >>>>> I have a process running for a long time, and >> which >>>> may use futures of different maxworkers count. I >> think it >>>> is not too far-fetched to create a new futures >> object each >>>> time. Yet, the execution becomes slower after each >> call, for >>>> example with http://freehackers.org/~tnagy/futurestest.py: >>>>> >>>>> """ >>>>> import concurrent.futures >>>>> from queue import Queue >>>>> import datetime >>>>> >>>>> class counter(object): >>>>>       def init(self, fut): >>>>>   self.fut = >> fut >>>>> >>>>>       def run(self): >>>>>   def >>>> lookbusy(num, obj): >>>>> >>>>     tot = 0 >>>>> >>>>     for x in range(num): >>>>> >>>>     tot += x >>>>> >>>>   obj.outq.put(tot) >>>>> >>>>>   start = >>>> datetime.datetime.utcnow() >>>>>   self.count = >> 0 >>>>>   self.outq >> = >>>> Queue(0) >>>>>   for x in >>>> range(1000): >>>>> >>>>     self.count += 1 >>>>> >>>>   self.fut.submit(lookbusy, >> self.count, >>>> self) >>>>> >>>>>   while >>>> self.count: >>>>> >>>>     self.count -= 1 >>>>> >>>>     self.outq.get() >>>>> >>>>>   delta = >>>> datetime.datetime.utcnow() - start >>>>> >>>>   print(delta.totalseconds()) >>>>> >>>>> fut = >>>> >> concurrent.futures.ThreadPoolExecutor(maxworkers=20) >>>>> for x in range(100): >>>>>       # comment the following >> line >>>>>       fut = >>>> >> concurrent.futures.ThreadPoolExecutor(maxworkers=20) >>>>>       c = counter(fut) >>>>>   c.run() >>>>> """ >>>>> >>>>> The runtime grows after each step: >>>>> 0.216451 >>>>> 0.225186 >>>>> 0.223725 >>>>> 0.222274 >>>>> 0.230964 >>>>> 0.240531 >>>>> 0.24137 >>>>> 0.252393 >>>>> 0.249948 >>>>> 0.257153 >>>>> ... >>>>> >>>>> Is there a mistake in this piece of code? >>>> >>>> There is no mistake that I can see but I suspect >> that the >>>> circular references that you are building are >> causing the >>>> ThreadPoolExecutor to take a long time to be >> collected. Try >>>> adding: >>>> >>>>      c = counter(fut) >>>>      c.run() >>>> +    fut.shutdown() >>>> >>>> Even if that fixes your problem, I still don't >> fully >>>> understand this because I would expect the runtime >> to fall >>>> after a while as ThreadPoolExecutors are >> collected. >>> >>> The shutdown call is indeed a good fix :-) Here is the >> time response >>> of the calls to counter() when shutdown is not >> called: >>> http://www.freehackers.org/~tnagy/runtimefutures.png >> >> FWIW, I think that you are confusion the term "future" >> with >> "executor". A future represents a single work item. An >> executor >> creates futures and schedules their underlying work. > > Ah yes, sorry. I have also realized that the executor is not the killer feature I was expecting, it can only replace a little part of the code I have: controlling the exceptions and the workflow is the most complicated part. > > I have also observed a minor performance degradation with the executor replacement (3 seconds for 5000 work items). The amount of work items processed by unit of time does not seem to be a straight line: http://www.freehackers.org/~tnagy/runtimefutures2.png . That looks pretty linear to me.

Ok.

> Out of curiosity, what is the "threadreferences" for?

There is a big comment above it in the code: # Workers are created as daemon threads. This is done to allow the interpreter # to exit when there are still idle threads in a ThreadPoolExecutor's thread # pool (i.e. shutdown() was not called). However, allowing workers to die with # the interpreter has two undesirable properties: #   - The workers would still be running during interpretor shutdown, #     meaning that they would fail in unpredictable ways. #   - The workers could be killed while evaluating a work item, which could #     be bad if the callable being evaluated has external side-effects e.g. #     writing to a file. # # To work around this problem, an exit handler is installed which tells the # workers to exit when their work queues are empty and then waits until the # threads finish. threadreferences = set() shutdown = False def pythonexit(): global shutdown shutdown = True for threadreference in threadreferences: thread = threadreference() if thread is not None: thread.join() Is it still unclear why it is there? Maybe you could propose some additional documentation.

I was thinking that if exceptions have to be caught - and it is likely to be the case in general - then this scheme is redundant. Now I see that the threads are getting their work items from a queue, so it is clear now.

Thanks for all the information,

Thomas



More information about the Python-Dev mailing list