Skip to content

Commit

Permalink
add a toy rpc based on netty and protobuf.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenshuo committed Aug 26, 2011
1 parent 54ff817 commit 420c985
Show file tree
Hide file tree
Showing 23 changed files with 4,752 additions and 3 deletions.
4 changes: 2 additions & 2 deletions protobuf/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ inline std::string encode(const google::protobuf::Message& message)
if (succeed)
{
const char* begin = result.c_str() + kHeaderLen;
int32_t checkSum = adler32(0, reinterpret_cast<const Bytef*>(begin), result.size()-kHeaderLen);
int32_t checkSum = adler32(1, reinterpret_cast<const Bytef*>(begin), result.size()-kHeaderLen);
int32_t be32 = ::htonl(checkSum);
result.append(reinterpret_cast<char*>(&be32), sizeof be32);

Expand Down Expand Up @@ -105,7 +105,7 @@ inline google::protobuf::Message* decode(const std::string& buf)
{
int32_t expectedCheckSum = asInt32(buf.c_str() + buf.size() - kHeaderLen);
const char* begin = buf.c_str();
int32_t checkSum = adler32(0, reinterpret_cast<const Bytef*>(begin), len-kHeaderLen);
int32_t checkSum = adler32(1, reinterpret_cast<const Bytef*>(begin), len-kHeaderLen);
if (checkSum == expectedCheckSum)
{
int32_t nameLen = asInt32(buf.c_str());
Expand Down
3 changes: 2 additions & 1 deletion protobuf/query.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package muduo;

option java_outer_classname = "Proto";
option java_package = "muduo.codec.tests";
option java_outer_classname = "QueryProtos";

message Query {
required int64 id = 1;
Expand Down
58 changes: 58 additions & 0 deletions protorpc/muduo/codec/ProtobufDecoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package muduo.codec;

import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.zip.Adler32;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;

import com.google.protobuf.Message;

@Sharable
public class ProtobufDecoder extends OneToOneDecoder {

private Map<String, Message> knownTypes = new HashMap<String, Message>();

@Override
public Object decode(ChannelHandlerContext ctx, Channel channel, Object obj)
throws Exception {
if (!(obj instanceof ChannelBuffer)) {
return obj;
}
ChannelBuffer buffer = (ChannelBuffer) obj;
if (buffer.readableBytes() >= 10 && checksum(buffer)) {
int nameLen = buffer.readInt();
String typeName = buffer.toString(buffer.readerIndex(), nameLen - 1,
Charset.defaultCharset());
buffer.readerIndex(buffer.readerIndex() + nameLen);
Message prototype = knownTypes.get(typeName);
if (prototype != null) {
return prototype.newBuilderForType().mergeFrom(buffer.array(),
buffer.arrayOffset() + buffer.readerIndex(),
buffer.readableBytes() - 4).build();
}
}
return obj;
}

private boolean checksum(ChannelBuffer buffer) {
Adler32 adler32 = new Adler32();
adler32.update(buffer.array(),
buffer.arrayOffset() + buffer.readerIndex(),
buffer.readableBytes() - 4);
buffer.markReaderIndex();
buffer.readerIndex(buffer.writerIndex() - 4);
int checksum = buffer.readInt();
buffer.resetReaderIndex();
return checksum == (int) adler32.getValue();
}

public void addMessageType(Message message) {
knownTypes.put(message.getDescriptorForType().getFullName(), message);
}
}
42 changes: 42 additions & 0 deletions protorpc/muduo/codec/ProtobufEncoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package muduo.codec;

import java.util.zip.Adler32;

import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;

import com.google.protobuf.Message;

@Sharable
public class ProtobufEncoder extends OneToOneEncoder {

public ProtobufEncoder() {
super();
}

@Override
public Object encode(ChannelHandlerContext ctx, Channel channel, Object obj)
throws Exception {
if (!(obj instanceof Message)) {
return obj;
}
Message message = (Message) obj;
String name = message.getDescriptorForType().getFullName();
int size = message.getSerializedSize();
ChannelBuffer buffer = new BigEndianHeapChannelBuffer(4 + name.length() + 1 + size + 4);
buffer.writeInt(name.length() + 1);
buffer.writeBytes(name.getBytes());
buffer.writeZero(1);
buffer.writeBytes(message.toByteArray());

Adler32 checksum = new Adler32();
checksum.update(buffer.array(), buffer.arrayOffset(), buffer.readableBytes());
buffer.writeInt((int) checksum.getValue());

return buffer;
}
}
71 changes: 71 additions & 0 deletions protorpc/muduo/codec/tests/CodecTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package muduo.codec.tests;

import static org.junit.Assert.assertEquals;
import muduo.codec.ProtobufDecoder;
import muduo.codec.ProtobufEncoder;
import muduo.codec.tests.QueryProtos.Empty;
import muduo.codec.tests.QueryProtos.Query;

import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.junit.Test;

import com.google.protobuf.Message;

public class CodecTest {

@Test
public void testEncoderEmpty() throws Exception {
ProtobufEncoder encoder = new ProtobufEncoder();
Empty empty = Empty.getDefaultInstance();
encoder.encode(null, null, empty);
}

@Test
public void testDecodeEmpty() throws Exception {
ProtobufEncoder encoder = new ProtobufEncoder();
Empty empty = Empty.getDefaultInstance();
ChannelBuffer buf = (ChannelBuffer) encoder.encode(null, null, empty);

ProtobufDecoder decoder = new ProtobufDecoder();
decoder.addMessageType(Empty.getDefaultInstance());
Message message = (Message) decoder.decode(null, null, buf);
assertEquals(empty, message);
}

@Test
public void testQuery() throws Exception {
ProtobufEncoder encoder = new ProtobufEncoder();
Query query = Query.newBuilder()
.setId(1)
.setQuestioner("Chen Shuo")
.addQuestion("Running?")
.build();
ChannelBuffer buf = (ChannelBuffer) encoder.encode(null, null, query);

ProtobufDecoder decoder = new ProtobufDecoder();
decoder.addMessageType(Query.getDefaultInstance());
Message message = (Message) decoder.decode(null, null, buf);
assertEquals(query, message);
}

@Test
public void testQuery2() throws Exception {
ProtobufEncoder encoder = new ProtobufEncoder();
Query query = Query.newBuilder()
.setId(1)
.setQuestioner("Chen Shuo")
.addQuestion("Running?")
.build();
ChannelBuffer buf = (ChannelBuffer) encoder.encode(null, null, query);
ChannelBuffer buf2 = new BigEndianHeapChannelBuffer(buf.readableBytes() + 8);
buf2.writeInt(123);
buf2.writeBytes(buf);

buf2.readInt();
ProtobufDecoder decoder = new ProtobufDecoder();
decoder.addMessageType(Query.getDefaultInstance());
Message message = (Message) decoder.decode(null, null, buf2);
assertEquals(query, message);
}
}
Loading

0 comments on commit 420c985

Please sign in to comment.