Skip to content

Commit

Permalink
Add support for protocol buffers based network format. Add new client…
Browse files Browse the repository at this point in the history
… config to avoid crazy constructors on the client. Add alpha support for server-side routing (still needs support on the client, though).
  • Loading branch information
jkreps committed Apr 27, 2009
1 parent 5ba2b99 commit c49df1b
Show file tree
Hide file tree
Showing 100 changed files with 8,649 additions and 1,516 deletions.
1 change: 1 addition & 0 deletions .classpath
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src/java"/>
<classpathentry kind="src" path="contrib/mongodb/example"/>
<classpathentry kind="src" path="contrib/batch-indexer/test"/>
<classpathentry kind="src" path="contrib/mongodb/src/java"/>
<classpathentry kind="src" path="contrib/mongodb/test"/>
Expand Down
16 changes: 15 additions & 1 deletion NOTES
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,18 @@ Background Resources
- Origin of vector clocks: http://research.microsoft.com/users/lamport/pubs/time-clocks.pdf
- Brewer's conjecture: http://citeseer.ist.psu.edu/544596.html

Current build is from r19
Current build is from r19


Supporting other clients
- Each store is available via all protcols, they are seperated by port
- Wire format vs. protocol (HTTP vs. Tcp/IP), protocol buffers
- How to abstract wire format?
- A given serialization type may or may not be supported by the client language

Socket servers share threadpool?

A WireProtocol takes bytes and creates a voldemort request, and takes objects to create a voldemort response.
Likewise the client does the opposite

Connector.handleRequest()
2 changes: 1 addition & 1 deletion bin/run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ done
CLASSPATH=$CLASSPATH:$base_dir/dist/resources

export CLASSPATH
java -Xmx2G -server -cp $CLASSPATH ${1} ${2} ${3} ${4} ${5} ${6} ${7}
java -Xmx2G -server -Dcom.sun.management.jmxremote -cp $CLASSPATH ${1} ${2} ${3} ${4} ${5} ${6} ${7}
3 changes: 2 additions & 1 deletion build.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## Main source
src.dir=src
java.dir=src/java
protobuff.dir=src/proto
lib.dir=lib
classes.dir=dist/classes
resources.dir=dist/resources
Expand Down Expand Up @@ -31,4 +32,4 @@ tomcat.manager.password=tomcat
tomcat.context=/voldemort

## Release
curr.release=0.51
curr.release=0.52
33 changes: 23 additions & 10 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@
<fileset dir="${testclasses.dir}" />
</jar>
</target>

<target name="protobuff" description="Generate source files from .proto files">
<path id="proto-files">
<fileset dir="${protobuff.dir}" />
</path>
<property name="proto.path" location="${protobuff.dir}"/>
<property name="javaout.path" location="${java.dir}"/>
<exec executable="protoc" failonerror="true">
<arg value="--proto_path=${proto.path}"/>
<arg value="--java_out=${javaout.path}"/>
<arg pathref="proto-files"/>
</exec>
</target>

<target name="jar" depends="build" description="Build server jar file">
<jar destfile="${dist.dir}/${name}-${curr.release}.jar">
Expand All @@ -115,6 +128,16 @@
</fileset>
</jar>
</target>

<target name="war" depends="build" description="Build server war file">
<war destfile="${dist.dir}/${name}.war" webxml="web.xml" basedir="${classes.dir}">
<classes dir="${classes.dir}"/>
<lib dir="${lib.dir}">
<exclude name="${lib.dir}/servlet-api.jar"/>
<exclude name="${lib.dir}/src/**"/>
</lib>
</war>
</target>

<target name="contrib-build" depends="build, buildtest" description="Compile contrib packages (java and test) ">
<replace-dir dir="${contrib.classes.dir}" />
Expand Down Expand Up @@ -206,16 +229,6 @@
<target name="junit-all" depends="junit, contrib-junit" description="Run All junit tests including contrib.">
</target>

<target name="war" depends="build" description="Build server war file">
<war destfile="${dist.dir}/${name}.war" webxml="web.xml" basedir="${classes.dir}">
<classes dir="${classes.dir}" />
<lib dir="${lib.dir}">
<exclude name="${lib.dir}/servlet-api.jar" />
<exclude name="${lib.dir}/src/**" />
</lib>
</war>
</target>

<macrodef name="make-javadocs">
<attribute name="dir" />
<attribute name="packagenames" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
public abstract class ReadOnlyBatchIndexMapper<K, V> implements
Mapper<K, V, BytesWritable, BytesWritable> {

private Cluster _cluster = null;
private StoreDefinition _storeDef = null;
private ConsistentRoutingStrategy _routingStrategy = null;
private Serializer<Object> _keySerializer;
private Serializer<Object> _valueSerializer;
private Cluster cluster = null;
private StoreDefinition storeDef = null;
private ConsistentRoutingStrategy routingStrategy = null;
private Serializer<Object> keySerializer;
private Serializer<Object> valueSerializer;

public abstract Object getKeyBytes(K key, V value);

Expand All @@ -51,10 +51,10 @@ public void map(K key,
V value,
OutputCollector<BytesWritable, BytesWritable> output,
Reporter reporter) throws IOException {
byte[] keyBytes = _keySerializer.toBytes(getKeyBytes(key, value));
byte[] valBytes = _valueSerializer.toBytes(getValueBytes(key, value));
byte[] keyBytes = keySerializer.toBytes(getKeyBytes(key, value));
byte[] valBytes = valueSerializer.toBytes(getValueBytes(key, value));

List<Node> nodes = _routingStrategy.routeRequest(keyBytes);
List<Node> nodes = routingStrategy.routeRequest(keyBytes);
for(Node node: nodes) {
ByteArrayOutputStream versionedValue = new ByteArrayOutputStream();
DataOutputStream valueDin = new DataOutputStream(versionedValue);
Expand Down Expand Up @@ -82,17 +82,17 @@ public void configure(JobConf conf) {
}

// get Cluster and Store details
_cluster = ContribUtils.getVoldemortClusterDetails(clusterFilePath);
_storeDef = ContribUtils.getVoldemortStoreDetails(storeFilePath,
conf.get("voldemort.store.name"));
cluster = ContribUtils.getVoldemortClusterDetails(clusterFilePath);
storeDef = ContribUtils.getVoldemortStoreDetails(storeFilePath,
conf.get("voldemort.store.name"));

_keySerializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(_storeDef.getKeySerializer());
_valueSerializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(_storeDef.getValueSerializer());
keySerializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(storeDef.getKeySerializer());
valueSerializer = (Serializer<Object>) new DefaultSerializerFactory().getSerializer(storeDef.getValueSerializer());

_routingStrategy = new ConsistentRoutingStrategy(_cluster.getNodes(),
_storeDef.getReplicationFactor());
routingStrategy = new ConsistentRoutingStrategy(cluster.getNodes(),
storeDef.getReplicationFactor());

if(_routingStrategy == null) {
if(routingStrategy == null) {
throw new RuntimeException("Failed to create routing strategy");
}
} catch(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@

public class ReadOnlyBatchIndexReducer implements Reducer<BytesWritable, BytesWritable, Text, Text> {

private DataOutputStream _indexFileStream = null;
private DataOutputStream _valueFileStream = null;
private DataOutputStream indexFileStream = null;
private DataOutputStream valueFileStream = null;

private long _position = 0;
private long position = 0;

private JobConf _conf = null;
private String _taskId = null;
private int _nodeId = -1;
private JobConf conf = null;
private String taskId = null;
private int nodeId = -1;

Path taskIndexFileName;
Path taskValueFileName;
Expand All @@ -46,21 +46,21 @@ public void reduce(BytesWritable key,
BytesWritable value = values.next();
byte[] valBytes = ByteUtils.copy(value.get(), 0, value.getSize());

if(_nodeId == -1) {
if(nodeId == -1) {
DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(valBytes));
_nodeId = buffer.readInt();
nodeId = buffer.readInt();
}
// strip first 4 bytes as node_id
byte[] value1 = ByteUtils.copy(valBytes, 4, valBytes.length);

// Write Index Key/ position
_indexFileStream.write(keyBytes);
_indexFileStream.writeLong(_position);
_valueFileStream.writeInt(value1.length);
_valueFileStream.write(value1);
_position += value1.length + 4;
indexFileStream.write(keyBytes);
indexFileStream.writeLong(position);
valueFileStream.writeInt(value1.length);
valueFileStream.write(value1);
position += value1.length + 4;

if(_position < 0) {
if(position < 0) {
throw new RuntimeException("Position bigger than Integer size, split input files.");
}
}
Expand All @@ -69,34 +69,34 @@ public void reduce(BytesWritable key,

public void configure(JobConf job) {
try {
_position = 0;
_conf = job;
position = 0;
conf = job;

_taskId = job.get("mapred.task.id");
taskId = job.get("mapred.task.id");

taskIndexFileName = new Path(FileOutputFormat.getOutputPath(_conf),
_conf.get("voldemort.index.filename") + "_" + _taskId);
taskValueFileName = new Path(FileOutputFormat.getOutputPath(_conf),
_conf.get("voldemort.data.filename") + "_" + _taskId);
taskIndexFileName = new Path(FileOutputFormat.getOutputPath(conf),
conf.get("voldemort.index.filename") + "_" + taskId);
taskValueFileName = new Path(FileOutputFormat.getOutputPath(conf),
conf.get("voldemort.data.filename") + "_" + taskId);

FileSystem fs = taskIndexFileName.getFileSystem(job);

_indexFileStream = fs.create(taskIndexFileName, (short) 1);
_valueFileStream = fs.create(taskValueFileName, (short) 1);
indexFileStream = fs.create(taskIndexFileName, (short) 1);
valueFileStream = fs.create(taskValueFileName, (short) 1);
} catch(IOException e) {
throw new RuntimeException("Failed to open Input/OutputStream", e);
}
}

public void close() throws IOException {

_indexFileStream.close();
_valueFileStream.close();
indexFileStream.close();
valueFileStream.close();

Path hdfsIndexFile = new Path(FileOutputFormat.getOutputPath(_conf), _nodeId + ".index");
Path hdfsValueFile = new Path(FileOutputFormat.getOutputPath(_conf), _nodeId + ".data");
Path hdfsIndexFile = new Path(FileOutputFormat.getOutputPath(conf), nodeId + ".index");
Path hdfsValueFile = new Path(FileOutputFormat.getOutputPath(conf), nodeId + ".data");

FileSystem fs = hdfsIndexFile.getFileSystem(_conf);
FileSystem fs = hdfsIndexFile.getFileSystem(conf);
fs.rename(taskIndexFileName, hdfsIndexFile);
fs.rename(taskValueFileName, hdfsValueFile);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void doOperation(int index) throws Exception {

reader.next(key, value);
store.put(new ByteArray(ByteUtils.copy(key.get(), 0, key.getSize())),
Versioned.of(ByteUtils.copy(value.get(), 0, value.getSize())));
Versioned.value(ByteUtils.copy(value.get(), 0, value.getSize())));
} catch(ObsoleteVersionException e) {
obsoletes.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void doOperation(int index) throws Exception {

reader.next(key, value);
store.put(new ByteArray(ByteUtils.copy(key.get(), 0, key.getSize())),
Versioned.of(ByteUtils.copy(value.get(), 0, value.getSize())));
Versioned.value(ByteUtils.copy(value.get(), 0, value.getSize())));
} catch(ObsoleteVersionException e) {
obsoletes.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ private String makeReadOnlyIndex(int minKey, int maxKey) throws Exception {

public void testswap() throws Throwable {
// assert that read-only store is working
Store<ByteArray, byte[]> store1 = server1.getStoreMap().get(storeName);
Store<ByteArray, byte[]> store2 = server2.getStoreMap().get(storeName);
Store<ByteArray, byte[]> store1 = server1.getStoreRepository().getStorageEngine(storeName);
Store<ByteArray, byte[]> store2 = server2.getStoreRepository().getStorageEngine(storeName);

SerializerDefinition serDef = new SerializerDefinition("json", "'string'");
Serializer<Object> serializer = StoreUtils.unsafeGetSerializer(new DefaultSerializerFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private String makeReadOnlyIndex(int minKey, int maxKey) throws Exception {

public void testswap() throws Throwable {
// assert that read-only store is working
Store<ByteArray, byte[]> store1 = server1.getStoreMap().get(storeName);
Store<ByteArray, byte[]> store2 = server2.getStoreMap().get(storeName);
Store<ByteArray, byte[]> store1 = server1.getStoreRepository().getStorageEngine(storeName);
Store<ByteArray, byte[]> store2 = server2.getStoreRepository().getStorageEngine(storeName);

SerializerDefinition serDef = new SerializerDefinition("json", "'string'");
Serializer<Object> serializer = StoreUtils.unsafeGetSerializer(new DefaultSerializerFactory(),
Expand Down
Loading

0 comments on commit c49df1b

Please sign in to comment.