Skip to content

Commit

Permalink
js: implement consumer with name
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <[email protected]>
  • Loading branch information
wallyqs committed Oct 1, 2022
1 parent 9926494 commit dbc8f17
Showing 2 changed files with 137 additions and 4 deletions.
23 changes: 19 additions & 4 deletions lib/nats/io/js.rb
Original file line number Diff line number Diff line change
@@ -222,7 +222,9 @@ def subscribe(subject, params={}, &cb)
# @option params [Hash] :config Configuration for the consumer.
# @return [NATS::JetStream::PullSubscription]
def pull_subscribe(subject, durable, params={})
raise JetStream::Error::InvalidDurableName.new("nats: invalid durable name") if durable.empty?
if durable.empty? && !params[:consumer]
raise JetStream::Error::InvalidDurableName.new("nats: invalid durable name")
end
params[:consumer] ||= durable
stream = params[:stream].nil? ? find_stream_name_by_subject(subject) : params[:stream]

@@ -329,7 +331,16 @@ def add_consumer(stream, config, params={})
else
config
end
req_subject = if config[:durable_name]

req_subject = case
when config[:name]
# NOTE: Only supported after nats-server v2.9.0
if config[:filter_subject] && config[:filter_subject] != ">"
"#{@prefix}.CONSUMER.CREATE.#{stream}.#{config[:name]}.#{config[:filter_subject]}"
else
"#{@prefix}.CONSUMER.CREATE.#{stream}.#{config[:name]}"
end
when config[:durable_name]
"#{@prefix}.CONSUMER.DURABLE.CREATE.#{stream}.#{config[:durable_name]}"
else
"#{@prefix}.CONSUMER.CREATE.#{stream}"
@@ -341,6 +352,10 @@ def add_consumer(stream, config, params={})
raise ArgumentError.new("nats: invalid ack wait") unless config[:ack_wait].is_a?(Integer)
config[:ack_wait] = config[:ack_wait] * ::NATS::NANOSECONDS
end
if config[:inactive_threshold]
raise ArgumentError.new("nats: invalid inactive threshold") unless config[:inactive_threshold].is_a?(Integer)
config[:inactive_threshold] = config[:inactive_threshold] * ::NATS::NANOSECONDS
end

req = {
stream_name: stream,
@@ -1203,7 +1218,7 @@ def initialize(opts={})
# @return [Integer]
# @!attribute max_ack_pending
# @return [Integer]
ConsumerConfig = Struct.new(:durable_name, :description,
ConsumerConfig = Struct.new(:name, :durable_name, :description,
:deliver_policy, :opt_start_seq, :opt_start_time,
:ack_policy, :ack_wait, :max_deliver, :backoff,
:filter_subject, :replay_policy, :rate_limit_bps,
@@ -1220,7 +1235,7 @@ def initialize(opts={})
# now can be configured directly.
:num_replicas,
# Force memory storage
:memory_storage,
:mem_storage,
keyword_init: true) do
def initialize(opts={})
# Filter unrecognized fields just in case.
118 changes: 118 additions & 0 deletions spec/js_spec.rb
Original file line number Diff line number Diff line change
@@ -535,6 +535,124 @@
nc.close
nc2.close
end

it 'should create and bind to consumer with name' do
nc = NATS.connect(@s.uri)
js = nc.jetstream

js.add_stream(name: "ctests", subjects: ['a', 'b', 'c.>'])
js.publish('a', 'hello world!')
js.publish('b', 'hello world!!')
js.publish('c.d', 'hello world!!!')
js.publish('c.d.e', 'hello world!!!!')

tsub = nc.subscribe("$JS.API.CONSUMER.>")

# ephemeral consumer
consumer_name = 'ephemeral'
cinfo = js.add_consumer("ctests", name: consumer_name, ack_policy: "explicit")
expect(cinfo.config.name).to eql(consumer_name)

msg = tsub.next_msg
expect(msg.subject).to eql('$JS.API.CONSUMER.CREATE.ctests.ephemeral')

sub = js.pull_subscribe("", "", stream: 'ctests', consumer: 'ephemeral')
cinfo = sub.consumer_info
expect(cinfo.config.name).to eql(consumer_name)
msgs = sub.fetch(1)
expect(msgs.first.data).to eql('hello world!')
msgs.first.ack_sync
msg = tsub.next_msg()
expect(msg.subject).to eql('$JS.API.CONSUMER.INFO.ctests.ephemeral')
tsub.unsubscribe

# Create durable pull consumer with a name.
tsub = nc.subscribe("$JS.API.CONSUMER.>")
consumer_name = 'durable'
cinfo = js.add_consumer("ctests",
name: consumer_name,
durable_name: consumer_name,
ack_policy: "explicit",
)
expect(cinfo.config.name).to eql(consumer_name)
msg = tsub.next_msg()
expect(msg.subject).to eql('$JS.API.CONSUMER.CREATE.ctests.durable')
sub = js.pull_subscribe("", "durable", stream: 'ctests')
cinfo = sub.consumer_info
expect(cinfo.config.name).to eql(consumer_name)
msgs = sub.fetch(1)
expect(msgs.first.data).to eql('hello world!')
msgs.first.ack_sync
msg = tsub.next_msg()
expect(msg.subject).to eql('$JS.API.CONSUMER.INFO.ctests.durable')
tsub.unsubscribe

# Create durable pull consumer with a name and a filter_subject
tsub = nc.subscribe("$JS.API.CONSUMER.>")
consumer_name = 'durable2'
cinfo = js.add_consumer("ctests",
name: consumer_name,
durable_name: consumer_name,
filter_subject: 'b',
ack_policy: "explicit",
)
expect(cinfo.config.name).to eql(consumer_name)
msg = tsub.next_msg()
expect(msg.subject).to eql('$JS.API.CONSUMER.CREATE.ctests.durable2.b')
sub = js.pull_subscribe("", "durable2", stream: 'ctests')
msgs = sub.fetch(1)
expect(msgs.first.data).to eql('hello world!!')
msgs.first.ack_sync
tsub.unsubscribe

# Create durable pull consumer with a name and a filter_subject
tsub = nc.subscribe("$JS.API.CONSUMER.>")
consumer_name = 'durable3'
cinfo = js.add_consumer("ctests",
name: consumer_name,
durable_name: consumer_name,
filter_subject: '>',
ack_policy: "explicit",
)
expect(cinfo.config.name).to eql(consumer_name)
msg = tsub.next_msg()
expect(msg.subject).to eql('$JS.API.CONSUMER.CREATE.ctests.durable3')
sub = js.pull_subscribe("", "durable3", stream: 'ctests')
msgs = sub.fetch(1)
expect(msgs.first.data).to eql('hello world!')
msgs.first.ack_sync
tsub.unsubscribe

# name and durable must match if both present.
expect do
js.add_consumer("ctests",
name: "foo",
durable_name: "bar",
ack_policy: "explicit",
)
end.to raise_error NATS::JetStream::Error::BadRequest
begin
js.add_consumer("ctests",
name: "foo",
durable_name: "bar",
ack_policy: "explicit",
)
rescue => e
expect(e.description).to eql(%Q(consumer name in subject does not match durable name in request))
end

# consumer name and inactive
consumer_name = 'inactive'
cinfo = js.add_consumer("ctests",
name: consumer_name,
durable_name: consumer_name,
inactive_threshold: 2,
ack_policy: "explicit",
mem_storage: true
)
expect(cinfo.config.inactive_threshold).to eql(2000000000)
expect(cinfo.config.mem_storage).to eql(true)
end
end

describe 'Push Subscribe' do

0 comments on commit dbc8f17

Please sign in to comment.