Skip to content

Commit

Permalink
trap fetch related IOErrors in blocking.Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
David Ormsbee committed Jan 5, 2012
1 parent 3de20e4 commit 3138014
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 16 deletions.
24 changes: 15 additions & 9 deletions brod/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,21 @@ def fetch(self, topic, offset, partition=None, max_size=None, callback=None, inc

# Send the request. The logic for handling the response
# is in _read_fetch_response().
return self._write(
fetch_request_size,
partial(self._wrote_request_size,
fetch_request,
partial(self._read_fetch_response,
callback,
offset,
include_corrupt
)))
try:
result = self._write(
fetch_request_size,
partial(self._wrote_request_size,
fetch_request,
partial(self._read_fetch_response,
callback,
offset,
include_corrupt
)))
except IOError as io_err:
kafka_log.exception(io_err)
raise ConnectionFailure("Fetch failure because of: {0}".format(io_err))

return result

def offsets(self, topic, time_val, max_offsets, partition=None, callback=None):

Expand Down
63 changes: 59 additions & 4 deletions brod/test/test_zk.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def test_004_something_else():

from zc.zk import ZooKeeper

from brod import Kafka
from brod import Kafka, ConnectionFailure
from brod.zk import ZKConsumer, ZKProducer

class ServerTopology(namedtuple('ServerTopology',
Expand Down Expand Up @@ -404,16 +404,71 @@ def test_001_zookeeper_invalid_offset():
k = Kafka("localhost", kafka_server.kafka_config.port)
for partition in range(topology_001.partitions_per_broker):
k.produce("topic_001_zookeeper_invalid_offset", ["world"], partition)
time.sleep(1)
time.sleep(1.5)

result = c2.fetch()
assert result
for msg_set in result:
assert_equals(msg_set.messages, ["world"])

def test_001_reconnects():
"""Test that we keep trying to read, even if our brokers go down."""
1/0
"""Test that we keep trying to read, even if our brokers go down.
We're going to:
1. Send messages to all partitions in a topic, across all brokers
2. Start polling (this will cause the Consumer to rebalance itself and find
everything).
3. Set the Consumer to disable rebalancing.
4. Shut down one of the brokers
5. Assert that nothing blows up
6. Restart the broker and assert that it continues to run.
Note that the partition split is always based on what's in ZooKeeper. So
even if the broker is dead or unreachable, we still keep its partitions and
try to contact it. Maybe there's a firewall issue preventing our server from
hitting it. We don't want to risk messing up other consumers by grabbing
partitions that might belong to them.
"""
for kafka_server in RunConfig.kafka_servers:
k = Kafka("localhost", kafka_server.kafka_config.port)
for partition in range(topology_001.partitions_per_broker):
k.produce("topic_001_reconnects", ["Rusty"], partition)
time.sleep(1.5)

fail_server = RunConfig.kafka_servers[0]

c1 = ZKConsumer(ZK_CONNECT_STR, "group_001_reconnects", "topic_001_reconnects")
result = c1.fetch()

# Disable rebalancing to force the consumer to read from the broker we're
# going to kill
c1.disable_rebalance()
fail_server.stop()
time.sleep(1.5)

assert_raises(ConnectionFailure, c1.fetch)
# We're also not catching the socket error properly here, which is what
# happens on subsequent fetches after the initial fetch failure. But that
# should be handled deeper in the stack.
# 1/0

# if i == 4:
# # By now we've done a couple of read attempts and not blown up.
# fail_server.start()
#
# if i == 6:
# # Now send some messages to the broker
# k = Kafka("localhost", fail-server.kafka_config.port)
# for partition in range(topology_001.partitions_per_broker):
# k.produce("topic_001_reconnects", ["Clyde"])
# time.sleep(1)
#
# if i == 7:
# assert_equal






Expand Down
7 changes: 4 additions & 3 deletions brod/zk.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import zookeeper
from zc.zk import ZooKeeper, FailedConnect

from brod.base import BrokerPartition, ConsumerStats, FetchResult, KafkaError, MessageSet, OffsetOutOfRange
from brod.base import BrokerPartition, ConsumerStats, MessageSet
from brod.base import ConnectionFailure, FetchResult, KafkaError, OffsetOutOfRange
from brod.blocking import Kafka

log = logging.getLogger('brod.zk')
Expand Down Expand Up @@ -151,7 +152,7 @@ def save_offsets_for(self, consumer_group, bps_to_next_offsets):
# None is the default value when we don't know what the next
# offset is, possibly because the MessageSet is empty...
if next_offset is not None:
print "Node %s: setting to %s" % (offset_node, next_offset)
log.debug("Node %s: setting to %s" % (offset_node, next_offset))
offset_node.set(string_value=str(next_offset))

def broker_ids_for(self, topic):
Expand Down Expand Up @@ -422,7 +423,7 @@ def simple_consumer(self, bp_ids_to_offsets):

return SimpleConsumer(self.topic, broker_partitions)

def fetch(self, max_size=None):
def fetch(self, max_size=None, retry_limit=3):
"""Return a FetchResult, which can be iterated over as a list of
MessageSets.
Expand Down

0 comments on commit 3138014

Please sign in to comment.