Skip to content

Commit

Permalink
Update test schema for GC
Browse files Browse the repository at this point in the history
  • Loading branch information
thesiddharth committed Jul 9, 2015
1 parent ddb385d commit b3deab3
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2009-2015 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.linkedin.norbert
package network
package netty

import java.util.concurrent._

import org.specs.util.WaitFor
import com.linkedin.norbert.cluster._
import com.linkedin.norbert.network.common.SampleMessage
import org.specs.SpecificationWithJUnit
import org.specs.mock.Mockito

/**
* @author: sishah
* @date: 07/08/15
* @version: 1.0
*/
class GcAwareNettyNetworkServerSpec extends SpecificationWithJUnit with Mockito with SampleMessage with WaitFor {

val cycleTime = 6000
val slotTime = 2000
val slaTime = 1000

val goodGcParams = new GcParamWrapper(slaTime, cycleTime, slotTime)

val networkConfig = spy(new NetworkServerConfig)
networkConfig.clusterClient = mock[ClusterClient]
networkConfig.clusterClient.clientName returns Some("Test")
networkConfig.clusterClient.serviceName returns "Test"
networkConfig.gcParams = goodGcParams
networkConfig.requestTimeoutMillis = 2000L
networkConfig.requestThreadCorePoolSize = 1
networkConfig.requestThreadMaxPoolSize = 1
networkConfig.requestThreadKeepAliveTimeSecs = 2

val networkServer = spy(new NettyNetworkServer(networkConfig))

val node0 = Node(0, "", false,Set.empty,None,None,Some(0))
val node1 = Node(1, "", false,Set.empty,None,None,Some(1))
val node2 = Node(2, "", false,Set.empty,None,None,Some(0))

val listenerKey: ClusterListenerKey = ClusterListenerKey(1)

networkServer.clusterClient.nodeWithId(1) returns Some(node0)
networkServer.clusterClient.addListener(any[ClusterListener]) returns listenerKey

"NetworkServer" should {
doAfter {
networkServer.shutdown
}

"have a valid GC Thread " in {

networkServer.gcThread must be_!=(None)

}

"schedule a new recurring GC event" in {

while(System.currentTimeMillis()%cycleTime != 0){}

val timeTillNextGC = networkServer.timeTillNextGC(node0.offset.get)
verifyDelay(timeTillNextGC, cycleTime) must beTrue

networkServer.schedulePeriodicGc(node0)
networkServer.gcFuture must eventually(be_!=(None))
networkServer.currOffset must be_==(0)

}

"adapt the GC event to the binding of a new node" in {

networkServer.schedulePeriodicGc(node0)

while(System.currentTimeMillis()%cycleTime != 0){}

networkServer.schedulePeriodicGc(node1)

val timeTillNextGC = networkServer.timeTillNextGC(node1.offset.get)

verifyDelay(timeTillNextGC, slotTime) must beTrue
networkServer.gcFuture must eventually(be_!=(None))
networkServer.currOffset must be_==(1)

}

"Not cancel the initial GC event if the offset of the new node is the same" in {

networkServer.schedulePeriodicGc(node0)

waitFor(100.ms)

networkServer.gcFuture = Some(mock[ScheduledFuture[_]])
networkServer.schedulePeriodicGc(node2)

networkServer.gcFuture must eventually(be_!=(None))
networkServer.currOffset must be_==(0)
there was no(networkServer.gcFuture.get).cancel(true)

}

}

def verifyDelay(obsDelay:Long, expDelay:Long): Boolean = {
//A little arbitrary. 20ms gap.
expDelay-obsDelay <= 20
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ import org.specs.SpecificationWithJUnit

class GcAwarePartitionedLoadBalancerSpec extends SpecificationWithJUnit {

val cycleTime = 6000
val slotTime = 2000

class TestLBF(numPartitions: Int, csr: Boolean = true)
extends GcAwarePartitionedLoadBalancerFactory[Int](numPartitions, 27000, 9000, csr)
extends GcAwarePartitionedLoadBalancerFactory[Int](numPartitions, cycleTime, slotTime, csr)
{
protected def calculateHash(id: Int) = HashFunctions.fnv(BigInt(id).toByteArray)

Expand Down Expand Up @@ -67,7 +70,7 @@ class GcAwarePartitionedLoadBalancerSpec extends SpecificationWithJUnit {

val nodes = Set(node1, node2, node3, node4, node5, node6)

"Set cover load balancer" should {
"Set cover GC-aware load balancer" should {
"nodesForPartitions returns nodes cover the input partitioned Ids" in {
val loadbalancer = loadBalancerFactory.newLoadBalancer(toEndpoints(nodes))
val res = loadbalancer.nodesForPartitionedIds(Set(0,1,3,4), Some(0L), Some(0L))
Expand Down Expand Up @@ -111,7 +114,7 @@ class GcAwarePartitionedLoadBalancerSpec extends SpecificationWithJUnit {

"not return a Node which is currently GCing" in {
val loadbalancer = loadBalancerFactory.newLoadBalancer(toEndpoints(nodes))
while(System.currentTimeMillis()%27000 != 0){}
while(System.currentTimeMillis()%cycleTime != 0){}

val possibleNodeSet = scala.collection.mutable.Set(node3,node5)
val res = loadbalancer.nextNode(1210)
Expand All @@ -120,7 +123,7 @@ class GcAwarePartitionedLoadBalancerSpec extends SpecificationWithJUnit {
val res2 = loadbalancer.nextNode(1210)
res2 must beSome[Node].which(possibleNodeSet must contain(_))

while(System.currentTimeMillis()%9000 != 0){}
while(System.currentTimeMillis()%slotTime != 0){}
val possibleNodeSet2 = scala.collection.mutable.Set(node2,node6)
val res3 = loadbalancer.nextNode(1318)
res3 must beSome[Node].which(possibleNodeSet2 must contain(_))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 2009-2015 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

/**
* @author: sishah
* @date: 07/07/15
* @version: 1.0
*/
package com.linkedin.norbert
package network
package server

import com.linkedin.norbert.cluster.Node
import com.linkedin.norbert.network.netty.GcParamWrapper

class GcAwareMessageExecutorSpec extends MessageExecutorSpec {

val cycleTime = 6000
val slotTime = 2000
val slaTime = 1000

val goodGcParams = new GcParamWrapper(slaTime, cycleTime, slotTime)

val badNode = Node(1, "localhost:31313", true)
def noNode:Node = {throw new NetworkServerNotBoundException}
val goodNode = Node(1, "localhost:31313", true, Set.empty, None, None, Some(0))

var nodeFlag = "badNode"

def getNode = {

Some (
nodeFlag match {
case "badNode" => badNode
case "noNode" => noNode
case "goodNode" => goodNode
}
)

}

// MessageExecutorSpec by default runs all tests with no GcParams and no defined node.
// This spec overrides the message executor to have valid GcParams, and a node based on a flag.
override val messageExecutor = new ThreadPoolMessageExecutor(None, "service",
messageHandlerRegistry,
filters,
1000L,
1,
1,
1,
100,
1000L,
-1,
goodGcParams,
getNode
)

"GcAwareMessageExecutor" should {

doAfter {
messageExecutor.shutdown
}

//No node is bound
"successfully respond (with no bound node) in" in {
nodeFlag = "noNode"

generalExecutorTests
}

//A node is connected, but it receives the request in its GC period
"throw a GC Exception (with a GC-ing bound node) in" in {

nodeFlag = "goodNode"

while(System.currentTimeMillis()%cycleTime != 0){}

messageHandlerRegistry.handlerFor(request) returns returnHandler _

messageExecutor.executeMessage(request, Some((either: Either[Exception, Ping]) => null:Unit), None) must throwA[GcException]

waitFor(50.ms)

there was no(messageHandlerRegistry).handlerFor(request)
}

//These tests occur outside the GC period
"successfully respond (with a not-currently-GCing node) in" in {

nodeFlag = "goodNode"

while(System.currentTimeMillis()%cycleTime != 0){}
waitFor((slotTime + 10).ms)

generalExecutorTests
}

}

}
Loading

0 comments on commit b3deab3

Please sign in to comment.