Skip to content

Commit

Permalink
Introduce partition count cache key for partition_key usage (karafk…
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Oct 29, 2023
1 parent 5ea1244 commit 5202af1
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 4 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# 0.14.0 (Unreleased)
* [Change] Remove support for Ruby 2.6 due to it being EOL and WeakMap incompatibilities.
* [Enhancement] Introduce producer partitions count metadata cache (mensfeld)
* [Enhancement] Increase metadata timeout request from `250 ms` to `2000 ms` default to allow for remote cluster operations via `rdkafka-ruby` (mensfeld)
* [Change] Remove support for Ruby 2.6 due to it being EOL and WeakMap incompatibilities (mensfeld)

# 0.13.0
* Support cooperative sticky partition assignment in the rebalance callback (methodmissing)
Expand Down
23 changes: 22 additions & 1 deletion lib/rdkafka/metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,18 @@ module Rdkafka
class Metadata
attr_reader :brokers, :topics

def initialize(native_client, topic_name = nil, timeout_ms = 250)
# Errors upon which we retry the metadata fetch
RETRIED_ERRORS = %i[
timed_out
leader_not_available
].freeze

private_constant :RETRIED_ERRORS

def initialize(native_client, topic_name = nil, timeout_ms = 2_000)
attempt ||= 0
attempt += 1

native_topic = if topic_name
Rdkafka::Bindings.rd_kafka_topic_new(native_client, topic_name, nil)
end
Expand All @@ -22,6 +33,16 @@ def initialize(native_client, topic_name = nil, timeout_ms = 250)
raise Rdkafka::RdkafkaError.new(result) unless result.zero?

metadata_from_native(ptr.read_pointer)
rescue ::Rdkafka::RdkafkaError => e
raise unless RETRIED_ERRORS.include?(e.code)
raise if attempt > 10

backoff_factor = 2**attempt
timeout = backoff_factor * 0.1

sleep(timeout)

retry
ensure
Rdkafka::Bindings.rd_kafka_topic_destroy(native_topic) if topic_name
Rdkafka::Bindings.rd_kafka_metadata_destroy(ptr.read_pointer)
Expand Down
38 changes: 36 additions & 2 deletions lib/rdkafka/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
module Rdkafka
# A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that.
class Producer
# Cache partitions count for 30 seconds
PARTITIONS_COUNT_TTL = 30

private_constant :PARTITIONS_COUNT_TTL

# @private
# Returns the current delivery callback, by default this is nil.
#
Expand All @@ -24,6 +29,19 @@ def initialize(native_kafka, partitioner_name)

# Makes sure, that native kafka gets closed before it gets GCed by Ruby
ObjectSpace.define_finalizer(self, native_kafka.finalizer)

@_partitions_count_cache = Hash.new do |cache, topic|
topic_metadata = nil

@native_kafka.with_inner do |inner|
topic_metadata = ::Rdkafka::Metadata.new(inner, topic).topics&.first
end

cache[topic] = [
monotonic_now,
topic_metadata ? topic_metadata[:partition_count] : nil
]
end
end

# Set a callback that will be called every time a message is successfully produced.
Expand Down Expand Up @@ -68,11 +86,21 @@ def flush(timeout_ms=5_000)
# @param topic [String] The topic name.
#
# @return partition count [Integer,nil]
#
# We cache the partition count for a given topic for given time
# This prevents us in case someone uses `partition_key` from querying for the count with
# each message. Instead we query once every 30 seconds at most
#
# @param topic [String] topic name
# @return [Integer] partition count for a given topic
def partition_count(topic)
closed_producer_check(__method__)
@native_kafka.with_inner do |inner|
Rdkafka::Metadata.new(inner, topic).topics&.first[:partition_count]

@_partitions_count_cache.delete_if do |_, cached|
monotonic_now - cached.first > PARTITIONS_COUNT_TTL
end

@_partitions_count_cache[topic].last
end

# Produces a message to a Kafka topic. The message is added to rdkafka's queue, call {DeliveryHandle#wait wait} on the returned delivery handle to make sure it is delivered.
Expand Down Expand Up @@ -193,6 +221,12 @@ def arity(callback)
end

private

def monotonic_now
# needed because Time.now can go backwards
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

def closed_producer_check(method)
raise Rdkafka::ClosedProducerError.new(method) if closed?
end
Expand Down
42 changes: 42 additions & 0 deletions spec/rdkafka/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -554,4 +554,46 @@ def call(_, handle)
end
end
end

describe '#partition_count' do
it { expect(producer.partition_count('consume_test_topic')).to eq(3) }

context 'when the partition count value is already cached' do
before do
producer.partition_count('consume_test_topic')
allow(::Rdkafka::Metadata).to receive(:new).and_call_original
end

it 'expect not to query it again' do
producer.partition_count('consume_test_topic')
expect(::Rdkafka::Metadata).not_to have_received(:new)
end
end

context 'when the partition count value was cached but time expired' do
before do
allow(::Process).to receive(:clock_gettime).and_return(0, 30.02)
producer.partition_count('consume_test_topic')
allow(::Rdkafka::Metadata).to receive(:new).and_call_original
end

it 'expect not to query it again' do
producer.partition_count('consume_test_topic')
expect(::Rdkafka::Metadata).to have_received(:new)
end
end

context 'when the partition count value was cached and time did not expire' do
before do
allow(::Process).to receive(:clock_gettime).and_return(0, 29.001)
producer.partition_count('consume_test_topic')
allow(::Rdkafka::Metadata).to receive(:new).and_call_original
end

it 'expect not to query it again' do
producer.partition_count('consume_test_topic')
expect(::Rdkafka::Metadata).not_to have_received(:new)
end
end
end
end

0 comments on commit 5202af1

Please sign in to comment.