[SPARK-21070][PYSPARK] Attempt to update cloudpickle again by holdenk · Pull Request #18734 · apache/spark (original) (raw)

I am merging this because:

cloudpickle looks initially ported from cloudpipe/cloudpickle@7aebb7e and cloudpipe/cloudpickle@c4f8851 (-> 04e44b3), where I see both are identical.

After 04e44b3, we have diff - e044705, 5520418, ee913e6, d489354, dbfc7aa, 20e6280 and 6297697

[SPARK-9116] [SQL] [PYSPARK] support Python only UDT in main, e044705: I think this part is only what we are worried of. It looks supporting classmethod, staticmethod and property. We have a test:

class PythonOnlyUDT(UserDefinedType):
"""
User-defined type (UDT) for ExamplePoint.
"""
@classmethod
def sqlType(self):
return ArrayType(DoubleType(), False)
@classmethod
def module(cls):
return '__main__'
def serialize(self, obj):
return [obj.x, obj.y]
def deserialize(self, datum):
return PythonOnlyPoint(datum[0], datum[1])
@staticmethod
def foo():
pass
@property
def props(self):
return {}
class PythonOnlyPoint(ExamplePoint):
"""
An example class to demonstrate UDT in only Python
"""
__UDT__ = PythonOnlyUDT()
def test_udt(self):
from pyspark.sql.types import _parse_datatype_json_string, _infer_type, _make_type_verifier
from pyspark.sql.tests import ExamplePointUDT, ExamplePoint
def check_datatype(datatype):
pickled = pickle.loads(pickle.dumps(datatype))
assert datatype == pickled
scala_datatype = self.spark._jsparkSession.parseDataType(datatype.json())
python_datatype = _parse_datatype_json_string(scala_datatype.json())
assert datatype == python_datatype
check_datatype(ExamplePointUDT())
structtype_with_udt = StructType([StructField("label", DoubleType(), False),
StructField("point", ExamplePointUDT(), False)])
check_datatype(structtype_with_udt)
p = ExamplePoint(1.0, 2.0)
self.assertEqual(_infer_type(p), ExamplePointUDT())
_make_type_verifier(ExamplePointUDT())(ExamplePoint(1.0, 2.0))
self.assertRaises(ValueError, lambda: _make_type_verifier(ExamplePointUDT())([1.0, 2.0]))
check_datatype(PythonOnlyUDT())
structtype_with_udt = StructType([StructField("label", DoubleType(), False),
StructField("point", PythonOnlyUDT(), False)])
check_datatype(structtype_with_udt)
p = PythonOnlyPoint(1.0, 2.0)
self.assertEqual(_infer_type(p), PythonOnlyUDT())
_make_type_verifier(PythonOnlyUDT())(PythonOnlyPoint(1.0, 2.0))
self.assertRaises(
ValueError,
lambda: _make_type_verifier(PythonOnlyUDT())([1.0, 2.0]))

[SPARK-10542] [PYSPARK] fix serialize namedtuple, 5520418: We keep the changes:

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L1090-L1095

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L433-L436

and the related test pass:

P = namedtuple("P", "x y")
p1 = P(1, 3)
p2 = loads(dumps(p1, 2))
self.assertEqual(p1, p2)
from pyspark.cloudpickle import dumps
P2 = loads(dumps(P))
p3 = P2(1, 3)
self.assertEqual(p1, p3)

[SPARK-13697] [PYSPARK] Fix the missing module name of TransformFunctionSerializer.loads, ee913e6: We keep this change:

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L528

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L1022-L1029

and the related test pass:

def test_function_module_name(self):
ser = CloudPickleSerializer()
func = lambda x: x
func2 = ser.loads(ser.dumps(func))
self.assertEqual(func.__module__, func2.__module__)

We should probably port this one to cloudpipe/cloudpickle.

[SPARK-16077] [PYSPARK] catch the exception from pickle.whichmodule(), d489354: We keep this change:

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L325-L330

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L620-L625

This patch even should be safer as I and @rgbkrk verified this with some tests:

cloudpipe/cloudpickle#112

[SPARK-17472] [PYSPARK] Better error message for serialization failures of large objects in Python, dbfc7aa: We keep this change:

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L240-L249

Probably, we should port this change into cloudpipe/cloudpickle.

[SPARK-19019] [PYTHON] Fix hijacked collections.namedtuple and port cloudpickle changes for PySpark to work with Python 3.6.0, 20e6280

This change was ported from cloudpipe/cloudpickle. I tested our PySpark tests pass with Python 3.6.0 in my local manually - #18734 (comment)

[SPARK-19505][PYTHON] AttributeError on Exception.message in Python3, 6297697: We keep this change:

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L240-L249