Skip to content

Commit

Permalink
small pause between connected and subscribed
Browse files Browse the repository at this point in the history
  • Loading branch information
yzh44yzh committed May 5, 2023
1 parent ae7f73e commit 0379499
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 19 deletions.
1 change: 1 addition & 0 deletions include/fox.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
connection :: pid() | undefined,
connection_ref :: reference() | undefined,
connection_params :: #amqp_params_network{},
connection_ready = false :: boolean(),
reconnect_attempt = 0 :: non_neg_integer(),
subscribers = [] :: [pid()],
registered_name :: atom()
Expand Down
54 changes: 41 additions & 13 deletions src/connection/fox_conn_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ stop(Pid) ->
init({RegName, ConnParams}) ->
put('$module', ?MODULE),
self() ! connect,
{ok, #conn_worker_state{connection_params = ConnParams, registered_name = RegName}}.
{ok, #conn_worker_state{
connection_ready = false,
connection_params = ConnParams,
registered_name = RegName
}}.


-spec handle_call(gs_request(), gs_from(), gs_reply()) -> gs_call_reply().
Expand All @@ -66,11 +70,15 @@ handle_call(Any, _From, State) ->


-spec handle_cast(gs_request(), gs_state()) -> gs_cast_reply().
handle_cast({register_subscriber, Pid}, #conn_worker_state{connection = Conn, subscribers = Subs} = State) ->
case Conn of
undefined -> do_nothing;
_ -> fox_subs_worker:connection_established(Pid, Conn)
end,
handle_cast({register_subscriber, Pid},
#conn_worker_state{connection_ready = false, subscribers = Subs} = State
) ->
{noreply, State#conn_worker_state{subscribers = [Pid | Subs]}};

handle_cast({register_subscriber, Pid},
#conn_worker_state{connection_ready = true, connection = Conn, subscribers = Subs} = State
) ->
fox_subs_worker:connection_established(Pid, Conn),
{noreply, State#conn_worker_state{subscribers = [Pid | Subs]}};

handle_cast({remove_subscriber, Pid}, #conn_worker_state{subscribers = Subs} = State) ->
Expand All @@ -87,28 +95,45 @@ handle_info(connect,
#conn_worker_state{
connection = undefined, connection_ref = undefined,
connection_params = Params, reconnect_attempt = Attempt,
subscribers = Subscribers,
registered_name = RegName
} = State) ->
SParams = fox_utils:params_network_to_str(Params),
case amqp_connection:start(Params) of
{ok, Conn} ->
Ref = erlang:monitor(process, Conn),
logger:notice("~s connected to ~s", [RegName, SParams]),
[fox_subs_worker:connection_established(Pid, Conn) || Pid <- Subscribers],
{noreply, State#conn_worker_state{
%% Need a small pause here
%% because RabbitMQ is not ready to accept subscriptions
%% immediatelly after restart.
{ok, SubscribeTimeout} = application:get_env(fox, subscribe_timeout),
erlang:send_after(SubscribeTimeout, self(), notify_subscribers),
State2 = State#conn_worker_state{
connection = Conn,
connection_ref = Ref,
reconnect_attempt = 0}};
connection_ready = false,
reconnect_attempt = 0
},
{noreply, State2};
{error, Reason} ->
logger:error("~s could not connect to ~s ~w", [RegName, SParams, Reason]),
fox_priv_utils:reconnect(Attempt),
{noreply, State#conn_worker_state{
State2 = State#conn_worker_state{
connection = undefined,
connection_ref = undefined,
reconnect_attempt = Attempt + 1}}
connection_ready = true,
reconnect_attempt = Attempt + 1
},
{noreply, State2}
end;

handle_info(notify_subscribers,
#conn_worker_state{
connection = Conn,
subscribers = Subscribers
} = State) ->
[fox_subs_worker:connection_established(Pid, Conn) || Pid <- Subscribers],
{noreply, State#conn_worker_state{connection_ready = true}};

handle_info({'DOWN', Ref, process, Conn, Reason},
#conn_worker_state{
connection = Conn,
Expand All @@ -118,7 +143,10 @@ handle_info({'DOWN', Ref, process, Conn, Reason},
} = State) ->
fox_priv_utils:error_or_info(Reason, "~s, connection is DOWN: ~0p", [RegName, Reason]),
fox_priv_utils:reconnect(Attempt),
{noreply, State#conn_worker_state{connection = undefined, connection_ref = undefined}};
{noreply, State#conn_worker_state{
connection_ready = false,
connection = undefined,
connection_ref = undefined}};


handle_info(Request, State) ->
Expand Down
1 change: 1 addition & 0 deletions src/fox.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
{connection_timeout, 10000},
{max_reconnect_timeout, 5000},
{min_reconnect_timeout, 100},
{subscribe_timeout, 300},
{num_publish_channels, 20}
]},
{licenses, ["MIT"]}
Expand Down
9 changes: 3 additions & 6 deletions src/fox.erl
Original file line number Diff line number Diff line change
Expand Up @@ -354,14 +354,11 @@ test_run() ->
{ok, _Ref1} = subscribe("my_pool", <<"queue_1">>, sample_subs_callback, [<<"queue_1">>, <<"key_1">>]),

Q = #'basic.consume'{queue = <<"queue_2">>},
{ok, Ref2} = subscribe("my_pool", Q, sample_subs_callback, [<<"queue_2">>, <<"key_2">>]),

timer:sleep(200),
{ok, _Ref2} = subscribe("my_pool", Q, sample_subs_callback, [<<"queue_2">>, <<"key_2">>]),

ok = publish("my_pool", <<"my_exchange">>, <<"key_1">>, <<"Hello 1">>, #{synchronous => true}),
ok = publish("my_pool", <<"my_exchange">>, <<"key_2">>, <<"Hello 2">>),

timer:sleep(2000),

unsubscribe("my_pool", Ref2),
%% timer:sleep(2000),
%% unsubscribe("my_pool", Ref2),
ok.
1 change: 1 addition & 0 deletions test/fox_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ subscribe_state_test(_Config) ->
#conn_worker_state{subscribers = Subscribers} = ConnState,
?assertEqual([SubsPid], Subscribers),

timer:sleep(?DELAY),
SubsState = sys:get_state(SubsPid),
ct:log("SubsState: ~p", [SubsState]),
?assertMatch(#subscription{
Expand Down

0 comments on commit 0379499

Please sign in to comment.