Issue 36512: future_factory argument for Thread/ProcessPoolExecutor (original) (raw)

It would allow to use Futures with a customized interface for a specific domain. e.g. to not only save the result of a task but also some context informations or provide properties/methods which are result specific.

But when subclassing Future the builtin Thread/ProcessExecutor cannot be reused anymore, because the returned class on submit cannot be customized to be the subclassed Future.

With my change it would be possible to customize the Future object returned by the Executor.

As it is now the Future class has to be wrapped and the Executor subclassed to return a wrapped Future object. The Future object cannot be extended without completely wrapping it.

This change would make the Executor class more versatile.

It would allow something like this:

class CustomExecutor: ...

custom_executor = CustomExecutor() custom_future = custom_executor.submit(workload, context=context_information) ... assert custom_future.context_has_some_property() assert custom_future.result_has_some_property()

factories are also used in other places in standard library: logging.setLogFactory asyncio.loop.set_task_factory

Sorry, but in the provided snippet I see a class with an interface which looks close to executor.

There is no clear relationship with standard executors (e.g. is it inherited from ThreadExecutor?). Proposed future_factory is also absent.

Please, provide a real example if you want to convince us that the requested feature is important and desired.

Here is a complete example. Please consider this is a very simple example just to demonstrate what would be possible with my proposed changes.

In the following example you can see two things:

from concurrent.futures import Future, ThreadPoolExecutor

def workload(n): # does some heavy calculations return n

class Context: def init(self, name): self.name = name

class CustomFactory(Future): def init(self): super().init() self.context = None

def is_from_context(self, ctx):
    return self.context.name == ctx

def processed_result(self):
    # processes the result
    return self.result()

class ContextExecutor(ThreadPoolExecutor): def init(self): super().init(future_factory=CustomFactory)

def submit(self, workload, arg, context):
    future = super().submit(workload, arg)
    future.context = context
    return future

context_executor = ContextExecutor() futures = [ context_executor.submit(workload, 0, context=Context("A")), context_executor.submit(workload, 1, context=Context("B")), context_executor.submit(workload, 2, context=Context("A")), ]

futures_from_context_a = [f for f in futures if f.is_from_context("A")] if all(f.done() for f in futures_from_context_a): print("All Futures in context A are done") processed_results = [f.processed_result() for f in futures_from_context_a] print("Results:", processed_results)