bpo-28053: Complete and fix custom reducers in multiprocessing. by pablogsal · Pull Request #9959 · python/cpython (original) (raw)
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Conversation91 Commits13 Checks0 Files changed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
[ Show hidden characters]({{ revealButtonHref }})
This PR tries to complete and fix the implementation of the custom reducer classes in multiprocessing
.
Important
I have marked the PR as DO-NOT-MERGE
because I have still several doubts about the previous implemented API, regarding the AbstractReducer
base class and the methods that the user needs to implement and how the rest of the library interacts with multiprocessing.reducer
. For example:
- I am not sure
multiprocessing.reducer.dumps
andmultiprocessing.reducer.register
are needed outside theForklingPickler
class and how that interacts with the ABC. - I am not sure the
AbstractReducer
is implemented completely (there is no abstract methods marked).
This PR is a draft implementation of the complete API, tests and documentation so we can discuss how to implement these correctly in a better way.
https://bugs.python.org/issue28053
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few minor typos I found while reading the PR.
Edit : Sorry, just read at the end that it's a draft implementation. Feel free to ignore them if needed.
1st1 removed their request for review
@pitrou Thank you very much for the review!
I have simplified the API. Now setting a custom reducer looks like this:
import multiprocessing from multiprocessing.reduction import AbstractReducer, ForkingPickler
class ForkingPicklerProtocol2(ForkingPickler):
@classmethod
def dumps(cls, obj, pickle_protocol=2):
return super().dumps(obj, protocol=pickle_protocol)
class PickleProtocol2Reducer(AbstractReducer):
def get_pickler_class(self):
return ForkingPicklerProtocol2
multiprocessing.set_reducer(PickleProtocol2Reducer)
I am making the interface a bit more strict, so multiprocessing.set_reducer()
must be called with a subclass of AbstractReducer
and get_pickler_class
must return a subclass of pickler.Pickle
. This way, the constructor and the rest of the methods needed for the multiprocessing
reduction machinery will be there. I have added some new test that check this behaviour.
@pitrou It took me a while but I have stabilized all tests and fixed some details on Windows. I have also added Listener and Client to the context so they also can benefit from custom reducers. Please, check my previous comment regarding some details.
This patch is already very big and very very complex and when errors happen they are extremely obscure or platform dependent, so I apologize in advance if I miss something obvious, but I have too many spinning plates.
Could you take another look?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. It seems there are test failures on all 3 CI platforms...
Defaults to :meth:`pickle.Pickle.dump` |
.. classmethod:: loads(bytes_object, *, fix_imports=True, encoding="ASCII", errors="strict") |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the optional arguments required? Does multiprocessing
ever pass them explicitly?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the method / classmethod asymmetry is weird and doesn't help designing an implementation. Do you think that can be fixed (one way or the other)?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I find this very weird as well. We can make the load()
be a method but that would require instantiating a Pickler()
object for no reason (AbstractPickler
must inherit from Pickler to make the dump work correctly). It will help with the asymmetry, though.
What do you think?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Notice that to instantiate the Pickler class we need to provide a dummy file-like object (probably a StringIO instance). I find that suboptimal as well.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other possibility is making dump
a method. In that case, we would need to create a Pickler
instance and copy and update the dispatch table over it every time is called.
.. method:: get_pickler_class(): |
This method must return an subclass of :class:`pickler.Pickler` to be used by |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not make it very clear the relation ship between pickler.Pickler
and AbstractPickler
.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should be AbstractPickler
@@ -1187,6 +1187,81 @@ For example: |
---|
the data in the pipe is likely to become corrupted, because it may become |
impossible to be sure where the message boundaries lie. |
Custom Reduction |
~~~~~~~~~~~~~~~~ |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll need some versionadded
directive at some point.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add this? Technically this PR is fixing the previous implementation, although as the old one was broken, one could argue that we are adding the feature.
@@ -51,14 +51,35 @@ def dumps(cls, obj, protocol=None): |
---|
cls(buf, protocol).dump(obj) |
return buf.getbuffer() |
loads = pickle.loads |
@classmethod |
def loads(cls, bytes_object, *, fix_imports=True, |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uh... I hadn't noticed these were class methods...
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that loads do not need to instantiate a Pickler class so it was designed here as a class method.
Would you prefer it to be a regular method that does the same (defers the call to pickle.loads)?
def loads(s, *, fix_imports=True, encoding="ASCII", errors="strict"): |
return ForkingPickler.loads(s, fix_imports=fix_imports, |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, I see that sharedctypes
is still using _ForkingPickler
directly. Should it be fixed as well?
@classmethod |
def _put_and_get_in_queue(cls, queue, parent_can_continue): |
parent_can_continue.set() |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to re-use the event afterwards, you have to clear it at some point. But I'm not sure all this synchronization is necessary (the queue already synchronizes for you).
p = self.custom_ctx.Process(target=self._put_and_get_in_queue, |
---|
args=(queue, parent_can_continue)) |
p.start() |
parent_can_continue.wait() |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't need this wait, AFAICT.
parent_can_continue.set() |
---|
queue.put("Something") |
queue.get(timeout=TIMEOUT2) |
close_queue(queue) |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it useful to close the queue explicitly?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, there were a bunch of race conditions in the CI related to this process not finishing. It seems is related to the queue thread doing something. I did not dig deeper but as soon as I added the extra synchronization, the failures went away.
I will increase the timeouts and try to remove this to see what happens.
element = queue.get(timeout=TIMEOUT3) |
---|
self.assertEqual(element, "Something") |
queue.put("Other_Something") |
parent_can_continue.wait() |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure you need this, either (and I don't think you have to close the queue, unless it helps test something?).
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the previous comment.
I will try to increase the timeouts and remove the event to see if the failures do not appear.
@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") |
def test_queue_custom_reducer_over_default_context(self): |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comments as above about events and queues.
@pitrou There is some failures in Windows that I am investigating but I found a problem.
In multiprocessing/context.py there is no way of passing down the current context to the Process class. The Process
class is an attribute of DefaultContext
. If we change that to a function that injects the context down, any class that is inheriting from multiprocessing.Process
will be broken because now it would not be a class (is a function).
This is the only exception as any other class is defined in BaseContext
as a method that injects the context down, but not the process. Passing the context to the process is necessary because it calls dump (check for example multiprocessing/popen_spawn_win32.Popen.init).
I don't know how to solve this, but basically tests like test_queue_custom_reducer_over_custom_context
are not possible as there is no way for the call to Process
to pass down the context unless you do something ugly as:
self.custom_ctx.Process(..., ctx=self.custom_ctx)
Trying to do something like:
class DefaultContext(BaseContext):
@property
def Process(self):
class _Process(Process):
_ctx = self.get_context()
return _Process
fails because _Process
will not be picklable downstream. So I am out of ideas.
As you are more used to the architecture of the multiprocessing module, do you see a way of solving this?
If you don't see a way, I'm afraid that custom reducers per context cannot be implemented because of the way multiprocessing is architected.
@pablogsal thanks a lot for putting this PR together, this is great work. This feature is very promising.
Regarding the issue concerning the mp.Process
class, it is definitely tricky. I think we should leverage the fact that we only want this context injection into the process class when the user explicitly asks for custom reduction behavior.
As a result, we can make Process
a property
of Context
objects, and make the property
return a different thing depending on if the user asked for custom reduction, which is a new feature (no backward compat issue), or not.
See proof of concept below
@property
def Process(self):
if not self._custom_reduction_enabled:
# Ensure backward compatibility by returning a class when no
# custom reducer was specified
return _Process # ForkProcess for ForkContext, Process for BaseContext etc.
else:
return self.process_factory
def process_factory(self, *args, **kwargs):
p = Process(*args, **kwargs)
p._ctx = self.get_context()
return p
More complete implementation here:
https://github.com/pablogsal/cpython/compare/bpo28053...pierreglaser:inject-custom-context-into-process-cls?expand=1
Gist showing a usage example and its behavior:
https://gist.github.com/pierreglaser/ed4f9f9e784a3571cfdc8b969d32085f
What do you think?
EDIT: Another question is whether or not we consider that Context
custom classes are part of the public API. The only place I think where Context
classes are mentioned in the python
docs is there:
Alternatively, you can use :func:`get_context` to obtain a context |
---|
object. Context objects have the same API as the multiprocessing |
module, and allow one to use multiple start methods in the same |
If we want to enable set_reducer
with custom Context
classes that have a Process
attribute, most probably pointing to a Process
class, then my above snippet will break -- how to handle this case is still an open question.
Another topic that appears many times in this PR is the strange AbstractPickler.loads
classmethod
and the symmetry between the load
and dump
for the Pickler
subclasses returned by get_pickler_class
.
A good way IMO to re-establish the symmetry here would be to use actual Unpickler
classes to load
pickle strings, instead of using smoke load
methods of Pickler
subclasses.
Thus, we could add an optional get_unpickler_class
to the AbstractReducer
API. The returned class would have to implement a load
instance method. This way, the symmetry is re-established, and we do not create un-necessary Pickler
instances at load time.
Reviewers
matrixise matrixise left review comments
pitrou pitrou requested changes
tirkarthi tirkarthi left review comments
rhettinger Awaiting requested review from rhettinger
vstinner Awaiting requested review from vstinner