diff --git a/cluster/src/main/java/com/linkedin/norbert/protos/NorbertExampleProtos.java b/cluster/src/main/java/com/linkedin/norbert/protos/NorbertExampleProtos.java index 0ebb5e81..79603582 100644 --- a/cluster/src/main/java/com/linkedin/norbert/protos/NorbertExampleProtos.java +++ b/cluster/src/main/java/com/linkedin/norbert/protos/NorbertExampleProtos.java @@ -94,10 +94,8 @@ public int getSerializedSize() { return size; } - private static final long serialVersionUID = 1L; @java.lang.Override - protected java.lang.Object writeReplace() - throws java.io.ObjectStreamException { + protected Object writeReplace() throws java.io.ObjectStreamException { return super.writeReplace(); } @@ -440,10 +438,8 @@ public int getSerializedSize() { return size; } - private static final long serialVersionUID = 1L; @java.lang.Override - protected java.lang.Object writeReplace() - throws java.io.ObjectStreamException { + protected Object writeReplace() throws java.io.ObjectStreamException { return super.writeReplace(); } diff --git a/cluster/src/main/java/com/linkedin/norbert/protos/NorbertProtos.java b/cluster/src/main/java/com/linkedin/norbert/protos/NorbertProtos.java index 5fc3e292..e3500a80 100644 --- a/cluster/src/main/java/com/linkedin/norbert/protos/NorbertProtos.java +++ b/cluster/src/main/java/com/linkedin/norbert/protos/NorbertProtos.java @@ -1594,10 +1594,6 @@ public interface NodeOrBuilder // optional int64 persistentCapability = 4; boolean hasPersistentCapability(); long getPersistentCapability(); - - // optional int32 altPort = 5; - boolean hasAltPort(); - int getAltPort(); } public static final class Node extends com.google.protobuf.GeneratedMessage @@ -1694,22 +1690,11 @@ public long getPersistentCapability() { return persistentCapability_; } - // optional int32 altPort = 5; - public static final int ALTPORT_FIELD_NUMBER = 5; - private int altPort_; - public boolean hasAltPort() { - return ((bitField0_ & 0x00000008) == 0x00000008); - } - public int getAltPort() { - return altPort_; - } - private void initFields() { id_ = 0; url_ = ""; partition_ = java.util.Collections.emptyList();; persistentCapability_ = 0L; - altPort_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1743,9 +1728,6 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeInt64(4, persistentCapability_); } - if (((bitField0_ & 0x00000008) == 0x00000008)) { - output.writeInt32(5, altPort_); - } getUnknownFields().writeTo(output); } @@ -1776,10 +1758,6 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeInt64Size(4, persistentCapability_); } - if (((bitField0_ & 0x00000008) == 0x00000008)) { - size += com.google.protobuf.CodedOutputStream - .computeInt32Size(5, altPort_); - } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1912,8 +1890,6 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000004); persistentCapability_ = 0L; bitField0_ = (bitField0_ & ~0x00000008); - altPort_ = 0; - bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -1969,10 +1945,6 @@ public com.linkedin.norbert.protos.NorbertProtos.Node buildPartial() { to_bitField0_ |= 0x00000004; } result.persistentCapability_ = persistentCapability_; - if (((from_bitField0_ & 0x00000010) == 0x00000010)) { - to_bitField0_ |= 0x00000008; - } - result.altPort_ = altPort_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -2008,9 +1980,6 @@ public Builder mergeFrom(com.linkedin.norbert.protos.NorbertProtos.Node other) { if (other.hasPersistentCapability()) { setPersistentCapability(other.getPersistentCapability()); } - if (other.hasAltPort()) { - setAltPort(other.getAltPort()); - } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -2079,11 +2048,6 @@ public Builder mergeFrom( persistentCapability_ = input.readInt64(); break; } - case 40: { - bitField0_ |= 0x00000010; - altPort_ = input.readInt32(); - break; - } } } } @@ -2213,27 +2177,6 @@ public Builder clearPersistentCapability() { return this; } - // optional int32 altPort = 5; - private int altPort_ ; - public boolean hasAltPort() { - return ((bitField0_ & 0x00000010) == 0x00000010); - } - public int getAltPort() { - return altPort_; - } - public Builder setAltPort(int value) { - bitField0_ |= 0x00000010; - altPort_ = value; - onChanged(); - return this; - } - public Builder clearAltPort() { - bitField0_ = (bitField0_ & ~0x00000010); - altPort_ = 0; - onChanged(); - return this; - } - // @@protoc_insertion_point(builder_scope:norbert.Node) } @@ -2277,11 +2220,10 @@ public Builder clearAltPort() { "\r \001(\t\022.\n\006header\030\016 \003(\0132\036.norbert.NorbertM" + "essage.Header\032$\n\006Header\022\013\n\003key\030\001 \002(\t\022\r\n\005" + "value\030\002 \001(\t\"*\n\006Status\022\006\n\002OK\020\000\022\t\n\005ERROR\020\001" + - "\022\r\n\tHEAVYLOAD\020\002\"a\n\004Node\022\n\n\002id\030\001 \002(\005\022\013\n\003u" + + "\022\r\n\tHEAVYLOAD\020\002\"P\n\004Node\022\n\n\002id\030\001 \002(\005\022\013\n\003u" + "rl\030\002 \002(\t\022\021\n\tpartition\030\003 \003(\005\022\034\n\024persisten", - "tCapability\030\004 \001(\003\022\017\n\007altPort\030\005 \001(\005B.\n\033co" + - "m.linkedin.norbert.protosB\rNorbertProtos" + - "H\001" + "tCapability\030\004 \001(\003B.\n\033com.linkedin.norber" + + "t.protosB\rNorbertProtosH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -2309,7 +2251,7 @@ public com.google.protobuf.ExtensionRegistry assignDescriptors( internal_static_norbert_Node_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_norbert_Node_descriptor, - new java.lang.String[] { "Id", "Url", "Partition", "PersistentCapability", "AltPort", }, + new java.lang.String[] { "Id", "Url", "Partition", "PersistentCapability", }, com.linkedin.norbert.protos.NorbertProtos.Node.class, com.linkedin.norbert.protos.NorbertProtos.Node.Builder.class); return null; diff --git a/cluster/src/main/protobuf/norbert.proto b/cluster/src/main/protobuf/norbert.proto index 3ef7a5fb..04bd1d3d 100644 --- a/cluster/src/main/protobuf/norbert.proto +++ b/cluster/src/main/protobuf/norbert.proto @@ -31,5 +31,4 @@ message Node { required string url = 2; repeated int32 partition = 3; optional int64 persistentCapability = 4; - optional int32 altPort = 5; } diff --git a/cluster/src/main/scala/com/linkedin/norbert/cluster/ClusterClient.scala b/cluster/src/main/scala/com/linkedin/norbert/cluster/ClusterClient.scala index 18e38943..59b057b5 100644 --- a/cluster/src/main/scala/com/linkedin/norbert/cluster/ClusterClient.scala +++ b/cluster/src/main/scala/com/linkedin/norbert/cluster/ClusterClient.scala @@ -202,22 +202,34 @@ trait ClusterClient extends Logging { node } + + /** + * Adds a node to the cluster metadata. + * + * @param nodeId the id of the node to add + * @param url the url to be used to send requests to the node + * + * @return the newly added node + * @throws ClusterDisconnectedException thrown if the cluster is disconnected when the method is called + * @throws InvalidNodeException thrown if there is an error adding the new node to the cluster metadata + */ + def addNode(nodeId: Int, url: String): Node = addNode(nodeId, url, Set[Int]()) + /** * Adds a node to the cluster metadata. * * @param nodeId the id of the node to add * @param url the url to be used to send requests to the node - * @param partitions the partitions for which the node can process requests. These are optional. - * @param altPort the secondary port for use with the second channel. This is optional. + * @param partitions the partitions for which the node can process requests * * @return the newly added node * @throws ClusterDisconnectedException thrown if the cluster is disconnected when the method is called * @throws InvalidNodeException thrown if there is an error adding the new node to the cluster metadata */ - def addNode(nodeId: Int, url: String, partitions: Set[Int] = Set[Int](), altPort: Option[Int] = None): Node = doIfConnected { + def addNode(nodeId: Int, url: String, partitions: Set[Int]): Node = doIfConnected { if (url == null) throw new NullPointerException - val node = Node(nodeId, url, false, partitions, altPort = altPort) + val node = Node(nodeId, url, false, partitions) clusterManager !? ClusterManagerMessages.AddNode(node) match { case ClusterManagerMessages.ClusterManagerResponse(Some(ex)) => throw ex case ClusterManagerMessages.ClusterManagerResponse(None) => node diff --git a/cluster/src/main/scala/com/linkedin/norbert/cluster/Node.scala b/cluster/src/main/scala/com/linkedin/norbert/cluster/Node.scala index c0c362e9..7474e93e 100644 --- a/cluster/src/main/scala/com/linkedin/norbert/cluster/Node.scala +++ b/cluster/src/main/scala/com/linkedin/norbert/cluster/Node.scala @@ -42,9 +42,9 @@ object Node { val partitions = node.getPartitionList.asInstanceOf[java.util.List[Int]].foldLeft(Set[Int]()) { (set, i) => set + i } if(!node.hasPersistentCapability) - Node(node.getId, node.getUrl, available, partitions, capability, None, Some(node.getAltPort)) + Node(node.getId, node.getUrl, available, partitions, capability, None) else - Node(node.getId, node.getUrl, available, partitions, capability, Some(node.getPersistentCapability), Some(node.getAltPort)) + Node(node.getId, node.getUrl, available, partitions, capability, Some(node.getPersistentCapability)) } catch { case ex: InvalidProtocolBufferException => throw new InvalidNodeException("Error deserializing node", ex) } @@ -63,12 +63,6 @@ object Node { case None => builder.setId(node.id).setUrl(node.url) case Some(x) => builder.setId(node.id).setUrl(node.url).setPersistentCapability(x) } - node.altPort match { - // Only set altPort if we actually have an altPort. - case None => // Do nothing! - case Some(x) => builder.setAltPort(x) - } - node.partitionIds.foreach(builder.addPartition(_)) builder.build.toByteArray @@ -84,7 +78,7 @@ object Node { * @param partitions the partitions for which the node can handle requests * @param capability the 64 bits Long representing up to 64 node capabilities */ -final case class Node(id: Int, url: String, available: Boolean, partitionIds: Set[Int] = Set.empty, capability: Option[Long] = None, persistentCapability: Option[Long] = None, altPort: Option[Int] = None) { +final case class Node(id: Int, url: String, available: Boolean, partitionIds: Set[Int] = Set.empty, capability: Option[Long] = None, persistentCapability: Option[Long] = None) { if (url == null) throw new NullPointerException("url must not be null") if (partitionIds == null) throw new NullPointerException("partitions must not be null") @@ -95,7 +89,7 @@ final case class Node(id: Int, url: String, available: Boolean, partitionIds: Se case _ => false } - override def toString = "Node(%d,%s,[%s],%b,0x%08X,0x%08X,%d)".format(id, url, partitionIds.mkString(","), available, if (capability.isEmpty) 0L else capability.get, if (persistentCapability.isEmpty) 0L else persistentCapability.get, if (altPort.isEmpty) 0 else altPort.get) + override def toString = "Node(%d,%s,[%s],%b,0x%08X,0x%08X)".format(id, url, partitionIds.mkString(","), available, if (capability.isEmpty) 0L else capability.get, if (persistentCapability.isEmpty) 0L else persistentCapability.get) def isCapableOf(c: Option[Long], pc: Option[Long]) : Boolean = { val capabilityMatch: Boolean = (capability, c) match { diff --git a/examples/src/main/java/com/linkedin/norbert/javacompat/network/Ping.java b/examples/src/main/java/com/linkedin/norbert/javacompat/network/Ping.java index 2f12f01c..d6d5946d 100644 --- a/examples/src/main/java/com/linkedin/norbert/javacompat/network/Ping.java +++ b/examples/src/main/java/com/linkedin/norbert/javacompat/network/Ping.java @@ -36,8 +36,6 @@ public String responseName() { return "pong"; } - public int priority() {return 0;} - public byte[] requestToBytes(Ping message) { return NorbertExampleProtos.Ping.newBuilder().setTimestamp(message.timestamp).build().toByteArray(); } diff --git a/examples/src/main/scala/com/linkedin/norbert/network/NorbertNetworkClientMain.scala b/examples/src/main/scala/com/linkedin/norbert/network/NorbertNetworkClientMain.scala index 28c4720e..bf53057d 100644 --- a/examples/src/main/scala/com/linkedin/norbert/network/NorbertNetworkClientMain.scala +++ b/examples/src/main/scala/com/linkedin/norbert/network/NorbertNetworkClientMain.scala @@ -74,15 +74,11 @@ object NorbertNetworkClientMain { cc.addNode(nodeId.toInt, url) println("Joined Norbert cluster") - case nodeId :: url :: altPort :: Nil => - cc.addNode(nodeId.toInt, url, altPort = Some(altPort.toInt)) + case nodeId :: url :: partitions => + cc.addNode(nodeId.toInt, url, Set() ++ partitions.map(_.toInt)) println("Joined Norbert cluster") - case nodeId :: url :: altPort :: partitions => - cc.addNode(nodeId.toInt, url, Set() ++ partitions.map(_.toInt), Some(altPort.toInt)) - println("Joined Norbert cluster") - - case _ => println("Error: Invalid syntax: join nodeId url altPort partition1 partition2...") + case _ => println("Error: Invalid syntax: join nodeId url partition1 partition2...") } println("Joined Norbert cluster") @@ -114,19 +110,6 @@ object NorbertNetworkClientMain { } } - case "altPing" => - if (args.length < 1) { - println("Invalid syntax: altPing nodeId") - } else { - val node = cc.nodeWithId(args.head.toInt) - node match { - case Some(n) => - nc.sendAltMessageToNode(Ping(System.currentTimeMillis), n) - - case None => println("No node with id: %d".format(args.head.toInt)) - } - } - case "exit" => System.exit(0) case "quit" => System.exit(0) diff --git a/java-cluster/src/main/java/com/linkedin/norbert/javacompat/cluster/Node.java b/java-cluster/src/main/java/com/linkedin/norbert/javacompat/cluster/Node.java index b84f5cce..f975fb53 100644 --- a/java-cluster/src/main/java/com/linkedin/norbert/javacompat/cluster/Node.java +++ b/java-cluster/src/main/java/com/linkedin/norbert/javacompat/cluster/Node.java @@ -26,5 +26,4 @@ public interface Node { boolean isCapableOf(Long c, Long pc); Long getCapability(); Long getPersistentCapability(); - Integer getAltPort(); } diff --git a/java-cluster/src/main/scala/com/linkedin/norbert/javacompat/cluster/JavaNode.scala b/java-cluster/src/main/scala/com/linkedin/norbert/javacompat/cluster/JavaNode.scala index b18faca5..a8d23a4e 100644 --- a/java-cluster/src/main/scala/com/linkedin/norbert/javacompat/cluster/JavaNode.scala +++ b/java-cluster/src/main/scala/com/linkedin/norbert/javacompat/cluster/JavaNode.scala @@ -27,12 +27,12 @@ object JavaNode { if (node.partitionIds != null) { node.partitionIds.foreach {id => s.add(id)} } - JavaNode(node.id, node.url, node.available, s, node.capability, node.persistentCapability, node.altPort) + JavaNode(node.id, node.url, node.available, s, node.capability, node.persistentCapability) } } } -case class JavaNode(@BeanProperty id: Int, @BeanProperty url: String, @BeanProperty available: Boolean, @BeanProperty partitionIds: java.util.Set[java.lang.Integer], capability: Option[Long] = None, persistentCapability: Option[Long] = None, altPort: Option[Int] = None) extends Node { +case class JavaNode(@BeanProperty id: Int, @BeanProperty url: String, @BeanProperty available: Boolean, @BeanProperty partitionIds: java.util.Set[java.lang.Integer], capability: Option[Long] = None, persistentCapability: Option[Long] = None) extends Node { def isAvailable = available def isCapableOf(c: java.lang.Long) : Boolean = isCapableOf(c, 0L) def isCapableOf(c: java.lang.Long, pc: java.lang.Long) : Boolean = @@ -52,9 +52,4 @@ case class JavaNode(@BeanProperty id: Int, @BeanProperty url: String, @BeanPrope case Some(nc) => nc.longValue case None => null } - def getAltPort() : java.lang.Integer = - altPort match { - case Some(nc) => nc.intValue - case None => null - } } diff --git a/java-cluster/src/main/scala/com/linkedin/norbert/javacompat/package.scala b/java-cluster/src/main/scala/com/linkedin/norbert/javacompat/package.scala index f4865b7f..7bbdd88b 100644 --- a/java-cluster/src/main/scala/com/linkedin/norbert/javacompat/package.scala +++ b/java-cluster/src/main/scala/com/linkedin/norbert/javacompat/package.scala @@ -15,8 +15,6 @@ */ package com.linkedin.norbert -import java.util - import javacompat.cluster.{JavaNode, Node => JNode} import com.linkedin.norbert.cluster.{Node => SNode} @@ -55,9 +53,8 @@ package object javacompat { } SNode(node.getId, node.getUrl, node.isAvailable, partitionIds, - if(node.getCapability == null) None else Some(node.getCapability.longValue), - if(node.getPersistentCapability == null) None else Some(node.getPersistentCapability.longValue), - Some(node.getAltPort.intValue)) + if(node.getCapability == null) None else Some(node.getCapability.longValue), + if(node.getPersistentCapability == null) None else Some(node.getPersistentCapability.longValue)) } } diff --git a/java-network/src/main/java/com/linkedin/norbert/javacompat/network/NetworkClient.java b/java-network/src/main/java/com/linkedin/norbert/javacompat/network/NetworkClient.java index e101ca01..a10fb95c 100644 --- a/java-network/src/main/java/com/linkedin/norbert/javacompat/network/NetworkClient.java +++ b/java-network/src/main/java/com/linkedin/norbert/javacompat/network/NetworkClient.java @@ -15,18 +15,15 @@ */ package com.linkedin.norbert.javacompat.network; +import java.util.concurrent.Future; + import com.linkedin.norbert.cluster.ClusterDisconnectedException; import com.linkedin.norbert.cluster.InvalidClusterException; import com.linkedin.norbert.network.NoNodesAvailableException; import com.linkedin.norbert.network.Serializer; -import com.linkedin.norbert.network.javaobjects.*; - -import java.util.concurrent.Future; public interface NetworkClient extends BaseNetworkClient { - - - /** + /** * Sends a request to a node in the cluster. The NetworkClient defers to the current * LoadBalancer to decide which Node the request should be sent to. * @@ -47,21 +44,4 @@ public interface NetworkClient extends BaseNetworkClient { Future sendRequest(RequestMsg request, Serializer serializer, int maxRetry, long capability) throws InvalidClusterException, NoNodesAvailableException, ClusterDisconnectedException; Future sendRequest(RequestMsg request, Serializer serializer, int maxRetry, long capability, long persistentCapability) throws InvalidClusterException, NoNodesAvailableException, ClusterDisconnectedException; - - /** - * Sends a request to a node in the cluster. The NetworkClient defers to the current - * LoadBalancer to decide which Node the request should be sent to. - * - * @param requestSpecification the RequestSpecification object specifying the message - * @param nodeSpecification the NodeSpecification object specifying the number of replicas and clusterId - * @param retrySpecification the RetrySpecification object specifying the retry strategy and callback - * - * @return void - * @throws InvalidClusterException thrown if the cluster is currently in an invalid state - * @throws NoNodesAvailableException thrown if the LoadBalancer was unable to provide a Node - * to send the request to - * @throws ClusterDisconnectedException thrown if the cluster is not connected when the method is called - */ - void sendRequest(RequestSpecification requestSpecification,NodeSpecification nodeSpecification, RetrySpecification retrySpecification, - Serializer serializer) throws InvalidClusterException, NoNodesAvailableException, ClusterDisconnectedException; } diff --git a/java-network/src/main/java/com/linkedin/norbert/javacompat/network/PartitionedNetworkClient.java b/java-network/src/main/java/com/linkedin/norbert/javacompat/network/PartitionedNetworkClient.java index 0dfe3c6e..eba4609f 100644 --- a/java-network/src/main/java/com/linkedin/norbert/javacompat/network/PartitionedNetworkClient.java +++ b/java-network/src/main/java/com/linkedin/norbert/javacompat/network/PartitionedNetworkClient.java @@ -15,36 +15,17 @@ */ package com.linkedin.norbert.javacompat.network; +import java.util.Set; +import java.util.concurrent.Future; + import com.linkedin.norbert.cluster.ClusterDisconnectedException; import com.linkedin.norbert.cluster.InvalidClusterException; +import com.linkedin.norbert.javacompat.cluster.Node; import com.linkedin.norbert.network.NoNodesAvailableException; import com.linkedin.norbert.network.ResponseIterator; import com.linkedin.norbert.network.Serializer; -import com.linkedin.norbert.network.javaobjects.*; - -import java.util.Set; -import java.util.concurrent.Future; public interface PartitionedNetworkClient extends BaseNetworkClient { - - /** - * Sends a Message to the specified PartitionedId. The PartitionedNetworkClient - * will interact with the current PartitionedLoadBalancer to calculate which Node the message - * must be sent to. This method is asynchronous and will return immediately. - * - * @param requestSpecification the RequestSpecification object specifying the message - * @param nodeSpecification the NodeSpecification object specifying the number of replicas and clusterId - * @param retrySpecification the RetrySpecification object specifying the retry strategy and callback - * - * @return void - * @throws InvalidClusterException thrown if the cluster is currently in an invalid state - * @throws NoNodesAvailableException thrown if the PartitionedLoadBalancer was unable to provide a Node - * to send the request to - * @throws ClusterDisconnectedException thrown if the PartitionedNetworkClient is not connected to the cluster - */ - void sendRequest(PartitionedRequestSpecification requestSpecification, PartitionedNodeSpecification nodeSpecification, PartitionedRetrySpecification retrySpecification, Serializer serializer) throws InvalidClusterException, NoNodesAvailableException, ClusterDisconnectedException; - - /** * Sends a Message to the specified PartitionedId. The PartitionedNetworkClient * will interact with the current PartitionedLoadBalancer to calculate which Node the message diff --git a/java-network/src/main/scala/com/linkedin/norbert/javacompat/network/BaseNettyNetworkClient.scala b/java-network/src/main/scala/com/linkedin/norbert/javacompat/network/BaseNettyNetworkClient.scala index beec8ca0..2cd58784 100644 --- a/java-network/src/main/scala/com/linkedin/norbert/javacompat/network/BaseNettyNetworkClient.scala +++ b/java-network/src/main/scala/com/linkedin/norbert/javacompat/network/BaseNettyNetworkClient.scala @@ -17,15 +17,14 @@ package com.linkedin.norbert package javacompat package network -import com.linkedin.norbert.EndpointConversions._ +import cluster.{Node, BaseClusterClient} import com.linkedin.norbert.cluster.{Node => SNode} -import com.linkedin.norbert.javacompat.cluster.{BaseClusterClient, Node} -import com.linkedin.norbert.network.client.loadbalancer.{LoadBalancer => SLoadBalancer, LoadBalancerFactory => SLoadBalancerFactory} -import com.linkedin.norbert.network.common.{Endpoint => SEndpoint} -import com.linkedin.norbert.network.partitioned.loadbalancer.{PartitionedLoadBalancer => SPartitionedLoadBalancer, PartitionedLoadBalancerFactory => SPartitionedLoadBalancerFactory} import com.linkedin.norbert.network.{ResponseIterator, Serializer} -import com.linkedin.norbert.network.javaobjects.{NodeSpecification, RequestSpecification, RetrySpecification, PartitionedNodeSpecification, PartitionedRequestSpecification, PartitionedRetrySpecification} +import com.linkedin.norbert.network.client.loadbalancer.{LoadBalancerFactory => SLoadBalancerFactory, LoadBalancer => SLoadBalancer} +import com.linkedin.norbert.network.partitioned.loadbalancer.{PartitionedLoadBalancerFactory => SPartitionedLoadBalancerFactory, PartitionedLoadBalancer => SPartitionedLoadBalancer} +import com.linkedin.norbert.network.common.{Endpoint => SEndpoint} +import EndpointConversions._ abstract class BaseNettyNetworkClient extends BaseNetworkClient { val underlying: com.linkedin.norbert.network.common.BaseNetworkClient @@ -106,9 +105,6 @@ class NettyNetworkClient(config: NetworkClientConfig, loadBalancerFactory: LoadB def sendRequest[RequestMsg, ResponseMsg](requestMsg: RequestMsg, serializer: Serializer[RequestMsg, ResponseMsg], maxRetry: Int, capability: Long, persistentCapability: Long) = underlying.sendRequest(requestMsg, maxRetry, Some(capability), Some(persistentCapability))(serializer, serializer) - def sendRequest[RequestMsg, ResponseMsg](requestSpecification: RequestSpecification[RequestMsg],nodeSpecification: NodeSpecification,retrySpecification: RetrySpecification[ResponseMsg], serializer:Serializer[RequestMsg, ResponseMsg]) = - underlying.sendRequest(requestSpecification, nodeSpecification, retrySpecification)(serializer, serializer) - } class NettyPartitionedNetworkClient[PartitionedId](config: NetworkClientConfig, loadBalancerFactory: PartitionedLoadBalancerFactory[PartitionedId], @@ -133,6 +129,7 @@ class NettyPartitionedNetworkClient[PartitionedId](config: NetworkClientConfig, underlying.sendRequest(ids: Set[PartitionedId], request)(serializer, serializer) def sendRequest[RequestMsg, ResponseMsg](ids: java.util.Set[PartitionedId], requestBuilder: RequestBuilder[PartitionedId, RequestMsg], serializer: Serializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = { + import collection.JavaConversions._ underlying.sendRequest(ids: java.util.Set[PartitionedId], (node: SNode, ids: Set[PartitionedId]) => requestBuilder(node, ids))(serializer, serializer) } @@ -146,9 +143,6 @@ class NettyPartitionedNetworkClient[PartitionedId](config: NetworkClientConfig, } - def sendRequest[RequestMsg, ResponseMsg](requestSpecification: PartitionedRequestSpecification[RequestMsg, PartitionedId], nodeSpecification: PartitionedNodeSpecification[PartitionedId], retrySpecification:PartitionedRetrySpecification[ResponseMsg], serializer:Serializer[RequestMsg, ResponseMsg]) = - underlying.sendRequest(requestSpecification, nodeSpecification, retrySpecification)(serializer, serializer) - def sendRequestToPartitions[RequestMsg, ResponseMsg](id: PartitionedId, partitions: java.util.Set[java.lang.Integer], requestBuilder: RequestBuilder[Integer, RequestMsg], serializer: Serializer[RequestMsg, ResponseMsg]) = { val sPartitions = partitions.foldLeft(Set.empty[Int])(_ + _.intValue()) diff --git a/network/src/main/scala/com/linkedin/norbert/network/NodeSpecification.scala b/network/src/main/scala/com/linkedin/norbert/network/NodeSpecification.scala deleted file mode 100644 index 3946a80f..00000000 --- a/network/src/main/scala/com/linkedin/norbert/network/NodeSpecification.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright 2009-2010 LinkedIn, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.linkedin.norbert.network - -import com.linkedin.norbert.network.javaobjects.{NodeSpecification => JNodeSpecification, PartitionedNodeSpecification => JPartitionedNodeSpecification} - -/** - * Partitioned and non-partitioned NodeSpecification wrapper objects for sendRequest - * - * A NodeSpecification object is used to store the necessary information to specify a node. - * For the non-partitioned version this is the capability and persistentCapability. - * The Partitioned version extends the non-partitioned version with numberOfReplicas and clusterId - */ - -/** - * NodeTrait is the trait that the NodeSpecification objects extend. - * @tparam NodeType Either a non-partitioned or partitioned nodeSpec - */ -trait NodeTrait[NodeType] { - var capability: Option[Long] = None - var persistentCapability: Option[Long] = None - var altPort: Option[Int] = None - - def setCapability(cap: Option[Long]): this.type = { - capability = cap - this - } - - def setPersistentCapability(persistentCap: Option[Long]): this.type = { - persistentCapability = persistentCap - this - } - - def setAltPort(port: Option[Int]): this.type = { - altPort = port - this - } - - def build: this.type = { - capability match { - case Some(cap) => this - case None => - persistentCapability match { - case Some(persCap) => throw new IllegalArgumentException("Cannot specify PersistentCapability without Capability") - case None => this - } - } - } -} - - -/** -* Non-Partitioned NodeSpecification. Defines getters for the java interface. -*/ -class NodeSpecification extends NodeTrait[NodeSpecification] with JNodeSpecification { - - // Converts Option[Long] to java.lang.Long - def getCapability(): java.lang.Long = { - capability match { - case Some(cap) => cap - case None => null - } - } - - // Converts Option[Long] to java.lang.Long - def getPersistentCapability: java.lang.Long= { - persistentCapability match { - case Some(pc) => pc - case None => null - } - } - - // Converts Option[Int] to java.lang.Integer - def getAltPort: java.lang.Integer = { - altPort match { - case Some(ap) => ap - case None => null - } - } -} - -/** -* Partitioned NodeSpecification. Defines getters for the java interface. -*/ - -class PartitionedNodeSpecification[PartitionedId](val ids: Set[PartitionedId]) extends NodeTrait[PartitionedNodeSpecification[_]] with JPartitionedNodeSpecification[PartitionedId] { - var numberOfReplicas: Int = 0 - var clusterId: Option[Int] = None - - def setNumberOfReplicas(_numberOfReplicas: Int): PartitionedNodeSpecification[PartitionedId] = { - this.numberOfReplicas = _numberOfReplicas - this - } - - def setClusterId(_clusterId: Option[Int]): PartitionedNodeSpecification[PartitionedId] = { - this.clusterId = _clusterId - this - } - - // Converts Option[Long] to java.lang.Long - def getCapability(): java.lang.Long = { - capability match { - case Some(cap) => cap - case None => null - } - } - - // Converts Option[Long] to java.lang.Long - def getPersistentCapability: java.lang.Long = { - persistentCapability match { - case Some(pc) => pc - case None => null - } - } - - // Converts Option[Int] to java.lang.Integer - def getAltPort: java.lang.Integer = { - altPort match { - case Some(ap) => ap - case None => null - } - } - - // Returns Int that is unboxed to int - def getNumberOfReplicas(): Int = numberOfReplicas - - - // Converts Option[Int] to java.lang.Int - def getClusterId(): java.lang.Integer = { - clusterId match { - case Some(cid) => cid - case None => null - } - } - - // Returns Set[PartitionedId] - def getIds(): Set[PartitionedId] = ids -} - - - - diff --git a/network/src/main/scala/com/linkedin/norbert/network/Request.scala b/network/src/main/scala/com/linkedin/norbert/network/Request.scala index 7b5c3a02..55915f13 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/Request.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/Request.scala @@ -13,90 +13,56 @@ * License for the specific language governing permissions and limitations under * the License. */ -package com.linkedin.norbert.network +package com.linkedin.norbert +package network import java.util.UUID - -import com.linkedin.norbert.cluster.{ClusterException, Node} -import com.linkedin.norbert.network.common.CachedNetworkStatistics - +import cluster.{ClusterException, Node} import scala.collection.mutable.Map +import common.CachedNetworkStatistics -object BaseRequest { +object Request { def apply[RequestMsg, ResponseMsg](message: RequestMsg, node: Node, - inputSerializer: InputSerializer[RequestMsg, ResponseMsg], - outputSerializer: OutputSerializer[RequestMsg, ResponseMsg]): BaseRequest[RequestMsg] = { - new BaseRequest(message, node, inputSerializer, outputSerializer) + inputSerializer: InputSerializer[RequestMsg, ResponseMsg], outputSerializer: OutputSerializer[RequestMsg, ResponseMsg], + callback: Option[Either[Throwable, ResponseMsg] => Unit], retryAttempt: Int = 0): Request[RequestMsg, ResponseMsg] = { + new Request(message, node, inputSerializer, outputSerializer, callback, retryAttempt) } } -class BaseRequest[RequestMsg](val message: RequestMsg, val node: Node, - val inputSerializer: InputSerializer[RequestMsg, _], - val outputSerializer: OutputSerializer[RequestMsg, _]) { +class Request[RequestMsg, ResponseMsg](val message: RequestMsg, val node: Node, + val inputSerializer: InputSerializer[RequestMsg, ResponseMsg], val outputSerializer: OutputSerializer[RequestMsg, ResponseMsg], + val callback: Option[Either[Throwable, ResponseMsg] => Unit], val retryAttempt: Int = 0) { val id = UUID.randomUUID val timestamp = System.currentTimeMillis val headers : Map[String, String] = Map.empty[String, String] - //currently there is an assumption in ClientChannelHandler that only the Request class and derivatives of it can expect responses - //if you extend baseRequest (and not request) with something that expects a response make sure to change that - val expectsResponse = false def name: String = { inputSerializer.requestName } + // serializer def requestBytes: Array[Byte] = outputSerializer.requestToBytes(message) def addHeader(key: String, value: String) = headers += (key -> value) - def startNettyTiming(stats : CachedNetworkStatistics[Node, UUID]) = { - stats.beginNetty(node, id, 0) - } - - override def toString: String = { - "[Request: %s, %s]".format(message, node) - } - def onFailure(exception: Throwable) { - // Nothing to do here! + if(!callback.isEmpty) callback.get(Left(exception)) } - def onSuccess(bytes: Array[Byte]) { - // Nothing to do here! + def endNettyTiming(stats: CachedNetworkStatistics[Node, UUID]) = { + stats.endNetty(node, id) } -} - -object Request { - def apply[RequestMsg, ResponseMsg](message: RequestMsg, node: Node, - inputSerializer: InputSerializer[RequestMsg, ResponseMsg], outputSerializer: OutputSerializer[RequestMsg, ResponseMsg], - callback: Option[Either[Throwable, ResponseMsg] => Unit], retryAttempt: Int = 0): Request[RequestMsg, ResponseMsg] = { - new Request(message, node, inputSerializer, outputSerializer, callback, retryAttempt) - } -} - -class Request[RequestMsg, ResponseMsg](override val message: RequestMsg, override val node: Node, - override val inputSerializer: InputSerializer[RequestMsg, ResponseMsg], override val outputSerializer: OutputSerializer[RequestMsg, ResponseMsg], - val callback: Option[Either[Throwable, ResponseMsg] => Unit], val retryAttempt: Int = 0) - extends BaseRequest[RequestMsg](message, node, inputSerializer, outputSerializer){ - - override val expectsResponse = !callback.isEmpty - - override def onFailure(exception: Throwable) { - callback match { - case Some(fn) => fn(Left(exception)) - case None => () - } + def startNettyTiming(stats : CachedNetworkStatistics[Node, UUID]) = { + stats.beginNetty(node, id, 0) } - override def onSuccess(bytes: Array[Byte]) { - callback match { - case Some(fn) => fn(try { - Right(inputSerializer.responseFromBytes(bytes)) - } catch { - case ex: Exception => Left(new ClusterException("Exception while deserializing response", ex)) - }) - case None => () - } + def onSuccess(bytes: Array[Byte]) { + if(!callback.isEmpty) callback.get(try { + Right(inputSerializer.responseFromBytes(bytes)) + } catch { + case ex: Exception => Left(new ClusterException("Exception while deserializing response", ex)) + }) } override def toString: String = { diff --git a/network/src/main/scala/com/linkedin/norbert/network/RequestSpecification.scala b/network/src/main/scala/com/linkedin/norbert/network/RequestSpecification.scala deleted file mode 100644 index 796f5d67..00000000 --- a/network/src/main/scala/com/linkedin/norbert/network/RequestSpecification.scala +++ /dev/null @@ -1,70 +0,0 @@ - -package com.linkedin.norbert.network -import com.linkedin.norbert.cluster.Node -import com.linkedin.norbert.network.javaobjects.{RequestSpecification => JRequestSpecification, PartitionedRequestSpecification => JPartitionedRequestSpecification} - -/** - * A RequestSpecification object is used to store the necessary information to specify the request message. - * For the non-partitioned version this is just the actual message being sent. - * There is a conversion from a PartitionedRequestSpecification, but it will error if the PartitionedRequestSpecification does not have a RequestMsg (which is possible). - */ - - -object RequestSpecification { - def apply[RequestMsg](message: RequestMsg): RequestSpecification[RequestMsg] = { - new RequestSpecification(message) - } - - implicit def convert[RequestMsg](partitionedSpec: PartitionedRequestSpecification[RequestMsg, _]): RequestSpecification[RequestMsg] = { - new RequestSpecification(partitionedSpec.message.get) - } - -} - -/** - * This is a RequestSpecification, it has a default constructor and no extra functionality. See the above companion object for more details. - * @param message The requestMsg to be sent to the node. - * @tparam RequestMsg The type of the request being sent to the node, should be the same as that used by the network client you will use to send the request. - */ -class RequestSpecification[RequestMsg](val message: RequestMsg) extends JRequestSpecification[RequestMsg]{ - def getMessage() = message -} - -/** - * This is the partitioned version of RequestSpecification. It serves the same purpose of storing the information regarding the message being sent. - * In this partitioned version the request can be specified either by giving the actual RequestMsg to be sent or by providing a requestBuilder. - * A RequestBuilder is a function which, given a node and a set of PartitionedIds will return a RequestMsg. - * which will generate a message from a set of partitionedIds. At least one of those must be specified. - * You can also convert a RequestSpecification into a PartitionedRequestSpecification, which will set the PartitionedRequestSpecification's message to that of the RequestSpecification and not specify a requestBuilder. - */ -object PartitionedRequestSpecification{ - def apply[RequestMsg, PartitionedId](message: Option[RequestMsg] = None, - requestBuilder: Option[(Node, Set[PartitionedId]) => RequestMsg] = None): PartitionedRequestSpecification[RequestMsg, PartitionedId] = { - new PartitionedRequestSpecification(message, requestBuilder) - } - implicit def convert[RequestMsg, PartitionedId](requestSpec: RequestSpecification[RequestMsg]): PartitionedRequestSpecification[RequestMsg, PartitionedId] = { - new PartitionedRequestSpecification(Some(requestSpec.message), None) - } -} - -/** - * See above for more information on the capabilities of a PartitionedRequestSpecification. It currently can only be constructed - * @param message The requestMsg to be sent to the node. - * @tparam RequestMsg The type of the request being sent to the node, should be the same as that used by the network client you will use to send the request. - * @param requestBuilder Builds a request using the specified set of partitionedIds. - */ -class PartitionedRequestSpecification[RequestMsg, PartitionedId](val message: Option[RequestMsg], - var requestBuilder: Option[(Node, Set[PartitionedId]) => RequestMsg]) extends JPartitionedRequestSpecification[RequestMsg, PartitionedId]{ - if (requestBuilder == None) { - if (message == None) { - /* error if both message and requestBuilder are none */ - throw new IllegalArgumentException("You must specify either message or requestBuilder") - } - requestBuilder = Some((node:Node, ids:Set[PartitionedId])=> message.getOrElse(throw new Exception("This should not happen"))) - } - - def getMessage() = message - - // Returns an optional anonymous function - def getRequestBuilder() = requestBuilder -} diff --git a/network/src/main/scala/com/linkedin/norbert/network/RetrySpecification.scala b/network/src/main/scala/com/linkedin/norbert/network/RetrySpecification.scala deleted file mode 100644 index 85eeb7fd..00000000 --- a/network/src/main/scala/com/linkedin/norbert/network/RetrySpecification.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2009-2010 LinkedIn, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.linkedin.norbert - -import runtime.BoxedUnit -import com.linkedin.norbert.network.UnitConversions -import com.linkedin.norbert.network.common.RetryStrategy -import com.linkedin.norbert.network.javaobjects.{RetrySpecification => JRetrySpecification, PartitionedRetrySpecification => JPartitionedRetrySpecification} - - -/** - * This is the companion object for the RoutingConfigs class. - */ -object RoutingConfigs { - val defaultRoutingConfigs = new RoutingConfigs(false, false) - def getDefaultRoutingConfigs():RoutingConfigs = { - defaultRoutingConfigs - } -} - -/** - * This class encapsulates the parameters used for request routing configurations. - * @param SelectiveRetry This indicates whether or not we want to use a specific retry strategy. - * @param DuplicatesOk This indicates whether or not we can have duplicates returned to a higher application layer. - */ -class RoutingConfigs(SelectiveRetry: Boolean, DuplicatesOk: Boolean ) { - val selectiveRetry = SelectiveRetry - val duplicatesOk = DuplicatesOk -} - -/** - * This is the companion object for the RetrySpecifications class. - */ -object RetrySpecification { - def apply[ResponseMsg](maxRetry: Int = 0, callback: Option[Either[Throwable, ResponseMsg] => Unit] = None) = { - new RetrySpecification[ResponseMsg](maxRetry, callback) - } -} - -/** - * This class encapsulates the retry specifications for a request. This class is the non-partitioned version - * which only contains two parameters. The class contains just a default constructor. - * - * @param maxRetry This is the maximum number of retry attempts for the request. If not otherwise specified, the value will be 0. - * @param callback This is a method to be called with either a Throwable in the case of an error along - * the way or a ResponseMsg representing the result. - * - * @throws IllegalArgumentException if the value for maxRetry is less than 0 and the callback is specified. - */ -class RetrySpecification[ResponseMsg](val maxRetry: Int, - val callback: Option[Either[Throwable, ResponseMsg] => Unit]) extends JRetrySpecification[ResponseMsg]{ - // Returns Int that is unboxed to int - def getMaxRetry() = maxRetry - // Returns an optional anonymous function - def getCallback() = { - val unitConversion = new UnitConversions[ResponseMsg] - unitConversion.curryImplicitly(callback.getOrElse(Either => ())) - } -} - -/** - * This is the companion object for the PartitionedRetrySpecification class. - */ -object PartitionedRetrySpecification { - def apply[ResponseMsg](maxRetry: Int = 0, - callback: Option[Either[Throwable, ResponseMsg] => Unit] = None, - retryStrategy: Option[RetryStrategy] = None, - routingConfigs: RoutingConfigs = RoutingConfigs.defaultRoutingConfigs) = { - new PartitionedRetrySpecification[ResponseMsg](maxRetry, callback, retryStrategy, routingConfigs) - } -} - -/** - * This is the partitioned version of the RetrySpecification class which encapsulates retry specifications. This class contains - * a default constructor and no additional functionality. - * - * @param maxRetry This is the maximum number of retry attempts for the request. If not otherwise specified, the value will be 0. - * @param callback This is a method to be called with either a Throwable in the case of an error along - * the way or a ResponseMsg representing the result. - * @param retryStrategy This is the strategy to apply when we run into timeout situation. - */ -class PartitionedRetrySpecification[ResponseMsg](maxRetry: Int, - callback: Option[Either[Throwable, ResponseMsg] => Unit], - var retryStrategy: Option[RetryStrategy], - var routingConfigs: RoutingConfigs = RoutingConfigs.defaultRoutingConfigs) - extends JPartitionedRetrySpecification[ResponseMsg]{ - def getMaxRetry() = maxRetry - def getCallback() = { - val unitConversion = new UnitConversions[ResponseMsg] - unitConversion.curryImplicitly(callback.getOrElse(Either => ())) - } - def getRetryStrategy() = retryStrategy - def getRoutingConfigs() = routingConfigs -} diff --git a/network/src/main/scala/com/linkedin/norbert/network/Serializer.scala b/network/src/main/scala/com/linkedin/norbert/network/Serializer.scala index 5a664aac..708a0f14 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/Serializer.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/Serializer.scala @@ -39,7 +39,6 @@ trait OutputSerializer[-RequestMsg, -ResponseMsg] { trait InputSerializer[+RequestMsg, +ResponseMsg] { def requestName: String - def priority: Int = 0 def requestFromBytes(bytes: Array[Byte]): RequestMsg def responseFromBytes(bytes: Array[Byte]): ResponseMsg } diff --git a/network/src/main/scala/com/linkedin/norbert/network/UnitConversions.scala b/network/src/main/scala/com/linkedin/norbert/network/UnitConversions.scala deleted file mode 100644 index 65408c97..00000000 --- a/network/src/main/scala/com/linkedin/norbert/network/UnitConversions.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* -* Copyright 2009-2010 LinkedIn, Inc -* -* Licensed under the Apache License, Version 2.0 (the "License"); you may not -* use this file except in compliance with the License. You may obtain a copy of -* the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -* License for the specific language governing permissions and limitations under -* the License. -*/ -package com.linkedin.norbert.network - -import runtime.BoxedUnit - -/** - * This class helps with converting the return type of callback between BoxedUnit and Unit for java/scala - * interoperability - */ -class UnitConversions[ResponseMsg] { - // Converts the return type from Unit to BoxedUnit.UNIT - def curryImplicitly[A](f: A => Unit): (A => BoxedUnit) = - (a: A) => { f(a); BoxedUnit.UNIT } - - - // Converts the return type from BoxedUnit to Unit - def uncurryImplicitly[A](f: A => BoxedUnit): (A => Unit) = - (a: A) => { f(a); () } - -} \ No newline at end of file diff --git a/network/src/main/scala/com/linkedin/norbert/network/client/Filter.scala b/network/src/main/scala/com/linkedin/norbert/network/client/Filter.scala index 71361946..fa9e9de5 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/client/Filter.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/client/Filter.scala @@ -7,5 +7,5 @@ package client */ trait Filter { - def onRequest[RequestMsg](request: BaseRequest[RequestMsg]): Unit + def onRequest[RequestMsg, ResponseMsg](request: Request[RequestMsg, ResponseMsg]): Unit } diff --git a/network/src/main/scala/com/linkedin/norbert/network/client/NetworkClient.scala b/network/src/main/scala/com/linkedin/norbert/network/client/NetworkClient.scala index 816e4817..6e99061d 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/client/NetworkClient.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/client/NetworkClient.scala @@ -22,15 +22,9 @@ import java.util.concurrent.Future import loadbalancer.{LoadBalancerFactory, LoadBalancer, LoadBalancerFactoryComponent} import server.{MessageExecutorComponent, NetworkServer} import cluster._ -import netty.NettyNetworkClient import network.common._ import network.client.DarkCanaryResponseHandler -import runtime.BoxedUnit -import com.linkedin.norbert.network.javaobjects.{NodeSpecification => JNodeSpecification, PartitionedNodeSpecification => JPartitionedNodeSpecification, - RetrySpecification => JRetrySpecification, PartitionedRetrySpecification => JPartitionedRetrySpecification, - RequestSpecification => JRequestSpecification, PartitionedRequestSpecification => JPartitionedRequestSpecification} -import com.linkedin.norbert.network.UnitConversions - +import netty.NettyNetworkClient object NetworkClientConfig { var defaultIteratorTimeout = NetworkDefaults.DEFAULT_ITERATOR_TIMEOUT; @@ -94,7 +88,6 @@ object NetworkClient { } } - /** * The network client interface for interacting with nodes in a cluster. */ @@ -116,31 +109,17 @@ trait NetworkClient extends BaseNetworkClient { * to send the request to * @throws ClusterDisconnectedException thrown if the cluster is not connected when the method is called */ - @deprecated("Use sendRequest(RequestSpecification[RequestMsg], NodeSpecification, RetrySpecification[ResponseMsg]), 12/17/2014") def sendRequest[RequestMsg, ResponseMsg](request: RequestMsg, callback: Either[Throwable, ResponseMsg] => Unit) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = { - val requestSpec = RequestSpecification(request) - val nodeSpec = new NodeSpecification().build - val retrySpec = RetrySpecification(0, Some(callback)) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = + sendRequest(request, callback, None, None) - @deprecated("Use sendRequest(RequestSpecification[RequestMsg], NodeSpecification, RetrySpecification[ResponseMsg]), 12/17/2014") def sendRequest[RequestMsg, ResponseMsg](request: RequestMsg, callback: Either[Throwable, ResponseMsg] => Unit, capability: Option[Long]) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = { - val requestSpec = RequestSpecification(request) - val nodeSpec = new NodeSpecification().setCapability(capability).build - val retrySpec = RetrySpecification(0, Some(callback)) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = + sendRequest(request, callback, capability, None) - @deprecated("Use sendRequest(RequestSpecification[RequestMsg], NodeSpecification, RetrySpecification[ResponseMsg]), 12/17/2014") def sendRequest[RequestMsg, ResponseMsg](request: RequestMsg, callback: Either[Throwable, ResponseMsg] => Unit, capability: Option[Long], persistentCapability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = doIfConnected { - val requestSpec = RequestSpecification(request) - val nodeSpec = new NodeSpecification().setCapability(capability).setPersistentCapability(persistentCapability).build - val retrySpec = RetrySpecification(0, Some(callback)) - sendRequest(requestSpec, nodeSpec, retrySpec) + sendRequest(request, callback, 0, capability, persistentCapability) } /** @@ -155,69 +134,38 @@ trait NetworkClient extends BaseNetworkClient { * to send the request to * @throws ClusterDisconnectedException thrown if the cluster is not connected when the method is called */ - @deprecated("Use sendRequest(RequestSpecification[RequestMsg], NodeSpecification, RetrySpecification[ResponseMsg]), 12/17/2014") + def sendRequest[RequestMsg, ResponseMsg](request: RequestMsg) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Future[ResponseMsg] = { - val future = new FutureAdapterListener[ResponseMsg] - val requestSpec = RequestSpecification(request) - val nodeSpec = new NodeSpecification().build - val retrySpec = RetrySpecification(0, Some(future)) - sendRequest(requestSpec, nodeSpec, retrySpec) - future - } + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Future[ResponseMsg] = + sendRequest(request, None, None) - @deprecated("Use sendRequest(RequestSpecification[RequestMsg], NodeSpecification, RetrySpecification[ResponseMsg]), 12/17/2014") def sendRequest[RequestMsg, ResponseMsg](request: RequestMsg, maxRetry:Int) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Future[ResponseMsg] = { - val future = new FutureAdapterListener[ResponseMsg] - val requestSpec = RequestSpecification(request) - val nodeSpec = new NodeSpecification().build - val retrySpec = RetrySpecification(maxRetry, Some(future)) - sendRequest(requestSpec, nodeSpec, retrySpec) - future - } + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Future[ResponseMsg] = + sendRequest(request, maxRetry, None, None) - @deprecated("Use sendRequest(RequestSpecification[RequestMsg], NodeSpecification, RetrySpecification[ResponseMsg]), 12/17/2014") def sendRequest[RequestMsg, ResponseMsg](request: RequestMsg, capability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Future[ResponseMsg] = { - val future = new FutureAdapterListener[ResponseMsg] - val requestSpec = RequestSpecification(request) - val nodeSpec = new NodeSpecification().setCapability(capability).build - val retrySpec = RetrySpecification(0, Some(future)) - sendRequest(requestSpec, nodeSpec, retrySpec) - future + sendRequest(request, capability, None) } - - @deprecated("Use sendRequest(RequestSpecification[RequestMsg], NodeSpecification, RetrySpecification[ResponseMsg]), 12/17/2014") + def sendRequest[RequestMsg, ResponseMsg](request: RequestMsg, capability: Option[Long], persistentCapability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Future[ResponseMsg] = { val future = new FutureAdapterListener[ResponseMsg] - val requestSpec = RequestSpecification(request) - val nodeSpec = new NodeSpecification().setCapability(capability).setPersistentCapability(persistentCapability).build - val retrySpec = RetrySpecification(0, Some(future)) - sendRequest(requestSpec, nodeSpec, retrySpec) + sendRequest(request, future, capability, persistentCapability) future } - @deprecated("Use sendRequest(RequestSpecification[RequestMsg], NodeSpecification, RetrySpecification[ResponseMsg]), 12/17/2014") def sendRequest[RequestMsg, ResponseMsg](request: RequestMsg, maxRetry: Int, capability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Future[ResponseMsg] = { val future = new FutureAdapterListener[ResponseMsg] - val requestSpec = RequestSpecification(request) - val nodeSpec = new NodeSpecification().setCapability(capability).build - val retrySpec = RetrySpecification(maxRetry, Some(future)) - sendRequest(requestSpec, nodeSpec, retrySpec) + sendRequest(request, future, maxRetry, capability, None) future } - @deprecated("Use sendRequest(RequestSpecification[RequestMsg], NodeSpecification, RetrySpecification[ResponseMsg]), 12/17/2014") def sendRequest[RequestMsg, ResponseMsg](request: RequestMsg, maxRetry: Int, capability: Option[Long], persistentCapability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Future[ResponseMsg] = { val future = new FutureAdapterListener[ResponseMsg] - val requestSpec = RequestSpecification(request) - val nodeSpec = new NodeSpecification().setCapability(capability).setPersistentCapability(persistentCapability).build - val retrySpec = RetrySpecification(maxRetry, Some(future)) - sendRequest(requestSpec, nodeSpec, retrySpec) + sendRequest(request, future, maxRetry, capability, persistentCapability) future } @@ -235,33 +183,16 @@ trait NetworkClient extends BaseNetworkClient { * to send the request to * @throws ClusterDisconnectedException thrown if the cluster is not connected when the method is called */ - @deprecated("Use sendRequest(RequestSpecification[RequestMsg], NodeSpecification, RetrySpecification[ResponseMsg]), 12/17/2014") def sendRequest[RequestMsg, ResponseMsg](request: RequestMsg, callback: Either[Throwable, ResponseMsg] => Unit, maxRetry: Int) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = { - val requestSpec = RequestSpecification(request) - val nodeSpec = new NodeSpecification().build - val retrySpec = RetrySpecification(maxRetry, Some(callback)) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = + sendRequest(request, callback, maxRetry, None, None) - @deprecated("Use sendRequest(RequestSpecification[RequestMsg], NodeSpecification, RetrySpecification[ResponseMsg]), 12/17/2014") def sendRequest[RequestMsg, ResponseMsg](request: RequestMsg, callback: Either[Throwable, ResponseMsg] => Unit, maxRetry: Int, capability: Option[Long]) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os:OutputSerializer[RequestMsg, ResponseMsg]): Unit = { - val requestSpec = RequestSpecification(request) - val nodeSpec = new NodeSpecification().setCapability(capability).build - val retrySpec = RetrySpecification(maxRetry, Some(callback)) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os:OutputSerializer[RequestMsg, ResponseMsg]): Unit = + sendRequest(request, callback, maxRetry, capability, None) - @deprecated("Use sendRequest(RequestSpecification[RequestMsg], NodeSpecification, RetrySpecification[ResponseMsg]), 12/17/2014") def sendRequest[RequestMsg, ResponseMsg](request: RequestMsg, callback: Either[Throwable, ResponseMsg] => Unit, maxRetry: Int, capability: Option[Long], persistentCapability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = doIfConnected { - // TODO: This still reroutes to the old sendRequest method so that we can test with it - // TODO: Uncomment the below when we are completely satisfied to reroute it to the new sendRequest method. -// val requestSpec = RequestSpecification(request) -// val nodeSpec = new NodeSpecification().setCapability(capability).setPersistentCapability(persistentCapability).build -// val retrySpec = RetrySpecification(maxRetry, Some(callback)) -// sendRequest(requestSpec, nodeSpec, retrySpec) if (request == null) throw new NullPointerException val loadBalancerReady = loadBalancer.getOrElse(throw new ClusterDisconnectedException("Client has no node information")) @@ -272,42 +203,7 @@ trait NetworkClient extends BaseNetworkClient { node.getOrElse(throw new NoNodesAvailableException("No node available that can handle the request: %s".format(request))) }) - doSendRequest(Request(request, node, is, os, if (maxRetry == 0) Some(callback) else Some(retryCallback[RequestMsg, ResponseMsg](callback, maxRetry, capability, persistentCapability) _))) - } - - /** - * New sendRequest API. Functionally the same as the old one, but this takes three wrapper objects as input: - * requestSpec: Handles the request object. - * nodeSpec: Handles capability and persistent capability for the node. - * retrySpec: Handles the maxRetry and callback used for the retry function. - * - * These wrapper objects contain overloading and/or defaulting to simplify development; - * instead of adding new overloaded sendRequest methods, changes should be made to the - * wrapper objects whenever possible. - */ - - - def sendRequest[RequestMsg, ResponseMsg](requestSpec: JRequestSpecification[RequestMsg], nodeSpec: JNodeSpecification, retrySpec: JRetrySpecification[ResponseMsg]) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os:OutputSerializer[RequestMsg, ResponseMsg]): Unit = doIfConnected { - if (requestSpec.getMessage() == null) throw new NullPointerException - // Convert return type of callback from BoxedUnit to Unit - val cb = retrySpec.getCallback(); - val unitConversion = new UnitConversions[ResponseMsg] - val callback = unitConversion.uncurryImplicitly(cb) - - - val loadBalancerReady = loadBalancer.getOrElse(throw new ClusterDisconnectedException("Client has no node information")) - - // Convert capability and persistentCapability from java.lang.Long to scala.Long - val capability = Option(Long.unbox(nodeSpec.getCapability())) - val persistentCapability = Option(Long.unbox(nodeSpec.getPersistentCapability())) - - val node = loadBalancerReady.fold(ex => throw ex, - lb => { - val node: Option[Node] = lb.nextNode(capability, persistentCapability) - node.getOrElse(throw new NoNodesAvailableException("No node available that can handle the request: %s".format(requestSpec.getMessage()))) - }) - doSendRequest(Request(requestSpec.getMessage(), node, is, os, if (retrySpec.getMaxRetry() == 0) Some(callback) else Some(retryCallback[RequestMsg, ResponseMsg](callback, retrySpec.getMaxRetry(), capability, persistentCapability) _))) + doSendRequest(Request(request, node, is, os, if (maxRetry == 0) Some(callback) else Some(retryCallback[RequestMsg, ResponseMsg](callback, maxRetry, capability, persistentCapability)_))) } @@ -354,49 +250,6 @@ trait NetworkClient extends BaseNetworkClient { } } - /** - * Sends the alternative one way message to a node in the cluster. The NetworkClient defers to the current - * LoadBalancer to decide which Node the request should be sent to. - * - * @param request the message to send - * - * @throws InvalidClusterException thrown if the cluster is currently in an invalid state - * @throws NoNodesAvailableException thrown if the LoadBalancer was unable to provide a Node - * to send the request to - * @throws ClusterDisconnectedException thrown if the cluster is not connected when the method is called - */ - - def sendAltMessage[RequestMsg, ResponseMsg](request: RequestMsg) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]) { - doIfConnected { - sendAltMessage(request, None, None) - } - } - - def sendAltMessage[RequestMsg, ResponseMsg](request: RequestMsg, capability: Option[Long]) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]) { - doIfConnected { - sendAltMessage(request, capability, None) - } - } - - def sendAltMessage[RequestMsg, ResponseMsg](request: RequestMsg, capability: Option[Long], persistentCapability: Option[Long]) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]) { - doIfConnected { - if (request == null) throw new NullPointerException - - val loadBalancerReady = loadBalancer.getOrElse(throw new ClusterDisconnectedException("Client has no node information")) - - val node = loadBalancerReady.fold(ex => throw ex, - lb => { - val node: Option[Node] = lb.nextNode(capability, persistentCapability) - node.getOrElse(throw new NoNodesAvailableException("No node available that can handle the request: %s".format(request))) - }) - - doSendAltMessage(BaseRequest(request, node, is, os)) - } - } - private[client] def retryCallback[RequestMsg, ResponseMsg](underlying: Either[Throwable, ResponseMsg] => Unit, maxRetry: Int, capability: Option[Long], persistentCapability: Option[Long])(res: Either[Throwable, ResponseMsg]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = { def propagate(t: Throwable) { underlying(Left(t)) } diff --git a/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala b/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala index 6f0d1fc6..eb2d6875 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/common/BaseNetworkClient.scala @@ -119,26 +119,6 @@ trait BaseNetworkClient extends Logging { future } - /** - * Sends a request to the specified node in the cluster using the alternate channel. - * - * @param request the message to send - * @param node the node to send the message to - * - * @return The second channel expects baseRequests and never returns responses - * @throws InvalidNodeException thrown if the node specified is not currently available - * @throws ClusterDisconnectedexception thrown if the cluster is not connected when the method is called - */ - def sendAltMessageToNode[RequestMsg, ResponseMsg](request: RequestMsg, node: Node) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = doIfConnected { - if (request == null || node == null) throw new NullPointerException - - val candidate = currentNodes.filter(_ == node) - if (candidate.size == 0) throw new InvalidNodeException("Unable to send message, %s is not available".format(node)) - - doSendAltMessage(BaseRequest(request, node, is, os)) - } - /** * Broadcasts a message to all the currently available nodes in the cluster. * @@ -175,12 +155,6 @@ trait BaseNetworkClient extends Logging { clusterIoClient.sendMessage(requestCtx.node, requestCtx) } - protected def doSendAltMessage[RequestMsg, ResponseMsg](requestCtx: BaseRequest[RequestMsg]) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = { - filters.foreach { filter => filter.onRequest(requestCtx) } - clusterIoClient.sendAltMessage(requestCtx.node, requestCtx) - } - protected def doIfConnected[T](block: => T): T = { if (shutdownSwitch.get) throw new NetworkShutdownException else if (!startedSwitch.get) throw new NetworkNotStartedException diff --git a/network/src/main/scala/com/linkedin/norbert/network/common/ClusterIoClientComponent.scala b/network/src/main/scala/com/linkedin/norbert/network/common/ClusterIoClientComponent.scala index 3fce4a4f..b4d93d06 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/common/ClusterIoClientComponent.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/common/ClusterIoClientComponent.scala @@ -24,7 +24,6 @@ trait ClusterIoClientComponent { trait ClusterIoClient { def sendMessage[RequestMsg, ResponseMsg](node: Node, request: Request[RequestMsg, ResponseMsg]): Unit - def sendAltMessage[RequestMsg](node: Node, request: BaseRequest[RequestMsg]): Unit def nodesChanged(nodes: Set[Node]): Set[Endpoint] def shutdown: Unit } diff --git a/network/src/main/scala/com/linkedin/norbert/network/javaobjects/NodeSpecification.java b/network/src/main/scala/com/linkedin/norbert/network/javaobjects/NodeSpecification.java deleted file mode 100644 index 76ec64c3..00000000 --- a/network/src/main/scala/com/linkedin/norbert/network/javaobjects/NodeSpecification.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.linkedin.norbert.network.javaobjects; - - -/** - * A NodeSpecification interface is to be extended by NodeSpecification.scala so that sendRequest can - * take java objects as arguments. This file specifies getters for a NodeSpecification. - */ - -public interface NodeSpecification { - Long getCapability(); - Long getPersistentCapability(); - Integer getAltPort(); -} - diff --git a/network/src/main/scala/com/linkedin/norbert/network/javaobjects/PartitionedNodeSpecification.java b/network/src/main/scala/com/linkedin/norbert/network/javaobjects/PartitionedNodeSpecification.java deleted file mode 100644 index ea69152b..00000000 --- a/network/src/main/scala/com/linkedin/norbert/network/javaobjects/PartitionedNodeSpecification.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.linkedin.norbert.network.javaobjects; - -import scala.collection.immutable.Set; - -/** - * A PartitionedNodeSpecification interface is to be extended by NodeSpecification.scala so that sendRequest can - * take java objects as arguments. This file specifies getters for a PartitionedNodeSpecification. - */ - -public interface PartitionedNodeSpecification extends NodeSpecification { - Integer getAltPort(); - int getNumberOfReplicas(); - Integer getClusterId(); - Set getIds(); -} diff --git a/network/src/main/scala/com/linkedin/norbert/network/javaobjects/PartitionedRequestSpecification.java b/network/src/main/scala/com/linkedin/norbert/network/javaobjects/PartitionedRequestSpecification.java deleted file mode 100644 index a1418301..00000000 --- a/network/src/main/scala/com/linkedin/norbert/network/javaobjects/PartitionedRequestSpecification.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.linkedin.norbert.network.javaobjects; - -import com.linkedin.norbert.cluster.Node; -import scala.Option; -import scala.collection.immutable.Set; -import scala.Function2; - -/** - * A PartitionedRequestSpecification interface is to be extended by RequestSpecification.scala so that sendRequest can - * take java objects as arguments. This file specifies getters for a PartitionedRequestSpecification. - */ - -public interface PartitionedRequestSpecification { - Option getMessage(); - - // Returns an optional anonymous function that takes two arguments - Option, RequestMsg>> getRequestBuilder(); -} diff --git a/network/src/main/scala/com/linkedin/norbert/network/javaobjects/PartitionedRetrySpecification.java b/network/src/main/scala/com/linkedin/norbert/network/javaobjects/PartitionedRetrySpecification.java deleted file mode 100644 index dee823c3..00000000 --- a/network/src/main/scala/com/linkedin/norbert/network/javaobjects/PartitionedRetrySpecification.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.linkedin.norbert.network.javaobjects; - -import com.linkedin.norbert.RoutingConfigs; -import scala.Option; -import scala.Either; -import scala.Function1; -import scala.runtime.BoxedUnit; -import com.linkedin.norbert.network.common.RetryStrategy; -import com.linkedin.norbert.RoutingConfigs; - -/** - * A PartitionedRetrySpecification interface is to be extended by RetrySpecification.scala so that sendRequest can - * take java objects as arguments. This file specifies getters for a PartitionedRetrySpecification. - */ - -public interface PartitionedRetrySpecification { - int getMaxRetry(); - - // Returns an anonymous function - Function1, BoxedUnit> getCallback(); - - Option getRetryStrategy(); - RoutingConfigs getRoutingConfigs(); - -} diff --git a/network/src/main/scala/com/linkedin/norbert/network/javaobjects/RequestSpecification.java b/network/src/main/scala/com/linkedin/norbert/network/javaobjects/RequestSpecification.java deleted file mode 100644 index 5cbe6995..00000000 --- a/network/src/main/scala/com/linkedin/norbert/network/javaobjects/RequestSpecification.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.linkedin.norbert.network.javaobjects; - -/** - * A RequestSpecification interface is to be extended by RequestSpecification.scala so that sendRequest can - * take java objects as arguments. This file specifies getters for a RequestSpecification. - */ - -public interface RequestSpecification { - RequestMsg getMessage(); -} - diff --git a/network/src/main/scala/com/linkedin/norbert/network/javaobjects/RetrySpecification.java b/network/src/main/scala/com/linkedin/norbert/network/javaobjects/RetrySpecification.java deleted file mode 100644 index fda73efa..00000000 --- a/network/src/main/scala/com/linkedin/norbert/network/javaobjects/RetrySpecification.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.linkedin.norbert.network.javaobjects; - -import scala.Option; -import scala.Either; -import scala.Function1; -import scala.runtime.BoxedUnit; - -/** - * A RetrySpecification interface is to be extended by RetrySpecification.scala so that sendRequest can - * take java objects as arguments. This file specifies getters for a RetrySpecification. - */ - -public interface RetrySpecification { - int getMaxRetry(); - - // Returns an anonymous function - Function1, BoxedUnit> getCallback(); - -} - diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala index e96e21ca..a915c1fe 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala @@ -17,20 +17,23 @@ package com.linkedin.norbert package network package netty -import java.net.InetSocketAddress -import java.util.UUID -import java.util.concurrent._ -import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} - -import com.linkedin.norbert.cluster.Node -import com.linkedin.norbert.jmx.JMX -import com.linkedin.norbert.jmx.JMX.MBean -import com.linkedin.norbert.logging.Logging -import com.linkedin.norbert.network.common.{BackoffStrategy, CachedNetworkStatistics} -import com.linkedin.norbert.norbertutils.{Clock, SystemClock} import org.jboss.netty.bootstrap.ClientBootstrap import org.jboss.netty.channel.group.{ChannelGroup, DefaultChannelGroup} -import org.jboss.netty.channel.{Channel, ChannelFuture, ChannelFutureListener, ConnectTimeoutException} +import org.jboss.netty.channel.{ChannelFutureListener, ChannelFuture, Channel} +import org.jboss.netty.channel.ConnectTimeoutException +import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder +import java.util.concurrent._ +import java.net.InetSocketAddress +import jmx.JMX.MBean +import jmx.JMX +import logging.Logging +import cluster.{Node, ClusterClient} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import norbertutils.{Clock, SystemClock} +import java.io.IOException +import com.linkedin.norbert.network.common.{CachedNetworkStatistics, BackoffStrategy, SimpleBackoffStrategy} +import java.util.UUID +import scala.Some class ChannelPoolClosedException extends Exception @@ -80,7 +83,7 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi } private val pool = new ArrayBlockingQueue[PoolEntry](maxConnections) - private val waitingWrites = new LinkedBlockingQueue[BaseRequest[_]] + private val waitingWrites = new LinkedBlockingQueue[Request[_, _]] private val poolSize = new AtomicInteger(0) private val closed = new AtomicBoolean private val softClosed = new AtomicBoolean @@ -129,6 +132,7 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi private val jmxHandle = JMX.register(new MBean(classOf[ChannelPoolMBean], "address=%s,port=%d".format(address.getHostName, address.getPort)) with ChannelPoolMBean { + import scala.math._ def getWriteQueueSize = waitingWrites.size def getOpenChannels = poolSize.get @@ -138,7 +142,7 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi def getNumberRequestsSent = requestsSent.get.abs }) - def sendRequest[RequestMsg](request: BaseRequest[RequestMsg]): Unit = if (closed.get) { + def sendRequest[RequestMsg, ResponseMsg](request: Request[RequestMsg, ResponseMsg]): Unit = if (closed.get) { throw new ChannelPoolClosedException } else { checkoutChannel match { @@ -216,7 +220,7 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi Option(poolEntry) } - private def openChannel(request: BaseRequest[_]) { + private def openChannel(request: Request[_, _]) { if (poolSize.incrementAndGet > maxConnections) { poolSize.decrementAndGet log.warn("Unable to open channel, pool is full. Waiting for another channel to return to queue before processing") @@ -236,9 +240,9 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi } else { openFuture.getCause match { case _:ConnectTimeoutException => - log.warn(openFuture.getCause, "Timeout when opening channel to: %s, marking offline".format(address)) - case _ => - log.error(openFuture.getCause, "Error when opening channel to: %s, marking offline".format(address)) + log.warn("Timeout when opening channel to: %s, marking offline".format(address)) + case cause => + log.error(cause, "Error when opening channel to: %s, marking offline".format(address)) } errorStrategy.foreach(_.notifyFailure(request.node)) poolSize.decrementAndGet @@ -250,20 +254,21 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi } } - private def writeRequestToChannel(request: BaseRequest[_], channel: Channel) { + private def writeRequestToChannel(request: Request[_, _], channel: Channel) { log.debug("Writing to %s: %s".format(channel, request)) requestsSent.incrementAndGet + request.startNettyTiming(stats) channel.write(request).addListener(new ChannelFutureListener { - def operationComplete(writeFuture: ChannelFuture) = if (!writeFuture.isSuccess) { - // Take the node out of rotation for a bit - log.warn("IO exception for " + request.node + ", marking node offline") - errorStrategy.foreach(_.notifyFailure(request.node)) - channel.close - request.onFailure(writeFuture.getCause) - } else { - request.startNettyTiming(stats) + def operationComplete(writeFuture: ChannelFuture) = { + request.endNettyTiming(stats) + if (!writeFuture.isSuccess) { + // Take the node out of rotation for a bit + log.warn("IO exception for " + request.node + ", marking node offline") + errorStrategy.foreach(_.notifyFailure(request.node)) + channel.close + request.onFailure(writeFuture.getCause) + } } - }) } diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala index 7725e3ed..5258007e 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala @@ -96,11 +96,10 @@ class ClientChannelHandler(clientName: Option[String], private val statsJMX = JMX.register(new NetworkClientStatisticsMBeanImpl(clientName, serviceName, stats, clientStatsStrategy)) override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) = { - val request = e.getMessage.asInstanceOf[BaseRequest[_]] + val request = e.getMessage.asInstanceOf[Request[_, _]] log.debug("Writing request: %s".format(request)) - if(request.expectsResponse) { - //We assume that if a response is expected then the message is a Request or some derivative of Request. - requestMap.put(request.id, request.asInstanceOf[Request[_,_]]) + if(!request.callback.isEmpty) { + requestMap.put(request.id, request) stats.beginRequest(request.node, request.id, 0) } diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/DarkCanaryChannelHandler.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/DarkCanaryChannelHandler.scala index 5b4d35fd..bfb0dcfe 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/DarkCanaryChannelHandler.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/DarkCanaryChannelHandler.scala @@ -203,10 +203,10 @@ class DarkCanaryChannelHandler extends Logging { darkCanaryResponseHandler match { case Some(responseHandler) => { mirrorToHost.remove(requestId) match { - case null => log.error("Could not find hostRequestId for darkCanaryRequestId: %s".format(requestId.toString)) case hostRequestId => { responseHandler.upstreamCallback(true, hostRequestId, request, message) } + case null => log.error("Could not find hostRequestId for darkCanaryRequestId: %s".format(requestId.toString)) } } case None => diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/NettyClusterIoClientComponent.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/NettyClusterIoClientComponent.scala index a27bb8b1..3bf3e435 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/NettyClusterIoClientComponent.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/NettyClusterIoClientComponent.scala @@ -32,7 +32,6 @@ trait NettyClusterIoClientComponent extends ClusterIoClientComponent { class NettyClusterIoClient(channelPoolFactory: ChannelPoolFactory, strategy: CanServeRequestStrategy) extends ClusterIoClient with UrlParser with Logging { private val channelPools = new ConcurrentHashMap[Node, ChannelPool] - private val altChannelPools = new ConcurrentHashMap[Node, ChannelPool] def sendMessage[RequestMsg, ResponseMsg](node: Node, request: Request[RequestMsg, ResponseMsg]) { if (node == null || request == null) throw new NullPointerException @@ -47,19 +46,6 @@ trait NettyClusterIoClientComponent extends ClusterIoClientComponent { } } - def sendAltMessage[RequestMsg](node: Node, request: BaseRequest[RequestMsg]) { - if (node == null || request == null) throw new NullPointerException - - val pool = getAltChannelPool(node) - try { - pool.sendRequest(request) - } catch { - case ex: ChannelPoolClosedException => - // ChannelPool was closed, try again - sendAltMessage(node, request) - } - } - def getChannelPool(node: Node): ChannelPool = { // TODO: Theoretically, we might be able to get a null reference instead of a channel pool here import norbertutils._ @@ -69,22 +55,6 @@ trait NettyClusterIoClientComponent extends ClusterIoClientComponent { } } - def getAltChannelPool(node: Node): ChannelPool = { - // TODO: Theoretically, we might be able to get a null reference instead of a channel pool here - import norbertutils._ - atomicCreateIfAbsent(altChannelPools, node) { n: Node => - val (address, _) = parseUrl(n.url) - val port = n.altPort match { - case None => - throw new IllegalArgumentException("The altPort was never properly set.") - 0 - case Some(altPort) => - altPort - } - channelPoolFactory.newChannelPool(new InetSocketAddress(address, port)) - } - } - def nodesChanged(nodes: Set[Node]): Set[Endpoint] = { import scala.collection.JavaConversions._ channelPools.keySet.foreach { node => @@ -96,16 +66,6 @@ trait NettyClusterIoClientComponent extends ClusterIoClientComponent { } } - // Now we also do the same for our altChannelPools. - altChannelPools.keySet.foreach { node => - if (!nodes.contains(node)) { - altChannelPools.get(node).unregisterJMX - val pool = altChannelPools.remove(node) - pool.close - log.info("Closing alt channel pool for unavailable node: %s".format(node)) - } - } - nodes.map { n => val requestStrategy = strategy @@ -128,16 +88,6 @@ trait NettyClusterIoClientComponent extends ClusterIoClientComponent { } } - // Do the same for altChannelPools - altChannelPools.keySet.foreach { key => - altChannelPools.get(key) match { - case null => // do nothing - case pool => - pool.close - altChannelPools.remove(key) - } - } - channelPoolFactory.shutdown log.debug("NettyClusterIoClient shut down") diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/NettyClusterIoServerComponent.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/NettyClusterIoServerComponent.scala index bb4edcb1..ac7353d9 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/NettyClusterIoServerComponent.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/NettyClusterIoServerComponent.scala @@ -28,10 +28,9 @@ import cluster.Node trait NettyClusterIoServerComponent extends ClusterIoServerComponent { class NettyClusterIoServer(bootstrap: ServerBootstrap, channelGroup: ChannelGroup) extends ClusterIoServer with UrlParser with Logging { private var serverChannel: Channel = _ - private var serverAltChannel: Channel = _ def bind(node: Node, wildcard: Boolean) = { - var (_, port) = parseUrl(node.url) + val (_, port) = parseUrl(node.url) try { val address = new InetSocketAddress(port) log.debug("Binding server socket to %s".format(address)) @@ -39,28 +38,12 @@ trait NettyClusterIoServerComponent extends ClusterIoServerComponent { } catch { case ex: ChannelException => throw new NetworkingException("Unable to bind to %s".format(node), ex) } - if(!node.altPort.isEmpty) { - port = node.altPort.get - try { - val address = new InetSocketAddress(port) - log.debug("Binding server socket to %s".format(address)) - serverAltChannel = bootstrap.bind(address) - } catch { - case ex: ChannelException =>throw new NetworkingException("Unable to bind to %s."format(node), ex) - } - } } - def shutdown = - if (serverChannel != null || serverAltChannel != null) { - if (serverChannel != null) { - serverChannel.close.awaitUninterruptibly - } - if (serverAltChannel != null) { - serverAltChannel.close.awaitUninterruptibly - } - channelGroup.close.awaitUninterruptibly - bootstrap.releaseExternalResources - } + def shutdown = if (serverChannel != null) { + serverChannel.close.awaitUninterruptibly + channelGroup.close.awaitUninterruptibly + bootstrap.releaseExternalResources + } } } diff --git a/network/src/main/scala/com/linkedin/norbert/network/partitioned/PartitionedNetworkClient.scala b/network/src/main/scala/com/linkedin/norbert/network/partitioned/PartitionedNetworkClient.scala index b58331cf..9dad0c7a 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/partitioned/PartitionedNetworkClient.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/partitioned/PartitionedNetworkClient.scala @@ -22,13 +22,22 @@ import common._ import loadbalancer.{PartitionedLoadBalancer, PartitionedLoadBalancerFactoryComponent, PartitionedLoadBalancerFactory} import server.{MessageExecutorComponent, NetworkServer} import netty.NettyPartitionedNetworkClient -import com.linkedin.norbert.network.client.NetworkClientConfig +import client.NetworkClientConfig import cluster.{Node, ClusterDisconnectedException, InvalidClusterException, ClusterClientComponent} import scala.util.Random import java.util -import com.linkedin.norbert.network.javaobjects.{NodeSpecification => JNodeSpecification, PartitionedNodeSpecification => JPartitionedNodeSpecification, -RetrySpecification => JRetrySpecification, PartitionedRetrySpecification => JPartitionedRetrySpecification, -RequestSpecification => JRequestSpecification, PartitionedRequestSpecification => JPartitionedRequestSpecification} + +object RoutingConfigs { + val defaultRoutingConfigs = new RoutingConfigs(false, false) + def getDefaultRoutingConfigs():RoutingConfigs = { + defaultRoutingConfigs + } +} + +class RoutingConfigs(SelectiveRetry: Boolean, DuplicatesOk: Boolean ) { + val selectiveRetry = SelectiveRetry + val duplicatesOk = DuplicatesOk +} object PartitionedNetworkClient { def apply[PartitionedId](config: NetworkClientConfig, loadBalancerFactory: PartitionedLoadBalancerFactory[PartitionedId]): PartitionedNetworkClient[PartitionedId] = { @@ -65,33 +74,22 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { @volatile private var loadBalancer: Option[Either[InvalidClusterException, PartitionedLoadBalancer[PartitionedId]]] = None - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](id: PartitionedId, request: RequestMsg, callback: Either[Throwable, ResponseMsg] => Unit) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](Some(request)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](Set(id)).build - val retrySpec = PartitionedRetrySpecification(callback = Some(callback)) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + sendRequest(id, request, callback, None, None) - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](id: PartitionedId, request: RequestMsg, callback: Either[Throwable, ResponseMsg] => Unit, capability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](Some(request)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](Set(id)).setCapability(capability).build - val retrySpec = PartitionedRetrySpecification(callback = Some(callback)) - sendRequest(requestSpec, nodeSpec, retrySpec) - } - - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") + sendRequest(id, request, callback, capability, None) + def sendRequest[RequestMsg, ResponseMsg](id: PartitionedId, request: RequestMsg, callback: Either[Throwable, ResponseMsg] => Unit, capability: Option[Long], persistentCapability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = doIfConnected { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](Some(request)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](Set(id)).setCapability(capability).setPersistentCapability(persistentCapability).build - val retrySpec = PartitionedRetrySpecification(callback = Some(callback)) - sendRequest(requestSpec, nodeSpec, retrySpec) + if (id == null || request == null) throw new NullPointerException + + val node = loadBalancer.getOrElse(throw new ClusterDisconnectedException).fold(ex => throw ex, + lb => lb.nextNode(id, capability, persistentCapability).getOrElse(throw new NoNodesAvailableException("Unable to satisfy request, no node available for id %s".format(id)))) + + doSendRequest(PartitionedRequest(request, node, Set(id), (node: Node, ids: Set[PartitionedId]) => request, is, os, Option(callback))) } @@ -160,7 +158,8 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]) { doIfConnected { if (id == null || request == null) throw new NullPointerException - val node = loadBalancer.getOrElse(throw new ClusterDisconnectedException).fold(ex => throw ex, lb => lb.nextNode(id, capability, persistentCapability).getOrElse(throw new NoNodesAvailableException("Unable to satisfy request, no node available for id %s".format(id)))) + val node = loadBalancer.getOrElse(throw new ClusterDisconnectedException).fold(ex => throw ex, + lb => lb.nextNode(id, capability, persistentCapability).getOrElse(throw new NoNodesAvailableException("Unable to satisfy request, no node available for id %s".format(id)))) doSendRequest(PartitionedRequest(request, node, Set(id), (node: Node, ids: Set[PartitionedId]) => request, is, os, None)) } @@ -180,41 +179,19 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { * to send the request to * @throws ClusterDisconnectedException thrown if the PartitionedNetworkClient is not connected to the cluster */ - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") + def sendRequest[RequestMsg, ResponseMsg](id: PartitionedId, request: RequestMsg) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Future[ResponseMsg] = - { - val future = new FutureAdapterListener[ResponseMsg] - - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](Some(request)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](Set(id)).build - val retrySpec = PartitionedRetrySpecification(0, Some(future)) - sendRequest(requestSpec, nodeSpec, retrySpec) - future - } - - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") + sendRequest(id, request, None, None) + def sendRequest[RequestMsg, ResponseMsg](id: PartitionedId, request: RequestMsg, capability: Option[Long]) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Future[ResponseMsg] = - { - val future = new FutureAdapterListener[ResponseMsg] - - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](Some(request)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](Set(id)).setCapability(capability).build - val retrySpec = PartitionedRetrySpecification(0, Some(future)) - sendRequest(requestSpec, nodeSpec, retrySpec) - future - } - - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Future[ResponseMsg] = + sendRequest(id, request, capability, None) + def sendRequest[RequestMsg, ResponseMsg](id: PartitionedId, request: RequestMsg, capability: Option[Long], persistentCapability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Future[ResponseMsg] = { val future = new FutureAdapterListener[ResponseMsg] - - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](Some(request)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](Set(id)).setCapability(capability).setPersistentCapability(persistentCapability).build - val retrySpec = PartitionedRetrySpecification(0, Some(future)) - sendRequest(requestSpec, nodeSpec, retrySpec) + sendRequest(id, request, future, capability, persistentCapability) future } @@ -233,35 +210,17 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { * to send the request to * @throws ClusterDisconnectedException thrown if the PartitionedNetworkClient is not connected to the cluster */ - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], request: RequestMsg) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](Some(request)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg]() - sendRequest(requestSpec, nodeSpec, retrySpec)(is, os) - } + sendRequest(ids, request, None, None) - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], request: RequestMsg, capability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](Some(request)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg]() - sendRequest(requestSpec, nodeSpec, retrySpec) - } - - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") + sendRequest(ids, request, capability, None) + def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], request: RequestMsg, capability: Option[Long], persistentCapability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](Some(request)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).setPersistentCapability(persistentCapability).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg]() - sendRequest(requestSpec, nodeSpec, retrySpec) - } + sendRequest(ids, (node: Node, ids: Set[PartitionedId]) => request, capability, persistentCapability)(is, os) /** * Sends a Message to the specified PartitionedIds. The PartitionedNetworkClient @@ -279,104 +238,46 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { * to send the request to * @throws ClusterDisconnectedException thrown if the PartitionedNetworkClient is not connected to the cluster */ - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], requestBuilder: (Node, Set[PartitionedId]) => RequestMsg) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + sendRequest(ids, requestBuilder, None, None) - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], numberOfReplicas: Int, requestBuilder: (Node, Set[PartitionedId]) => RequestMsg) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setNumberOfReplicas(numberOfReplicas).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + sendRequest(ids, numberOfReplicas, requestBuilder, None, None) - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], numberOfReplicas: Int, requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, capability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).setNumberOfReplicas(numberOfReplicas).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + sendRequest(ids, numberOfReplicas, requestBuilder, capability, None) - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, capability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + sendRequest(ids, requestBuilder, capability, None) - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], numberOfReplicas: Int, requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, capability: Option[Long], dupOk : Boolean) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val routingConfigs = new RoutingConfigs(retryStrategy != None, dupOk) - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).setNumberOfReplicas(numberOfReplicas).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy, routingConfigs = routingConfigs) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + sendRequest(ids, numberOfReplicas, requestBuilder, 0, capability, None, new RoutingConfigs(retryStrategy != None, dupOk)) - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, capability: Option[Long], dupOk : Boolean) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val routingConfigs = new RoutingConfigs(retryStrategy != None, dupOk) - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy, routingConfigs = routingConfigs) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + sendRequest(ids, 0, requestBuilder, 0, capability, None, new RoutingConfigs(retryStrategy != None, dupOk)) - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, capability: Option[Long], routingConfigs : RoutingConfigs) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy, routingConfigs = routingConfigs) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + sendRequest(ids, 0, requestBuilder, 0, capability, None, routingConfigs, None) - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, capability: Option[Long], routingConfigs : RoutingConfigs, retryStrategy: Option[RetryStrategy]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy, routingConfigs = routingConfigs) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + sendRequest(ids, 0, requestBuilder, 0, capability, None, routingConfigs, retryStrategy) - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, capability: Option[Long], persistentCapability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = doIfConnected { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).setPersistentCapability(persistentCapability).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy) - sendRequest(requestSpec, nodeSpec, retrySpec) + sendRequest(ids, 0, requestBuilder, 0, capability, persistentCapability) } - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], numberOfReplicas: Int, requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, capability: Option[Long], persistentCapability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = doIfConnected { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).setPersistentCapability(persistentCapability).setNumberOfReplicas(numberOfReplicas).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy) - sendRequest(requestSpec, nodeSpec, retrySpec) + sendRequest(ids, numberOfReplicas, requestBuilder, 0, capability, persistentCapability) } /** @@ -398,37 +299,18 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { * @throws ClusterDisconnectedException thrown if the PartitionedNetworkClient is not connected to the cluster */ // TODO: investigate interplay between default parameter and implicits - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], numberOfReplicas: Int, requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, maxRetry: Int) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setNumberOfReplicas(numberOfReplicas).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](maxRetry, retryStrategy = retryStrategy) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + sendRequest(ids, numberOfReplicas, requestBuilder, maxRetry, None, None) - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, maxRetry: Int) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](maxRetry, retryStrategy = retryStrategy) - sendRequest(requestSpec, nodeSpec, retrySpec) - } - - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") + sendRequest(ids, 0, requestBuilder, maxRetry, None, None) + def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, maxRetry: Int, capability: Option[Long]) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](maxRetry, retryStrategy = retryStrategy) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = + sendRequest(ids, 0, requestBuilder, maxRetry, capability, None) - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg]), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], numberOfReplicas: Int, requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, maxRetry: Int, capability: Option[Long], persistentCapability: Option[Long], @@ -436,14 +318,9 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { retryStrategy: Option[RetryStrategy] = retryStrategy) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).setPersistentCapability(persistentCapability).setNumberOfReplicas(numberOfReplicas).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](maxRetry, retryStrategy = retryStrategy, routingConfigs = routingConfigs) - sendRequest(requestSpec, nodeSpec, retrySpec) - } + sendRequest(ids, numberOfReplicas, None, requestBuilder, maxRetry, capability, persistentCapability, routingConfigs, + retryStrategy) - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg]), 2/2/2015") def sendRequest[RequestMsg, ResponseMsg](ids: Set[PartitionedId], numberOfReplicas: Int, clusterId: Option[Int], requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, maxRetry: Int, capability: Option[Long], persistentCapability: Option[Long], @@ -452,166 +329,25 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = doIfConnected { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).setPersistentCapability(persistentCapability).setClusterId(clusterId).setNumberOfReplicas(numberOfReplicas).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](maxRetry, retryStrategy = retryStrategy, routingConfigs = routingConfigs) - sendRequest(requestSpec, nodeSpec, retrySpec) - } - - /** - * Sends a Message to the specified PartitionedIds. The PartitionedNetworkClient - * will interact with the current PartitionedLoadBalancer to calculate which Nodes the message - * must be sent to. This method is synchronous and will return once the responseAggregator has returned a value. - * - * @param ids the PartitionedIds to which the message is addressed - * @param requestBuilder A method which allows the user to generate a specialized request for a set of partitions - * before it is sent to the Node. - * @param responseAggregator a callback method which allows the user to aggregate all the responses - * and return a single object to the caller. The callback will receive the original message passed to - * sendRequest and the ResponseIterator for the request. - * - * @return the return value of the responseAggregator - * @throws InvalidClusterException thrown if the cluster is currently in an invalid state - * @throws NoNodesAvailableException thrown if the PartitionedLoadBalancer was unable to provide a Node - * to send the request to - * @throws ClusterDisconnectedException thrown if the PartitionedNetworkClient is not connected to the cluster - * @throws Exception any exception thrown by responseAggregator will be passed through to the client - */ - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") - def sendRequest[RequestMsg, ResponseMsg, Result](ids: Set[PartitionedId], - requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, - responseAggregator: (ResponseIterator[ResponseMsg]) => Result) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Result = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy) - - sendRequest(requestSpec, nodeSpec, retrySpec, responseAggregator) - } - - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") - def sendRequest[RequestMsg, ResponseMsg, Result](ids: Set[PartitionedId], - numberOfReplicas: Int, - requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, - responseAggregator: (ResponseIterator[ResponseMsg]) => Result) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Result = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setNumberOfReplicas(numberOfReplicas).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy) - - sendRequest(requestSpec, nodeSpec, retrySpec, responseAggregator) - } - - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") - def sendRequest[RequestMsg, ResponseMsg, Result](ids: Set[PartitionedId], - requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, - responseAggregator: (ResponseIterator[ResponseMsg]) => Result, - capability: Option[Long]) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Result = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy) - - sendRequest(requestSpec, nodeSpec, retrySpec, responseAggregator) - } - - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") - def sendRequest[RequestMsg, ResponseMsg, Result](ids: Set[PartitionedId], - requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, - responseAggregator: (ResponseIterator[ResponseMsg]) => Result, - capability: Option[Long], - persistentCapability: Option[Long]) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Result = - { - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).setPersistentCapability(persistentCapability).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy) - - sendRequest(requestSpec, nodeSpec, retrySpec, responseAggregator) - } - - @deprecated("Use sendRequest(PartitionedRequestSpecification[RequestMsg, PartitionedId], PartitionedNodeSpecification[PartitionedId], PartitionedRetrySpecification[ResponseMsg], responseAggregator), 2/2/2015") - def sendRequest[RequestMsg, ResponseMsg, Result](ids: Set[PartitionedId], - numberOfReplicas: Int, - requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, - responseAggregator: (ResponseIterator[ResponseMsg]) => Result, - capability: Option[Long], - persistentCapability: Option[Long]) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Result = doIfConnected { - - val requestSpec = PartitionedRequestSpecification[RequestMsg, PartitionedId](requestBuilder = Some(requestBuilder)) - val nodeSpec = new PartitionedNodeSpecification[PartitionedId](ids).setCapability(capability).setPersistentCapability(persistentCapability).setNumberOfReplicas(numberOfReplicas).build - val retrySpec = PartitionedRetrySpecification[ResponseMsg](retryStrategy = retryStrategy) - - sendRequest(requestSpec, nodeSpec, retrySpec, responseAggregator) - } - - // A version of the new sendRequest API (see below), that also takes a responseAggregator, a callback method that - // allows the user to aggregate all the responses and return a single object to the caller. - def sendRequest[RequestMsg, ResponseMsg, Result](requestSpec: PartitionedRequestSpecification[RequestMsg, PartitionedId], - nodeSpec: PartitionedNodeSpecification[PartitionedId], - retrySpec: PartitionedRetrySpecification[ResponseMsg], - responseAggregator: (ResponseIterator[ResponseMsg]) => Result ) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Result = doIfConnected { - - if (responseAggregator == null) throw new NullPointerException - responseAggregator(sendRequest[RequestMsg, ResponseMsg](requestSpec, nodeSpec, retrySpec)) - } - - - /** - * Sends a Request according to the requestSpec, nodeSpec, and retrySpec object specifications. - * The PartitionedNetworkClient - * will interact with the current PartitionedLoadBalancer to calculate which Nodes the message - * must be sent to. This method is synchronous and will return once the responseAggregator has returned a value. - * - * @param requestSpec Specifies the message to be sent and the requestBuilder, a method which allows the user to - * generate a specialized request for a set of partitions before it is sent to the Node. - * @param nodeSpec Specifies the numberOfReplicas and the clusterId of the recipient Nodes. - * @param retrySpec Specifies the maxRetry (max number of retry attempts), a callback method, - * a retryStrategy to apply in case of timeout, and a routingConfigs. - * - * @return the return value of the responseAggregator - * @throws InvalidClusterException thrown if the cluster is currently in an invalid state - * @throws NoNodesAvailableException thrown if the PartitionedLoadBalancer was unable to provide a Node - * to send the request to - * @throws ClusterDisconnectedException thrown if the PartitionedNetworkClient is not connected to the cluster - * @throws Exception any exception thrown by responseAggregator will be passed through to the client - */ - def sendRequest[RequestMsg, ResponseMsg](requestSpec: JPartitionedRequestSpecification[RequestMsg, PartitionedId], nodeSpec: JPartitionedNodeSpecification[PartitionedId], retrySpec: JPartitionedRetrySpecification[ResponseMsg]) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = doIfConnected - { - val requestBuilder = requestSpec.getRequestBuilder().getOrElse(throw new Exception("Request spec automatically creates a builder - this shouldn't happen ever.")) - - if (nodeSpec.getIds() == null || requestBuilder == null) throw new NullPointerException - - // Convert clusterId from java.lang.Integer to scala.Int - val clusterId = Option(Int.unbox(nodeSpec.getClusterId())) - - // Convert capability and persistentCapability from java.lang.Long to scala.Long - val capability = Option(Long.unbox(nodeSpec.getCapability())) - val persistentCapability = Option(Long.unbox(nodeSpec.getPersistentCapability())) + if (ids == null || requestBuilder == null) throw new NullPointerException val nodes = clusterId match { - case Some(clusterId:Int) => calculateNodesFromIdsInCluster (nodeSpec.getIds(), clusterId, capability, persistentCapability) - case None => calculateNodesFromIds (nodeSpec.getIds(), nodeSpec.getNumberOfReplicas(), capability, persistentCapability) + case Some(clusterId:Int) => calculateNodesFromIdsInCluster (ids, clusterId, capability, persistentCapability) + case None => calculateNodesFromIds (ids, numberOfReplicas, capability, persistentCapability) } - log.debug("Total number of ids: %d, selected nodes: %d, ids per node: [%s]".format(nodeSpec.getIds().size, nodes.size, + log.debug("Total number of ids: %d, selected nodes: %d, ids per node: [%s]".format(ids.size, nodes.size, nodes.view.map { case (node, idsForNode) => idsForNode.size } mkString("", ",", "") )) - if (nodes.size <= 1 || !retrySpec.getRoutingConfigs().selectiveRetry || retrySpec.getRetryStrategy() == None) { + if (nodes.size <= 1 || !routingConfigs.selectiveRetry || retryStrategy == None) { val queue = new ResponseQueue[ResponseMsg] val resIter = new NorbertDynamicResponseIterator[ResponseMsg](nodes.size, queue) nodes.foreach { case (node, idsForNode) => try { - doSendRequest(PartitionedRequest(requestBuilder(node, idsForNode), node, idsForNode, requestBuilder, is, os, if (retrySpec.getMaxRetry() == 0) Some((a: Either[Throwable, ResponseMsg]) => {queue += a :Unit}) else Some(retryCallback[RequestMsg, ResponseMsg](queue.+=, retrySpec.getMaxRetry(), capability, persistentCapability)_), 0, Some(resIter))) + doSendRequest(PartitionedRequest(requestBuilder(node, idsForNode), node, idsForNode, requestBuilder, is, os, if (maxRetry == 0) Some((a: Either[Throwable, ResponseMsg]) => {queue += a :Unit}) else Some(retryCallback[RequestMsg, ResponseMsg](queue.+=, maxRetry, capability, persistentCapability)_), 0, Some(resIter))) } catch { case ex: Exception => queue += Left(ex) } @@ -619,14 +355,14 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { return resIter } else { val nodes = clusterId match { - case Some(clusterId:Int) => calculateNodesFromIdsInCluster (nodeSpec.getIds(), clusterId, None, None) - case None => calculateNodesFromIds (nodeSpec.getIds(), nodeSpec.getNumberOfReplicas(), None, None) + case Some(clusterId:Int) => calculateNodesFromIdsInCluster (ids, clusterId, None, None) + case None => calculateNodesFromIds (ids, numberOfReplicas, None, None) } var setRequests: Map[PartitionedId, Node] = Map.empty[PartitionedId, Node] nodes.foreach { case (node, pids) => { pids.foreach{ - case(pid) => setRequests += pid->node + case(pid) => setRequests += pid->node } } } @@ -634,14 +370,14 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { /* wrapper so that iterator does not have to care about capability stuff */ def calculateNodesFromIdsSRetry(ids: Set[PartitionedId], excludedNodes: Set[Node], maxAttempts: Int) - :Map[Node, Set[PartitionedId]] = { + :Map[Node, Set[PartitionedId]] = { calculateNodesFromIds(ids, excludedNodes, maxAttempts, capability, persistentCapability).toMap } val resIter = new SelectiveRetryIterator[PartitionedId, RequestMsg, ResponseMsg]( - nodes.size, retrySpec.getRetryStrategy().get.initialTimeout, doSendRequest, setRequests, - queue, calculateNodesFromIdsSRetry, requestBuilder, is, os, retrySpec.getRetryStrategy(), - retrySpec.getRoutingConfigs().duplicatesOk) + nodes.size, retryStrategy.get.initialTimeout, doSendRequest, setRequests, + queue, calculateNodesFromIdsSRetry, requestBuilder, is, os, retryStrategy, + routingConfigs.duplicatesOk) nodes.foreach { case (node, idsForNode) => { @@ -653,9 +389,9 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { } try { doSendRequest(PartitionedRequest( - requestBuilder(node, idsForNode), node, idsForNode, requestBuilder, is, os, - Some((a: Either[Throwable, ResponseMsg]) => {callback(a)}), 0, Some(resIter)) - ) + requestBuilder(node, idsForNode), node, idsForNode, requestBuilder, is, os, + Some((a: Either[Throwable, ResponseMsg]) => {callback(a)}), 0, Some(resIter)) + ) } catch { case ex: Exception => queue += Left(ex) } @@ -665,6 +401,64 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { } } + /** + * Sends a Message to the specified PartitionedIds. The PartitionedNetworkClient + * will interact with the current PartitionedLoadBalancer to calculate which Nodes the message + * must be sent to. This method is synchronous and will return once the responseAggregator has returned a value. + * + * @param ids the PartitionedIds to which the message is addressed + * @param requestBuilder A method which allows the user to generate a specialized request for a set of partitions + * before it is sent to the Node. + * @param responseAggregator a callback method which allows the user to aggregate all the responses + * and return a single object to the caller. The callback will receive the original message passed to + * sendRequest and the ResponseIterator for the request. + * + * @return the return value of the responseAggregator + * @throws InvalidClusterException thrown if the cluster is currently in an invalid state + * @throws NoNodesAvailableException thrown if the PartitionedLoadBalancer was unable to provide a Node + * to send the request to + * @throws ClusterDisconnectedException thrown if the PartitionedNetworkClient is not connected to the cluster + * @throws Exception any exception thrown by responseAggregator will be passed through to the client + */ + def sendRequest[RequestMsg, ResponseMsg, Result](ids: Set[PartitionedId], + requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, + responseAggregator: (ResponseIterator[ResponseMsg]) => Result) + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Result = + sendRequest(ids, requestBuilder, responseAggregator, None, None) + + def sendRequest[RequestMsg, ResponseMsg, Result](ids: Set[PartitionedId], + numberOfReplicas: Int, + requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, + responseAggregator: (ResponseIterator[ResponseMsg]) => Result) + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Result = + sendRequest(ids, numberOfReplicas, requestBuilder, responseAggregator, None, None) + + def sendRequest[RequestMsg, ResponseMsg, Result](ids: Set[PartitionedId], + requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, + responseAggregator: (ResponseIterator[ResponseMsg]) => Result, + capability: Option[Long]) + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Result = + sendRequest(ids, requestBuilder, responseAggregator, capability, None) + + def sendRequest[RequestMsg, ResponseMsg, Result](ids: Set[PartitionedId], + requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, + responseAggregator: (ResponseIterator[ResponseMsg]) => Result, + capability: Option[Long], + persistentCapability: Option[Long]) + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Result = + sendRequest(ids, 0, requestBuilder, responseAggregator, capability, persistentCapability) + + def sendRequest[RequestMsg, ResponseMsg, Result](ids: Set[PartitionedId], + numberOfReplicas: Int, + requestBuilder: (Node, Set[PartitionedId]) => RequestMsg, + responseAggregator: (ResponseIterator[ResponseMsg]) => Result, + capability: Option[Long], + persistentCapability: Option[Long]) + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Result = doIfConnected { + if (responseAggregator == null) throw new NullPointerException + responseAggregator(sendRequest[RequestMsg, ResponseMsg](ids, numberOfReplicas, requestBuilder, capability, persistentCapability)) + } + /** * Sends a RequestMessage to one replica of the cluster. This is a broadcast intended for read operations on the cluster, like searching every partition for some data. * @@ -776,7 +570,7 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { def correctRequestPartitioning(nodes: Map[Node, Set[Int]], partitionToNodes: Map[Int, Set[Node]]): Map[Node, Set[Int]] = { partitionToNodes.foldLeft(Map.empty[Node, Set[Int]]) { case (map, (partitionId, candidates)) => - val nodeToUse = if (candidates.size == 1) { + val nodeToUse = if(candidates.size == 1) { candidates.head } else { // randomly select @@ -784,12 +578,10 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { val randomIndex = random.nextInt(candidateSeq.size) candidateSeq(randomIndex) } - { val nodePartitions = map.getOrElse(nodeToUse, Set.empty[Int]) map + (nodeToUse -> (nodePartitions + partitionId)) } - } } def sendRequestToReplicas[RequestMsg, ResponseMsg](id: PartitionedId, request: RequestMsg, maxRetry : Int = 0) @@ -797,7 +589,7 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { sendRequestToReplicas(id, request, maxRetry, None, None) def sendRequestToReplicas[RequestMsg, ResponseMsg](id: PartitionedId, request: RequestMsg, maxRetry : Int, capability: Option[Long]) - (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = sendRequestToReplicas(id, request, maxRetry, capability, None) + (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = sendRequestToReplicas(id, request, maxRetry, capability, None) def sendRequestToReplicas[RequestMsg, ResponseMsg](id: PartitionedId, request: RequestMsg, maxRetry : Int, capability: Option[Long], persistentCapability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = doIfConnected { @@ -843,7 +635,7 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { def sendRequestToPartitions[RequestMsg, ResponseMsg](id: PartitionedId, partitions: Set[Int], requestBuilder: (Node, Set[Int]) => RequestMsg, capability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): ResponseIterator[ResponseMsg] = - sendRequestToPartitions(id, partitions, requestBuilder, None, None) + sendRequestToPartitions(id, partitions, requestBuilder, None, None) def sendRequestToPartitions[RequestMsg, ResponseMsg](id: PartitionedId, partitions: Set[Int], requestBuilder: (Node, Set[Int]) => RequestMsg, capability: Option[Long], persistentCapability: Option[Long]) (implicit is: InputSerializer[RequestMsg, ResponseMsg], @@ -928,7 +720,7 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { ids.foldLeft(Map[Node, Set[PartitionedId]]().withDefaultValue(Set())) { (map, id) => val node = lb.nextNode(id, capability, persistentCapability).getOrElse(throw new NoNodesAvailableException("Unable to satisfy request, no node available for id %s".format(id))) map.updated(node, map(node) + id) - } + } } private def calculateNodesFromIds(ids: Set[PartitionedId], numberOfReplicas: Int, capability: Option[Long], @@ -976,4 +768,3 @@ trait PartitionedNetworkClient[PartitionedId] extends BaseNetworkClient { } } - diff --git a/network/src/main/scala/com/linkedin/norbert/network/server/MessageExecutorComponent.scala b/network/src/main/scala/com/linkedin/norbert/network/server/MessageExecutorComponent.scala index f828dcea..5c4288e6 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/server/MessageExecutorComponent.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/server/MessageExecutorComponent.scala @@ -74,7 +74,7 @@ class ThreadPoolMessageExecutor(clientName: Option[String], private val statsActor = CachedNetworkStatistics[Int, Int](SystemClock, requestStatisticsWindow, 200L) private val totalNumRejected = new AtomicInteger - val requestQueue = new PriorityBlockingQueue[Runnable](maxWaitingQueueSize) + val requestQueue = new ArrayBlockingQueue[Runnable](maxWaitingQueueSize) val statsJmx = JMX.register(new RequestProcessorMBeanImpl(clientName, serviceName, statsActor, requestQueue, threadPool)) private val threadPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, requestQueue, @@ -126,35 +126,7 @@ class ThreadPoolMessageExecutor(clientName: Option[String], callback: Option[(Either[Exception, ResponseMsg]) => Unit], val queuedAt: Long = System.currentTimeMillis, val id: Int = idGenerator.getAndIncrement.abs, - implicit val is: InputSerializer[RequestMsg, ResponseMsg]) extends Runnable with Comparable[RequestRunner[_,_]] { - /** - * CompareTo compares this RequestRunner with another and returns an integer indicating which one has a higher priority - * to be executed. It pulls the priority from InputSerializer.priority (a higher priority goes first), and tiebreaks based on - * the queuedAt time (an older, or lower time, message goes first). - * - * @param rr the request being compared with - * @return a negative number if myPriority is higher than rrPriority (which would put my value closer to the front of the queue than rr's). - * a positive number if rrPriority is higher than myPriority - * a negative number if my message is older than rr's but they have the same priority - * a positive number if my message is newer than rr's but they have the same priority - * 0 if both messages have the same time and priority - */ - @Override - def compareTo(rr:RequestRunner[_,_]): Int = { - val myPriority = is.priority - val rrPriority = rr.is.priority - if (myPriority == rrPriority) { - // if the priorities are the same, we want the older request to go first, so if rr is older (has a smaller time) we want my > rr (a positive result) - val myTime = queuedAt - val rrTime = rr.queuedAt - return (myTime - rrTime).asInstanceOf[Int] - } - else { - // If rr has a higher priority then it should come out first - so we want my > rr (a positive result) if rrPriority>myPriority - return rrPriority - myPriority - } - } - + implicit val is: InputSerializer[RequestMsg, ResponseMsg]) extends Runnable { def run = { val now = System.currentTimeMillis if(now - queuedAt > reqTimeout) { @@ -219,7 +191,7 @@ class ThreadPoolMessageExecutor(clientName: Option[String], def getActivePoolSize: Int } - class RequestProcessorMBeanImpl(clientName: Option[String], serviceName: String, val stats: CachedNetworkStatistics[Int, Int], queue: PriorityBlockingQueue[Runnable], threadPool: ThreadPoolExecutor) + class RequestProcessorMBeanImpl(clientName: Option[String], serviceName: String, val stats: CachedNetworkStatistics[Int, Int], queue: ArrayBlockingQueue[Runnable], threadPool: ThreadPoolExecutor) extends MBean(classOf[RequestProcessorMBean], JMX.name(clientName, serviceName)) with RequestProcessorMBean { def getQueueSize = queue.size diff --git a/network/src/test/scala/com/linkedin/norbert/network/client/NetworkClientSpec.scala b/network/src/test/scala/com/linkedin/norbert/network/client/NetworkClientSpec.scala index 07307cf2..54797bac 100644 --- a/network/src/test/scala/com/linkedin/norbert/network/client/NetworkClientSpec.scala +++ b/network/src/test/scala/com/linkedin/norbert/network/client/NetworkClientSpec.scala @@ -77,20 +77,6 @@ class NetworkClientSpec extends BaseNetworkClientSpecification { // clusterIoClient.sendMessage(node, message, null) was called } - "send the provided message to the node specified by the load balancer for sendAltMessage via its altPort" in { - clusterClient.nodes returns nodeSet - clusterClient.isConnected returns true - networkClient.clusterIoClient.nodesChanged(nodeSet) returns endpoints - networkClient.loadBalancerFactory.newLoadBalancer(endpoints) returns networkClient.lb - networkClient.lb.nextNode(Some(0x1), Some(2L)) returns Some(nodes(1)) - - networkClient.start - networkClient.sendAltMessage(request, Some(1L), Some(2L)) must notBeNull - - there was one(networkClient.lb).nextNode(Some(0x1), Some(2L)) - there was no(networkClient.lb).nextNode(None, None) - } - "send the provided message to the node specified by the load balancer for sendRequest with the requested capability " in { clusterClient.nodes returns nodeSet clusterClient.isConnected returns true @@ -193,12 +179,6 @@ class NetworkClientSpec extends BaseNetworkClientSpecification { def request = requestCtx }) } - def sendAltMessage[RequestMsg](node: Node, requestCtx: BaseRequest[RequestMsg]) { - invocationCount += 1 - requestCtx.onFailure(new Exception with RequestAccess[BaseRequest[RequestMsg]] { - def request = requestCtx - }) - } def nodesChanged(nodes: Set[Node]) = {NetworkClientSpec.this.endpoints} def shutdown {} } diff --git a/network/src/test/scala/com/linkedin/norbert/network/common/BaseNetworkClientSpecification.scala b/network/src/test/scala/com/linkedin/norbert/network/common/BaseNetworkClientSpecification.scala index 57ef7184..c22e1d33 100644 --- a/network/src/test/scala/com/linkedin/norbert/network/common/BaseNetworkClientSpecification.scala +++ b/network/src/test/scala/com/linkedin/norbert/network/common/BaseNetworkClientSpecification.scala @@ -26,7 +26,7 @@ abstract class BaseNetworkClientSpecification extends SpecificationWithJUnit wit val clusterClient = mock[ClusterClient] val networkClient: BaseNetworkClient - val nodes = List(Node(1, "", true, altPort=Some(1)), Node(2, "", true, altPort=Some(2)), Node(3, "", true, altPort=Some(3))) + val nodes = List(Node(1, "", true), Node(2, "", true), Node(3, "", true)) val nodeSet = Set() ++ nodes val endpoints = nodeSet.map { n => new Endpoint { def node = n diff --git a/network/src/test/scala/com/linkedin/norbert/network/common/NodeSpecificationSpec.scala b/network/src/test/scala/com/linkedin/norbert/network/common/NodeSpecificationSpec.scala deleted file mode 100644 index 031d97e6..00000000 --- a/network/src/test/scala/com/linkedin/norbert/network/common/NodeSpecificationSpec.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright 2009-2010 LinkedIn, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.linkedin.norbert -package network -package client - -import org.specs.SpecificationWithJUnit -import org.specs.mock.Mockito -import network.common.SampleMessage -import scala.collection.mutable.MutableList -import com.linkedin.norbert.network.client - -/* - * Partitioned and non-partitioned nodeSpecification tests using specs - */ -class NodeSpecificationSpec extends SpecificationWithJUnit { - - "NodeSpecification" should { - "create a new NodeSpecification object if capability is set" in { - val nonPartitionedTest = new NodeSpecification() - .setCapability(Some(1)) - .setPersistentCapability(Some(2)) - .build - nonPartitionedTest.capability must beSome(1) - nonPartitionedTest.persistentCapability must beSome(2) - } - - "create a new PartitionedNodeSpecification object if capability is set" in { - val PartitionedTest = new PartitionedNodeSpecification(Set(1)) - .setCapability(Some(2)) - .setPersistentCapability(Some(3)) - .setNumberOfReplicas(4) - .setClusterId(Some(5)) - .build - PartitionedTest.capability must beSome(2) - PartitionedTest.persistentCapability must beSome(3) - PartitionedTest.numberOfReplicas must be equalTo(4) - PartitionedTest.clusterId must beSome(5) - } - - "create a new NodeSpecification object if altPort is set" in { - val altPortNonPartitionedTest = new NodeSpecification() - .setCapability(Some(1)) - .setPersistentCapability(Some(2)) - .setAltPort(Some(3)) - .build - altPortNonPartitionedTest.capability must beSome(1) - altPortNonPartitionedTest.persistentCapability must beSome(2) - altPortNonPartitionedTest.altPort must beSome(3) - } - - "create a new PartitionedNodeSpecification object if altPort is set" in { - val altPortPartitionedTest = new PartitionedNodeSpecification(Set(1)) - .setCapability(Some(2)) - .setPersistentCapability(Some(3)) - .setNumberOfReplicas(4) - .setClusterId(Some(5)) - .setAltPort(Some(6)) - .build - altPortPartitionedTest.capability must beSome(2) - altPortPartitionedTest.persistentCapability must beSome(3) - altPortPartitionedTest.numberOfReplicas must be equalTo(4) - altPortPartitionedTest.clusterId must beSome(5) - altPortPartitionedTest.altPort must beSome(6) - } - - "throw an IllegalArgumentException if persistentCapability is set but not capability" in { - - new NodeSpecification() - .setPersistentCapability(Some(2)) - .build must throwA[IllegalArgumentException] - new PartitionedNodeSpecification(Set(1)) - .setPersistentCapability(Some(2)) - .build must throwA[IllegalArgumentException] - } - - } -} - diff --git a/network/src/test/scala/com/linkedin/norbert/network/common/RequestSpecificationsSpec.scala b/network/src/test/scala/com/linkedin/norbert/network/common/RequestSpecificationsSpec.scala deleted file mode 100644 index aa1e597e..00000000 --- a/network/src/test/scala/com/linkedin/norbert/network/common/RequestSpecificationsSpec.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Partitioned and non-partitioned RequestSpecification tests using specs - */ - -package com.linkedin.norbert -package network -package server - -import org.specs.SpecificationWithJUnit -import org.specs.mock.Mockito -import network.common.SampleMessage -import scala.collection.mutable.MutableList - -class RequestSpecificationsSpec extends SpecificationWithJUnit { - - val requestSpecificationTest: RequestSpecification[String] = RequestSpecification[String]("test") - val partitionedRequestSpecificationTest2: PartitionedRequestSpecification[String, Int] = PartitionedRequestSpecification[String, Int](Some("partitionedTest")) - - "RequestSpecification" should { - "create a new RequestSpecification object" in { - requestSpecificationTest.message must be equalTo("test") - } - - "convert a RequestSpecification object to a PartitionedRequestSpecification object" in { - val partitionedRequestSpecificationTest: PartitionedRequestSpecification[String, Int] = requestSpecificationTest - partitionedRequestSpecificationTest.message must beSome("test") - } - - "create a new PartitionedRequestSpecification object" in { - partitionedRequestSpecificationTest2.message must beSome("partitionedTest") - } - - "convert a PartitionedRequestSpecification object to a RequestSpecification object" in { - val requestSpecificationTest2: RequestSpecification[String] = partitionedRequestSpecificationTest2 - requestSpecificationTest2.message must be equalTo("partitionedTest") - } - - - } -} - diff --git a/network/src/test/scala/com/linkedin/norbert/network/common/RetryAndCallbackSpecificationSpec.scala b/network/src/test/scala/com/linkedin/norbert/network/common/RetryAndCallbackSpecificationSpec.scala deleted file mode 100644 index 41338a87..00000000 --- a/network/src/test/scala/com/linkedin/norbert/network/common/RetryAndCallbackSpecificationSpec.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Partitioned and non-partitioned retrySpecification tests using specs - */ - -package com.linkedin.norbert -package network -package server - -import org.specs.SpecificationWithJUnit -import org.specs.mock.Mockito -import network.common.SampleMessage -import scala.collection.mutable.MutableList -import com.linkedin.norbert.network - -class RetryAndCallbackSpecificationSpec extends SpecificationWithJUnit { - "RetryAndCallbackSpecification" should { - "create a new retrySpecification object" in { - val retrySpecificationTest: RetrySpecification[String] = RetrySpecification[String](9) - retrySpecificationTest.getMaxRetry() must be equalTo(9) - } - - "create a new partitionedRetrySpecification object" in { - val partitionedRetrySpecificationTest: PartitionedRetrySpecification[String] = PartitionedRetrySpecification[String](9) - partitionedRetrySpecificationTest.getMaxRetry() must be equalTo(9) - } - } -} diff --git a/network/src/test/scala/com/linkedin/norbert/network/common/SampleMessage.scala b/network/src/test/scala/com/linkedin/norbert/network/common/SampleMessage.scala index d79b845e..9fb10c95 100644 --- a/network/src/test/scala/com/linkedin/norbert/network/common/SampleMessage.scala +++ b/network/src/test/scala/com/linkedin/norbert/network/common/SampleMessage.scala @@ -41,30 +41,4 @@ trait SampleMessage { case class Ping(timestamp: Long = System.currentTimeMillis) val request = new Ping - - // A Ping which has an increased priority for testing prioritization. - object PriorityPing extends Ping{ - implicit case object PriorityPingSerializer extends Serializer[PriorityPing, PriorityPing] { - def requestName = "ping" - def responseName = "pong" - override def priority = 5 - - def requestToBytes(message: PriorityPing) = - NorbertExampleProtos.Ping.newBuilder.setTimestamp(message.timestamp).build.toByteArray - - def requestFromBytes(bytes: Array[Byte]) = { - val timestamp = (NorbertExampleProtos.Ping.newBuilder.mergeFrom(bytes).build.getTimestamp) - PriorityPing(timestamp) - } - - def responseToBytes(message: PriorityPing) = - requestToBytes(message) - - def responseFromBytes(bytes: Array[Byte]) = - requestFromBytes(bytes) - } - } - - case class PriorityPing(timestamp: Long = System.currentTimeMillis) - val priorityRequest = new PriorityPing -} +} \ No newline at end of file diff --git a/network/src/test/scala/com/linkedin/norbert/network/partitioned/PartitionedNetworkClientSpec.scala b/network/src/test/scala/com/linkedin/norbert/network/partitioned/PartitionedNetworkClientSpec.scala index 43b50c9c..fc0bcf30 100644 --- a/network/src/test/scala/com/linkedin/norbert/network/partitioned/PartitionedNetworkClientSpec.scala +++ b/network/src/test/scala/com/linkedin/norbert/network/partitioned/PartitionedNetworkClientSpec.scala @@ -717,12 +717,6 @@ class PartitionedNetworkClientSpec extends BaseNetworkClientSpecification { def request = requestCtx }) } - def sendAltMessage[RequestMsg](node: Node, requestCtx: BaseRequest[RequestMsg]) { - invocationCount += 1 - requestCtx.onFailure(new Exception with RequestAccess[BaseRequest[RequestMsg]] { - def request = requestCtx - }) - } def nodesChanged(nodes: Set[Node]) = {PartitionedNetworkClientSpec.this.endpoints} def shutdown {} } @@ -774,17 +768,6 @@ class PartitionedNetworkClientSpec extends BaseNetworkClientSpecification { requestCtx.onSuccess(requestCtx.outputSerializer.requestToBytes(requestCtx.message)) } } - def sendAltMessage[RequestMsg](node: Node, requestCtx: BaseRequest[RequestMsg]) { - if (!succ) { - succ = true - requestCtx.onFailure(new RemoteException("FooBar", "ServerError") with RequestAccess[BaseRequest[RequestMsg]] { - def request = requestCtx - }) - } else { - succ = false - requestCtx.onSuccess(requestCtx.outputSerializer.requestToBytes(requestCtx.message)) - } - } def nodesChanged(nodes: Set[Node]) = {PartitionedNetworkClientSpec.this.endpoints} def shutdown {} } @@ -832,16 +815,6 @@ class PartitionedNetworkClientSpec extends BaseNetworkClientSpecification { requestCtx.onSuccess(requestCtx.outputSerializer.requestToBytes(requestCtx.message)) } } - def sendAltMessage[RequestMsg](node: Node, requestCtx: BaseRequest[RequestMsg]) { - if (failOnce) { - failOnce = false - requestCtx.onFailure(new RemoteException("FooBar", "ServerError") with RequestAccess[BaseRequest[RequestMsg]] { - def request = requestCtx - }) - } else { - requestCtx.onSuccess(requestCtx.outputSerializer.requestToBytes(requestCtx.message)) - } - } def nodesChanged(nodes: Set[Node]) = {PartitionedNetworkClientSpec.this.endpoints} def shutdown {} } @@ -887,12 +860,6 @@ class PartitionedNetworkClientSpec extends BaseNetworkClientSpecification { def request = requestCtx }) } - def sendAltMessage[RequestMsg](node: Node, requestCtx: BaseRequest[RequestMsg]) { - invocationCount += 1 - requestCtx.onFailure(new Exception with RequestAccess[BaseRequest[RequestMsg]] { - def request = requestCtx - }) - } def nodesChanged(nodes: Set[Node]) = {PartitionedNetworkClientSpec.this.endpoints} def shutdown {} } @@ -928,12 +895,6 @@ class PartitionedNetworkClientSpec extends BaseNetworkClientSpecification { def request = requestCtx }) } - def sendAltMessage[RequestMsg](node: Node, requestCtx: BaseRequest[RequestMsg]) { - invocationCount += 1 - requestCtx.onFailure(new Exception with RequestAccess[BaseRequest[RequestMsg]] { - def request = requestCtx - }) - } def nodesChanged(nodes: Set[Node]) = {PartitionedNetworkClientSpec.this.endpoints} def shutdown {} } @@ -968,12 +929,6 @@ class PartitionedNetworkClientSpec extends BaseNetworkClientSpecification { def request = requestCtx }) } - def sendAltMessage[RequestMsg](node: Node, requestCtx: BaseRequest[RequestMsg]) { - invocationCount += 1 - requestCtx.onFailure(new Exception with RequestAccess[BaseRequest[RequestMsg]] { - def request = requestCtx - }) - } def nodesChanged(nodes: Set[Node]) = {PartitionedNetworkClientSpec.this.endpoints} def shutdown {} } diff --git a/network/src/test/scala/com/linkedin/norbert/network/server/MessageExecutorSpec.scala b/network/src/test/scala/com/linkedin/norbert/network/server/MessageExecutorSpec.scala index 60a24537..e3f91ce0 100644 --- a/network/src/test/scala/com/linkedin/norbert/network/server/MessageExecutorSpec.scala +++ b/network/src/test/scala/com/linkedin/norbert/network/server/MessageExecutorSpec.scala @@ -44,12 +44,8 @@ class MessageExecutorSpec extends SpecificationWithJUnit with Mockito with WaitF -1) var handlerCalled = false - var priorityHandlerCalled = false var either: Either[Exception, Ping] = null - var priorityEither: Either[Exception, PriorityPing] = null - var messageCount = 0 - //this isn't used val unregisteredSerializer = { val s = mock[Serializer[Ping, Ping]] s.requestName returns ("Foo") @@ -58,17 +54,9 @@ class MessageExecutorSpec extends SpecificationWithJUnit with Mockito with WaitF def handler(e: Either[Exception, Ping]) { handlerCalled = true - messageCount += 1 either = e } - def priorityHandler(e: Either[Exception, PriorityPing]) { - priorityHandlerCalled = true - messageCount += 1 - priorityEither = e - } - - "MessageExecutor" should { doAfter { messageExecutor.shutdown @@ -103,33 +91,11 @@ class MessageExecutorSpec extends SpecificationWithJUnit with Mockito with WaitF messageExecutor.executeMessage(request, Some(handler _)) - handlerCalled must eventually(beTrue) either.isRight must beTrue either.right.get must be(request) } - "Run higher priority messages first" in { - messageHandlerRegistry.handlerFor(request) returns timeStampHandler _ - messageHandlerRegistry.handlerFor(priorityRequest) returns priorityTimeStampHandler _ - - // Put one priority request first to make sure that the second priorityRequest - // gets in the queue before we start handling the regular request. - messageExecutor.executeMessage(priorityRequest, Some(priorityHandler _)) - messageExecutor.executeMessage(request, Some(handler _)) - messageExecutor.executeMessage(priorityRequest, Some(priorityHandler _)) - messageExecutor.executeMessage(priorityRequest, Some(priorityHandler _)) - - handlerCalled must eventually(beTrue) - priorityHandlerCalled must beTrue - // need to have run all the messages when we finish running the regular message - messageCount must be(4) - either.isRight must beTrue - priorityEither.isRight must beTrue - // check that the regular request was handled after the last priorityRequest - priorityEither.right.get.timestamp must be_<(either.right.get.timestamp) - } - "not execute the responseHandler if the handler returns null" in { // messageHandlerRegistry.validResponseFor(request, null) returns true messageHandlerRegistry.handlerFor(request) returns nullHandler _ @@ -235,9 +201,6 @@ class MessageExecutorSpec extends SpecificationWithJUnit with Mockito with WaitF } def returnHandler(message: Ping): Ping = message - def timeStampHandler(message: Ping): Ping = { waitFor(50.ms); return new Ping} - def priorityReturnHandler(message: PriorityPing): PriorityPing = message - def priorityTimeStampHandler(message: PriorityPing): PriorityPing = {waitFor(50.ms); return new PriorityPing} def throwsHandler(message: Ping): Ping = throw exception def nullHandler(message: Ping): Ping = null } diff --git a/network/src/test/scala/com/linkedin/norbert/network/server/MessageHandlerRegistrySpec.scala b/network/src/test/scala/com/linkedin/norbert/network/server/MessageHandlerRegistrySpec.scala index 92678d0d..45b8000c 100644 --- a/network/src/test/scala/com/linkedin/norbert/network/server/MessageHandlerRegistrySpec.scala +++ b/network/src/test/scala/com/linkedin/norbert/network/server/MessageHandlerRegistrySpec.scala @@ -46,8 +46,6 @@ class MessageHandlerRegistrySpec extends SpecificationWithJUnit with Mockito wit messageHandlerRegistry.handlerFor(Ping.PingSerializer.requestName) must throwA[InvalidMessageException] } - //The below tests were no longer being used, they were not performing any checks -/* "return true if the provided response is a valid response for the given request" in { messageHandlerRegistry.registerHandler(handler) // messageHandlerRegistry.validResponseFor(proto, NorbertExampleProtos.Ping.newBuilder.setTimestamp(System.currentTimeMillis).build) must beTrue @@ -65,6 +63,5 @@ class MessageHandlerRegistrySpec extends SpecificationWithJUnit with Mockito wit // messageHandlerRegistry.registerHandler(proto, proto, handler) // messageHandlerRegistry.validResponseFor(proto, null) must beFalse // } - */ } }