Skip to content

Commit

Permalink
[Core] Remove delete_creating_tasks (ray-project#12962)
Browse files Browse the repository at this point in the history
  • Loading branch information
kfstorm authored Dec 21, 2020
1 parent 6e35469 commit 5a6801d
Show file tree
Hide file tree
Showing 36 changed files with 33 additions and 313 deletions.
3 changes: 1 addition & 2 deletions java/api/src/main/java/io/ray/api/runtime/RayRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ public interface RayRuntime {
*
* @param objectRefs The object references to free.
* @param localOnly Whether only free objects for local object store or not.
* @param deleteCreatingTasks Whether also delete objects' creating tasks from GCS.
*/
void free(List<ObjectRef<?>> objectRefs, boolean localOnly, boolean deleteCreatingTasks);
void free(List<ObjectRef<?>> objectRefs, boolean localOnly);

/**
* Set the resource for the specific node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ public <T> List<T> get(List<ObjectRef<T>> objectRefs) {
}

@Override
public void free(List<ObjectRef<?>> objectRefs, boolean localOnly, boolean deleteCreatingTasks) {
public void free(List<ObjectRef<?>> objectRefs, boolean localOnly) {
objectStore.delete(objectRefs.stream().map(ref -> ((ObjectRefImpl<?>) ref).getId()).collect(
Collectors.toList()), localOnly, deleteCreatingTasks);
Collectors.toList()), localOnly);
}

@Override
Expand Down
35 changes: 0 additions & 35 deletions java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,19 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import io.ray.api.id.ActorId;
import io.ray.api.id.BaseId;
import io.ray.api.id.JobId;
import io.ray.api.id.PlacementGroupId;
import io.ray.api.id.TaskId;
import io.ray.api.id.UniqueId;
import io.ray.api.placementgroup.PlacementGroup;
import io.ray.api.runtimecontext.NodeInfo;
import io.ray.runtime.generated.Gcs;
import io.ray.runtime.generated.Gcs.GcsNodeInfo;
import io.ray.runtime.generated.Gcs.TablePrefix;
import io.ray.runtime.placementgroup.PlacementGroupUtils;
import io.ray.runtime.util.IdUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,25 +27,10 @@ public class GcsClient {
private static Logger LOGGER = LoggerFactory.getLogger(GcsClient.class);
private RedisClient primary;

private List<RedisClient> shards;
private GlobalStateAccessor globalStateAccessor;

public GcsClient(String redisAddress, String redisPassword) {
primary = new RedisClient(redisAddress, redisPassword);
int numShards = 0;
try {
numShards = Integer.valueOf(primary.get("NumRedisShards", null));
Preconditions.checkState(numShards > 0,
String.format("Expected at least one Redis shards, found %d.", numShards));
} catch (NumberFormatException e) {
throw new RuntimeException("Failed to get number of redis shards.", e);
}

List<byte[]> shardAddresses = primary.lrange("RedisShards".getBytes(), 0, -1);
Preconditions.checkState(shardAddresses.size() == numShards);
shards = shardAddresses.stream().map((byte[] address) -> {
return new RedisClient(new String(address), redisPassword);
}).collect(Collectors.toList());
globalStateAccessor = GlobalStateAccessor.getInstance(redisAddress, redisPassword);
}

Expand Down Expand Up @@ -163,16 +144,6 @@ public boolean wasCurrentActorRestarted(ActorId actorId) {
return actorTableData.getNumRestarts() != 0;
}

/**
* Query whether the raylet task exists in Gcs.
*/
public boolean rayletTaskExistsInGcs(TaskId taskId) {
byte[] key = ArrayUtils.addAll(TablePrefix.RAYLET_TASK.toString().getBytes(),
taskId.getBytes());
RedisClient client = getShardClient(taskId);
return client.exists(key);
}

public JobId nextJobId() {
int jobCounter = (int) primary.incr("JobCounter".getBytes());
return JobId.fromInt(jobCounter);
Expand All @@ -186,10 +157,4 @@ public void destroy() {
LOGGER.debug("Destroying global state accessor.");
GlobalStateAccessor.destroyInstance();
}

private RedisClient getShardClient(BaseId key) {
return shards.get((int) Long.remainderUnsigned(IdUtil.murmurHashCode(key),
shards.size()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private void waitInternal(List<ObjectId> objectIds, int numObjects, long timeout
}

@Override
public void delete(List<ObjectId> objectIds, boolean localOnly, boolean deleteCreatingTasks) {
public void delete(List<ObjectId> objectIds, boolean localOnly) {
for (ObjectId objectId : objectIds) {
pool.remove(objectId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public List<Boolean> wait(List<ObjectId> objectIds, int numObjects, long timeout
}

@Override
public void delete(List<ObjectId> objectIds, boolean localOnly, boolean deleteCreatingTasks) {
nativeDelete(toBinaryList(objectIds), localOnly, deleteCreatingTasks);
public void delete(List<ObjectId> objectIds, boolean localOnly) {
nativeDelete(toBinaryList(objectIds), localOnly);
}

@Override
Expand Down Expand Up @@ -116,8 +116,7 @@ private static List<byte[]> toBinaryList(List<ObjectId> ids) {
private static native List<Boolean> nativeWait(List<byte[]> objectIds, int numObjects,
long timeoutMs);

private static native void nativeDelete(List<byte[]> objectIds, boolean localOnly,
boolean deleteCreatingTasks);
private static native void nativeDelete(List<byte[]> objectIds, boolean localOnly);

private static native void nativeAddLocalReference(byte[] workerId, byte[] objectId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,8 @@ public <T> WaitResult<T> wait(List<ObjectRef<T>> waitList, int numReturns, int t
*
* @param objectIds IDs of the objects to delete.
* @param localOnly Whether only delete the objects in local node, or all nodes in the cluster.
* @param deleteCreatingTasks Whether also delete the tasks that created these objects.
*/
public abstract void delete(List<ObjectId> objectIds, boolean localOnly,
boolean deleteCreatingTasks);
public abstract void delete(List<ObjectId> objectIds, boolean localOnly);

/**
* Increase the local reference count for this object ID.
Expand Down
69 changes: 0 additions & 69 deletions java/runtime/src/main/java/io/ray/runtime/util/IdUtil.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.ray.runtime.util;

import io.ray.api.id.ActorId;
import io.ray.api.id.BaseId;
import io.ray.api.id.ObjectId;
import io.ray.api.id.TaskId;

Expand All @@ -11,74 +10,6 @@
*/
public class IdUtil {

/**
* Compute the murmur hash code of this ID.
*/
public static long murmurHashCode(BaseId id) {
return murmurHash64A(id.getBytes(), id.size(), 0);
}

/**
* This method is the same as `Hash()` method of `ID` class in ray/src/ray/common/id.h
*/
private static long murmurHash64A(byte[] data, int length, int seed) {
final long m = 0xc6a4a7935bd1e995L;
final int r = 47;

long h = (seed & 0xFFFFFFFFL) ^ (length * m);

int length8 = length / 8;

for (int i = 0; i < length8; i++) {
final int i8 = i * 8;
long k = ((long) data[i8] & 0xff)
+ (((long) data[i8 + 1] & 0xff) << 8)
+ (((long) data[i8 + 2] & 0xff) << 16)
+ (((long) data[i8 + 3] & 0xff) << 24)
+ (((long) data[i8 + 4] & 0xff) << 32)
+ (((long) data[i8 + 5] & 0xff) << 40)
+ (((long) data[i8 + 6] & 0xff) << 48)
+ (((long) data[i8 + 7] & 0xff) << 56);

k *= m;
k ^= k >>> r;
k *= m;

h ^= k;
h *= m;
}

final int remaining = length % 8;
if (remaining >= 7) {
h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48;
}
if (remaining >= 6) {
h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40;
}
if (remaining >= 5) {
h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32;
}
if (remaining >= 4) {
h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24;
}
if (remaining >= 3) {
h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16;
}
if (remaining >= 2) {
h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8;
}
if (remaining >= 1) {
h ^= (long) (data[length & ~7] & 0xff);
h *= m;
}

h ^= h >>> r;
h *= m;
h ^= h >>> r;

return h;
}

/**
* Compute the actor ID of the task which created this object.
* @return The actor ID of the task which created this object.
Expand Down
8 changes: 0 additions & 8 deletions java/runtime/src/test/java/io/ray/runtime/UniqueIdTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.ray.runtime;

import io.ray.api.id.UniqueId;
import io.ray.runtime.util.IdUtil;
import java.nio.ByteBuffer;
import java.util.Arrays;
import javax.xml.bind.DatatypeConverter;
Expand Down Expand Up @@ -46,11 +45,4 @@ public void testConstructUniqueId() {
Assert.assertEquals("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF".toLowerCase(), id6.toString());
Assert.assertTrue(id6.isNil());
}

@Test
void testMurmurHash() {
UniqueId id = UniqueId.fromHexString("3131313131313131313132323232323232323232");
long remainder = Long.remainderUnsigned(IdUtil.murmurHashCode(id), 1000000000);
Assert.assertEquals(remainder, 787616861);
}
}
4 changes: 2 additions & 2 deletions java/test/src/main/java/io/ray/test/ActorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testUnreconstructableActorObject() throws InterruptedException {
ObjectRef value = counter.task(Counter::getValue).remote();
Assert.assertEquals(100, value.get());
// Delete the object from the object store.
Ray.internal().free(ImmutableList.of(value), false, false);
Ray.internal().free(ImmutableList.of(value), false);
// Wait for delete RPC to propagate
TimeUnit.SECONDS.sleep(1);
// Free deletes from in-memory store.
Expand All @@ -138,7 +138,7 @@ public void testUnreconstructableActorObject() throws InterruptedException {
ObjectRef<TestUtils.LargeObject> largeValue = counter.task(Counter::createLargeObject).remote();
Assert.assertTrue(largeValue.get() instanceof TestUtils.LargeObject);
// Delete the object from the object store.
Ray.internal().free(ImmutableList.of(largeValue), false, false);
Ray.internal().free(ImmutableList.of(largeValue), false);
// Wait for delete RPC to propagate
TimeUnit.SECONDS.sleep(1);
// Free deletes big objects from plasma store.
Expand Down
19 changes: 1 addition & 18 deletions java/test/src/main/java/io/ray/test/PlasmaFreeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
import com.google.common.collect.ImmutableList;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.id.TaskId;
import io.ray.runtime.object.ObjectRefImpl;
import java.util.Arrays;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand All @@ -20,7 +18,7 @@ public void testDeleteObjects() {
ObjectRef<String> helloId = Ray.task(PlasmaFreeTest::hello).remote();
String helloString = helloId.get();
Assert.assertEquals("hello", helloString);
Ray.internal().free(ImmutableList.of(helloId), true, false);
Ray.internal().free(ImmutableList.of(helloId), true);

final boolean result = TestUtils.waitForCondition(() ->
!TestUtils.getRuntime().getObjectStore()
Expand All @@ -32,19 +30,4 @@ public void testDeleteObjects() {
Assert.assertFalse(result);
}
}

@Test(groups = {"cluster"})
public void testDeleteCreatingTasks() {
ObjectRef<String> helloId = Ray.task(PlasmaFreeTest::hello).remote();
Assert.assertEquals("hello", helloId.get());
Ray.internal().free(ImmutableList.of(helloId), true, true);

TaskId taskId = TaskId.fromBytes(
Arrays.copyOf(((ObjectRefImpl<String>) helloId).getId().getBytes(), TaskId.LENGTH));
final boolean result = TestUtils.waitForCondition(
() -> !TestUtils.getRuntime().getGcsClient()
.rayletTaskExistsInGcs(taskId), 50);
Assert.assertTrue(result);
}

}
5 changes: 2 additions & 3 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1031,14 +1031,13 @@ cdef class CoreWorker:

return ready, not_ready

def free_objects(self, object_refs, c_bool local_only,
c_bool delete_creating_tasks):
def free_objects(self, object_refs, c_bool local_only):
cdef:
c_vector[CObjectID] free_ids = ObjectRefsToVector(object_refs)

with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Delete(
free_ids, local_only, delete_creating_tasks))
free_ids, local_only))

def global_gc(self):
with nogil:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
int64_t timeout_ms, c_vector[c_bool] *results,
c_bool fetch_local)
CRayStatus Delete(const c_vector[CObjectID] &object_ids,
c_bool local_only, c_bool delete_creating_tasks)
c_bool local_only)
CRayStatus TriggerGlobalGC()
c_string MemoryUsageString()

Expand Down
7 changes: 2 additions & 5 deletions python/ray/internal/internal_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def memory_summary():
return reply.memory_summary


def free(object_refs, local_only=False, delete_creating_tasks=False):
def free(object_refs, local_only=False):
"""Free a list of IDs from the in-process and plasma object stores.
This function is a low-level API which should be used in restricted
Expand All @@ -59,8 +59,6 @@ def free(object_refs, local_only=False, delete_creating_tasks=False):
object_refs (List[ObjectRef]): List of object refs to delete.
local_only (bool): Whether only deleting the list of objects in local
object store or all object stores.
delete_creating_tasks (bool): Whether also delete the object creating
tasks.
"""
worker = ray.worker.global_worker

Expand All @@ -83,5 +81,4 @@ def free(object_refs, local_only=False, delete_creating_tasks=False):
if len(object_refs) == 0:
return

worker.core_worker.free_objects(object_refs, local_only,
delete_creating_tasks)
worker.core_worker.free_objects(object_refs, local_only)
6 changes: 2 additions & 4 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1111,8 +1111,7 @@ Status CoreWorker::Wait(const std::vector<ObjectID> &ids, int num_objects,
return Status::OK();
}

Status CoreWorker::Delete(const std::vector<ObjectID> &object_ids, bool local_only,
bool delete_creating_tasks) {
Status CoreWorker::Delete(const std::vector<ObjectID> &object_ids, bool local_only) {
// Release the object from plasma. This does not affect the object's ref
// count. If this was called from a non-owning worker, then a warning will be
// logged and the object will not get released.
Expand All @@ -1129,8 +1128,7 @@ Status CoreWorker::Delete(const std::vector<ObjectID> &object_ids, bool local_on
// We only delete from plasma, which avoids hangs (issue #7105). In-memory
// objects can only be deleted once the ref count goes to 0.
absl::flat_hash_set<ObjectID> plasma_object_ids(object_ids.begin(), object_ids.end());
return plasma_store_provider_->Delete(plasma_object_ids, local_only,
delete_creating_tasks);
return plasma_store_provider_->Delete(plasma_object_ids, local_only);
}

void CoreWorker::TriggerGlobalGC() {
Expand Down
5 changes: 1 addition & 4 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] object_ids IDs of the objects to delete.
/// \param[in] local_only Whether only delete the objects in local node, or all nodes in
/// the cluster.
/// \param[in] delete_creating_tasks Whether also delete the tasks that
/// created these objects.
/// \return Status.
Status Delete(const std::vector<ObjectID> &object_ids, bool local_only,
bool delete_creating_tasks);
Status Delete(const std::vector<ObjectID> &object_ids, bool local_only);

/// Trigger garbage collection on each worker in the cluster.
void TriggerGlobalGC();
Expand Down
Loading

0 comments on commit 5a6801d

Please sign in to comment.