Skip to content

Commit

Permalink
[Core]Remove checkpoint table (ray-project#12235)
Browse files Browse the repository at this point in the history
* Delete an actor entry from node manager.

* Remove checkpoint table

* remote checkpoint interface

* remove checkpoint interface

* fix ExitActorTest

Co-authored-by: chaokunyang <[email protected]>
  • Loading branch information
rkooo567 and chaokunyang authored Dec 1, 2020
1 parent 9021f15 commit f6f3cc9
Show file tree
Hide file tree
Showing 52 changed files with 6 additions and 2,012 deletions.
100 changes: 0 additions & 100 deletions java/api/src/main/java/io/ray/api/Checkpointable.java

This file was deleted.

29 changes: 0 additions & 29 deletions java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import io.ray.api.Checkpointable.Checkpoint;
import io.ray.api.id.ActorId;
import io.ray.api.id.BaseId;
import io.ray.api.id.JobId;
Expand All @@ -12,7 +11,6 @@
import io.ray.api.placementgroup.PlacementGroup;
import io.ray.api.runtimecontext.NodeInfo;
import io.ray.runtime.generated.Gcs;
import io.ray.runtime.generated.Gcs.ActorCheckpointIdData;
import io.ray.runtime.generated.Gcs.GcsNodeInfo;
import io.ray.runtime.generated.Gcs.TablePrefix;
import io.ray.runtime.placementgroup.PlacementGroupUtils;
Expand Down Expand Up @@ -175,33 +173,6 @@ public boolean rayletTaskExistsInGcs(TaskId taskId) {
return client.exists(key);
}

/**
* Get the available checkpoints for the given actor ID.
*/
public List<Checkpoint> getCheckpointsForActor(ActorId actorId) {
List<Checkpoint> checkpoints = new ArrayList<>();
byte[] result = globalStateAccessor.getActorCheckpointId(actorId);
if (result != null) {
ActorCheckpointIdData data = null;
try {
data = ActorCheckpointIdData.parseFrom(result);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Received invalid protobuf data from GCS.");
}
UniqueId[] checkpointIds = new UniqueId[data.getCheckpointIdsCount()];
for (int i = 0; i < checkpointIds.length; i++) {
checkpointIds[i] = UniqueId
.fromByteBuffer(data.getCheckpointIds(i).asReadOnlyByteBuffer());
}

for (int i = 0; i < checkpointIds.length; i++) {
checkpoints.add(new Checkpoint(checkpointIds[i], data.getTimestamps(i)));
}
}
checkpoints.sort((x, y) -> Long.compare(y.timestamp, x.timestamp));
return checkpoints;
}

public JobId nextJobId() {
int jobCounter = (int) primary.incr("JobCounter".getBytes());
return JobId.fromInt(jobCounter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,6 @@ public byte[] getActorInfo(ActorId actorId) {
}
}

/**
* @return An actor checkpoint id data with ActorCheckpointIdData protobuf schema.
*/
public byte[] getActorCheckpointId(ActorId actorId) {
// Fetch an actor checkpoint id with protobuf bytes format from GCS.
synchronized (GlobalStateAccessor.class) {
validateGlobalStateAccessorPointer();
return this.nativeGetActorCheckpointId(globalStateAccessorNativePointer, actorId.getBytes());
}
}

private void destroyGlobalStateAccessor() {
synchronized (GlobalStateAccessor.class) {
if (0 == globalStateAccessorNativePointer) {
Expand Down Expand Up @@ -164,8 +153,6 @@ private void destroyGlobalStateAccessor() {

private native byte[] nativeGetActorInfo(long nativePtr, byte[] actorId);

private native byte[] nativeGetActorCheckpointId(long nativePtr, byte[] actorId);

private native byte[] nativeGetPlacementGroupInfo(long nativePtr,
byte[] placementGroupId);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.ray.runtime.task;

import io.ray.api.id.ActorId;
import io.ray.api.id.UniqueId;
import io.ray.runtime.RayRuntimeInternal;

Expand Down Expand Up @@ -34,11 +33,4 @@ protected LocalActorContext createActorContext() {
return new LocalActorContext(runtime.getWorkerContext().getCurrentWorkerId());
}

@Override
protected void maybeSaveCheckpoint(Object actor, ActorId actorId) {
}

@Override
protected void maybeLoadCheckpoint(Object actor, ActorId actorId) {
}
}
Original file line number Diff line number Diff line change
@@ -1,39 +1,14 @@
package io.ray.runtime.task;

import com.google.common.base.Preconditions;
import io.ray.api.Checkpointable;
import io.ray.api.Checkpointable.Checkpoint;
import io.ray.api.Checkpointable.CheckpointContext;
import io.ray.api.id.ActorId;
import io.ray.api.id.UniqueId;
import io.ray.runtime.RayRuntimeInternal;
import java.util.ArrayList;
import java.util.List;

/**
* Task executor for cluster mode.
*/
public class NativeTaskExecutor extends TaskExecutor<NativeTaskExecutor.NativeActorContext> {

// TODO(hchen): Use the C++ config.
private static final int NUM_ACTOR_CHECKPOINTS_TO_KEEP = 20;

static class NativeActorContext extends TaskExecutor.ActorContext {

/**
* Number of tasks executed since last actor checkpoint.
*/
private int numTasksSinceLastCheckpoint = 0;

/**
* IDs of this actor's previous checkpoints.
*/
private List<UniqueId> checkpointIds;

/**
* Timestamp of the last actor checkpoint.
*/
private long lastCheckpointTimestamp = 0;
}

public NativeTaskExecutor(RayRuntimeInternal runtime) {
Expand All @@ -49,63 +24,4 @@ public void onWorkerShutdown(byte[] workerIdBytes) {
// This is to make sure no memory leak when `Ray.exitActor()` is called.
removeActorContext(new UniqueId(workerIdBytes));
}

@Override
protected void maybeSaveCheckpoint(Object actor, ActorId actorId) {
if (!(actor instanceof Checkpointable)) {
return;
}
NativeActorContext actorContext = getActorContext();
CheckpointContext checkpointContext = new CheckpointContext(actorId,
++actorContext.numTasksSinceLastCheckpoint,
System.currentTimeMillis() - actorContext.lastCheckpointTimestamp);
Checkpointable checkpointable = (Checkpointable) actor;
if (!checkpointable.shouldCheckpoint(checkpointContext)) {
return;
}
actorContext.numTasksSinceLastCheckpoint = 0;
actorContext.lastCheckpointTimestamp = System.currentTimeMillis();
UniqueId checkpointId = new UniqueId(nativePrepareCheckpoint());
List<UniqueId> checkpointIds = actorContext.checkpointIds;
checkpointIds.add(checkpointId);
if (checkpointIds.size() > NUM_ACTOR_CHECKPOINTS_TO_KEEP) {
((Checkpointable) actor).checkpointExpired(actorId, checkpointIds.get(0));
checkpointIds.remove(0);
}
checkpointable.saveCheckpoint(actorId, checkpointId);
}

@Override
protected void maybeLoadCheckpoint(Object actor, ActorId actorId) {
if (!(actor instanceof Checkpointable)) {
return;
}
NativeActorContext actorContext = getActorContext();
actorContext.numTasksSinceLastCheckpoint = 0;
actorContext.lastCheckpointTimestamp = System.currentTimeMillis();
actorContext.checkpointIds = new ArrayList<>();
List<Checkpoint> availableCheckpoints
= runtime.getGcsClient().getCheckpointsForActor(actorId);
if (availableCheckpoints.isEmpty()) {
return;
}
UniqueId checkpointId = ((Checkpointable) actor).loadCheckpoint(actorId, availableCheckpoints);
if (checkpointId != null) {
boolean checkpointValid = false;
for (Checkpoint checkpoint : availableCheckpoints) {
if (checkpoint.checkpointId.equals(checkpointId)) {
checkpointValid = true;
break;
}
}
Preconditions.checkArgument(checkpointValid,
"'loadCheckpoint' must return a checkpoint ID that exists in the "
+ "'availableCheckpoints' list, or null.");
nativeNotifyActorResumedFromCheckpoint(checkpointId.getBytes());
}
}

private static native byte[] nativePrepareCheckpoint();

private static native void nativeNotifyActorResumedFromCheckpoint(byte[] checkpointId);
}
10 changes: 0 additions & 10 deletions java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.ray.runtime.task;

import com.google.common.base.Preconditions;
import io.ray.api.id.ActorId;
import io.ray.api.id.JobId;
import io.ray.api.id.TaskId;
import io.ray.api.id.UniqueId;
Expand Down Expand Up @@ -150,16 +149,10 @@ protected List<NativeRayObject> execute(List<String> rayFunctionInfo,
}
// Set result
if (taskType != TaskType.ACTOR_CREATION_TASK) {
if (taskType == TaskType.ACTOR_TASK) {
// TODO (kfstorm): handle checkpoint in core worker.
maybeSaveCheckpoint(actor, runtime.getWorkerContext().getCurrentActorId());
}
if (rayFunction.hasReturn()) {
returnObjects.add(ObjectSerializer.serialize(result));
}
} else {
// TODO (kfstorm): handle checkpoint in core worker.
maybeLoadCheckpoint(result, runtime.getWorkerContext().getCurrentActorId());
actorContext.currentActor = result;
}
LOGGER.debug("Finished executing task {}", taskId);
Expand Down Expand Up @@ -195,7 +188,4 @@ private JavaFunctionDescriptor parseFunctionDescriptor(List<String> rayFunctionI
rayFunctionInfo.get(2));
}

protected abstract void maybeSaveCheckpoint(Object actor, ActorId actorId);

protected abstract void maybeLoadCheckpoint(Object actor, ActorId actorId);
}
Loading

0 comments on commit f6f3cc9

Please sign in to comment.