Skip to content

Commit

Permalink
ruby client: Add offset request to the consumer.
Browse files Browse the repository at this point in the history
  • Loading branch information
noj committed May 26, 2011
1 parent e28022f commit a2825b2
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 1 deletion.
1 change: 1 addition & 0 deletions clients/ruby/lib/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

require File.join(File.dirname(__FILE__), "kafka", "io")
require File.join(File.dirname(__FILE__), "kafka", "request_type")
require File.join(File.dirname(__FILE__), "kafka", "error_codes")
require File.join(File.dirname(__FILE__), "kafka", "batch")
require File.join(File.dirname(__FILE__), "kafka", "message")
require File.join(File.dirname(__FILE__), "kafka", "producer")
Expand Down
55 changes: 54 additions & 1 deletion clients/ruby/lib/kafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class Consumer
CONSUME_REQUEST_TYPE = Kafka::RequestType::FETCH
MAX_SIZE = 1048576 # 1 MB
DEFAULT_POLLING_INTERVAL = 2 # 2 seconds
MAX_OFFSETS = 100

attr_accessor :topic, :partition, :offset, :max_size, :request_type, :polling

Expand All @@ -14,11 +15,18 @@ def initialize(options = {})
self.partition = options[:partition] || 0
self.host = options[:host] || "localhost"
self.port = options[:port] || 9092
self.offset = options[:offset] || 0
self.offset = options[:offset] || -2
self.max_size = options[:max_size] || MAX_SIZE
self.request_type = options[:request_type] || CONSUME_REQUEST_TYPE
self.polling = options[:polling] || DEFAULT_POLLING_INTERVAL
self.connect(self.host, self.port)

if @offset < 0
send_offsets_request
offsets = read_offsets_response
raise Exception, "No offsets for #@topic-#@partition" if offsets.empty?
@offset = offsets[0]
end
end

# REQUEST TYPE ID + TOPIC LENGTH + TOPIC + PARTITION + OFFSET + MAX SIZE
Expand All @@ -40,6 +48,25 @@ def encode_request(request_type, topic, partition, offset, max_size)
request_type + topic + partition + offset + max_size
end

def offsets_request_size
2 + 2 + topic.length + 4 + 8 +4
end

def encode_offsets_request_size
[offsets_request_size].pack('N')
end

# Query the server for the offsets
def encode_offsets_request(topic, partition, time, max_offsets)
req = [Kafka::RequestType::OFFSETS].pack('n')
topic = [topic.length].pack('n') + topic
partition = [partition].pack('N')
time = [time].pack("q").reverse # DIY 64bit big endian integer
max_offsets = [max_offsets].pack('N')

req + topic + partition + time + max_offsets
end

def consume
self.send_consume_request # request data
data = self.read_data_response # read data response
Expand All @@ -66,6 +93,32 @@ def send_consume_request
self.write(self.encode_request(self.request_type, self.topic, self.partition, self.offset, self.max_size)) # write request
end

def send_offsets_request
self.write(self.encode_offsets_request_size) # write request_size
self.write(self.encode_offsets_request(@topic, @partition, -2, MAX_OFFSETS)) # write request
end

def read_offsets_response
data_length = self.socket.read(4).unpack('N').shift # read length
data = self.socket.read(data_length) # read message

pos = 0
error_code = data[pos,2].unpack('n')[0]
raise Exception, Kafka::ErrorCodes::to_s(error_code) if error_code != Kafka::ErrorCodes::NO_ERROR

pos += 2
count = data[pos,4].unpack('N')[0]
pos += 4

res = []
while pos != data.size
res << data[pos,8].reverse.unpack('q')[0]
pos += 8
end

res
end

def parse_message_set_from(data)
messages = []
processed = 0
Expand Down
21 changes: 21 additions & 0 deletions clients/ruby/lib/kafka/error_codes.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module Kafka
module ErrorCodes
NO_ERROR = 0
OFFSET_OUT_OF_RANGE = 1
INVALID_MESSAGE_CODE = 2
WRONG_PARTITION_CODE = 3
INVALID_RETCH_SIZE_CODE = 4

STRINGS = {
0 => 'No error',
1 => 'Offset out of range',
2 => 'Invalid message code',
3 => 'Wrong partition code',
4 => 'Invalid retch size code',
}

def self.to_s(code)
STRINGS[code] || 'Unknown error'
end
end
end

0 comments on commit a2825b2

Please sign in to comment.