Skip to content

Commit

Permalink
fix: function update and related tests are broken (apache#2755)
Browse files Browse the repository at this point in the history
* fix: function update is broken since assignments of updated functions are not propagated

* remove test code
  • Loading branch information
jerrypeng authored and srkukarni committed Oct 10, 2018
1 parent 8d396c3 commit 6d6155e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ public void invokeScheduler() {

if (!assignment.getInstance().equals(instance)) {
functionMap.put(fullyQualifiedInstanceId, assignment.toBuilder().setInstance(instance).build());
publishNewAssignment(assignment.toBuilder().setInstance(instance).build().toBuilder().build(), false);
}
}
if (functionMap.isEmpty()) {
Expand Down Expand Up @@ -222,7 +223,7 @@ public void invokeScheduler() {

List<Assignment> assignments = this.scheduler.schedule(unassignedInstances.getLeft(), currentAssignments, currentMembership);
assignments.addAll(unassignedInstances.getRight());

if (log.isDebugEnabled()) {
log.debug("New assignments computed: {}", assignments);
}
Expand Down Expand Up @@ -251,7 +252,7 @@ private void publishNewAssignment(Assignment assignment, boolean deleted) {
try {
String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
// publish empty message with instance-id key so, compactor can delete and skip delivery of this instance-id
// message
// message
producer.newMessage().key(fullyQualifiedInstanceId)
.value(deleted ? "".getBytes() : assignment.toByteArray()).sendAsync().get();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

@Slf4j
Expand Down Expand Up @@ -400,6 +401,9 @@ public void testScalingUp() throws Exception {
.build();
Assert.assertEquals(assignments, assignment2);

// updating assignments
currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);

// scale up

Function.FunctionMetaData function2Scaled = Function.FunctionMetaData.newBuilder()
Expand Down Expand Up @@ -510,7 +514,6 @@ public void testScalingDown() throws Exception {
}
});


Function.Assignment assignment2_1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
Expand All @@ -531,6 +534,11 @@ public void testScalingDown() throws Exception {
assertTrue(allAssignments.contains(assignment2_2));
assertTrue(allAssignments.contains(assignment2_3));

// updating assignments
currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3);

// scale down

Function.FunctionMetaData function2Scaled = Function.FunctionMetaData.newBuilder()
Expand All @@ -551,11 +559,9 @@ public void testScalingDown() throws Exception {
callSchedule();

invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
Assert.assertEquals(invocations.size(), 4);
Assert.assertEquals(invocations.size(), 6);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
send = (byte[]) invocations.get(0).getRawArguments()[0];
assignments = Assignment.parseFrom(send);

Set<Assignment> allAssignments2 = Sets.newHashSet();
invocations.forEach(invocation -> {
Expand Down Expand Up @@ -671,15 +677,6 @@ public void testUpdate() throws Exception {

callSchedule();

List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
Assert.assertEquals(invocations.size(), 3);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
byte[] send = (byte[]) invocations.get(0).getRawArguments()[0];
Assignment assignments = Assignment.parseFrom(send);

log.info("assignmentsUpdate: {}", assignments);

Function.Assignment assignment2_1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1")
.setInstance(Function.Instance.newBuilder()
Expand All @@ -696,12 +693,10 @@ public void testUpdate() throws Exception {
.setFunctionMetaData(function2).setInstanceId(2).build())
.build();

invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync"));
Assert.assertEquals(invocations.size(), 3);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
send = (byte[]) invocations.get(0).getRawArguments()[0];
assignments = Assignment.parseFrom(send);

Set<Assignment> allAssignments = Sets.newHashSet();
invocations.forEach(invocation -> {
Expand All @@ -712,11 +707,17 @@ public void testUpdate() throws Exception {
}
});

assertEquals(allAssignments.size(), 3);
assertTrue(allAssignments.contains(assignment2_1));
assertTrue(allAssignments.contains(assignment2_2));
assertTrue(allAssignments.contains(assignment2_3));

// scale down
// updating assignments
currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
currentAssignments.get("worker-1").put(Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3);

// update field

Function.FunctionMetaData function2Updated = Function.FunctionMetaData.newBuilder()
.setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("/foo/bar2"))
Expand Down Expand Up @@ -750,8 +751,6 @@ public void testUpdate() throws Exception {
Assert.assertEquals(invocations.size(), 6);
invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value",
Object.class));
send = (byte[]) invocations.get(0).getRawArguments()[0];
assignments = Assignment.parseFrom(send);

Set<Assignment> allAssignments2 = Sets.newHashSet();
invocations.forEach(invocation -> {
Expand Down

0 comments on commit 6d6155e

Please sign in to comment.