Skip to content

Commit

Permalink
Remove use of ObjectID transport flag (ray-project#7699)
Browse files Browse the repository at this point in the history
  • Loading branch information
edoakes authored May 17, 2020
1 parent acffdb2 commit 16f4807
Show file tree
Hide file tree
Showing 40 changed files with 135 additions and 384 deletions.
3 changes: 1 addition & 2 deletions cpp/src/ray/runtime/abstract_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ void AbstractRayRuntime::Put(std::shared_ptr<msgpack::sbuffer> data,

ObjectID AbstractRayRuntime::Put(std::shared_ptr<msgpack::sbuffer> data) {
ObjectID object_id =
ObjectID::ForPut(worker_->GetCurrentTaskID(), worker_->GetNextPutIndex(),
static_cast<uint8_t>(TaskTransportType::RAYLET));
ObjectID::ForPut(worker_->GetCurrentTaskID(), worker_->GetNextPutIndex());
Put(data, object_id);
return object_id;
}
Expand Down
7 changes: 3 additions & 4 deletions cpp/src/ray/runtime/task/local_mode_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy
} else if (type == TaskType::ACTOR_TASK) {
const TaskID actor_creation_task_id =
TaskID::ForActorCreationTask(invocation.actor_id);
const ObjectID actor_creation_dummy_object_id = ObjectID::ForTaskReturn(
actor_creation_task_id, 1, static_cast<int>(ray::TaskTransportType::RAYLET));
const ObjectID actor_creation_dummy_object_id =
ObjectID::ForTaskReturn(actor_creation_task_id, 1);
builder.SetActorTaskSpec(invocation.actor_id, actor_creation_dummy_object_id,
ObjectID(), invocation.actor_counter);
} else {
Expand All @@ -52,8 +52,7 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy
/// TODO(Guyang Song): Use both 'AddByRefArg' and 'AddByValueArg' to distinguish
builder.AddByValueArg(::ray::RayObject(buffer, nullptr, std::vector<ObjectID>()));
auto task_specification = builder.Build();
ObjectID return_object_id =
task_specification.ReturnId(0, ray::TaskTransportType::RAYLET);
ObjectID return_object_id = task_specification.ReturnId(0);

std::shared_ptr<msgpack::sbuffer> actor;
std::shared_ptr<absl::Mutex> mutex;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/ray/runtime/task/task_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void TaskExecutor::Invoke(const TaskSpecification &task_spec,
data = (*exec_function)(dynamic_library_base_addr,
std::stoul(typed_descriptor->FunctionOffset()), args);
}
runtime->Put(std::move(data), task_spec.ReturnId(0, ray::TaskTransportType::RAYLET));
runtime->Put(std::move(data), task_spec.ReturnId(0));
}
} // namespace api
} // namespace ray
} // namespace ray
55 changes: 0 additions & 55 deletions java/test/src/main/java/io/ray/api/test/ClientExceptionTest.java

This file was deleted.

9 changes: 6 additions & 3 deletions java/test/src/main/java/io/ray/api/test/RayCallTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,12 @@ public void testType() {
TestUtils.LargeObject largeObject = new TestUtils.LargeObject();
Assert.assertNotNull(Ray.call(RayCallTest::testLargeObject, largeObject).get());

ObjectId randomObjectId = ObjectId.fromRandom();
Ray.call(RayCallTest::testNoReturn, randomObjectId);
Assert.assertEquals(((int) Ray.get(randomObjectId, Integer.class)), 1);
// TODO(edoakes): this test doesn't work now that we've switched to direct call
// mode. To make it work, we need to implement the same protocol for resolving
// passed ObjectIDs that we have in Python.
// ObjectId randomObjectId = ObjectId.fromRandom();
// Ray.call(RayCallTest::testNoReturn, randomObjectId);
// Assert.assertEquals(((int) Ray.get(randomObjectId, Integer.class)), 1);
}

private static int testNoParam() {
Expand Down
12 changes: 4 additions & 8 deletions python/ray/async_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,10 @@ def done_callback(future):
# A hack to keep reference to the future so it doesn't get GC.
user_future.retry_plasma_future = retry_plasma_future

if object_id.is_direct_call_type():
inner_future = loop.create_future()
# We must add the done_callback before sending to in_memory_store_get
inner_future.add_done_callback(done_callback)
core_worker.in_memory_store_get_async(object_id, inner_future)
else:
inner_future = as_future(object_id)
inner_future.add_done_callback(done_callback)
inner_future = loop.create_future()
# We must add the done_callback before sending to in_memory_store_get
inner_future.add_done_callback(done_callback)
core_worker.in_memory_store_get_async(object_id, inner_future)
# A hack to keep reference to inner_future so it doesn't get GC.
user_future.inner_future = inner_future
# A hack to keep a reference to the object ID for ref counting.
Expand Down
1 change: 0 additions & 1 deletion python/ray/dashboard/client/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ export type RayletInfoResponse = {
children: RayletInfoResponse["actors"];
// currentTaskFuncDesc: string[];
ipAddress: string;
isDirectCall: boolean;
jobId: string;
nodeId: string;
numExecutedTasks: number;
Expand Down
3 changes: 0 additions & 3 deletions python/ray/dashboard/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,6 @@ def __init__(self, redis_address, redis_password=None):
"children": {},
"currentTaskFuncDesc": [],
"ipAddress": "",
"isDirectCall": False,
"jobId": "",
"numExecutedTasks": 0,
"numLocalObjects": 0,
Expand Down Expand Up @@ -782,7 +781,6 @@ def run(self):
self._addr_to_extra_info_dict[addr] = {
"jobId": actor_data["JobID"],
"state": actor_data["State"],
"isDirectCall": actor_data["IsDirectCall"],
"timestamp": actor_data["Timestamp"]
}

Expand Down Expand Up @@ -826,7 +824,6 @@ def run(self):
"jobId": ray.utils.binary_to_hex(
actor_data.job_id),
"state": actor_data.state,
"isDirectCall": True,
"timestamp": actor_data.timestamp
}
else:
Expand Down
1 change: 0 additions & 1 deletion python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CJobID CreationJobID() const
CLanguage ActorLanguage() const
CFunctionDescriptor ActorCreationTaskFunctionDescriptor() const
c_bool IsDirectCallActor() const
c_string ExtensionData() const

cdef cppclass CCoreWorker "ray::CoreWorker":
Expand Down
6 changes: 0 additions & 6 deletions python/ray/includes/unique_ids.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,6 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:

c_bool is_put()

c_bool IsDirectCallType()

CObjectID WithPlasmaTransportType()

CObjectID WithDirectTransportType()

int64_t ObjectIndex() const

CTaskID TaskId() const
Expand Down
5 changes: 1 addition & 4 deletions python/ray/includes/unique_ids.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,6 @@ cdef class ObjectID(BaseID):
def hex(self):
return decode(self.data.Hex())

def is_direct_call_type(self):
return self.data.IsDirectCallType()

def is_nil(self):
return self.data.IsNil()

Expand All @@ -186,7 +183,7 @@ cdef class ObjectID(BaseID):

@classmethod
def from_random(cls):
return cls(CObjectID.FromRandom().WithDirectTransportType().Binary())
return cls(CObjectID.FromRandom().Binary())

def __await__(self):
# Delayed import because this can only be imported in py3.
Expand Down
11 changes: 4 additions & 7 deletions python/ray/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,10 @@ def object_id_serializer(obj):
self.add_contained_object_id(obj)
owner_id = ""
owner_address = ""
# TODO(swang): Remove this check. Otherwise, we will not be able to
# handle serialized plasma IDs correctly.
if obj.is_direct_call_type():
worker = ray.worker.global_worker
worker.check_connected()
obj, owner_id, owner_address = (
worker.core_worker.serialize_and_promote_object_id(obj))
worker = ray.worker.global_worker
worker.check_connected()
obj, owner_id, owner_address = (
worker.core_worker.serialize_and_promote_object_id(obj))
obj = id_serializer(obj)
owner_id = id_serializer(owner_id) if owner_id else owner_id
return (obj, owner_id, owner_address)
Expand Down
1 change: 0 additions & 1 deletion python/ray/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,6 @@ def _actor_table(self, actor_id):
"IPAddress": actor_table_data.owner_address.ip_address,
"Port": actor_table_data.owner_address.port
},
"IsDirectCall": True,
"State": actor_table_data.state,
"Timestamp": actor_table_data.timestamp,
}
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def expect_exception(objects, exception):
ray.get(signal1.send.remote())

signal2 = SignalActor.remote()
actor = Actor.options(is_direct_call=True, max_concurrency=2).remote()
actor = Actor.options(max_concurrency=2).remote()
expect_exception(
[actor.bad_func2.remote(),
actor.slow_func.remote(signal2)], ray.exceptions.RayActorError)
Expand Down
17 changes: 10 additions & 7 deletions python/ray/tests/test_reference_counting.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@

@pytest.fixture
def one_worker_100MiB(request):
yield ray.init(num_cpus=1, object_store_memory=100 * 1024 * 1024)
config = json.dumps({
"object_store_full_max_retries": 2,
"task_retry_delay_ms": 0,
})
yield ray.init(
num_cpus=1,
object_store_memory=100 * 1024 * 1024,
_internal_config=config)
ray.shutdown()


Expand All @@ -33,12 +40,8 @@ def _fill_object_store_and_get(oid, succeed=True, object_MiB=40,
if succeed:
ray.get(oid)
else:
if oid.is_direct_call_type():
with pytest.raises(ray.exceptions.RayTimeoutError):
ray.get(oid, timeout=0.1)
else:
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(oid)
with pytest.raises(ray.exceptions.RayTimeoutError):
ray.get(oid, timeout=0.1)


def _check_refcounts(expected):
Expand Down
9 changes: 2 additions & 7 deletions python/ray/tests/test_reference_counting_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
@pytest.fixture
def one_worker_100MiB(request):
config = json.dumps({
"distributed_ref_counting_enabled": 1,
"object_store_full_max_retries": 2,
"task_retry_delay_ms": 0,
})
Expand All @@ -40,12 +39,8 @@ def _fill_object_store_and_get(oid, succeed=True, object_MiB=40,
if succeed:
ray.get(oid)
else:
if oid.is_direct_call_type():
with pytest.raises(ray.exceptions.RayTimeoutError):
ray.get(oid, timeout=0.1)
else:
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(oid)
with pytest.raises(ray.exceptions.RayTimeoutError):
ray.get(oid, timeout=0.1)


# Test that an object containing object IDs within it pins the inner IDs
Expand Down
1 change: 1 addition & 0 deletions python/ray/tune/tune.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def run(run_or_experiment,
space = {"lr": tune.uniform(0, 1), "momentum": tune.uniform(0, 1)}
tune.run(my_trainable, config=space, stop={"training_iteration": 10})
"""
pass # XXX: force CI
trial_executor = trial_executor or RayTrialExecutor(
queue_trials=queue_trials,
reuse_actors=reuse_actors,
Expand Down
1 change: 1 addition & 0 deletions rllib/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def create_parser(parser_creator=None):


def run(args, parser):
pass # XXX: force CI
if args.config_file:
with open(args.config_file) as f:
experiments = yaml.safe_load(f)
Expand Down
Loading

0 comments on commit 16f4807

Please sign in to comment.