bpo-28053: Complete and fix custom reducers in multiprocessing. by pablogsal · Pull Request #9959 · python/cpython (original) (raw)

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Conversation91 Commits13 Checks0 Files changed

Conversation

This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters

[ Show hidden characters]({{ revealButtonHref }})

pablogsal

This PR tries to complete and fix the implementation of the custom reducer classes in multiprocessing.

Important

I have marked the PR as DO-NOT-MERGE because I have still several doubts about the previous implemented API, regarding the AbstractReducer base class and the methods that the user needs to implement and how the rest of the library interacts with multiprocessing.reducer. For example:

  1. I am not sure multiprocessing.reducer.dumps and multiprocessing.reducer.register are needed outside the ForklingPickler class and how that interacts with the ABC.
  2. I am not sure the AbstractReducer is implemented completely (there is no abstract methods marked).

This PR is a draft implementation of the complete API, tests and documentation so we can discuss how to implement these correctly in a better way.

https://bugs.python.org/issue28053

tirkarthi

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few minor typos I found while reading the PR.

Edit : Sorry, just read at the end that it's a draft implementation. Feel free to ignore them if needed.

matrixise

pitrou

pitrou

pitrou

@pitrou

@1st1 1st1 removed their request for review

November 6, 2018 20:57

@pablogsal

@pitrou Thank you very much for the review!

I have simplified the API. Now setting a custom reducer looks like this:

import multiprocessing from multiprocessing.reduction import AbstractReducer, ForkingPickler

class ForkingPicklerProtocol2(ForkingPickler):
   @classmethod
   def dumps(cls, obj, pickle_protocol=2):
       return super().dumps(obj, protocol=pickle_protocol)

class PickleProtocol2Reducer(AbstractReducer):
   def get_pickler_class(self):
       return ForkingPicklerProtocol2

multiprocessing.set_reducer(PickleProtocol2Reducer)

I am making the interface a bit more strict, so multiprocessing.set_reducer() must be called with a subclass of AbstractReducer and get_pickler_class must return a subclass of pickler.Pickle. This way, the constructor and the rest of the methods needed for the multiprocessing reduction machinery will be there. I have added some new test that check this behaviour.

@pitrou

@pablogsal

@pitrou It took me a while but I have stabilized all tests and fixed some details on Windows. I have also added Listener and Client to the context so they also can benefit from custom reducers. Please, check my previous comment regarding some details.

This patch is already very big and very very complex and when errors happen they are extremely obscure or platform dependent, so I apologize in advance if I miss something obvious, but I have too many spinning plates.

Could you take another look?

@pablogsal

@pablogsal

pitrou

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. It seems there are test failures on all 3 CI platforms...

Defaults to :meth:`pickle.Pickle.dump`
.. classmethod:: loads(bytes_object, *, fix_imports=True, encoding="ASCII", errors="strict")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the optional arguments required? Does multiprocessing ever pass them explicitly?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the method / classmethod asymmetry is weird and doesn't help designing an implementation. Do you think that can be fixed (one way or the other)?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I find this very weird as well. We can make the load() be a method but that would require instantiating a Pickler() object for no reason (AbstractPickler must inherit from Pickler to make the dump work correctly). It will help with the asymmetry, though.

What do you think?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that to instantiate the Pickler class we need to provide a dummy file-like object (probably a StringIO instance). I find that suboptimal as well.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other possibility is making dump a method. In that case, we would need to create a Pickler instance and copy and update the dispatch table over it every time is called.

.. method:: get_pickler_class():
This method must return an subclass of :class:`pickler.Pickler` to be used by

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not make it very clear the relation ship between pickler.Pickler and AbstractPickler.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be AbstractPickler

@@ -1187,6 +1187,81 @@ For example:
the data in the pipe is likely to become corrupted, because it may become
impossible to be sure where the message boundaries lie.
Custom Reduction
~~~~~~~~~~~~~~~~

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need some versionadded directive at some point.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this? Technically this PR is fixing the previous implementation, although as the old one was broken, one could argue that we are adding the feature.

@@ -51,14 +51,35 @@ def dumps(cls, obj, protocol=None):
cls(buf, protocol).dump(obj)
return buf.getbuffer()
loads = pickle.loads
@classmethod
def loads(cls, bytes_object, *, fix_imports=True,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh... I hadn't noticed these were class methods...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that loads do not need to instantiate a Pickler class so it was designed here as a class method.

Would you prefer it to be a regular method that does the same (defers the call to pickle.loads)?

def loads(s, *, fix_imports=True, encoding="ASCII", errors="strict"):
return ForkingPickler.loads(s, fix_imports=fix_imports,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I see that sharedctypes is still using _ForkingPickler directly. Should it be fixed as well?

@classmethod
def _put_and_get_in_queue(cls, queue, parent_can_continue):
parent_can_continue.set()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to re-use the event afterwards, you have to clear it at some point. But I'm not sure all this synchronization is necessary (the queue already synchronizes for you).

p = self.custom_ctx.Process(target=self._put_and_get_in_queue,
args=(queue, parent_can_continue))
p.start()
parent_can_continue.wait()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need this wait, AFAICT.

parent_can_continue.set()
queue.put("Something")
queue.get(timeout=TIMEOUT2)
close_queue(queue)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it useful to close the queue explicitly?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, there were a bunch of race conditions in the CI related to this process not finishing. It seems is related to the queue thread doing something. I did not dig deeper but as soon as I added the extra synchronization, the failures went away.

I will increase the timeouts and try to remove this to see what happens.

element = queue.get(timeout=TIMEOUT3)
self.assertEqual(element, "Something")
queue.put("Other_Something")
parent_can_continue.wait()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure you need this, either (and I don't think you have to close the queue, unless it helps test something?).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the previous comment.

I will try to increase the timeouts and remove the event to see if the failures do not appear.

@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
def test_queue_custom_reducer_over_default_context(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments as above about events and queues.

@pablogsal

@pablogsal

@pitrou There is some failures in Windows that I am investigating but I found a problem.

In multiprocessing/context.py there is no way of passing down the current context to the Process class. The Process class is an attribute of DefaultContext. If we change that to a function that injects the context down, any class that is inheriting from multiprocessing.Process will be broken because now it would not be a class (is a function).

This is the only exception as any other class is defined in BaseContext as a method that injects the context down, but not the process. Passing the context to the process is necessary because it calls dump (check for example multiprocessing/popen_spawn_win32.Popen.init).

I don't know how to solve this, but basically tests like test_queue_custom_reducer_over_custom_context are not possible as there is no way for the call to Process to pass down the context unless you do something ugly as:

self.custom_ctx.Process(..., ctx=self.custom_ctx)

Trying to do something like:

class DefaultContext(BaseContext):
    @property
    def Process(self):
        class _Process(Process):
             _ctx = self.get_context()
        return _Process

fails because _Process will not be picklable downstream. So I am out of ideas.

As you are more used to the architecture of the multiprocessing module, do you see a way of solving this?

If you don't see a way, I'm afraid that custom reducers per context cannot be implemented because of the way multiprocessing is architected.

@pablogsal

@pierreglaser

@pablogsal thanks a lot for putting this PR together, this is great work. This feature is very promising.

Regarding the issue concerning the mp.Process class, it is definitely tricky. I think we should leverage the fact that we only want this context injection into the process class when the user explicitly asks for custom reduction behavior.
As a result, we can make Process a property of Context objects, and make the property return a different thing depending on if the user asked for custom reduction, which is a new feature (no backward compat issue), or not.

See proof of concept below

@property
def Process(self):
    if not self._custom_reduction_enabled:
        # Ensure backward compatibility by returning a class when no
        # custom reducer was specified
        return _Process  # ForkProcess for ForkContext, Process for BaseContext etc.
    else:
        return self.process_factory

def process_factory(self, *args, **kwargs):
    p = Process(*args, **kwargs)
    p._ctx = self.get_context()
    return p

More complete implementation here:
https://github.com/pablogsal/cpython/compare/bpo28053...pierreglaser:inject-custom-context-into-process-cls?expand=1

Gist showing a usage example and its behavior:
https://gist.github.com/pierreglaser/ed4f9f9e784a3571cfdc8b969d32085f

What do you think?

EDIT: Another question is whether or not we consider that Context custom classes are part of the public API. The only place I think where Context classes are mentioned in the python docs is there:

Alternatively, you can use :func:`get_context` to obtain a context
object. Context objects have the same API as the multiprocessing
module, and allow one to use multiple start methods in the same

If we want to enable set_reducer with custom Context classes that have a Process attribute, most probably pointing to a Process class, then my above snippet will break -- how to handle this case is still an open question.

@pierreglaser

Another topic that appears many times in this PR is the strange AbstractPickler.loads classmethod and the symmetry between the load and dump for the Pickler subclasses returned by get_pickler_class.

A good way IMO to re-establish the symmetry here would be to use actual Unpickler classes to load pickle strings, instead of using smoke load methods of Pickler subclasses.

Thus, we could add an optional get_unpickler_class to the AbstractReducer API. The returned class would have to implement a load instance method. This way, the symmetry is re-established, and we do not create un-necessary Pickler instances at load time.

Reviewers

@matrixise matrixise matrixise left review comments

@pitrou pitrou pitrou requested changes

@tirkarthi tirkarthi tirkarthi left review comments

@rhettinger rhettinger Awaiting requested review from rhettinger

@vstinner vstinner Awaiting requested review from vstinner