Skip to content

Commit

Permalink
more tests, update README
Browse files Browse the repository at this point in the history
  • Loading branch information
yzh44yzh committed May 12, 2023
1 parent 47fa3b9 commit cb0e117
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 17 deletions.
30 changes: 14 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Client library for RabbitMQ build on top of [amqp_client](https://github.com/rab

**fox** allows to create one or seleral connection pools, monitors connections inside pool and reconnects them if needed.

**fox:create_connection_pool(PoolName, ConenctionParams)** creates pool with several connections (5 by default). PoolName should be _atom()_, _string()_ or _binary()_. ConnectionParams should be record [#amqp_params_network{}](https://github.com/jbrisbin/amqp_client/blob/master/include/amqp_client.hrl#L25) or _map()_ with the same fields.
**fox:create_connection_pool(PoolName, ConenctionParams)** creates pool with several connections (5 by default). PoolName should be an `atom()`, `string()` or `binary()`. ConnectionParams should be record [#amqp_params_network{}](https://github.com/jbrisbin/amqp_client/blob/master/include/amqp_client.hrl#L25) or `map()` with the same fields.

```erlang
Params = #{host => "localhost",
Expand All @@ -18,16 +18,14 @@ Params = #{host => "localhost",
fox:create_connection_pool(my_pool, Params),
```

**fox:close_connection_pool(PoolName)** closes all connections and removes pool.
**fox:close_connection_pool(PoolName)** closes all connections and removes the pool.

Before stating the pool you can check your connection params with call to **fox:validate_params_network(Params)**. If params are valid to connect to RabbitMQ, function returns _ok_, otherwise it returns _{error, Reason}_.
Before stating a pool you can check your connection params by calling **fox:validate_params_network(Params)**. If params are valid to connect to RabbitMQ, function returns `ok`, otherwise it returns `{error, Reason}`.


## Working with Channels

Many APIs need a channel.
You can get channel with **fox:get_channel(PoolName)**.
Function returns _{ok, ChannelPid}_ or _{error, Reason}_.
Many APIs need a channel. You can get channel with **fox:get_channel(PoolName)**. Function returns `{ok, ChannelPid}` or `{error, Reason}`.

```erlang
{ok, Channel} = fox:get_channel(my_pool),
Expand All @@ -39,11 +37,11 @@ fox:bind_queue(Channel, <<"my_queue">>, <<"my_exchange">>, <<"my_key">>)),

## Wrappers for amqp_channel:call/cast

**amqp_client** library communicates with RabbitMQ with **amqp_channel:call/cast** calls. It uses records like _#'basic.publish'{}_, _#'exchange.declare'{}_, _#'queue.bind'{}_ etc. And this is not very convenient.
**amqp_client** library communicates with RabbitMQ with **amqp_channel:call/cast** calls. It uses records like `#'basic.publish'{}`, `#'exchange.declare'{}`, `#'queue.bind'{}` etc. And this is not very convenient.

Full list of this records you can find here: [rabbit_framing.hrl](https://github.com/jbrisbin/rabbit_common/blob/master/include/rabbit_framing.hrl).
Here you can find the full list of records: [rabbit_framing.hrl](https://github.com/jbrisbin/rabbit_common/blob/master/include/rabbit_framing.hrl).

**fox** provides wrappers for most ofter used actions. For example, instead of
**fox** provides wrappers for most often used actions. For example, instead of

```erlang
BPublish = #'basic.publish'{exchange = Exchange, routing_key = RKey},
Expand All @@ -57,7 +55,7 @@ you can use
fox:publish(Channel, Exchange, RKey, <<"foobar">>)
```

And insread of
And instead of

```erlang
BPublish = #'basic.publish'{exchange = Exchange, routing_key = RKey},
Expand All @@ -71,7 +69,7 @@ you can use
```erlang
fox:publish(Channel, Exchange, RKey, <<"foobar">>, #{delivery_mode => 2})
```
There are wrappers for _publish_, _declare\_exchange_, _delete\_exchange_, _declare\_queue_, _delete\_queue_, _bind\_queue_, _unbind\_queue_ and _qos_.
There are wrappers for `publish`, `declare_exchange`, `delete_exchange`, `declare_queue`, `delete_queue`, `bind_queue`, `unbind_queue` and `qos`.


## Publishing
Expand All @@ -87,17 +85,17 @@ fox:publish(my_pool, Exchange, RougingKey, <<"Message">>)

The most sophisticated part of working with **amqp_channel** is subscription. You need gen\_server process to accept messages, and send pid of this process as argument to **amqp_client:subscribe/3**.

**fox** provides other way. You should create callback module implementing **fox_subs_worker** behaviour. This is something similar to cowboy handler. Then you call **fox:subscribe**. **fox** creates channel and new process for you module, subscribes to queues and routes messages to your callback module.
**fox** works differently. First you create callback module implementing **fox_subs_worker** behaviour. This is something similar to cowboy handler. Then you call `fox:subscribe`. **fox** creates channel and new process for you module, subscribes to queues and routes messages to your callback module.

```erlang
{ok, Ref} = fox:subscribe(my_pool, <<"my_queue">>, my_callback_module, CallbackInitArgs)
```

The first argument is a pool name. The second argument is queue. Queue can be a queue name (_binary()_) or a _#'basic.consume'{}_ record. Use _basic.consume_ if you need to defined some additional parameters for queue like exclusive, nowait, no\_ack etc. The forth argument (Args) used to init you callback module. **subscribe** returns _reference()_ needed fo unsubscribe.
The first argument is a pool name. The second argument is queue. Queue can be a queue name (`binary()`) or a `#'basic.consume'{}` record. Use `basic.consume` if you need to defined some additional parameters for queue like `exclusive`, `nowait`, `no_ack` etc. The forth argument (Args) used to init you callback module. **subscribe** returns `reference()` needed fo unsubscribe.

**fox_subs_worker** behaviour includes 3 functions:

**init(ChannelPid, Args)** gets channel pid and Args (forth argument to fox:subscribe). Here you can do any initialization steps, like, for example, creating exchanges and queues and bindings. It should create some state which will be later used in other callbacks.
**init(ChannelPid, Args)** gets channel pid and Args (the forth argument to `fox:subscribe`). Here you can do any initialization steps, like creating exchanges, queues and bindings. `init` returns some state which will be later used in other callbacks.

```erlang
init(Channel, Args) ->
Expand All @@ -108,7 +106,7 @@ init(Channel, Args) ->
{ok, State}.
```

**handle(Data, ChannelPid, State)** called each time new message arrives. Here you can process message, reply with _#'basic.ack'{}_ or _#'basic.reject'{}_, or don't reply at all. Function should return new state.
**handle(Data, ChannelPid, State)** called each time new message arrives. Here you can process message, reply with `#'basic.ack'{}` or `#'basic.reject'{}`, or don't reply at all. Function should return a new state.

```erlang
handle({#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{payload = Payload}}, ChannelPid, State) ->
Expand All @@ -117,7 +115,7 @@ handle({#'basic.deliver'{delivery_tag = Tag}, #amqp_msg{payload = Payload}}, Cha
{ok, State};
```

**terminate(ChannelPid, State)** called on _unsubscribe_ or pool closing. Here you can free resources, remove exchanges and queue.
**terminate(ChannelPid, State)** is called on `unsubscribe` or pool closing. Here you can free resources, remove exchanges and queue.

```erlang
terminate(ChannelPid, State) ->
Expand Down
55 changes: 54 additions & 1 deletion test/fox_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ all() ->
subscribe_test,
subscribe_2_queues_test,
subscribe_state_test,
subs_worker_crash_test
subs_worker_crash_test,
channel_crash_test
].


Expand Down Expand Up @@ -308,6 +309,7 @@ subscribe_state_test(_Config) ->
fox:unsubscribe(subscribe_state_test, Ref),
ok.

-spec subs_worker_crash_test(list()) -> ok.
subs_worker_crash_test(_Config) ->
PoolName = subs_worker_crash_test,

Expand Down Expand Up @@ -357,6 +359,57 @@ subs_worker_crash_test(_Config) ->
ok.


-spec channel_crash_test(list()) -> ok.
channel_crash_test(_Config) ->
PoolName = channel_crash_test,

T = ets:new(PoolName, [public, named_table]),
E = <<"my_exchange">>,
Q = <<"channel_crash_test_queue">>,
RK = <<"channel_crash_test_key">>,
Args = {T, E, Q, RK},
{ok, Ref} = fox:subscribe(PoolName, Q, subs_test_callback, Args),

timer:sleep(?DELAY),
ct:log("~p", [get_subs_log(T)]),
?assertMatch([{1, Q, init, Args}], get_subs_log(T)),

fox:publish(PoolName, E, RK, <<"Event 1">>),
timer:sleep(?DELAY),
ct:log("~p", [get_subs_log(T)]),
?assertMatch([
{1, Q, init, Args},
{2, Q, handle_basic_deliver, <<"Event 1">>}
], get_subs_log(T)),

fox:publish(PoolName, E, RK, <<"Event 2">>),
timer:sleep(?DELAY),
ct:log("~p", [get_subs_log(T)]),
?assertMatch([
{1, Q, init, Args},
{2, Q, handle_basic_deliver, <<"Event 1">>},
{3, Q, handle_basic_deliver, <<"Event 2">>}
], get_subs_log(T)),

%% crash channel
#subs_meta{subs_worker = SubsPid} = fox_conn_pool:get_subs_meta(PoolName, Ref),
#subscription{channel = Channel} = sys:get_state(SubsPid),
exit(Channel, boom),
timer:sleep(?DELAY),

fox:publish(PoolName, E, RK, <<"Event 3">>),
timer:sleep(?DELAY),
ct:log("~p", [get_subs_log(T)]),
?assertMatch([
{1, Q, init, Args},
{2, Q, handle_basic_deliver, <<"Event 1">>},
{3, Q, handle_basic_deliver, <<"Event 2">>},
{4, Q, init, Args},
{5, Q, handle_basic_deliver, <<"Event 3">>}
], get_subs_log(T)),
ok.


-spec get_subs_log(ets:tab()) -> [tuple()].
get_subs_log(Ets) ->
lists:sort(
Expand Down

0 comments on commit cb0e117

Please sign in to comment.