Skip to content

Commit

Permalink
Add jetstream spec
Browse files Browse the repository at this point in the history
  • Loading branch information
Envek committed Jun 2, 2023
1 parent 347f25b commit cc2732e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
4 changes: 2 additions & 2 deletions sig/nats/io/client.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ module NATS
include MonitorMixin
include Status

@ruby_pid: Integer

attr_reader status: Integer
attr_reader server_info: Hash[Symbol, untyped]
attr_reader server_pool: Array[untyped]
Expand All @@ -45,6 +43,8 @@ module NATS
SUB_OP: 'SUB'
EMPTY_MSG: ''

INSTANCES: ObjectSpace::WeakMap

@options: Hash[Symbol, untyped]

@io: NATS::IO::Socket?
Expand Down
39 changes: 31 additions & 8 deletions spec/client_fork_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,28 @@
describe 'Client - Fork detection' do

before(:all) do
@s = NatsServerControl.new
@tmpdir = Dir.mktmpdir("ruby-jetstream-fork")
@s = NatsServerControl.new("nats://127.0.0.1:4524", "/tmp/test-nats.pid", "-js -sd=#{@tmpdir}")
@s.start_server(true)
end

after(:all) do
@s.kill_server
FileUtils.remove_entry(@tmpdir)
end

class Component
attr_reader :nats, :options
attr_accessor :msgs

def initialize(options={})
@nats = NATS::IO::Client.new
@nats = NATS.connect("nats://127.0.0.1:4524", options)
@msgs = []
@options = options
end

def connect!
@nats.connect(@options)
end
end

let(:options) { {} }
let!(:component) { Component.new(options).tap(&:connect!) }
let!(:component) { Component.new(options) }

it 'should be able to publish messages from child process after forking' do
received = nil
Expand Down Expand Up @@ -114,6 +111,32 @@ def connect!
Process.wait(pid)
end

it "should be able to use jetstreams from child process after forking" do
js = component.nats.jetstream
js.add_stream(name: "forked-stream", subjects: ["foo"])

from_child, to_parent = IO.pipe

pid = fork do # child process
from_child.close # close unused ends

psub = js.pull_subscribe("foo", "bar")
msgs = psub.fetch(1)
msgs.each(&:ack)

to_parent.write(msgs.first.data)
end
to_parent.close

js.publish("foo", "Hey JetStream!")

result = from_child.read
expect(result).to eq("Hey JetStream!")

from_child.close
Process.wait(pid)
end

context "when reconnection is disabled" do
let(:options) { { reconnect: false } }

Expand Down

0 comments on commit cc2732e

Please sign in to comment.