From 5a6801dde7cf016639ec1b06a8b26a69fae9e0c1 Mon Sep 17 00:00:00 2001 From: Kai Yang Date: Tue, 22 Dec 2020 00:01:27 +0800 Subject: [PATCH] [Core] Remove `delete_creating_tasks` (#12962) --- .../java/io/ray/api/runtime/RayRuntime.java | 3 +- .../io/ray/runtime/AbstractRayRuntime.java | 4 +- .../java/io/ray/runtime/gcs/GcsClient.java | 35 ---------- .../runtime/object/LocalModeObjectStore.java | 2 +- .../ray/runtime/object/NativeObjectStore.java | 7 +- .../io/ray/runtime/object/ObjectStore.java | 4 +- .../main/java/io/ray/runtime/util/IdUtil.java | 69 ------------------- .../java/io/ray/runtime/UniqueIdTest.java | 8 --- .../src/main/java/io/ray/test/ActorTest.java | 4 +- .../main/java/io/ray/test/PlasmaFreeTest.java | 19 +---- python/ray/_raylet.pyx | 5 +- python/ray/includes/libcoreworker.pxd | 2 +- python/ray/internal/internal_api.py | 7 +- src/ray/core_worker/core_worker.cc | 6 +- src/ray/core_worker/core_worker.h | 5 +- ...io_ray_runtime_object_NativeObjectStore.cc | 7 +- .../io_ray_runtime_object_NativeObjectStore.h | 4 +- .../store_provider/plasma_store_provider.cc | 7 +- .../store_provider/plasma_store_provider.h | 3 +- src/ray/core_worker/test/core_worker_test.cc | 2 +- src/ray/gcs/accessor.h | 10 --- .../gcs/gcs_client/service_based_accessor.cc | 19 ----- .../gcs/gcs_client/service_based_accessor.h | 3 - .../test/service_based_gcs_client_test.cc | 11 --- .../gcs/gcs_server/task_info_handler_impl.cc | 25 ------- .../gcs/gcs_server/task_info_handler_impl.h | 4 -- .../gcs_server/test/gcs_server_rpc_test.cc | 17 ----- src/ray/gcs/redis_accessor.cc | 13 ---- src/ray/gcs/redis_accessor.h | 3 - src/ray/protobuf/gcs_service.proto | 10 --- src/ray/raylet/format/node_manager.fbs | 2 - src/ray/raylet/node_manager.cc | 8 --- src/ray/raylet_client/raylet_client.cc | 6 +- src/ray/raylet_client/raylet_client.h | 4 +- src/ray/rpc/gcs_server/gcs_rpc_client.h | 3 - src/ray/rpc/gcs_server/gcs_rpc_server.h | 5 -- 36 files changed, 33 insertions(+), 313 deletions(-) diff --git a/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java b/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java index 620a400427a9f..8817d5b1b86a3 100644 --- a/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java +++ b/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java @@ -72,9 +72,8 @@ public interface RayRuntime { * * @param objectRefs The object references to free. * @param localOnly Whether only free objects for local object store or not. - * @param deleteCreatingTasks Whether also delete objects' creating tasks from GCS. */ - void free(List> objectRefs, boolean localOnly, boolean deleteCreatingTasks); + void free(List> objectRefs, boolean localOnly); /** * Set the resource for the specific node. diff --git a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java index ac199fd959b0d..6dabe3c3cc75f 100644 --- a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java @@ -100,9 +100,9 @@ public List get(List> objectRefs) { } @Override - public void free(List> objectRefs, boolean localOnly, boolean deleteCreatingTasks) { + public void free(List> objectRefs, boolean localOnly) { objectStore.delete(objectRefs.stream().map(ref -> ((ObjectRefImpl) ref).getId()).collect( - Collectors.toList()), localOnly, deleteCreatingTasks); + Collectors.toList()), localOnly); } @Override diff --git a/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java b/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java index 9c5d10072ad38..41a82c2d5eba5 100644 --- a/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java +++ b/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java @@ -3,10 +3,8 @@ import com.google.common.base.Preconditions; import com.google.protobuf.InvalidProtocolBufferException; import io.ray.api.id.ActorId; -import io.ray.api.id.BaseId; import io.ray.api.id.JobId; import io.ray.api.id.PlacementGroupId; -import io.ray.api.id.TaskId; import io.ray.api.id.UniqueId; import io.ray.api.placementgroup.PlacementGroup; import io.ray.api.runtimecontext.NodeInfo; @@ -14,12 +12,10 @@ import io.ray.runtime.generated.Gcs.GcsNodeInfo; import io.ray.runtime.generated.Gcs.TablePrefix; import io.ray.runtime.placementgroup.PlacementGroupUtils; -import io.ray.runtime.util.IdUtil; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.commons.lang3.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,25 +27,10 @@ public class GcsClient { private static Logger LOGGER = LoggerFactory.getLogger(GcsClient.class); private RedisClient primary; - private List shards; private GlobalStateAccessor globalStateAccessor; public GcsClient(String redisAddress, String redisPassword) { primary = new RedisClient(redisAddress, redisPassword); - int numShards = 0; - try { - numShards = Integer.valueOf(primary.get("NumRedisShards", null)); - Preconditions.checkState(numShards > 0, - String.format("Expected at least one Redis shards, found %d.", numShards)); - } catch (NumberFormatException e) { - throw new RuntimeException("Failed to get number of redis shards.", e); - } - - List shardAddresses = primary.lrange("RedisShards".getBytes(), 0, -1); - Preconditions.checkState(shardAddresses.size() == numShards); - shards = shardAddresses.stream().map((byte[] address) -> { - return new RedisClient(new String(address), redisPassword); - }).collect(Collectors.toList()); globalStateAccessor = GlobalStateAccessor.getInstance(redisAddress, redisPassword); } @@ -163,16 +144,6 @@ public boolean wasCurrentActorRestarted(ActorId actorId) { return actorTableData.getNumRestarts() != 0; } - /** - * Query whether the raylet task exists in Gcs. - */ - public boolean rayletTaskExistsInGcs(TaskId taskId) { - byte[] key = ArrayUtils.addAll(TablePrefix.RAYLET_TASK.toString().getBytes(), - taskId.getBytes()); - RedisClient client = getShardClient(taskId); - return client.exists(key); - } - public JobId nextJobId() { int jobCounter = (int) primary.incr("JobCounter".getBytes()); return JobId.fromInt(jobCounter); @@ -186,10 +157,4 @@ public void destroy() { LOGGER.debug("Destroying global state accessor."); GlobalStateAccessor.destroyInstance(); } - - private RedisClient getShardClient(BaseId key) { - return shards.get((int) Long.remainderUnsigned(IdUtil.murmurHashCode(key), - shards.size())); - } - } diff --git a/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java b/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java index 87f0adc00317f..4614100ae21a3 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java @@ -93,7 +93,7 @@ private void waitInternal(List objectIds, int numObjects, long timeout } @Override - public void delete(List objectIds, boolean localOnly, boolean deleteCreatingTasks) { + public void delete(List objectIds, boolean localOnly) { for (ObjectId objectId : objectIds) { pool.remove(objectId); } diff --git a/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java b/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java index ef85cf62c61c7..38a6a16c9b59a 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java @@ -50,8 +50,8 @@ public List wait(List objectIds, int numObjects, long timeout } @Override - public void delete(List objectIds, boolean localOnly, boolean deleteCreatingTasks) { - nativeDelete(toBinaryList(objectIds), localOnly, deleteCreatingTasks); + public void delete(List objectIds, boolean localOnly) { + nativeDelete(toBinaryList(objectIds), localOnly); } @Override @@ -116,8 +116,7 @@ private static List toBinaryList(List ids) { private static native List nativeWait(List objectIds, int numObjects, long timeoutMs); - private static native void nativeDelete(List objectIds, boolean localOnly, - boolean deleteCreatingTasks); + private static native void nativeDelete(List objectIds, boolean localOnly); private static native void nativeAddLocalReference(byte[] workerId, byte[] objectId); diff --git a/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java b/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java index e72bed802a149..bfec229f167fe 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java @@ -167,10 +167,8 @@ public WaitResult wait(List> waitList, int numReturns, int t * * @param objectIds IDs of the objects to delete. * @param localOnly Whether only delete the objects in local node, or all nodes in the cluster. - * @param deleteCreatingTasks Whether also delete the tasks that created these objects. */ - public abstract void delete(List objectIds, boolean localOnly, - boolean deleteCreatingTasks); + public abstract void delete(List objectIds, boolean localOnly); /** * Increase the local reference count for this object ID. diff --git a/java/runtime/src/main/java/io/ray/runtime/util/IdUtil.java b/java/runtime/src/main/java/io/ray/runtime/util/IdUtil.java index eca2860af1507..23b24728cd29a 100644 --- a/java/runtime/src/main/java/io/ray/runtime/util/IdUtil.java +++ b/java/runtime/src/main/java/io/ray/runtime/util/IdUtil.java @@ -1,7 +1,6 @@ package io.ray.runtime.util; import io.ray.api.id.ActorId; -import io.ray.api.id.BaseId; import io.ray.api.id.ObjectId; import io.ray.api.id.TaskId; @@ -11,74 +10,6 @@ */ public class IdUtil { - /** - * Compute the murmur hash code of this ID. - */ - public static long murmurHashCode(BaseId id) { - return murmurHash64A(id.getBytes(), id.size(), 0); - } - - /** - * This method is the same as `Hash()` method of `ID` class in ray/src/ray/common/id.h - */ - private static long murmurHash64A(byte[] data, int length, int seed) { - final long m = 0xc6a4a7935bd1e995L; - final int r = 47; - - long h = (seed & 0xFFFFFFFFL) ^ (length * m); - - int length8 = length / 8; - - for (int i = 0; i < length8; i++) { - final int i8 = i * 8; - long k = ((long) data[i8] & 0xff) - + (((long) data[i8 + 1] & 0xff) << 8) - + (((long) data[i8 + 2] & 0xff) << 16) - + (((long) data[i8 + 3] & 0xff) << 24) - + (((long) data[i8 + 4] & 0xff) << 32) - + (((long) data[i8 + 5] & 0xff) << 40) - + (((long) data[i8 + 6] & 0xff) << 48) - + (((long) data[i8 + 7] & 0xff) << 56); - - k *= m; - k ^= k >>> r; - k *= m; - - h ^= k; - h *= m; - } - - final int remaining = length % 8; - if (remaining >= 7) { - h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48; - } - if (remaining >= 6) { - h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40; - } - if (remaining >= 5) { - h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32; - } - if (remaining >= 4) { - h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24; - } - if (remaining >= 3) { - h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16; - } - if (remaining >= 2) { - h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8; - } - if (remaining >= 1) { - h ^= (long) (data[length & ~7] & 0xff); - h *= m; - } - - h ^= h >>> r; - h *= m; - h ^= h >>> r; - - return h; - } - /** * Compute the actor ID of the task which created this object. * @return The actor ID of the task which created this object. diff --git a/java/runtime/src/test/java/io/ray/runtime/UniqueIdTest.java b/java/runtime/src/test/java/io/ray/runtime/UniqueIdTest.java index 25704f3216eb9..ce1b61db1952d 100644 --- a/java/runtime/src/test/java/io/ray/runtime/UniqueIdTest.java +++ b/java/runtime/src/test/java/io/ray/runtime/UniqueIdTest.java @@ -1,7 +1,6 @@ package io.ray.runtime; import io.ray.api.id.UniqueId; -import io.ray.runtime.util.IdUtil; import java.nio.ByteBuffer; import java.util.Arrays; import javax.xml.bind.DatatypeConverter; @@ -46,11 +45,4 @@ public void testConstructUniqueId() { Assert.assertEquals("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF".toLowerCase(), id6.toString()); Assert.assertTrue(id6.isNil()); } - - @Test - void testMurmurHash() { - UniqueId id = UniqueId.fromHexString("3131313131313131313132323232323232323232"); - long remainder = Long.remainderUnsigned(IdUtil.murmurHashCode(id), 1000000000); - Assert.assertEquals(remainder, 787616861); - } } diff --git a/java/test/src/main/java/io/ray/test/ActorTest.java b/java/test/src/main/java/io/ray/test/ActorTest.java index a7e9c6ac6b31f..78b8f2468203d 100644 --- a/java/test/src/main/java/io/ray/test/ActorTest.java +++ b/java/test/src/main/java/io/ray/test/ActorTest.java @@ -128,7 +128,7 @@ public void testUnreconstructableActorObject() throws InterruptedException { ObjectRef value = counter.task(Counter::getValue).remote(); Assert.assertEquals(100, value.get()); // Delete the object from the object store. - Ray.internal().free(ImmutableList.of(value), false, false); + Ray.internal().free(ImmutableList.of(value), false); // Wait for delete RPC to propagate TimeUnit.SECONDS.sleep(1); // Free deletes from in-memory store. @@ -138,7 +138,7 @@ public void testUnreconstructableActorObject() throws InterruptedException { ObjectRef largeValue = counter.task(Counter::createLargeObject).remote(); Assert.assertTrue(largeValue.get() instanceof TestUtils.LargeObject); // Delete the object from the object store. - Ray.internal().free(ImmutableList.of(largeValue), false, false); + Ray.internal().free(ImmutableList.of(largeValue), false); // Wait for delete RPC to propagate TimeUnit.SECONDS.sleep(1); // Free deletes big objects from plasma store. diff --git a/java/test/src/main/java/io/ray/test/PlasmaFreeTest.java b/java/test/src/main/java/io/ray/test/PlasmaFreeTest.java index 1b924f3c0552c..59fafc0f4d81e 100644 --- a/java/test/src/main/java/io/ray/test/PlasmaFreeTest.java +++ b/java/test/src/main/java/io/ray/test/PlasmaFreeTest.java @@ -3,9 +3,7 @@ import com.google.common.collect.ImmutableList; import io.ray.api.ObjectRef; import io.ray.api.Ray; -import io.ray.api.id.TaskId; import io.ray.runtime.object.ObjectRefImpl; -import java.util.Arrays; import org.testng.Assert; import org.testng.annotations.Test; @@ -20,7 +18,7 @@ public void testDeleteObjects() { ObjectRef helloId = Ray.task(PlasmaFreeTest::hello).remote(); String helloString = helloId.get(); Assert.assertEquals("hello", helloString); - Ray.internal().free(ImmutableList.of(helloId), true, false); + Ray.internal().free(ImmutableList.of(helloId), true); final boolean result = TestUtils.waitForCondition(() -> !TestUtils.getRuntime().getObjectStore() @@ -32,19 +30,4 @@ public void testDeleteObjects() { Assert.assertFalse(result); } } - - @Test(groups = {"cluster"}) - public void testDeleteCreatingTasks() { - ObjectRef helloId = Ray.task(PlasmaFreeTest::hello).remote(); - Assert.assertEquals("hello", helloId.get()); - Ray.internal().free(ImmutableList.of(helloId), true, true); - - TaskId taskId = TaskId.fromBytes( - Arrays.copyOf(((ObjectRefImpl) helloId).getId().getBytes(), TaskId.LENGTH)); - final boolean result = TestUtils.waitForCondition( - () -> !TestUtils.getRuntime().getGcsClient() - .rayletTaskExistsInGcs(taskId), 50); - Assert.assertTrue(result); - } - } diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 4d5bb8ff9cc2b..356222bb9df5f 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1031,14 +1031,13 @@ cdef class CoreWorker: return ready, not_ready - def free_objects(self, object_refs, c_bool local_only, - c_bool delete_creating_tasks): + def free_objects(self, object_refs, c_bool local_only): cdef: c_vector[CObjectID] free_ids = ObjectRefsToVector(object_refs) with nogil: check_status(CCoreWorkerProcess.GetCoreWorker().Delete( - free_ids, local_only, delete_creating_tasks)) + free_ids, local_only)) def global_gc(self): with nogil: diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 68c1a95b3de81..9dd63aafee0c2 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -182,7 +182,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: int64_t timeout_ms, c_vector[c_bool] *results, c_bool fetch_local) CRayStatus Delete(const c_vector[CObjectID] &object_ids, - c_bool local_only, c_bool delete_creating_tasks) + c_bool local_only) CRayStatus TriggerGlobalGC() c_string MemoryUsageString() diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py index d3e25c1ec8935..601b3986a5275 100644 --- a/python/ray/internal/internal_api.py +++ b/python/ray/internal/internal_api.py @@ -37,7 +37,7 @@ def memory_summary(): return reply.memory_summary -def free(object_refs, local_only=False, delete_creating_tasks=False): +def free(object_refs, local_only=False): """Free a list of IDs from the in-process and plasma object stores. This function is a low-level API which should be used in restricted @@ -59,8 +59,6 @@ def free(object_refs, local_only=False, delete_creating_tasks=False): object_refs (List[ObjectRef]): List of object refs to delete. local_only (bool): Whether only deleting the list of objects in local object store or all object stores. - delete_creating_tasks (bool): Whether also delete the object creating - tasks. """ worker = ray.worker.global_worker @@ -83,5 +81,4 @@ def free(object_refs, local_only=False, delete_creating_tasks=False): if len(object_refs) == 0: return - worker.core_worker.free_objects(object_refs, local_only, - delete_creating_tasks) + worker.core_worker.free_objects(object_refs, local_only) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 9d70993039e8e..d2ab2c1507bcb 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1111,8 +1111,7 @@ Status CoreWorker::Wait(const std::vector &ids, int num_objects, return Status::OK(); } -Status CoreWorker::Delete(const std::vector &object_ids, bool local_only, - bool delete_creating_tasks) { +Status CoreWorker::Delete(const std::vector &object_ids, bool local_only) { // Release the object from plasma. This does not affect the object's ref // count. If this was called from a non-owning worker, then a warning will be // logged and the object will not get released. @@ -1129,8 +1128,7 @@ Status CoreWorker::Delete(const std::vector &object_ids, bool local_on // We only delete from plasma, which avoids hangs (issue #7105). In-memory // objects can only be deleted once the ref count goes to 0. absl::flat_hash_set plasma_object_ids(object_ids.begin(), object_ids.end()); - return plasma_store_provider_->Delete(plasma_object_ids, local_only, - delete_creating_tasks); + return plasma_store_provider_->Delete(plasma_object_ids, local_only); } void CoreWorker::TriggerGlobalGC() { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 171a42d76231b..14136a8958274 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -571,11 +571,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] object_ids IDs of the objects to delete. /// \param[in] local_only Whether only delete the objects in local node, or all nodes in /// the cluster. - /// \param[in] delete_creating_tasks Whether also delete the tasks that - /// created these objects. /// \return Status. - Status Delete(const std::vector &object_ids, bool local_only, - bool delete_creating_tasks); + Status Delete(const std::vector &object_ids, bool local_only); /// Trigger garbage collection on each worker in the cluster. void TriggerGlobalGC(); diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc index b62b198183854..d66088de1475e 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc @@ -120,15 +120,14 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeWai } JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeDelete( - JNIEnv *env, jclass, jobject objectIds, jboolean localOnly, - jboolean deleteCreatingTasks) { + JNIEnv *env, jclass, jobject objectIds, jboolean localOnly) { std::vector object_ids; JavaListToNativeVector( env, objectIds, &object_ids, [](JNIEnv *env, jobject id) { return JavaByteArrayToId(env, static_cast(id)); }); - auto status = ray::CoreWorkerProcess::GetCoreWorker().Delete( - object_ids, (bool)localOnly, (bool)deleteCreatingTasks); + auto status = + ray::CoreWorkerProcess::GetCoreWorker().Delete(object_ids, (bool)localOnly); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0); } diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h index 4e11c045677a6..b1da06e57068c 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h @@ -60,10 +60,10 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeWai /* * Class: io_ray_runtime_object_NativeObjectStore * Method: nativeDelete - * Signature: (Ljava/util/List;ZZ)V + * Signature: (Ljava/util/List;Z)V */ JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeDelete( - JNIEnv *, jclass, jobject, jboolean, jboolean); + JNIEnv *, jclass, jobject, jboolean); /* * Class: io_ray_runtime_object_NativeObjectStore diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 25007a86343d2..3079b99f57f84 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -361,10 +361,9 @@ Status CoreWorkerPlasmaStoreProvider::Wait( } Status CoreWorkerPlasmaStoreProvider::Delete( - const absl::flat_hash_set &object_ids, bool local_only, - bool delete_creating_tasks) { + const absl::flat_hash_set &object_ids, bool local_only) { std::vector object_id_vector(object_ids.begin(), object_ids.end()); - return raylet_client_->FreeObjects(object_id_vector, local_only, delete_creating_tasks); + return raylet_client_->FreeObjects(object_id_vector, local_only); } std::string CoreWorkerPlasmaStoreProvider::MemoryUsageString() { @@ -424,7 +423,7 @@ Status CoreWorkerPlasmaStoreProvider::WarmupStore() { RAY_RETURN_NOT_OK(Create(nullptr, 8, object_id, rpc::Address(), &data)); RAY_RETURN_NOT_OK(Seal(object_id)); RAY_RETURN_NOT_OK(Release(object_id)); - RAY_RETURN_NOT_OK(Delete({object_id}, false, false)); + RAY_RETURN_NOT_OK(Delete({object_id}, false)); return Status::OK(); } diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 88bed0428f723..6085a50c1a8bf 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -98,8 +98,7 @@ class CoreWorkerPlasmaStoreProvider { int64_t timeout_ms, const WorkerContext &ctx, absl::flat_hash_set *ready); - Status Delete(const absl::flat_hash_set &object_ids, bool local_only, - bool delete_creating_tasks); + Status Delete(const absl::flat_hash_set &object_ids, bool local_only); /// Lists objects in used (pinned) by the current client. /// diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index f06e1a7f461cf..0c4d69149ce4e 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -822,7 +822,7 @@ TEST_F(SingleNodeTest, TestObjectInterface) { // Test Delete(). // clear the reference held by PlasmaBuffer. results.clear(); - RAY_CHECK_OK(core_worker.Delete(ids, true, false)); + RAY_CHECK_OK(core_worker.Delete(ids, true)); // Note that Delete() calls RayletClient::FreeObjects and would not // wait for objects being deleted, so wait a while for plasma store diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 655c47aa76ba0..e1e70d5c5c1fc 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -208,16 +208,6 @@ class TaskInfoAccessor { virtual Status AsyncGet(const TaskID &task_id, const OptionalItemCallback &callback) = 0; - /// Delete tasks from GCS asynchronously. - /// - /// \param task_ids The vector of IDs to delete from GCS. - /// \param callback Callback that is called after delete finished. - /// \return Status - // TODO(micafan) Will support callback of batch deletion in the future. - // Currently this callback will never be called. - virtual Status AsyncDelete(const std::vector &task_ids, - const StatusCallback &callback) = 0; - /// Subscribe asynchronously to the event that the given task is added in GCS. /// /// \param task_id The ID of the task to be subscribed to. diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 2cf1d2cafcb3c..7e7d67d444180 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -886,25 +886,6 @@ Status ServiceBasedTaskInfoAccessor::AsyncGet( return Status::OK(); } -Status ServiceBasedTaskInfoAccessor::AsyncDelete(const std::vector &task_ids, - const StatusCallback &callback) { - RAY_LOG(DEBUG) << "Deleting tasks, task id list size = " << task_ids.size(); - rpc::DeleteTasksRequest request; - for (auto &task_id : task_ids) { - request.add_task_id_list(task_id.Binary()); - } - client_impl_->GetGcsRpcClient().DeleteTasks( - request, - [task_ids, callback](const Status &status, const rpc::DeleteTasksReply &reply) { - if (callback) { - callback(status); - } - RAY_LOG(DEBUG) << "Finished deleting tasks, status = " << status - << ", task id list size = " << task_ids.size(); - }); - return Status::OK(); -} - Status ServiceBasedTaskInfoAccessor::AsyncSubscribe( const TaskID &task_id, const SubscribeCallback &subscribe, const StatusCallback &done) { diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index f0e1f45bc4c7d..05f2d431628a8 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -278,9 +278,6 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor { Status AsyncGet(const TaskID &task_id, const OptionalItemCallback &callback) override; - Status AsyncDelete(const std::vector &task_ids, - const StatusCallback &callback) override; - Status AsyncSubscribe(const TaskID &task_id, const SubscribeCallback &subscribe, const StatusCallback &done) override; diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 9df66dfffdd6c..b470598d0dbc6 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -415,13 +415,6 @@ class ServiceBasedGcsClientTest : public ::testing::Test { return task_table_data; } - bool DeleteTask(const std::vector &task_ids) { - std::promise promise; - RAY_CHECK_OK(gcs_client_->Tasks().AsyncDelete( - task_ids, [&promise](Status status) { promise.set_value(status.ok()); })); - return WaitReady(promise.get_future(), timeout_ms_); - } - bool SubscribeTaskLease( const TaskID &task_id, const gcs::SubscribeCallback> @@ -875,10 +868,6 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskInfo) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); EXPECT_EQ(task_count, 1); - // Delete tasks from GCS. - std::vector task_ids = {task_id}; - ASSERT_TRUE(DeleteTask(task_ids)); - // Subscribe to the event that the given task lease is added in GCS. std::atomic task_lease_count(0); auto task_lease_subscribe = [&task_lease_count]( diff --git a/src/ray/gcs/gcs_server/task_info_handler_impl.cc b/src/ray/gcs/gcs_server/task_info_handler_impl.cc index 7034c87a57ad3..b47ab7cef5518 100644 --- a/src/ray/gcs/gcs_server/task_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/task_info_handler_impl.cc @@ -68,30 +68,6 @@ void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request, ++counts_[CountType::GET_TASK_REQUEST]; } -void DefaultTaskInfoHandler::HandleDeleteTasks(const DeleteTasksRequest &request, - DeleteTasksReply *reply, - SendReplyCallback send_reply_callback) { - std::vector task_ids = IdVectorFromProtobuf(request.task_id_list()); - JobID job_id = task_ids.empty() ? JobID::Nil() : task_ids[0].JobId(); - RAY_LOG(DEBUG) << "Deleting tasks, job id = " << job_id - << ", task id list size = " << task_ids.size(); - auto on_done = [job_id, task_ids, request, reply, send_reply_callback](Status status) { - if (!status.ok()) { - RAY_LOG(ERROR) << "Failed to delete tasks, job id = " << job_id - << ", task id list size = " << task_ids.size(); - } - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - }; - - Status status = gcs_table_storage_->TaskTable().BatchDelete(task_ids, on_done); - if (!status.ok()) { - on_done(status); - } - RAY_LOG(DEBUG) << "Finished deleting tasks, job id = " << job_id - << ", task id list size = " << task_ids.size(); - ++counts_[CountType::DELETE_TASKS_REQUEST]; -} - void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &request, AddTaskLeaseReply *reply, SendReplyCallback send_reply_callback) { @@ -183,7 +159,6 @@ std::string DefaultTaskInfoHandler::DebugString() const { stream << "DefaultTaskInfoHandler: {AddTask request count: " << counts_[CountType::ADD_TASK_REQUEST] << ", GetTask request count: " << counts_[CountType::GET_TASK_REQUEST] - << ", DeleteTasks request count: " << counts_[CountType::DELETE_TASKS_REQUEST] << ", AddTaskLease request count: " << counts_[CountType::ADD_TASK_LEASE_REQUEST] << ", GetTaskLease request count: " << counts_[CountType::GET_TASK_LEASE_REQUEST] << ", AttemptTaskReconstruction request count: " diff --git a/src/ray/gcs/gcs_server/task_info_handler_impl.h b/src/ray/gcs/gcs_server/task_info_handler_impl.h index 98cd64bdad797..5a7599e8f4752 100644 --- a/src/ray/gcs/gcs_server/task_info_handler_impl.h +++ b/src/ray/gcs/gcs_server/task_info_handler_impl.h @@ -35,9 +35,6 @@ class DefaultTaskInfoHandler : public rpc::TaskInfoHandler { void HandleGetTask(const GetTaskRequest &request, GetTaskReply *reply, SendReplyCallback send_reply_callback) override; - void HandleDeleteTasks(const DeleteTasksRequest &request, DeleteTasksReply *reply, - SendReplyCallback send_reply_callback) override; - void HandleAddTaskLease(const AddTaskLeaseRequest &request, AddTaskLeaseReply *reply, SendReplyCallback send_reply_callback) override; @@ -58,7 +55,6 @@ class DefaultTaskInfoHandler : public rpc::TaskInfoHandler { enum CountType { ADD_TASK_REQUEST = 0, GET_TASK_REQUEST = 1, - DELETE_TASKS_REQUEST = 2, ADD_TASK_LEASE_REQUEST = 3, GET_TASK_LEASE_REQUEST = 4, ATTEMPT_TASK_RECONSTRUCTION_REQUEST = 5, diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index fd2084168c06a..ea8ebc09de80a 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -280,16 +280,6 @@ class GcsServerTest : public ::testing::Test { return task_data; } - bool DeleteTasks(const rpc::DeleteTasksRequest &request) { - std::promise promise; - client_->DeleteTasks( - request, [&promise](const Status &status, const rpc::DeleteTasksReply &reply) { - RAY_CHECK_OK(status); - promise.set_value(true); - }); - return WaitReady(promise.get_future(), timeout_ms_); - } - bool AddTaskLease(const rpc::AddTaskLeaseRequest &request) { std::promise promise; client_->AddTaskLease( @@ -574,13 +564,6 @@ TEST_F(GcsServerTest, TestTaskInfo) { rpc::TaskTableData result = GetTask(task_id.Binary()); ASSERT_TRUE(result.task().task_spec().job_id() == job_id.Binary()); - // Delete task - rpc::DeleteTasksRequest delete_tasks_request; - delete_tasks_request.add_task_id_list(task_id.Binary()); - ASSERT_TRUE(DeleteTasks(delete_tasks_request)); - result = GetTask(task_id.Binary()); - ASSERT_TRUE(!result.has_task()); - // Add task lease NodeID node_id = NodeID::FromRandom(); auto task_lease_data = Mocker::GenTaskLeaseData(task_id.Binary(), node_id.Binary()); diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index bd3fe06046ffc..248eb9a89db1e 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -247,19 +247,6 @@ Status RedisTaskInfoAccessor::AsyncGet( return task_table.Lookup(task_id.JobId(), task_id, on_success, on_failure); } -Status RedisTaskInfoAccessor::AsyncDelete(const std::vector &task_ids, - const StatusCallback &callback) { - raylet::TaskTable &task_table = client_impl_->raylet_task_table(); - JobID job_id = task_ids.empty() ? JobID::Nil() : task_ids[0].JobId(); - task_table.Delete(job_id, task_ids); - if (callback) { - callback(Status::OK()); - } - // TODO(micafan) Always return OK here. - // Confirm if we need to handle the deletion failure and how to handle it. - return Status::OK(); -} - Status RedisTaskInfoAccessor::AsyncSubscribe( const TaskID &task_id, const SubscribeCallback &subscribe, const StatusCallback &done) { diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index c8263d0c822a9..ec5d389f6e1a8 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -182,9 +182,6 @@ class RedisTaskInfoAccessor : public TaskInfoAccessor { Status AsyncGet(const TaskID &task_id, const OptionalItemCallback &callback) override; - Status AsyncDelete(const std::vector &task_ids, - const StatusCallback &callback) override; - Status AsyncSubscribe(const TaskID &task_id, const SubscribeCallback &subscribe, const StatusCallback &done) override; diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index eb730a7cf72ad..8bba86e56e05c 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -348,14 +348,6 @@ message GetTaskReply { TaskTableData task_data = 2; } -message DeleteTasksRequest { - repeated bytes task_id_list = 1; -} - -message DeleteTasksReply { - GcsStatus status = 1; -} - message AddTaskLeaseRequest { TaskLeaseData task_lease_data = 1; } @@ -387,8 +379,6 @@ service TaskInfoGcsService { rpc AddTask(AddTaskRequest) returns (AddTaskReply); // Get task information from GCS Service. rpc GetTask(GetTaskRequest) returns (GetTaskReply); - // Delete tasks from GCS Service. - rpc DeleteTasks(DeleteTasksRequest) returns (DeleteTasksReply); // Add a task lease to GCS Service. rpc AddTaskLease(AddTaskLeaseRequest) returns (AddTaskLeaseReply); // Get task lease information from GCS Service. diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index fb95bbc615faa..1e405b5e36d08 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -249,8 +249,6 @@ table FreeObjectsRequest { // Whether keep this request with local object store // or send it to all the object stores. local_only: bool; - // Whether also delete objects' creating tasks from GCS. - delete_creating_tasks: bool; // List of object ids we'll delete from object store. object_ids: [string]; } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e78820d429b1d..b27a0e32c5e9f 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1224,14 +1224,6 @@ void NodeManager::ProcessClientMessage(const std::shared_ptr & std::vector object_ids = from_flatbuf(*message->object_ids()); // Clean up objects from the object store. object_manager_.FreeObjects(object_ids, message->local_only()); - if (message->delete_creating_tasks()) { - // Clean up their creating tasks from GCS. - std::vector creating_task_ids; - for (const auto &object_id : object_ids) { - creating_task_ids.push_back(object_id.TaskId()); - } - RAY_CHECK_OK(gcs_client_->Tasks().AsyncDelete(creating_task_ids, nullptr)); - } } break; case protocol::MessageType::SubscribePlasmaReady: { ProcessSubscribePlasmaReady(client, message_data); diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 1c63657968cc1..5582a68bae070 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -274,10 +274,10 @@ Status raylet::RayletClient::PushProfileEvents(const ProfileTableData &profile_e } Status raylet::RayletClient::FreeObjects(const std::vector &object_ids, - bool local_only, bool delete_creating_tasks) { + bool local_only) { flatbuffers::FlatBufferBuilder fbb; - auto message = protocol::CreateFreeObjectsRequest( - fbb, local_only, delete_creating_tasks, to_flatbuf(fbb, object_ids)); + auto message = + protocol::CreateFreeObjectsRequest(fbb, local_only, to_flatbuf(fbb, object_ids)); fbb.Finish(message); return conn_->WriteMessage(MessageType::FreeObjectsInObjectStoreRequest, &fbb); } diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 9fa1b7982f8a3..185ca445ac3b4 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -313,10 +313,8 @@ class RayletClient : public RayletClientInterface { /// \param object_ids A list of ObjectsIDs to be deleted. /// \param local_only Whether keep this request with local object store /// or send it to all the object stores. - /// \param delete_creating_tasks Whether also delete objects' creating tasks from GCS. /// \return ray::Status. - ray::Status FreeObjects(const std::vector &object_ids, bool local_only, - bool deleteCreatingTasks); + ray::Status FreeObjects(const std::vector &object_ids, bool local_only); /// Sets a resource with the specified capacity and client id /// \param resource_name Name of the resource to be set diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index 82857123ee7a1..39641358f102d 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -219,9 +219,6 @@ class GcsRpcClient { /// Get task information from GCS Service. VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, GetTask, task_info_grpc_client_, ) - /// Delete tasks from GCS Service. - VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, DeleteTasks, task_info_grpc_client_, ) - /// Add a task lease to GCS Service. VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, AddTaskLease, task_info_grpc_client_, ) diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index 248ec983744ed..a39323e40ccd0 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -375,10 +375,6 @@ class TaskInfoGcsServiceHandler { virtual void HandleGetTask(const GetTaskRequest &request, GetTaskReply *reply, SendReplyCallback send_reply_callback) = 0; - virtual void HandleDeleteTasks(const DeleteTasksRequest &request, - DeleteTasksReply *reply, - SendReplyCallback send_reply_callback) = 0; - virtual void HandleAddTaskLease(const AddTaskLeaseRequest &request, AddTaskLeaseReply *reply, SendReplyCallback send_reply_callback) = 0; @@ -410,7 +406,6 @@ class TaskInfoGrpcService : public GrpcService { std::vector> *server_call_factories) override { TASK_INFO_SERVICE_RPC_HANDLER(AddTask); TASK_INFO_SERVICE_RPC_HANDLER(GetTask); - TASK_INFO_SERVICE_RPC_HANDLER(DeleteTasks); TASK_INFO_SERVICE_RPC_HANDLER(AddTaskLease); TASK_INFO_SERVICE_RPC_HANDLER(GetTaskLease); TASK_INFO_SERVICE_RPC_HANDLER(AttemptTaskReconstruction);