Skip to content

Commit

Permalink
backport incremental alter configs (karafka#448)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored May 3, 2024
1 parent bfdb0a2 commit 4eb360f
Show file tree
Hide file tree
Showing 10 changed files with 442 additions and 50 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Rdkafka Changelog

## 0.16.0 (Unreleased)
- **[Feature]** Introduce ability to discover cluster and topic configuration.
- **[Feature]** Support incremental config describe + alter API.
- **[Feature]** Oauthbearer token refresh callback (bruce-szalwinski-he)
- [Enhancement] Provide `Rrdkafka::Admin#describe_errors` to get errors descriptions (mensfeld)
- [Enhancement] Replace time poll based wait engine with an event based to improve response times on blocking operations and wait (nijikon + mensfeld)
Expand Down
2 changes: 2 additions & 0 deletions lib/rdkafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
require "rdkafka/admin/describe_acl_report"
require "rdkafka/admin/describe_configs_handle"
require "rdkafka/admin/describe_configs_report"
require "rdkafka/admin/incremental_alter_configs_handle"
require "rdkafka/admin/incremental_alter_configs_report"
require "rdkafka/admin/acl_binding_result"
require "rdkafka/admin/config_binding_result"
require "rdkafka/admin/config_resource_binding_result"
Expand Down
92 changes: 87 additions & 5 deletions lib/rdkafka/admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -667,11 +667,6 @@ def describe_acl(resource_type:, resource_name:, resource_pattern_type:, princip

# Describe configs
#
# @param resource_type - values of type rd_kafka_ResourceType_t that support configs
# https://github.com/confluentinc/librdkafka/blob/292d2a66b9921b783f08147807992e603c7af059/src/rdkafka.h#L7307
# valid values are:
# RD_KAFKA_RESOURCE_TOPIC = 2
# RD_KAFKA_RESOURCE_BROKER = 4
# @param resources [Array<Hash>] Array where elements are hashes with two keys:
# - `:resource_type` - numerical resource type based on Kafka API
# - `:resource_name` - string with resource name
Expand Down Expand Up @@ -742,6 +737,93 @@ def describe_configs(resources)
handle
end

# Alters in an incremental way all the configs provided for given resources
#
# @param resources_with_configs [Array<Hash>] resources with the configs key that contains
# name, value and the proper op_type to perform on this value.
#
# @return [IncrementalAlterConfigsHandle] Incremental alter configs handle that can be used to
# wait for the result of altering resources with their appropriate configs
#
# @raise [RdkafkaError]
#
# @note Several resources can be requested at one go, but only one broker at a time
# @note The results won't contain altered values but only the altered resources
def incremental_alter_configs(resources_with_configs)
closed_admin_check(__method__)

handle = IncrementalAlterConfigsHandle.new
handle[:pending] = true
handle[:response] = -1

queue_ptr = @native_kafka.with_inner do |inner|
Rdkafka::Bindings.rd_kafka_queue_get_background(inner)
end

if queue_ptr.null?
raise Rdkafka::Config::ConfigError.new("rd_kafka_queue_get_background was NULL")
end

admin_options_ptr = @native_kafka.with_inner do |inner|
Rdkafka::Bindings.rd_kafka_AdminOptions_new(
inner,
Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS
)
end

IncrementalAlterConfigsHandle.register(handle)
Rdkafka::Bindings.rd_kafka_AdminOptions_set_opaque(admin_options_ptr, handle.to_ptr)

# Tu poprawnie tworzyc
pointer_array = resources_with_configs.map do |resource_details|
# First build the appropriate resource representation
resource_ptr = Rdkafka::Bindings.rd_kafka_ConfigResource_new(
resource_details.fetch(:resource_type),
FFI::MemoryPointer.from_string(
resource_details.fetch(:resource_name)
)
)

resource_details.fetch(:configs).each do |config|
Bindings.rd_kafka_ConfigResource_add_incremental_config(
resource_ptr,
config.fetch(:name),
config.fetch(:op_type),
config.fetch(:value)
)
end

resource_ptr
end

configs_array_ptr = FFI::MemoryPointer.new(:pointer, pointer_array.size)
configs_array_ptr.write_array_of_pointer(pointer_array)


begin
@native_kafka.with_inner do |inner|
Rdkafka::Bindings.rd_kafka_IncrementalAlterConfigs(
inner,
configs_array_ptr,
pointer_array.size,
admin_options_ptr,
queue_ptr
)
end
rescue Exception
IncrementalAlterConfigsHandle.remove(handle.to_ptr.address)

raise
ensure
Rdkafka::Bindings.rd_kafka_ConfigResource_destroy_array(
configs_array_ptr,
pointer_array.size
) if configs_array_ptr
end

handle
end

private

def closed_admin_check(method)
Expand Down
4 changes: 2 additions & 2 deletions lib/rdkafka/admin/describe_configs_handle.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def create_result

def raise_error
raise RdkafkaError.new(
self[:response],
broker_message: self[:response_string].read_string
self[:response],
broker_message: self[:response_string].read_string
)
end
end
Expand Down
3 changes: 1 addition & 2 deletions lib/rdkafka/admin/describe_configs_report.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ def validate!(config_resource_result_ptr)
raise(
RdkafkaError.new(
code,
nil,
broker_message: Bindings.rd_kafka_ConfigResource_error_string(config_resource_result_ptr)
Bindings.rd_kafka_ConfigResource_error_string(config_resource_result_ptr)
)
)
end
Expand Down
33 changes: 33 additions & 0 deletions lib/rdkafka/admin/incremental_alter_configs_handle.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

module Rdkafka
class Admin
class IncrementalAlterConfigsHandle < AbstractHandle
layout :pending, :bool,
:response, :int,
:response_string, :pointer,
:config_entries, :pointer,
:entry_count, :int

# @return [String] the name of the operation.
def operation_name
"incremental alter configs"
end

# @return [DescribeAclReport] instance with an array of acls that matches the request filters.
def create_result
IncrementalAlterConfigsReport.new(
config_entries: self[:config_entries],
entry_count: self[:entry_count]
)
end

def raise_error
raise RdkafkaError.new(
self[:response],
broker_message: self[:response_string].read_string
)
end
end
end
end
54 changes: 54 additions & 0 deletions lib/rdkafka/admin/incremental_alter_configs_report.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# frozen_string_literal: true

module Rdkafka
class Admin
class IncrementalAlterConfigsReport
attr_reader :resources

def initialize(config_entries:, entry_count:)
@resources=[]

return if config_entries == FFI::Pointer::NULL

config_entries
.read_array_of_pointer(entry_count)
.each { |config_resource_result_ptr| validate!(config_resource_result_ptr) }
.each do |config_resource_result_ptr|
config_resource_result = ConfigResourceBindingResult.new(config_resource_result_ptr)

pointer_to_size_t = FFI::MemoryPointer.new(:int32)
configs_ptr = Bindings.rd_kafka_ConfigResource_configs(
config_resource_result_ptr,
pointer_to_size_t
)

configs_ptr
.read_array_of_pointer(pointer_to_size_t.read_int)
.map { |config_ptr| ConfigBindingResult.new(config_ptr) }
.each { |config_binding| config_resource_result.configs << config_binding }

@resources << config_resource_result
end
ensure
return if config_entries == FFI::Pointer::NULL

Bindings.rd_kafka_ConfigResource_destroy_array(config_entries, entry_count)
end

private

def validate!(config_resource_result_ptr)
code = Bindings.rd_kafka_ConfigResource_error(config_resource_result_ptr)

return if code.zero?

raise(
RdkafkaError.new(
code,
Bindings.rd_kafka_ConfigResource_error_string(config_resource_result_ptr)
)
)
end
end
end
end
12 changes: 12 additions & 0 deletions lib/rdkafka/bindings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,22 @@ class ConfigResource < FFI::Struct
attach_function :rd_kafka_ConfigEntry_synonyms, [:pointer, :pointer], :pointer
attach_function :rd_kafka_ConfigResource_error, [:pointer], :int
attach_function :rd_kafka_ConfigResource_error_string, [:pointer], :string
attach_function :rd_kafka_IncrementalAlterConfigs, [:pointer, :pointer, :size_t, :pointer, :pointer], :void, blocking: true
attach_function :rd_kafka_IncrementalAlterConfigs_result_resources, [:pointer, :pointer], :pointer
attach_function :rd_kafka_ConfigResource_add_incremental_config, [:pointer, :string, :int32, :string], :pointer
attach_function :rd_kafka_event_IncrementalAlterConfigs_result, [:pointer], :pointer

RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS = 5
RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT = 104

RD_KAFKA_ADMIN_OP_INCREMENTALALTERCONFIGS = 16
RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT = 131072

RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET = 0
RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE = 1
RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND = 2
RD_KAFKA_ALTER_CONFIG_OP_TYPE_SUBTRACT = 3

# Errors
class NativeErrorDesc < FFI::Struct
layout :code, :int,
Expand Down
46 changes: 42 additions & 4 deletions lib/rdkafka/callbacks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def initialize(event_ptr)
end
end

class DescribeConfigResult
class DescribeConfigsResult
attr_reader :result_error, :error_string, :results, :results_count

def initialize(event_ptr)
Expand All @@ -122,10 +122,28 @@ def initialize(event_ptr)
@error_string = Rdkafka::Bindings.rd_kafka_event_error_string(event_ptr)

if @result_error == 0
acl_describe_result = Rdkafka::Bindings.rd_kafka_event_DescribeConfigs_result(event_ptr)
configs_describe_result = Rdkafka::Bindings.rd_kafka_event_DescribeConfigs_result(event_ptr)
# Get the number of matching acls
pointer_to_size_t = FFI::MemoryPointer.new(:int32)
@results = Rdkafka::Bindings.rd_kafka_DescribeConfigs_result_resources(acl_describe_result, pointer_to_size_t)
@results = Rdkafka::Bindings.rd_kafka_DescribeConfigs_result_resources(configs_describe_result, pointer_to_size_t)
@results_count = pointer_to_size_t.read_int
end
end
end

class IncrementalAlterConfigsResult
attr_reader :result_error, :error_string, :results, :results_count

def initialize(event_ptr)
@results=[]
@result_error = Rdkafka::Bindings.rd_kafka_event_error(event_ptr)
@error_string = Rdkafka::Bindings.rd_kafka_event_error_string(event_ptr)

if @result_error == 0
incremental_alter_result = Rdkafka::Bindings.rd_kafka_event_IncrementalAlterConfigs_result(event_ptr)
# Get the number of matching acls
pointer_to_size_t = FFI::MemoryPointer.new(:int32)
@results = Rdkafka::Bindings.rd_kafka_IncrementalAlterConfigs_result_resources(incremental_alter_result, pointer_to_size_t)
@results_count = pointer_to_size_t.read_int
end
end
Expand All @@ -146,6 +164,8 @@ def self.call(_, event_ptr, _)
process_create_topic(event_ptr)
when Rdkafka::Bindings::RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT
process_describe_configs(event_ptr)
when Rdkafka::Bindings::RD_KAFKA_EVENT_INCREMENTALALTERCONFIGS_RESULT
process_incremental_alter_configs(event_ptr)
when Rdkafka::Bindings::RD_KAFKA_EVENT_DELETETOPICS_RESULT
process_delete_topic(event_ptr)
when Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT
Expand Down Expand Up @@ -182,7 +202,7 @@ def self.process_create_topic(event_ptr)
end

def self.process_describe_configs(event_ptr)
describe_configs = DescribeConfigResult.new(event_ptr)
describe_configs = DescribeConfigsResult.new(event_ptr)
describe_configs_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)

if describe_configs_handle = Rdkafka::Admin::DescribeConfigsHandle.remove(describe_configs_handle_ptr.address)
Expand All @@ -199,6 +219,24 @@ def self.process_describe_configs(event_ptr)
end
end

def self.process_incremental_alter_configs(event_ptr)
incremental_alter = IncrementalAlterConfigsResult.new(event_ptr)
incremental_alter_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)

if incremental_alter_handle = Rdkafka::Admin::IncrementalAlterConfigsHandle.remove(incremental_alter_handle_ptr.address)
incremental_alter_handle[:response] = incremental_alter.result_error
incremental_alter_handle[:response_string] = incremental_alter.error_string
incremental_alter_handle[:pending] = false

if incremental_alter.result_error == 0
incremental_alter_handle[:config_entries] = incremental_alter.results
incremental_alter_handle[:entry_count] = incremental_alter.results_count
end

incremental_alter_handle.unlock
end
end

def self.process_delete_groups(event_ptr)
delete_groups_result = Rdkafka::Bindings.rd_kafka_event_DeleteGroups_result(event_ptr)

Expand Down
Loading

0 comments on commit 4eb360f

Please sign in to comment.