Skip to content

Commit

Permalink
Fix streaming ci failure (ray-project#12830)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaokunyang authored Dec 30, 2020
1 parent 59e9b80 commit 33089c4
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 31 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ matrix:
- RAY_INSTALL_JAVA=1
- PYTHON=3.6 PYTHONWARNINGS=ignore
- RAY_USE_RANDOM_PORTS=1
- RAY_ENABLE_NEW_SCHEDULER=0
language: java
jdk: openjdk8
install:
Expand Down
2 changes: 2 additions & 0 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,8 @@ def init(
driver_object_store_memory=_driver_object_store_memory,
job_id=None,
job_config=job_config)
if job_config and job_config.code_search_path:
global_worker.set_load_code_from_local(True)

for hook in _post_init_hooks:
hook()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ public void submit(JobGraph jobGraph, Map<String, String> jobConfig) {

if (submitResult.get()) {
LOG.info("Finish submitting job: {}.", jobGraph.getJobName());
} else {
throw new RuntimeException("submitting job failed");
}
} catch (Exception e) {
LOG.error("Failed to submit job: {}.", jobGraph.getJobName(), e);
throw new RuntimeException("submitting job failed", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public boolean submitJob(ActorHandle<JobMaster> jobMasterActor, JobGraph jobGrap
scheduler = new JobSchedulerImpl(this);
scheduler.scheduleJob(graphManager.getExecutionGraph());
} catch (Exception e) {
LOG.error("Failed to submit job.", e);
e.printStackTrace();
LOG.error("Failed to submit job {}.", e, e);
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.ray.streaming.runtime.master.scheduler;

import com.google.common.base.Preconditions;
import io.ray.api.ActorHandle;
import io.ray.streaming.runtime.config.StreamingConfig;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
Expand Down Expand Up @@ -82,7 +83,7 @@ private void initAndStart(ExecutionGraph executionGraph) {
Map<ExecutionVertex, JobWorkerContext> vertexToContextMap = buildWorkersContext(executionGraph);

// init workers
initWorkers(vertexToContextMap);
Preconditions.checkState(initWorkers(vertexToContextMap));

// init master
initMaster();
Expand Down Expand Up @@ -119,17 +120,13 @@ public boolean createWorkers(ExecutionGraph executionGraph) {
* @param vertexToContextMap vertex - context map
*/
protected boolean initWorkers(Map<ExecutionVertex, JobWorkerContext> vertexToContextMap) {
boolean result;
try {
result =
workerLifecycleController.initWorkers(
vertexToContextMap,
jobConfig.masterConfig.schedulerConfig.workerInitiationWaitTimeoutMs());
} catch (Exception e) {
LOG.error("Failed to initiate workers.", e);
return false;
boolean succeed;
int timeoutMs = jobConfig.masterConfig.schedulerConfig.workerInitiationWaitTimeoutMs();
succeed = workerLifecycleController.initWorkers(vertexToContextMap, timeoutMs);
if (!succeed) {
LOG.error("Failed to initiate workers in {} milliseconds", timeoutMs);
}
return result;
return succeed;
}

/** Start JobWorkers according to the physical plan. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private static boolean isBasic(Object value) {
public byte[] newInstance(byte[] classNameBytes) {
String className = (String) serializer.deserialize(classNameBytes);
try {
Class<?> clz = Class.forName(className, true, this.getClass().getClassLoader());
Class<?> clz = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
Object instance = clz.newInstance();
referenceMap.put(getReferenceId(instance), instance);
return serializer.serialize(getReferenceId(instance));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ class BundleMeta {

// kMessageBundleHeaderSize + kUniqueIDSize:
// magicNum(4b) + bundleTs(8b) + lastMessageId(8b) + messageListSize(4b)
// + bundleType(4b) + rawBundleSize(4b) + channelID(20b)
static final int LENGTH = 4 + 8 + 8 + 4 + 4 + 4 + 20;
// + bundleType(4b) + rawBundleSize(4b) + channelID
static final int LENGTH = 4 + 8 + 8 + 4 + 4 + 4 + ChannelId.ID_LENGTH;
private int magicNum;
private long bundleTs;
private long lastMessageId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.io.BaseEncoding;
import io.ray.api.id.ObjectId;
import java.lang.ref.Reference;
import java.nio.ByteBuffer;
import java.util.Random;
Expand All @@ -16,7 +17,7 @@
*/
public class ChannelId {

public static final int ID_LENGTH = 20;
public static final int ID_LENGTH = ObjectId.LENGTH;
private static final FinalizableReferenceQueue REFERENCE_QUEUE = new FinalizableReferenceQueue();
// This ensures that the FinalizablePhantomReference itself is not garbage-collected.
private static final Set<Reference<?>> references = Sets.newConcurrentHashSet();
Expand Down Expand Up @@ -82,15 +83,15 @@ public static String genRandomIdStr() {
}

/**
* Generate channel name, which will be 20 character
* Generate channel name, which will be {@link ChannelId#ID_LENGTH} character
*
* @param fromTaskId upstream task id
* @param toTaskId downstream task id Returns channel name
*/
public static String genIdStr(int fromTaskId, int toTaskId, long ts) {
/*
| Head | Timestamp | Empty | From | To |
| 8 bytes | 4bytes | 4bytes| 2bytes| 2bytes |
| Head | Timestamp | Empty | From | To | padding |
| 8 bytes | 4bytes | 4bytes| 2bytes| 2bytes | |
*/
Preconditions.checkArgument(
fromTaskId < Short.MAX_VALUE,
Expand All @@ -99,7 +100,7 @@ public static String genIdStr(int fromTaskId, int toTaskId, long ts) {
Short.MAX_VALUE);
Preconditions.checkArgument(
toTaskId < Short.MAX_VALUE, "toTaskId %s is larger than %s", fromTaskId, Short.MAX_VALUE);
byte[] channelName = new byte[20];
byte[] channelName = new byte[ID_LENGTH];

for (int i = 11; i >= 8; i--) {
channelName[i] = (byte) (ts & 0xff);
Expand Down
2 changes: 1 addition & 1 deletion streaming/python/includes/transfer.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ cdef c_vector[CObjectID] bytes_list_to_qid_vec(list py_queue_ids) except *:
c_string q_id_data
for q_id in py_queue_ids:
q_id_data = q_id
assert q_id_data.size() == CObjectID.Size()
assert q_id_data.size() == CObjectID.Size(), f"{q_id_data.size()}, {CObjectID.Size()}"
obj_id = CObjectID.FromBinary(q_id_data)
queue_id_vec.push_back(obj_id)
return queue_id_vec
Expand Down
6 changes: 3 additions & 3 deletions streaming/python/runtime/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from ray._raylet import PythonFunctionDescriptor
from ray._raylet import Language

CHANNEL_ID_LEN = 20
CHANNEL_ID_LEN = ray.ObjectID.nil().size()
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -58,8 +58,8 @@ def gen_random_id():

@staticmethod
def gen_id(from_index, to_index, ts):
"""Generate channel id, which is 20 character"""
channel_id = bytearray(20)
"""Generate channel id, which is `CHANNEL_ID_LEN` character"""
channel_id = bytearray(CHANNEL_ID_LEN)
for i in range(11, 7, -1):
channel_id[i] = ts & 0xff
ts >>= 8
Expand Down
8 changes: 1 addition & 7 deletions streaming/python/tests/test_hybrid_stream.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import json
import os
import subprocess
import sys

import ray
from ray.streaming import StreamingContext
Expand Down Expand Up @@ -31,12 +29,8 @@ def test_hybrid_stream():
"../../../bazel-bin/streaming/java/all_streaming_tests_deploy.jar")
jar_path = os.path.abspath(jar_path)
print("jar_path", jar_path)
java_worker_options = json.dumps(["-classpath", jar_path])
print("java_worker_options", java_worker_options)
assert not ray.is_initialized()
ray.init(
job_config=ray.job_config.JobConfig(code_search_path=sys.path),
_java_worker_options=java_worker_options)
ray.init(job_config=ray.job_config.JobConfig(code_search_path=[jar_path]))

sink_file = "/tmp/ray_streaming_test_hybrid_stream.txt"
if os.path.exists(sink_file):
Expand Down

0 comments on commit 33089c4

Please sign in to comment.