Skip to content

Commit

Permalink
yard remarks (karafka#334)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Nov 4, 2023
1 parent 7fc07fa commit 9709183
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 87 deletions.
6 changes: 5 additions & 1 deletion lib/rdkafka.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# frozen_string_literal: true

require "rdkafka/version"
require "logger"
require "objspace"
require "ffi"
require "json"

require "rdkafka/version"
require "rdkafka/helpers/time"
require "rdkafka/abstract_handle"
require "rdkafka/admin"
Expand Down
2 changes: 0 additions & 2 deletions lib/rdkafka/abstract_handle.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "ffi"

module Rdkafka
# This class serves as an abstract base class to represent handles within the Rdkafka module.
# As a subclass of `FFI::Struct`, this class provides a blueprint for other specific handle
Expand Down
13 changes: 6 additions & 7 deletions lib/rdkafka/admin.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "objspace"

module Rdkafka
class Admin
# @private
Expand Down Expand Up @@ -30,11 +28,12 @@ def closed?

# Create a topic with the given partition count and replication factor
#
# @return [CreateTopicHandle] Create topic handle that can be used to wait for the result of
# creating the topic
#
# @raise [ConfigError] When the partition count or replication factor are out of valid range
# @raise [RdkafkaError] When the topic name is invalid or the topic already exists
# @raise [RdkafkaError] When the topic configuration is invalid
#
# @return [CreateTopicHandle] Create topic handle that can be used to wait for the result of creating the topic
def create_topic(topic_name, partition_count, replication_factor, topic_config={})
closed_admin_check(__method__)

Expand Down Expand Up @@ -107,11 +106,11 @@ def create_topic(topic_name, partition_count, replication_factor, topic_config={
create_topic_handle
end

# Delete the named topic
# Deletes the named topic
#
# @return [DeleteTopicHandle] Delete topic handle that can be used to wait for the result of
# deleting the topic
# @raise [RdkafkaError] When the topic name is invalid or the topic does not exist
#
# @return [DeleteTopicHandle] Delete topic handle that can be used to wait for the result of deleting the topic
def delete_topic(topic_name)
closed_admin_check(__method__)

Expand Down
4 changes: 0 additions & 4 deletions lib/rdkafka/bindings.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
# frozen_string_literal: true

require "ffi"
require "json"
require "logger"

module Rdkafka
# @private
module Bindings
Expand Down
18 changes: 8 additions & 10 deletions lib/rdkafka/config.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "logger"

module Rdkafka
# Configuration for a Kafka consumer or producer. You can create an instance and use
# the consumer and producer methods to create a client. Documentation of the available
Expand Down Expand Up @@ -142,12 +140,12 @@ def consumer_rebalance_listener=(listener)
@consumer_rebalance_listener = listener
end

# Create a consumer with this configuration.
# Creates a consumer with this configuration.
#
# @return [Consumer] The created consumer
#
# @raise [ConfigError] When the configuration contains invalid options
# @raise [ClientCreationError] When the native client cannot be created
#
# @return [Consumer] The created consumer
def consumer
opaque = Opaque.new
config = native_config(opaque)
Expand Down Expand Up @@ -175,10 +173,10 @@ def consumer

# Create a producer with this configuration.
#
# @return [Producer] The created producer
#
# @raise [ConfigError] When the configuration contains invalid options
# @raise [ClientCreationError] When the native client cannot be created
#
# @return [Producer] The created producer
def producer
# Create opaque
opaque = Opaque.new
Expand All @@ -196,12 +194,12 @@ def producer
end
end

# Create an admin instance with this configuration.
# Creates an admin instance with this configuration.
#
# @return [Admin] The created admin instance
#
# @raise [ConfigError] When the configuration contains invalid options
# @raise [ClientCreationError] When the native client cannot be created
#
# @return [Admin] The created admin instance
def admin
opaque = Opaque.new
config = native_config(opaque)
Expand Down
72 changes: 23 additions & 49 deletions lib/rdkafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,11 @@ def closed?
@native_kafka.closed?
end

# Subscribe to one or more topics letting Kafka handle partition assignments.
# Subscribes to one or more topics letting Kafka handle partition assignments.
#
# @param topics [Array<String>] One or more topic names
#
# @raise [RdkafkaError] When subscribing fails
#
# @return [nil]
# @raise [RdkafkaError] When subscribing fails
def subscribe(*topics)
closed_consumer_check(__method__)

Expand All @@ -78,9 +76,8 @@ def subscribe(*topics)

# Unsubscribe from all subscribed topics.
#
# @raise [RdkafkaError] When unsubscribing fails
#
# @return [nil]
# @raise [RdkafkaError] When unsubscribing fails
def unsubscribe
closed_consumer_check(__method__)

Expand All @@ -95,10 +92,8 @@ def unsubscribe
# Pause producing or consumption for the provided list of partitions
#
# @param list [TopicPartitionList] The topic with partitions to pause
#
# @raise [RdkafkaTopicPartitionListError] When pausing subscription fails.
#
# @return [nil]
# @raise [RdkafkaTopicPartitionListError] When pausing subscription fails.
def pause(list)
closed_consumer_check(__method__)

Expand All @@ -122,13 +117,11 @@ def pause(list)
end
end

# Resume producing consumption for the provided list of partitions
# Resumes producing consumption for the provided list of partitions
#
# @param list [TopicPartitionList] The topic with partitions to pause
#
# @raise [RdkafkaError] When resume subscription fails.
#
# @return [nil]
# @raise [RdkafkaError] When resume subscription fails.
def resume(list)
closed_consumer_check(__method__)

Expand All @@ -150,11 +143,10 @@ def resume(list)
end
end

# Return the current subscription to topics and partitions
#
# @raise [RdkafkaError] When getting the subscription fails.
# Returns the current subscription to topics and partitions
#
# @return [TopicPartitionList]
# @raise [RdkafkaError] When getting the subscription fails.
def subscription
closed_consumer_check(__method__)

Expand All @@ -179,7 +171,6 @@ def subscription
# Atomic assignment of partitions to consume
#
# @param list [TopicPartitionList] The topic with partitions to assign
#
# @raise [RdkafkaError] When assigning fails
def assign(list)
closed_consumer_check(__method__)
Expand All @@ -204,9 +195,8 @@ def assign(list)

# Returns the current partition assignment.
#
# @raise [RdkafkaError] When getting the assignment fails.
#
# @return [TopicPartitionList]
# @raise [RdkafkaError] When getting the assignment fails.
def assignment
closed_consumer_check(__method__)

Expand All @@ -232,14 +222,14 @@ def assignment
end

# Return the current committed offset per partition for this consumer group.
# The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.
# The offset field of each requested partition will either be set to stored offset or to -1001
# in case there was no stored offset for that partition.
#
# @param list [TopicPartitionList, nil] The topic with partitions to get the offsets for or nil to use the current subscription.
# @param list [TopicPartitionList, nil] The topic with partitions to get the offsets for or nil
# to use the current subscription.
# @param timeout_ms [Integer] The timeout for fetching this information.
#
# @raise [RdkafkaError] When getting the committed positions fails.
#
# @return [TopicPartitionList]
# @raise [RdkafkaError] When getting the committed positions fails.
def committed(list=nil, timeout_ms=1200)
closed_consumer_check(__method__)

Expand Down Expand Up @@ -269,10 +259,8 @@ def committed(list=nil, timeout_ms=1200)
# @param topic [String] The topic to query
# @param partition [Integer] The partition to query
# @param timeout_ms [Integer] The timeout for querying the broker
#
# @raise [RdkafkaError] When querying the broker fails.
#
# @return [Integer] The low and high watermark
# @raise [RdkafkaError] When querying the broker fails.
def query_watermark_offsets(topic, partition, timeout_ms=200)
closed_consumer_check(__method__)

Expand Down Expand Up @@ -306,10 +294,9 @@ def query_watermark_offsets(topic, partition, timeout_ms=200)
#
# @param topic_partition_list [TopicPartitionList] The list to calculate lag for.
# @param watermark_timeout_ms [Integer] The timeout for each query watermark call.
#
# @return [Hash<String, Hash<Integer, Integer>>] A hash containing all topics with the lag
# per partition
# @raise [RdkafkaError] When querying the broker fails.
#
# @return [Hash<String, Hash<Integer, Integer>>] A hash containing all topics with the lag per partition
def lag(topic_partition_list, watermark_timeout_ms=100)
out = {}

Expand Down Expand Up @@ -358,10 +345,8 @@ def member_id
# When using this `enable.auto.offset.store` should be set to `false` in the config.
#
# @param message [Rdkafka::Consumer::Message] The message which offset will be stored
#
# @raise [RdkafkaError] When storing the offset fails
#
# @return [nil]
# @raise [RdkafkaError] When storing the offset fails
def store_offset(message)
closed_consumer_check(__method__)

Expand Down Expand Up @@ -392,10 +377,8 @@ def store_offset(message)
# message at the given offset.
#
# @param message [Rdkafka::Consumer::Message] The message to which to seek
#
# @raise [RdkafkaError] When seeking fails
#
# @return [nil]
# @raise [RdkafkaError] When seeking fails
def seek(message)
closed_consumer_check(__method__)

Expand Down Expand Up @@ -434,10 +417,8 @@ def seek(message)
#
# @param list [TopicPartitionList,nil] The topic with partitions to commit
# @param async [Boolean] Whether to commit async or wait for the commit to finish
#
# @raise [RdkafkaError] When committing fails
#
# @return [nil]
# @raise [RdkafkaError] When committing fails
def commit(list=nil, async=false)
closed_consumer_check(__method__)

Expand All @@ -462,10 +443,8 @@ def commit(list=nil, async=false)
# Poll for the next message on one of the subscribed topics
#
# @param timeout_ms [Integer] Timeout of this poll
#
# @raise [RdkafkaError] When polling fails
#
# @return [Message, nil] A message or nil if there was no new message within the timeout
# @raise [RdkafkaError] When polling fails
def poll(timeout_ms)
closed_consumer_check(__method__)

Expand Down Expand Up @@ -494,14 +473,11 @@ def poll(timeout_ms)
# Poll for new messages and yield for each received one. Iteration
# will end when the consumer is closed.
#
# If `enable.partition.eof` is turned on in the config this will raise an
# error when an eof is reached, so you probably want to disable that when
# using this method of iteration.
# If `enable.partition.eof` is turned on in the config this will raise an error when an eof is
# reached, so you probably want to disable that when using this method of iteration.
#
# @raise [RdkafkaError] When polling fails
#
# @yieldparam message [Message] Received message
#
# @return [nil]
def each
loop do
Expand Down Expand Up @@ -554,9 +530,7 @@ def each
# that you may or may not see again.
#
# @param max_items [Integer] Maximum size of the yielded array of messages
#
# @param bytes_threshold [Integer] Threshold number of total message bytes in the yielded array of messages
#
# @param timeout_ms [Integer] max time to wait for up to max_items
#
# @raise [RdkafkaError] When polling fails
Expand Down
6 changes: 2 additions & 4 deletions lib/rdkafka/consumer/headers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ def [](key)

# Reads a librdkafka native message's headers and returns them as a Ruby Hash
#
# @param [librdkakfa message] native_message
# @private
#
# @param [librdkakfa message] native_message
# @return [Hash<String, String>] headers Hash for the native_message
#
# @raise [Rdkafka::RdkafkaError] when fail to read headers
#
# @private
def self.from_native(native_message)
headers_ptrptr = FFI::MemoryPointer.new(:pointer)
err = Rdkafka::Bindings.rd_kafka_message_headers(native_message, headers_ptrptr)
Expand Down
16 changes: 6 additions & 10 deletions lib/rdkafka/producer.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require "objspace"

module Rdkafka
# A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that.
class Producer
Expand Down Expand Up @@ -108,18 +106,16 @@ def flush(timeout_ms=5_000)
end

# Partition count for a given topic.
# NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil.
#
# @param topic [String] The topic name.
# @return [Integer] partition count for a given topic
#
# @return partition count [Integer,nil]
#
# We cache the partition count for a given topic for given time
# This prevents us in case someone uses `partition_key` from querying for the count with
# each message. Instead we query once every 30 seconds at most
# @note If 'allow.auto.create.topics' is set to true in the broker, the topic will be
# auto-created after returning nil.
#
# @param [String] topic name
# @return [Integer] partition count for a given topic
# @note We cache the partition count for a given topic for given time.
# This prevents us in case someone uses `partition_key` from querying for the count with
# each message. Instead we query once every 30 seconds at most
def partition_count(topic)
closed_producer_check(__method__)

Expand Down

0 comments on commit 9709183

Please sign in to comment.