Skip to content

Commit

Permalink
[kafka_consumer] add timeout for zk/kafka connections
Browse files Browse the repository at this point in the history
Because the default timeouts are pretty high in the kafka-python and
kazoo library (resp. 120s and 10s), we put them at a much lower
threshold (5s) for the default cases and offer a way to override in the
YAML init_config section.

Fixes DataDog#1589.
  • Loading branch information
LeoCavaille committed May 6, 2015
1 parent 4c58b50 commit ce85591
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
24 changes: 17 additions & 7 deletions checks.d/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
# stdlib
from collections import defaultdict
import random

# project
from checks import AgentCheck

# 3rd party
# 3p
from kafka.client import KafkaClient
from kafka.common import OffsetRequest
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError

# project
from checks import AgentCheck

DEFAULT_KAFKA_TIMEOUT = 5
DEFAULT_ZK_TIMEOUT = 5


class KafkaCheck(AgentCheck):

SOURCE_TYPE_NAME = 'kafka'

def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances=instances)
self.zk_timeout = int(
init_config.get('zk_timeout', DEFAULT_ZK_TIMEOUT))
self.kafka_timeout = int(
init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT))

def check(self, instance):
consumer_groups = self.read_config(instance, 'consumer_groups',
cast=self._validate_consumer_groups)
Expand All @@ -26,7 +36,7 @@ def check(self, instance):
zk_path_tmpl = zk_prefix + '/consumers/%s/offsets/%s/%s'

# Connect to Zookeeper
zk_conn = KazooClient(zk_connect_str)
zk_conn = KazooClient(zk_connect_str, timeout=self.zk_timeout)
zk_conn.start()

try:
Expand Down Expand Up @@ -56,7 +66,7 @@ def check(self, instance):
self.log.exception('Error cleaning up Zookeeper connection')

# Connect to Kafka
kafka_conn = KafkaClient(kafka_host_ports)
kafka_conn = KafkaClient(kafka_host_ports, timeout=self.kafka_timeout)

try:
# Query Kafka for the broker offsets
Expand Down
4 changes: 4 additions & 0 deletions conf.d/kafka_consumer.yaml.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
init_config:
# Customize the ZooKeeper connection timeout here
# zk_timeout: 5
# Customize the Kafka connection timeout here
# kafka_timeout: 5

instances:
# - kafka_connect_str: localhost:19092
Expand Down

0 comments on commit ce85591

Please sign in to comment.