Skip to content

Commit

Permalink
Use pickle by default for serialization (ray-project#5978)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz authored Nov 11, 2019
1 parent 01aee8d commit decaa65
Show file tree
Hide file tree
Showing 10 changed files with 698 additions and 597 deletions.
13 changes: 13 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ if [ -z "$SKIP_PYARROW_INSTALL" ]; then
--find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/3a11193d9530fe8ec7fdb98057f853b708f6f6ae/index.html
fi

PYTHON_VERSION=`"$PYTHON_EXECUTABLE" -c 'import sys; version=sys.version_info[:3]; print("{0}.{1}".format(*version))'`
if [[ "$PYTHON_VERSION" == "3.6" || "$PYTHON_VERSION" == "3.7" ]]; then
WORK_DIR=`mktemp -d`
pushd $WORK_DIR
git clone https://github.com/pitrou/pickle5-backport
pushd pickle5-backport
git checkout 5186f9ca4ce55ae530027db196da51e08208a16b
"$PYTHON_EXECUTABLE" setup.py bdist_wheel
unzip -o dist/*.whl -d "$ROOT_DIR/python/ray/pickle5_files"
popd
popd
fi

export PYTHON3_BIN_PATH="$PYTHON_EXECUTABLE"
export PYTHON2_BIN_PATH="$PYTHON_EXECUTABLE"

Expand Down
16 changes: 15 additions & 1 deletion python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,26 @@
import os
import sys

# MUST add pickle5 to the import path because it will be imported by some
# raylet modules.

if "pickle5" in sys.modules:
raise ImportError("Ray must be imported before pickle5 because Ray "
"requires a specific version of pickle5 (which is "
"packaged along with Ray).")

# Add the directory containing pickle5 to the Python path so that we find the
# pickle5 version packaged with ray and not a pre-existing pickle5.
pickle5_path = os.path.join(
os.path.abspath(os.path.dirname(__file__)), "pickle5_files")
sys.path.insert(0, pickle5_path)

# MUST import ray._raylet before pyarrow to initialize some global variables.
# It seems the library related to memory allocation in pyarrow will destroy the
# initialization of grpc if we import pyarrow at first.
# NOTE(JoeyJiang): See https://github.com/ray-project/ray/issues/5219 for more
# details.
import ray._raylet
import ray._raylet # noqa: E402

if "pyarrow" in sys.modules:
raise ImportError("Ray must be imported before pyarrow because Ray "
Expand Down
133 changes: 38 additions & 95 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,34 @@ cdef void exit_handler() nogil:
sys.exit(0)


cdef shared_ptr[CBuffer] string_to_buffer(c_string& c_str):
cdef shared_ptr[CBuffer] empty_metadata
if c_str.size() == 0:
return empty_metadata
return dynamic_pointer_cast[CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](<uint8_t*>(c_str.data()),
c_str.size(), True))


cdef write_serialized_object(serialized_object, const shared_ptr[CBuffer]& buf):
# avoid initializing pyarrow before raylet
from ray.serialization import Pickle5SerializedObject, RawSerializedObject

if isinstance(serialized_object, RawSerializedObject):
buffer = Buffer.make(buf)
stream = pyarrow.FixedSizeBufferWriter(pyarrow.py_buffer(buffer))
stream.set_memcopy_threads(MEMCOPY_THREADS)
stream.write(pyarrow.py_buffer(serialized_object.value))
elif isinstance(serialized_object, Pickle5SerializedObject):
(<Pickle5Writer>serialized_object.writer).write_to(
serialized_object.inband, buf, MEMCOPY_THREADS)
else:
buffer = Buffer.make(buf)
stream = pyarrow.FixedSizeBufferWriter(pyarrow.py_buffer(buffer))
stream.set_memcopy_threads(MEMCOPY_THREADS)
serialized_object.serialized_object.write_to(stream)


cdef class CoreWorker:
cdef unique_ptr[CCoreWorker] core_worker

Expand Down Expand Up @@ -753,66 +781,15 @@ cdef class CoreWorker:
CObjectID c_object_id
shared_ptr[CBuffer] data
shared_ptr[CBuffer] metadata

object_already_exists = self._create_put_buffer(
metadata, serialized_object.total_bytes,
object_id, &c_object_id, &data)
if not object_already_exists:
stream = pyarrow.FixedSizeBufferWriter(
pyarrow.py_buffer(Buffer.make(data)))
stream.set_memcopy_threads(MEMCOPY_THREADS)
serialized_object.write_to(stream)

with nogil:
check_status(
self.core_worker.get().Seal(c_object_id))

return ObjectID(c_object_id.Binary())

def put_raw_buffer(self, c_string value, ObjectID object_id=None):
cdef:
c_string metadata_str = RAW_BUFFER_METADATA
CObjectID c_object_id
shared_ptr[CBuffer] data
shared_ptr[CBuffer] metadata = dynamic_pointer_cast[
CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](
<uint8_t*>(metadata_str.data()), metadata_str.size()))

metadata = string_to_buffer(serialized_object.metadata)
total_bytes = serialized_object.total_bytes
object_already_exists = self._create_put_buffer(
metadata, value.size(), object_id, &c_object_id, &data)
metadata, total_bytes, object_id, &c_object_id, &data)
if not object_already_exists:
stream = pyarrow.FixedSizeBufferWriter(
pyarrow.py_buffer(Buffer.make(data)))
stream.set_memcopy_threads(MEMCOPY_THREADS)
stream.write(pyarrow.py_buffer(value))

write_serialized_object(serialized_object, data)
with nogil:
check_status(
self.core_worker.get().Seal(c_object_id))

return ObjectID(c_object_id.Binary())

def put_pickle5_buffers(self, c_string inband,
Pickle5Writer writer, ObjectID object_id=None):
cdef:
CObjectID c_object_id
c_string metadata_str = PICKLE5_BUFFER_METADATA
shared_ptr[CBuffer] data
shared_ptr[CBuffer] metadata = dynamic_pointer_cast[
CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](
<uint8_t*>(metadata_str.data()), metadata_str.size()))

object_already_exists = self._create_put_buffer(
metadata, writer.get_total_bytes(inband),
object_id, &c_object_id, &data)
if not object_already_exists:
writer.write_to(inband, data, MEMCOPY_THREADS)
with nogil:
check_status(
self.core_worker.get().Seal(c_object_id))

return ObjectID(c_object_id.Binary())

def wait(self, object_ids, int num_returns, int64_t timeout_ms,
Expand Down Expand Up @@ -1021,7 +998,6 @@ cdef class CoreWorker:
cdef:
c_vector[size_t] data_sizes
c_string metadata_str
shared_ptr[CBuffer] empty_metadata
c_vector[shared_ptr[CBuffer]] metadatas

if return_ids.size() == 0:
Expand All @@ -1036,31 +1012,13 @@ cdef class CoreWorker:
elif output is NoReturn:
serialized_objects.append(output)
data_sizes.push_back(0)
metadatas.push_back(empty_metadata)
elif isinstance(output, bytes):
serialized_objects.append(output)
data_sizes.push_back(len(output))
metadata_str = RAW_BUFFER_METADATA
metadatas.push_back(dynamic_pointer_cast[
CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](
<uint8_t*>(metadata_str.data()),
metadata_str.size(), True)))
elif worker.use_pickle:
inband, writer = worker._serialize_with_pickle5(output)
serialized_objects.append((inband, writer))
data_sizes.push_back(writer.get_total_bytes(inband))
metadata_str = PICKLE5_BUFFER_METADATA
metadatas.push_back(dynamic_pointer_cast[
CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](
<uint8_t*>(metadata_str.data()),
metadata_str.size(), True)))
metadatas.push_back(string_to_buffer(b''))
else:
serialized_object = worker._serialize_with_pyarrow(output)
serialized_objects.append(serialized_object)
context = worker.get_serialization_context()
serialized_object = context.serialize(output)
data_sizes.push_back(serialized_object.total_bytes)
metadatas.push_back(empty_metadata)
metadatas.push_back(string_to_buffer(serialized_object.metadata))
serialized_objects.append(serialized_object)

check_status(self.core_worker.get().AllocateReturnObjects(
return_ids, data_sizes, metadatas, returns))
Expand All @@ -1069,22 +1027,7 @@ cdef class CoreWorker:
# A nullptr is returned if the object already exists.
if returns[0][i].get() == NULL:
continue

if serialized_object is NoReturn:
returns[0][i].reset()
elif isinstance(serialized_object, bytes):
buffer = Buffer.make(returns[0][i].get().GetData())
stream = pyarrow.FixedSizeBufferWriter(
pyarrow.py_buffer(buffer))
stream.set_memcopy_threads(MEMCOPY_THREADS)
stream.write(pyarrow.py_buffer(serialized_object))
elif worker.use_pickle:
inband, writer = serialized_object
(<Pickle5Writer>writer).write_to(
inband, returns[0][i].get().GetData(), MEMCOPY_THREADS)
else:
buffer = Buffer.make(returns[0][i].get().GetData())
stream = pyarrow.FixedSizeBufferWriter(
pyarrow.py_buffer(buffer))
stream.set_memcopy_threads(MEMCOPY_THREADS)
serialized_object.write_to(stream)
write_serialized_object(serialized_object, returns[0][i].get().GetData())
10 changes: 9 additions & 1 deletion python/ray/cloudpickle/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from __future__ import absolute_import
import os
import sys

if sys.version_info[:2] >= (3, 8):
CLOUDPICKLE_PATH = os.path.dirname(os.path.realpath(__file__))

if os.path.exists(os.path.join(CLOUDPICKLE_PATH, "..", "pickle5_files", "pickle5")):
HAS_PICKLE5 = True
else:
HAS_PICKLE5 = False

if sys.version_info[:2] >= (3, 8) or HAS_PICKLE5:
from ray.cloudpickle.cloudpickle_fast import *
FAST_CLOUDPICKLE_USED = True
else:
Expand Down
Loading

0 comments on commit decaa65

Please sign in to comment.