Skip to content

Commit

Permalink
Revert "Merge pull request linkedin#93 from ENuge/secondChannel_oneco…
Browse files Browse the repository at this point in the history
…mmit"

This reverts commit 96c2b7a, reversing
changes made to 8dcc10f.
  • Loading branch information
gabrielwong committed Apr 29, 2015
1 parent a440e75 commit 965c8c4
Show file tree
Hide file tree
Showing 45 changed files with 269 additions and 1,668 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion cluster/src/main/protobuf/norbert.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,4 @@ message Node {
required string url = 2;
repeated int32 partition = 3;
optional int64 persistentCapability = 4;
optional int32 altPort = 5;
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,22 +202,34 @@ trait ClusterClient extends Logging {
node
}


/**
* Adds a node to the cluster metadata.
*
* @param nodeId the id of the node to add
* @param url the url to be used to send requests to the node
*
* @return the newly added node
* @throws ClusterDisconnectedException thrown if the cluster is disconnected when the method is called
* @throws InvalidNodeException thrown if there is an error adding the new node to the cluster metadata
*/
def addNode(nodeId: Int, url: String): Node = addNode(nodeId, url, Set[Int]())

/**
* Adds a node to the cluster metadata.
*
* @param nodeId the id of the node to add
* @param url the url to be used to send requests to the node
* @param partitions the partitions for which the node can process requests. These are optional.
* @param altPort the secondary port for use with the second channel. This is optional.
* @param partitions the partitions for which the node can process requests
*
* @return the newly added node
* @throws ClusterDisconnectedException thrown if the cluster is disconnected when the method is called
* @throws InvalidNodeException thrown if there is an error adding the new node to the cluster metadata
*/
def addNode(nodeId: Int, url: String, partitions: Set[Int] = Set[Int](), altPort: Option[Int] = None): Node = doIfConnected {
def addNode(nodeId: Int, url: String, partitions: Set[Int]): Node = doIfConnected {
if (url == null) throw new NullPointerException

val node = Node(nodeId, url, false, partitions, altPort = altPort)
val node = Node(nodeId, url, false, partitions)
clusterManager !? ClusterManagerMessages.AddNode(node) match {
case ClusterManagerMessages.ClusterManagerResponse(Some(ex)) => throw ex
case ClusterManagerMessages.ClusterManagerResponse(None) => node
Expand Down
14 changes: 4 additions & 10 deletions cluster/src/main/scala/com/linkedin/norbert/cluster/Node.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ object Node {
val partitions = node.getPartitionList.asInstanceOf[java.util.List[Int]].foldLeft(Set[Int]()) { (set, i) => set + i }

if(!node.hasPersistentCapability)
Node(node.getId, node.getUrl, available, partitions, capability, None, Some(node.getAltPort))
Node(node.getId, node.getUrl, available, partitions, capability, None)
else
Node(node.getId, node.getUrl, available, partitions, capability, Some(node.getPersistentCapability), Some(node.getAltPort))
Node(node.getId, node.getUrl, available, partitions, capability, Some(node.getPersistentCapability))
} catch {
case ex: InvalidProtocolBufferException => throw new InvalidNodeException("Error deserializing node", ex)
}
Expand All @@ -63,12 +63,6 @@ object Node {
case None => builder.setId(node.id).setUrl(node.url)
case Some(x) => builder.setId(node.id).setUrl(node.url).setPersistentCapability(x)
}
node.altPort match {
// Only set altPort if we actually have an altPort.
case None => // Do nothing!
case Some(x) => builder.setAltPort(x)
}

node.partitionIds.foreach(builder.addPartition(_))

builder.build.toByteArray
Expand All @@ -84,7 +78,7 @@ object Node {
* @param partitions the partitions for which the node can handle requests
* @param capability the 64 bits Long representing up to 64 node capabilities
*/
final case class Node(id: Int, url: String, available: Boolean, partitionIds: Set[Int] = Set.empty, capability: Option[Long] = None, persistentCapability: Option[Long] = None, altPort: Option[Int] = None) {
final case class Node(id: Int, url: String, available: Boolean, partitionIds: Set[Int] = Set.empty, capability: Option[Long] = None, persistentCapability: Option[Long] = None) {
if (url == null) throw new NullPointerException("url must not be null")
if (partitionIds == null) throw new NullPointerException("partitions must not be null")

Expand All @@ -95,7 +89,7 @@ final case class Node(id: Int, url: String, available: Boolean, partitionIds: Se
case _ => false
}

override def toString = "Node(%d,%s,[%s],%b,0x%08X,0x%08X,%d)".format(id, url, partitionIds.mkString(","), available, if (capability.isEmpty) 0L else capability.get, if (persistentCapability.isEmpty) 0L else persistentCapability.get, if (altPort.isEmpty) 0 else altPort.get)
override def toString = "Node(%d,%s,[%s],%b,0x%08X,0x%08X)".format(id, url, partitionIds.mkString(","), available, if (capability.isEmpty) 0L else capability.get, if (persistentCapability.isEmpty) 0L else persistentCapability.get)

def isCapableOf(c: Option[Long], pc: Option[Long]) : Boolean = {
val capabilityMatch: Boolean = (capability, c) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public String responseName() {
return "pong";
}

public int priority() {return 0;}

public byte[] requestToBytes(Ping message) {
return NorbertExampleProtos.Ping.newBuilder().setTimestamp(message.timestamp).build().toByteArray();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,11 @@ object NorbertNetworkClientMain {
cc.addNode(nodeId.toInt, url)
println("Joined Norbert cluster")

case nodeId :: url :: altPort :: Nil =>
cc.addNode(nodeId.toInt, url, altPort = Some(altPort.toInt))
case nodeId :: url :: partitions =>
cc.addNode(nodeId.toInt, url, Set() ++ partitions.map(_.toInt))
println("Joined Norbert cluster")

case nodeId :: url :: altPort :: partitions =>
cc.addNode(nodeId.toInt, url, Set() ++ partitions.map(_.toInt), Some(altPort.toInt))
println("Joined Norbert cluster")

case _ => println("Error: Invalid syntax: join nodeId url altPort partition1 partition2...")
case _ => println("Error: Invalid syntax: join nodeId url partition1 partition2...")
}
println("Joined Norbert cluster")

Expand Down Expand Up @@ -114,19 +110,6 @@ object NorbertNetworkClientMain {
}
}

case "altPing" =>
if (args.length < 1) {
println("Invalid syntax: altPing nodeId")
} else {
val node = cc.nodeWithId(args.head.toInt)
node match {
case Some(n) =>
nc.sendAltMessageToNode(Ping(System.currentTimeMillis), n)

case None => println("No node with id: %d".format(args.head.toInt))
}
}

case "exit" => System.exit(0)

case "quit" => System.exit(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,4 @@ public interface Node {
boolean isCapableOf(Long c, Long pc);
Long getCapability();
Long getPersistentCapability();
Integer getAltPort();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ object JavaNode {
if (node.partitionIds != null) {
node.partitionIds.foreach {id => s.add(id)}
}
JavaNode(node.id, node.url, node.available, s, node.capability, node.persistentCapability, node.altPort)
JavaNode(node.id, node.url, node.available, s, node.capability, node.persistentCapability)
}
}
}

case class JavaNode(@BeanProperty id: Int, @BeanProperty url: String, @BeanProperty available: Boolean, @BeanProperty partitionIds: java.util.Set[java.lang.Integer], capability: Option[Long] = None, persistentCapability: Option[Long] = None, altPort: Option[Int] = None) extends Node {
case class JavaNode(@BeanProperty id: Int, @BeanProperty url: String, @BeanProperty available: Boolean, @BeanProperty partitionIds: java.util.Set[java.lang.Integer], capability: Option[Long] = None, persistentCapability: Option[Long] = None) extends Node {
def isAvailable = available
def isCapableOf(c: java.lang.Long) : Boolean = isCapableOf(c, 0L)
def isCapableOf(c: java.lang.Long, pc: java.lang.Long) : Boolean =
Expand All @@ -52,9 +52,4 @@ case class JavaNode(@BeanProperty id: Int, @BeanProperty url: String, @BeanPrope
case Some(nc) => nc.longValue
case None => null
}
def getAltPort() : java.lang.Integer =
altPort match {
case Some(nc) => nc.intValue
case None => null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package com.linkedin.norbert

import java.util

import javacompat.cluster.{JavaNode, Node => JNode}
import com.linkedin.norbert.cluster.{Node => SNode}

Expand Down Expand Up @@ -55,9 +53,8 @@ package object javacompat {
}

SNode(node.getId, node.getUrl, node.isAvailable, partitionIds,
if(node.getCapability == null) None else Some(node.getCapability.longValue),
if(node.getPersistentCapability == null) None else Some(node.getPersistentCapability.longValue),
Some(node.getAltPort.intValue))
if(node.getCapability == null) None else Some(node.getCapability.longValue),
if(node.getPersistentCapability == null) None else Some(node.getPersistentCapability.longValue))
}
}

Expand Down
Loading

0 comments on commit 965c8c4

Please sign in to comment.