diff --git a/.travis.yml b/.travis.yml index 82af05d..9f6d597 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,6 @@ language: erlang otp_release: + - 17.1 - 17.0 - R16B03-1 - R16B03 @@ -10,8 +11,5 @@ otp_release: - R15B02 - R15B01 - R15B - - R14B04 - - R14B03 - - R14B02 services: cassandra script: make test diff --git a/Makefile b/Makefile index 5df5186..f4bfe7d 100644 --- a/Makefile +++ b/Makefile @@ -11,26 +11,22 @@ all: deps compile deps: @ rebar get-deps -compile: +compile: deps @ rebar compile dialyze: compile $(PLT) - @ echo "==> (dialyze)" @ dialyzer --plt $(PLT) ebin \ -Wunmatched_returns \ -Wno_undefined_callbacks $(PLT): dialyzer.apps - @ echo "==> (dialyze)" - @ printf "Building $(PLT) file..." @- dialyzer -q --build_plt --output_plt $(PLT) \ --apps $(shell cat $(APPS)) - @ echo " done" wait: @ ./scripts/wait.escript $(DB_HOST) $(DB_PORT) $(TIMEOUT) -test: compile wait +test: deps compile wait @ rebar skip_deps=true ct console: compile diff --git a/README.md b/README.md index b7cd2bf..769f8fe 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ # erlCQL [![Build Status][travis_ci_image]][travis_ci] -[![Bitdeli Badge][bitdeli_image]][bitdeli] Cassandra native protocol CQL Erlang client. @@ -30,6 +29,7 @@ erlcql:start_link(Options :: proplists:proplist()) -> | reconnect_start | pos_integer() | 1000 | | reconnect_max | pos_integer() | 30000 | | keepalive | boolean() | false | +| default_timeout | pos_integer() | 5000 | ### Query @@ -61,11 +61,9 @@ erlcql:start_link(Options :: proplists:proplist()) -> ### Versions -Supported versions: [`v1`][proto_v1]. +Supported versions: [`v1`][proto_v1], [`v2`][proto_v2] [travis_ci]: https://travis-ci.org/rpt/erlcql [travis_ci_image]: https://travis-ci.org/rpt/erlcql.png -[bitdeli]: https://bitdeli.com/free -[bitdeli_image]: https://d2weczhvl823v0.cloudfront.net/rpt/erlcql/trend.png -[proto_v1]: -https://raw.github.com/apache/cassandra/trunk/doc/native_protocol_v1.spec +[proto_v1]: https://raw.github.com/apache/cassandra/trunk/doc/native_protocol_v1.spec +[proto_v2]: https://raw.github.com/apache/cassandra/trunk/doc/native_protocol_v2.spec diff --git a/rebar.config b/rebar.config index 90b0444..64c7250 100644 --- a/rebar.config +++ b/rebar.config @@ -2,8 +2,11 @@ %% vi: set ft=erlang : {deps, - [{snappy, "", {git, "https://github.com/rpt/snappy.git"}}, + [{lager, ".*", {git, "https://github.com/basho/lager.git", {tag, "2.0.3"}}}, + {snappy, "", {git, "https://github.com/rpt/snappy.git"}}, {lz4, "", {git, "https://github.com/szktty/erlang-lz4.git"}}, - {proper, "", {git, "https://github.com/manopapad/proper.git"}}]}. + {proper, "", {git, "https://github.com/manopapad/proper.git"}}, + {folsomite, ".*", {git, "git://github.com/puzza007/folsomite.git"}}, + {quintana, ".*", {git, "https://github.com/puzza007/quintana.git"}}]}. {cover_enabled, true}. diff --git a/src/erlcql.app.src b/src/erlcql.app.src index 1a33b7a..b996f30 100644 --- a/src/erlcql.app.src +++ b/src/erlcql.app.src @@ -1,12 +1,8 @@ -%% -*- mode: erlang -*- -%% vi: set ft=erlang : - {application, erlcql, [{description, "Cassandra native protocol CQL client"}, - {vsn, "0.1.7"}, + {vsn, "0.2.5"}, {registered, []}, - {applications, - [kernel, - stdlib, - snappy, - lz4]}]}. + {applications, [kernel, + stdlib, + snappy, + lz4]}]}. diff --git a/src/erlcql.erl b/src/erlcql.erl index ddcad75..13dc60e 100644 --- a/src/erlcql.erl +++ b/src/erlcql.erl @@ -18,16 +18,15 @@ %% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS %% IN THE SOFTWARE. -%% @doc API module. -%% @author Krzysztof Rutka -module(erlcql). -%% API --export([start_link/0, - start_link/1, - start_link/2]). --export([q/2, q/3, - e/3, e/4]). +-export([start_link/0]). +-export([start_link/1]). +-export([start_link/2]). +-export([q/2]). +-export([q/3]). +-export([e/3]). +-export([e/4]). -export([default/1]). -include("erlcql.hrl"). @@ -36,24 +35,20 @@ Pid :: pid(), Stream :: integer()}. --export_type([response/0, - consistency/0, - compression/0, - event_type/0, - event_fun/0, - query_ref/0]). --export_type([values/0, - type/0, - native_type/0, - uuid/0, - collection_type/0, - erlcql_list/0, - erlcql_set/0, - erlcql_map/0]). - -%%----------------------------------------------------------------------------- -%% API functions -%%----------------------------------------------------------------------------- +-export_type([response/0]). +-export_type([consistency/0]). +-export_type([compression/0]). +-export_type([event_type/0]). +-export_type([event_fun/0]). +-export_type([query_ref/0]). +-export_type([values/0]). +-export_type([type/0]). +-export_type([native_type/0]). +-export_type([uuid/0]). +-export_type([collection_type/0]). +-export_type([erlcql_list/0]). +-export_type([erlcql_set/0]). +-export_type([erlcql_map/0]). -spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}. start_link() -> @@ -98,10 +93,14 @@ default(compression) -> false; default(tracing) -> false; default(event_handler) -> self(); default(consistency) -> quorum; -default(auto_reconnect) -> false; +default(serial_consistency) -> serial; +default(batch_type) -> logged; default(register) -> []; default(use) -> undefined; default(prepare) -> []; +default(auto_reconnect) -> false; default(reconnect_start) -> 1000; default(reconnect_max) -> 30000; -default(keepalive) -> false. +default(keepalive) -> false; +default(version) -> 2; +default(default_timeout) -> timer:seconds(5). diff --git a/include/erlcql.hrl b/src/erlcql.hrl similarity index 80% rename from include/erlcql.hrl rename to src/erlcql.hrl index 791eefa..de4c38c 100644 --- a/include/erlcql.hrl +++ b/src/erlcql.hrl @@ -18,35 +18,28 @@ %% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS %% IN THE SOFTWARE. -%% @author Krzysztof Rutka - -define(APP, erlcql). --define(VERSION, 1). -%% Directions +-type version() :: 1 | 2. + -define(REQUEST, 0). -define(RESPONSE, 1). -%% Encode/decode types -define(INT, 4/big-signed-integer-unit:8). -define(SHORT, 2/big-unsigned-integer-unit:8). -define(int(X), <>). -define(short(X), <>). -%% Parser -record(parser, { buffer = <<>> :: binary(), - length :: pos_integer() + length :: pos_integer(), + version :: version() }). -type parser() :: #parser{}. -type event_fun() :: fun((event()) -> any()). -%%----------------------------------------------------------------------------- -%% Types -%%----------------------------------------------------------------------------- - -type proplist() :: proplists:proplist(). -type socket() :: inet:socket(). -type ets() :: ets:tid(). @@ -102,7 +95,11 @@ | quorum | all | local_quorum - | each_quorum. + | each_quorum + | local_one. + +-type serial_consistency() :: serial + | local_serial. -type event_type() :: topology_change | status_change @@ -164,8 +161,6 @@ -type inet() :: {inet:ip_address(), inet:port_number()}. -%% Types ---------------------------------------------------------------------- - -type uuid() :: bitstring(). -type native_type() :: binary() @@ -189,21 +184,6 @@ -type values() :: [type() | {option(), type()}]. -%%----------------------------------------------------------------------------- -%% Logging macros -%%----------------------------------------------------------------------------- - --ifdef(ERLCQL_NO_LOGS). --define(ERROR(Format, Data), begin Format, Data, ok end). --define(EMERGENCY(Format, Data), begin Format, Data, ok end). --define(ALERT(Format, Data), begin Format, Data, ok end). --define(CRITICAL(Format, Data), begin Format, Data, ok end). --define(WARNING(Format, Data), begin Format, Data, ok end). --define(INFO(Format, Data), begin Format, Data, ok end). --define(NOTICE(Format, Data), begin Format, Data, ok end). --define(DEBUG(Format, Data), begin Format, Data, ok end). --else. --ifdef(ERLCQL_LAGER). -compile({parse_transform, lager_transform}). -define(EMERGENCY(Format, Data), lager:emergency(Format, Data)). -define(ALERT(Format, Data), lager:alert(Format, Data)). @@ -213,17 +193,6 @@ -define(NOTICE(Format, Data), lager:notice(Format, Data)). -define(INFO(Format, Data), lager:info(Format, Data)). -define(DEBUG(Format, Data), lager:debug(Format, Data)). --else. --define(ERROR(Format, Data), error_logger:error_msg(Format ++ "~n", Data)). --define(EMERGENCY(Format, Data), ?ERROR(Format, Data)). --define(ALERT(Format, Data), ?ERROR(Format, Data)). --define(CRITICAL(Format, Data), ?ERROR(Format, Data)). --define(WARNING(Format, Data), error_logger:warning_msg(Format ++ "~n", Data)). --define(INFO(Format, Data), error_logger:info_msg(Format ++ "~n", Data)). --define(NOTICE(Format, Data), ?INFO(Format, Data)). --define(DEBUG(Format, Data), ?INFO(Format, Data)). --endif. --endif. -define(EMERGENCY(Format), ?EMERGENCY(Format, [])). -define(ALERT(Format), ?ALERT(Format, [])). diff --git a/src/erlcql_client.erl b/src/erlcql_client.erl index bd135dc..41e7a34 100644 --- a/src/erlcql_client.erl +++ b/src/erlcql_client.erl @@ -18,34 +18,37 @@ %% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS %% IN THE SOFTWARE. -%% @doc Native protocol CQL client module. -%% @author Krzysztof Rutka -module(erlcql_client). -behaviour(gen_fsm). -%% API -export([start_link/1]). --export(['query'/3, async_query/3, - execute/4, async_execute/4]). --export([prepare/2, prepare/3, - options/1, - register/2]). --export([await/1, - await/2]). - -%% gen_fsm callbacks --export([init/1, - handle_event/3, - handle_sync_event/4, - handle_info/3, - code_change/4, - terminate/3]). --export([startup/2, - startup/3, - ready/2, - ready/3]). +-export(['query'/3]). +-export([execute/4]). +-export([batch/3]). +-export([prepare/2]). +-export([prepare/3]). +-export([options/1]). +-export([register/2]). +-export([async_query/3]). +-export([async_execute/4]). +-export([await/1]). +-export([await/2]). +-export([get_env_opt/2]). + +-export([init/1]). +-export([handle_event/3]). +-export([handle_sync_event/4]). +-export([handle_info/3]). +-export([code_change/4]). +-export([terminate/3]). +-export([startup/2]). +-export([startup/3]). +-export([ready/2]). +-export([ready/3]). -include("erlcql.hrl"). +-include("erlcql_metrics.hrl"). + -record(state, { async_ets :: ets(), @@ -64,7 +67,8 @@ prepare :: [{Name :: atom(), Query :: iodata()}], prepared_ets :: ets(), socket :: undefined | socket(), - streams = lists:seq(1, 127) :: [integer()] + streams = lists:seq(1, 127) :: [integer()], + version :: version() }). -type state() :: #state{}. @@ -76,7 +80,6 @@ -define(PREPARED_ETS_NAME, erlcql_prepared). -define(PREPARED_ETS_OPTS, [set, private, {read_concurrency, true}]). --define(TIMEOUT, timer:seconds(5)). -record(backoff, { start :: pos_integer(), @@ -85,20 +88,16 @@ }). -type backoff() :: #backoff{}. -%% Start API ------------------------------------------------------------------ - start_link(Opts) -> Opts2 = [{parent, self()} | Opts], EventFun = event_fun(get_env_opt(event_handler, Opts)), Opts3 = [{event_fun, EventFun} | Opts2], gen_fsm:start_link(?MODULE, proplists:unfold(Opts3), []). -%% Request API ---------------------------------------------------------------- - -spec 'query'(pid(), iodata(), consistency()) -> result() | {error, Reason :: term()}. -'query'(Pid, QueryString, Consistency) -> - async_call(Pid, {'query', QueryString, Consistency}). +'query'(Pid, QueryString, Params) -> + async_call(Pid, {'query', QueryString, Params}). -spec prepare(pid(), iodata()) -> prepared() | {error, Reason :: term()}. prepare(Pid, QueryString) -> @@ -113,6 +112,11 @@ prepare(Pid, QueryString, Name) -> execute(Pid, QueryId, Values, Consistency) -> async_call(Pid, {execute, QueryId, Values, Consistency}). +-spec batch(pid(), [{atom(), values()}], proplist()) -> + result() | {error, Reason :: term()}. +batch(Pid, Queries, Params) -> + async_call(Pid, {batch, Queries, Params}). + -spec options(pid()) -> supported() | {error, Reason :: term()}. options(Pid) -> async_call(Pid, options). @@ -121,12 +125,10 @@ options(Pid) -> register(Pid, Events) -> async_call(Pid, {register, Events}). -%% Async request API ---------------------------------------------------------- - -spec async_query(pid(), iodata(), consistency()) -> {ok, QueryRef :: erlcql:query_ref()} | {error, Reason :: term()}. -async_query(Pid, QueryString, Consistency) -> - cast(Pid, {'query', QueryString, Consistency}). +async_query(Pid, QueryString, Params) -> + cast(Pid, {'query', QueryString, Params}). -spec async_execute(pid(), erlcql:uuid() | atom(), values(), consistency()) -> {ok, QueryRef :: erlcql:query_ref()} | {error, Reason :: term()}. @@ -135,11 +137,13 @@ async_execute(Pid, QueryId, Values, Consistency) -> -spec await(erlcql:query_ref()) -> response() | {error, Reason :: term()}. await({ok, QueryRef}) -> - do_await(QueryRef, ?TIMEOUT); + Timeout = get_env(default_timeout), + do_await(QueryRef, Timeout); await({error, _Reason} = Error) -> Error; await(QueryRef) -> - do_await(QueryRef, ?TIMEOUT). + Timeout = get_env(default_timeout), + do_await(QueryRef, Timeout). -spec await(erlcql:query_ref(), integer()) -> response() | {error, Reason :: term()}. @@ -150,8 +154,6 @@ await({error, _Reason} = Error, _Timeout) -> await(QueryRef, Timeout) -> do_await(QueryRef, Timeout). -%% FSM: init ------------------------------------------------------------------ - init(Opts) -> State = init_state(Opts), case State#state.auto_reconnect of @@ -176,6 +178,7 @@ try_connect(#state{database = {Host, Port}, {ok, ready, State3}; {error, Reason} = Error -> ?ERROR("Connection init failed: ~s", [Reason]), + ok = quintana:notify_histogram(?CONNECTION_FAILURE_METRIC, 1), {stop, Error} end; {error, Reason} = Error -> @@ -197,17 +200,18 @@ init_state(Opts) -> Host = get_env_opt(host, Opts), Port = get_env_opt(port, Opts), Database = {Host, Port}, - EventFun = get_opt(event_fun, Opts), + EventFun = get_internal_opt(event_fun, Opts), Events = get_env_opt(register, Opts), Compression = get_env_opt(compression, Opts), Tracing = get_env_opt(tracing, Opts), Flags = {Compression, Tracing}, Keepalive = get_env_opt(keepalive, Opts), Keyspace = get_env_opt(use, Opts), - Parent = get_opt(parent, Opts), - Parser = erlcql_decode:new_parser(), + Parent = get_internal_opt(parent, Opts), Prepare = get_env_opt(prepare, Opts), PreparedETS = maybe_create_prepared_ets(Opts), + Version = get_env_opt(version, Opts), + Parser = erlcql_decode:new_parser(Version), #state{async_ets = AsyncETS, auto_reconnect = AutoReconnect, backoff = Backoff, @@ -222,9 +226,8 @@ init_state(Opts) -> parent = Parent, parser = Parser, prepare = Prepare, - prepared_ets = PreparedETS}. - -%% State: startup ------------------------------------------------------------- + prepared_ets = PreparedETS, + version = Version}. startup(reconnect, #state{backoff = Backoff, database = {Host, Port}, @@ -251,6 +254,7 @@ startup(reconnect, #state{backoff = Backoff, end; startup(Event, State) -> ?ERROR("Bad event (startup): ~p", [Event]), + ok = quintana:notify_histogram(?CONNECTION_STARTUP_FAIL_METRIC, 1), {stop, {bad_event, Event}, State}. startup({_Ref, {'query', Query, _}}, _From, State) -> @@ -261,6 +265,8 @@ startup({_Ref, {prepare, Query, _}}, _From, State) -> not_ready(prepare, Query, State); startup({_Ref, {execute, Query, _, _}}, _From, State) -> not_ready(execute, Query, State); +startup({_Ref, {batch, _, _}}, _From, State) -> + not_ready(batch, State); startup({_Ref, options}, _From, State) -> not_ready(options, State); startup({_Ref, {register, _}}, _From, State) -> @@ -277,68 +283,79 @@ not_ready(Request, Info, State) -> ?DEBUG("Connection not ready for '~s': ~p", [Request, Info]), {reply, {error, not_ready}, startup, State}. -%% State: ready --------------------------------------------------------------- - ready(Event, State) -> ?ERROR("Bad event (ready): ~p", [Event]), {stop, {bad_event, Event}, State}. ready({_Ref, _}, _From, #state{streams = []} = State) -> ?CRITICAL("Too many requests!"), + ok = quintana:notify_histogram(?CONNECTION_STREAMS_EXHAUSTED_METRIC, 1), {reply, {error, too_many_requests}, ready, State}; -ready({Ref, {'query', QueryString, Consistency}}, {From, _}, State) -> - Query = erlcql_encode:'query'(QueryString, Consistency), +ready({Ref, {'query', QueryString, Params}}, {From, _}, + #state{version = Version} = State) -> + Query = erlcql_encode:'query'(Version, QueryString, Params), send(Query, {Ref, From}, State); -ready({Ref, {prepare, Query}}, {From, _}, State) -> - Prepare = erlcql_encode:prepare(Query), +ready({Ref, {prepare, Query}}, {From, _}, + #state{version = Version} = State) -> + Prepare = erlcql_encode:prepare(Version, Query), send(Prepare, {Ref, From}, State); ready({Ref, {prepare, Query, Name}}, {From, _}, - #state{prepared_ets = PreparedETS} = State) -> - Prepare = erlcql_encode:prepare(Query), - Fun = fun({ok, QueryId, Types}) -> - true = ets:insert(PreparedETS, {Name, Query, QueryId, Types}), - {ok, QueryId}; - ({error, _} = Response) -> - Response + #state{prepared_ets = PreparedETS, + version = Version} = State) -> + Prepare = erlcql_encode:prepare(Version, Query), + Fun = fun({ok, {QueryId, RequestMetadata, _ResultMetadata}} = Response) -> + Types = proplists:get_value(types, RequestMetadata), + Entry = {Name, Query, QueryId, Types}, + true = ets:insert(PreparedETS, Entry), + Response; + ({error, Reason}) -> + {error, {Name, Query, Reason}} end, send(Prepare, {Ref, From, Fun}, State); -ready({Ref, {execute, QueryId, Values, Consistency}}, - {From, _}, State) when is_binary(QueryId) -> - Execute = erlcql_encode:execute(QueryId, Values, Consistency), +ready({Ref, {execute, QueryId, Values, Consistency}}, {From, _}, + #state{version = Version} = State) when is_binary(QueryId) -> + Execute = erlcql_encode:execute(Version, QueryId, Values, Consistency), send(Execute, {Ref, From}, State); ready({Ref, {execute, Name, Values, Consistency}}, {From, _}, - #state{prepared_ets = PreparedETS} = State) when is_atom(Name) -> + #state{prepared_ets = PreparedETS, + version = Version} = State) when is_atom(Name) -> case ets:lookup(PreparedETS, Name) of [{Name, Query, QueryId, undefined}] -> ?DEBUG("Executing ~s: ~s, ~p", [Name, Query, Values]), - Execute = erlcql_encode:execute(QueryId, Values, Consistency), + Execute = erlcql_encode:execute(Version, QueryId, + Values, Consistency), send(Execute, {Ref, From}, State); [{Name, Query, QueryId, Types}] -> TypedValues = lists:zip(Types, Values), ?DEBUG("Executing ~s: ~s, ~p", [Name, Query, Values]), - Execute = erlcql_encode:execute(QueryId, TypedValues, Consistency), + Execute = erlcql_encode:execute(Version, QueryId, + TypedValues, Consistency), send(Execute, {Ref, From}, State); [] -> ?DEBUG("Execute failed, invalid query name: ~s", [Name]), {reply, {error, invalid_query_name}, ready, State} end; -ready({Ref, options}, {From, _}, State) -> - Options = erlcql_encode:options(), +ready({Ref, {batch, Queries, Params}}, {From, _}, + #state{prepared_ets = PreparedETS, + version = Version} = State) -> + case expand_prepared(PreparedETS, Queries, []) of + {error, Error} -> + {reply, {error, Error}, ready, State}; + Queries2 -> + Batch = erlcql_encode:batch(Version, Queries2, Params), + send(Batch, {Ref, From}, State) + end; +ready({Ref, options}, {From, _}, #state{version = Version} = State) -> + Options = erlcql_encode:options(Version), send(Options, {Ref, From}, State); -ready({Ref, {register, Events}}, {From, _}, State) -> - Register = erlcql_encode:register(Events), +ready({Ref, {register, Events}}, {From, _}, + #state{version = Version} = State) -> + Register = erlcql_encode:register(Version, Events), send(Register, {Ref, From}, State); ready(Event, _From, State) -> ?ERROR("Bad event (ready/sync): ~p", [Event]), {stop, {bad_event, Event}, State}. -%% FSM: event handling -------------------------------------------------------- - -handle_event({timeout, Stream}, StateName, - #state{async_ets = AsyncETS, - streams = Streams} = State) -> - true = ets:delete(AsyncETS, Stream), - {next_state, StateName, State#state{streams = [Stream | Streams]}}; handle_event(Event, StateName, State) -> ?ERROR("Bad event (~s/handle_event): ~p", [StateName, Event]), {stop, {bad_event, Event}, State}. @@ -354,18 +371,22 @@ handle_info({tcp, Socket, Data}, ready, #state{socket = Socket} = State) -> handle_info({tcp_closed, Socket}, _StateName, #state{socket = Socket, auto_reconnect = false} = State) -> ?ERROR("TCP socket ~p closed", [Socket]), + ok = quintana:notify_histogram(?CONNECTION_SOCKET_CLOSED, 1), {stop, tcp_closed, State}; handle_info({tcp_closed, Socket}, _StateName, #state{socket = Socket, auto_reconnect = true} = State) -> ?WARNING("TCP socket ~p closed", [Socket]), + ok = quintana:notify_histogram(?CONNECTION_SOCKET_CLOSED, 1), try_again(State); handle_info({tcp_error, Socket, Reason}, _StateName, #state{socket = Socket, auto_reconnect = false} = State) -> ?ERROR("TCP socket ~p error: ~p", [Socket, Reason]), + ok = quintana:notify_histogram(?CONNECTION_SOCKET_ERROR, 1), {stop, {tcp_error, Reason}, State}; handle_info({tcp_error, Socket, Reason}, _StateName, #state{socket = Socket, auto_reconnect = true} = State) -> ?WARNING("TCP socket ~p error: ~p", [Socket, Reason]), + ok = quintana:notify_histogram(?CONNECTION_SOCKET_ERROR, 1), try_again(State); handle_info(Info, StateName, State) -> ?ERROR("Bad info (~s/handle_info): ~p", [StateName, Info]), @@ -377,18 +398,25 @@ code_change(_OldVsn, StateName, State, _Extra) -> terminate(_Reason, _StateName, #state{socket = Socket}) -> close_socket(Socket). -%%----------------------------------------------------------------------------- -%% Internal functions -%%----------------------------------------------------------------------------- +-spec expand_prepared(ets(), [{atom(), values()}], [{binary(), values()}]) -> + [{binary(), values()}] | {error, invalid_query_name}. +expand_prepared(_, [], Acc) -> + lists:reverse(Acc); +expand_prepared(ETS, [{Name, Values} | Qs], Acc) when is_atom(Name) -> + case ets:lookup(ETS, Name) of + [{Name, _Query, QueryId, undefined}] -> + expand_prepared(ETS, Qs, [{QueryId, Values} | Acc]); + [{Name, _Query, QueryId, Types}] -> + TypedValues = lists:zip(Types, Values), + expand_prepared(ETS, Qs, [{QueryId, TypedValues} | Acc]); + [] -> + {error, invalid_query_name} + end. -spec maybe_create_prepared_ets(proplist()) -> ets(). maybe_create_prepared_ets(Opts) -> - case get_opt(prepared_statements_ets_tid, Opts) of - undefined -> - ets:new(?PREPARED_ETS_NAME, ?PREPARED_ETS_OPTS); - Tid -> - Tid - end. + New = fun() -> ets:new(?PREPARED_ETS_NAME, ?PREPARED_ETS_OPTS) end, + get_opt(prepared_statements_ets_tid, Opts, New). -spec event_fun(event_fun() | pid()) -> event_fun(). event_fun(Fun) when is_function(Fun) -> @@ -419,8 +447,9 @@ apply_funs([Fun | Rest], State) -> end. -spec send_options(state()) -> {ok, state()} | {error, Reason :: term()}. -send_options(#state{cql_version = undefined} = State) -> - Options = erlcql_encode:options(), +send_options(#state{cql_version = undefined, + version = Version} = State) -> + Options = erlcql_encode:options(Version), ok = send_request(Options, 0, State), case wait_for_response(State) of {ok, Supported} -> @@ -435,8 +464,9 @@ send_options(State) -> -spec send_startup(state()) -> {ok, state()} | {error, Reason :: term()}. send_startup(#state{flags = {Compression, Tracing}, - cql_version = CQLVersion} = State) -> - Startup = erlcql_encode:startup(Compression, CQLVersion), + cql_version = CQLVersion, + version = Version} = State) -> + Startup = erlcql_encode:startup(Version, Compression, CQLVersion), ok = send_request(Startup, 0, State#state{flags = {false, Tracing}}), wait_for_ready(State). @@ -458,10 +488,11 @@ wait_for_ready(State) -> -spec try_auth(bitstring(), state()) -> ok | {error, term()}. try_auth(<<"org.apache.cassandra.auth.PasswordAuthenticator">>, - #state{credentials = {Username, Password}} = State) -> + #state{credentials = {Username, Password}, + version = Version} = State) -> Map = [{<<"username">>, Username}, {<<"password">>, Password}], - Credentials = erlcql_encode:credentials(Map), + Credentials = erlcql_encode:credentials(Version, Map), ok = send_request(Credentials, 0, State); try_auth(Other, _State) -> {error, {unknown_auth_class, Other}}. @@ -471,8 +502,9 @@ register_to_events(#state{events = undefined} = State) -> {ok, State}; register_to_events(#state{events = []} = State) -> {ok, State}; -register_to_events(#state{events = Events} = State) -> - Register = erlcql_encode:register(Events), +register_to_events(#state{events = Events, + version = Version} = State) -> + Register = erlcql_encode:register(Version, Events), ok = send_request(Register, 0, State), case wait_for_response(State) of ready -> @@ -484,8 +516,10 @@ register_to_events(#state{events = Events} = State) -> -spec use_keyspace(state()) -> {ok, state()} | {error, Reason :: term()}. use_keyspace(#state{keyspace = undefined} = State) -> {ok, State}; -use_keyspace(#state{keyspace = Keyspace} = State) -> - Use = erlcql_encode:'query'([<<"USE ">>, Keyspace], any), +use_keyspace(#state{keyspace = Keyspace, + version = Version} = State) -> + Use = erlcql_encode:'query'(Version, [<<"USE ">>, Keyspace], + [{consistency, any}]), ok = send_request(Use, 0, State), case wait_for_response(State) of {ok, Keyspace} -> @@ -505,11 +539,13 @@ prepare_queries(#state{prepare = Queries} = State) -> prepare_queries([], State) -> {ok, State}; prepare_queries([{Name, Query} | Rest], - #state{prepared_ets = PreparedETS} = State) -> - Prepare = erlcql_encode:prepare(Query), + #state{prepared_ets = PreparedETS, + version = Version} = State) -> + Prepare = erlcql_encode:prepare(Version, Query), ok = send_request(Prepare, 0, State), case wait_for_response(State) of - {ok, QueryId, Types} -> + {ok, {QueryId, RequestMetadata, _}} -> + Types = proplists:get_value(types, RequestMetadata), true = ets:insert(PreparedETS, {Name, Query, QueryId, Types}), prepare_queries(Rest, State); {error, _Reason} = Error -> @@ -518,8 +554,9 @@ prepare_queries([{Name, Query} | Rest], -spec send_request(request(), integer(), state()) -> ok. send_request(Body, Stream, #state{socket = Socket, - flags = Flags}) -> - Frame = erlcql_encode:frame(Body, Flags, Stream), + flags = Flags, + version = Version}) -> + Frame = erlcql_encode:frame(Version, Body, Flags, Stream), ok = gen_tcp:send(Socket, Frame). -spec wait_for_response(state()) -> @@ -547,8 +584,9 @@ wait_for_body(<<_:32, Length:32>> = Header, -spec decode_response(binary(), state()) -> ready() | authenticate() | response() | {error, term()}. -decode_response(Binary, #state{flags = {Compression, _}}) -> - case erlcql_decode:decode(Binary, Compression) of +decode_response(Binary, #state{version = Version, + flags = {Compression, _}}) -> + case erlcql_decode:decode(Version, Binary, Compression) of {ok, 0, Response, <<>>} -> Response; {error, _} = Error -> @@ -594,12 +632,12 @@ cast(Pid, Request) -> -spec do_await(erlcql:query_ref(), integer()) -> response() | {error, Reason :: term()}. -do_await({Ref, Pid, Stream}, Timeout) -> +do_await({Ref, _Pid, _Stream}, Timeout) -> receive {Ref, Response} -> Response after Timeout -> - gen_fsm:send_all_state_event(Pid, {timeout, Stream}), + ok = quintana:notify_histogram(?CONNECTION_QUERY_TIMEOUT, 1), {error, timeout} end. @@ -617,12 +655,14 @@ parse_response(Data, #state{parser = Parser, case erlcql_decode:parse(Data, Parser, Compression) of {ok, Responses, Parser2} -> State2 = handle_responses(Responses, State), - {next_state, ready, State2#state{parser = Parser2}}; + {next_state, ready, State2#state{parser = Parser2}, hibernate}; {error, Reason} -> ?ERROR("Parsing response failed: ~p", [Reason]), + ok = quintana:notify_histogram(?CONNECTION_PARSE_ERROR, 1), {stop, Reason, State} end. +handle_responses([], State) -> State; handle_responses(Responses, State) -> lists:foldl(fun handle_response/2, State, Responses). @@ -640,19 +680,17 @@ handle_response({Stream, Response}, #state{async_ets = AsyncETS} = State) -> send_response(Stream, {Ref, Response2}, Pid, State); [] -> ?WARNING("Unexpected response (~p): ~p", [Stream, Response]), + ok = quintana:notify_histogram(?CONNECTION_UNEXPECTED_RESPONSE, 1), State end. -spec send_response(integer(), {erlcql:query_ref(), erlcql:response()}, pid(), state()) -> state(). -send_response(Stream, Response, Pid, #state{async_ets = AsyncETS, - streams = Streams} = State) -> - true = ets:delete(AsyncETS, Stream), +send_response(Stream, Response, Pid, State=#state{async_ets = AsyncETS, streams = Streams}) -> Pid ! Response, + true = ets:delete(AsyncETS, Stream), State#state{streams = [Stream | Streams]}. -%% Helper functions ----------------------------------------------------------- - -spec get_env_opt(term(), proplist()) -> Value :: term(). get_env_opt(Opt, Opts) -> case lists:keyfind(Opt, 1, Opts) of @@ -662,11 +700,22 @@ get_env_opt(Opt, Opts) -> get_env(Opt) end. +get_internal_opt(Opt, Opts) -> + {Opt, Value} = lists:keyfind(Opt, 1, Opts), + Value. + -spec get_opt(term(), proplist()) -> Value :: term(). get_opt(Opt, Opts) -> get_opt(Opt, Opts, undefined). --spec get_opt(term(), proplist(), term()) -> Value :: term(). +-spec get_opt(term(), proplist(), term() | function()) -> Value :: term(). +get_opt(Opt, Opts, Default) when is_function(Default) -> + case lists:keyfind(Opt, 1, Opts) of + {Opt, Value} -> + Value; + false -> + Default() + end; get_opt(Opt, Opts, Default) -> case lists:keyfind(Opt, 1, Opts) of {Opt, Value} -> diff --git a/src/erlcql_convert.erl b/src/erlcql_convert.erl index 54980ac..dbee1b0 100644 --- a/src/erlcql_convert.erl +++ b/src/erlcql_convert.erl @@ -18,17 +18,15 @@ %% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS %% IN THE SOFTWARE. -%% @doc Value encoding/decoding module. -%% @author Krzysztof Rutka -module(erlcql_convert). --export([from_binary/2, from_null/1, - to_binary/1, to_binary/2]). +-export([from_binary/2]). +-export([from_null/1]). +-export([to_binary/1]). +-export([to_binary/2]). -include("erlcql.hrl"). -%% From binary ---------------------------------------------------------------- - -spec from_binary(option(), binary()) -> type(). from_binary(ascii, Binary) -> Binary; @@ -116,8 +114,6 @@ decode_signed(Binary) -> <> = Binary, Int. -%% To binary ------------------------------------------------------------------ - -spec to_binary(Values) -> Binaries when Values :: [Value | TypedValue], Value :: type() | null, diff --git a/src/erlcql_decode.erl b/src/erlcql_decode.erl index 21d689b..513ee51 100644 --- a/src/erlcql_decode.erl +++ b/src/erlcql_decode.erl @@ -18,30 +18,21 @@ %% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS %% IN THE SOFTWARE. -%% @doc Native protocol decoder/parser. -%% @author Krzysztof Rutka -module(erlcql_decode). -%% API --export([new_parser/0, - parse/3]). - --export([decode/2]). +-export([new_parser/1]). +-export([parse/3]). +-export([decode/3]). -include("erlcql.hrl"). -define(STRING(Length), Length/bytes). +-define(BYTES(Length), Length/bytes). -%%----------------------------------------------------------------------------- -%% API functions -%%----------------------------------------------------------------------------- - -%% @doc Returns a new parser. --spec new_parser() -> parser(). -new_parser() -> - #parser{}. +-spec new_parser(version()) -> parser(). +new_parser(Version) -> + #parser{version = Version}. -%% @doc Parses given data using a parser. -spec parse(binary(), parser(), compression()) -> {ok, Responses :: [{Stream :: integer(), Response :: response()}], @@ -52,10 +43,6 @@ parse(Data, #parser{buffer = Buffer} = Parser, Compression) -> NewParser = Parser#parser{buffer = NewBuffer}, parse_loop(NewParser, Compression, []). -%%----------------------------------------------------------------------------- -%% Parser functions -%%----------------------------------------------------------------------------- - -spec parse_loop(parser(), compression(), [{integer(), response()}]) -> {ok, Responses :: [{Stream :: integer(), Response :: response()}], @@ -90,8 +77,9 @@ get_length(#parser{length = undefined, buffer = Buffer} = Parser) -> Parser#parser{length = Length + 8}; get_length(Parser) -> Parser. -run_decode(#parser{buffer = Buffer} = Parser, Compression) -> - case decode(Buffer, Compression) of +run_decode(#parser{version = Version, + buffer = Buffer} = Parser, Compression) -> + case decode(Version, Buffer, Compression) of {ok, Stream, Response, Leftovers} -> NewParser = Parser#parser{length = undefined, buffer = Leftovers}, @@ -100,20 +88,16 @@ run_decode(#parser{buffer = Buffer} = Parser, Compression) -> {error, Other} end. -%%----------------------------------------------------------------------------- -%% Decode functions -%%----------------------------------------------------------------------------- - --spec decode(binary(), compression()) -> +-spec decode(version(), binary(), compression()) -> {ok, Stream :: integer(), Response :: response(), Rest :: binary()} | {error, Reason :: term()}. -decode(<>, Compression) -> +decode(V, <>, Compression) -> Data2 = maybe_decompress(Decompress, Compression, Data), Response = case opcode(Opcode) of - error -> - error2(Data2); + cql_error -> + cql_error(Data2); ready -> ready(Data2); authenticate -> @@ -123,11 +107,18 @@ decode(< result(Data2); event -> - event(Data2) + event(Data2); + auth_challenge -> + auth_challenge(Data2); + auth_success -> + auth_success(Data2) end, {ok, Stream, Response, Rest}; -decode(_Other, _Compression) -> - {error, bad_header}. +decode(_V, <<_Other:32, Length:32, _Data:Length/binary, + _Rest/binary>>, _Compression) -> + {error, bad_header}; +decode(_V, _Other, _Compression) -> + {error, binary_too_small}. -spec maybe_decompress(0 | 1, compression(), binary()) -> binary(). maybe_decompress(0, _Compression, Data) -> @@ -140,17 +131,17 @@ maybe_decompress(1, lz4, <>) -> UnpackedData. -spec opcode(integer()) -> response_opcode(). -opcode(16#00) -> error; +opcode(16#00) -> cql_error; opcode(16#02) -> ready; opcode(16#03) -> authenticate; opcode(16#06) -> supported; opcode(16#08) -> result; -opcode(16#0c) -> event. +opcode(16#0c) -> event; +opcode(16#0e) -> auth_challenge; +opcode(16#10) -> auth_success. -%% Error ---------------------------------------------------------------------- - --spec error2(binary()) -> cql_error(). -error2(<>) -> +-spec cql_error(binary()) -> cql_error(). +cql_error(<>) -> Error = case error_code(ErrorCode) of unavailable_exception -> unavailable_exception(Data); @@ -171,7 +162,7 @@ error2(<>) -> -spec error_code(integer()) -> error_code(). error_code(16#0000) -> server_error; -error_code(16#000A) -> protocol_error; +error_code(16#000a) -> protocol_error; error_code(16#0100) -> bad_credentials; error_code(16#1000) -> unavailable_exception; error_code(16#1001) -> overloaded; @@ -244,20 +235,14 @@ consistency(5) -> all; consistency(6) -> local_quorum; consistency(7) -> each_quorum. -%% Ready ---------------------------------------------------------------------- - -spec ready(binary()) -> ready. ready(<<>>) -> ready. -%% Authenticate --------------------------------------------------------------- - -spec authenticate(binary()) -> authenticate(). authenticate(<>) -> {authenticate, AuthClass}. -%% Supported ------------------------------------------------------------------ - -spec supported(binary()) -> supported(). supported(<>) -> Supported = supported_map(N, Data, []), @@ -280,8 +265,6 @@ supported_list(N, <>, Values) -> supported_list(N - 1, Rest, [Value | Values]). -%% Result --------------------------------------------------------------------- - -spec result(binary()) -> result(). result(<>) -> case result_kind(Kind) of @@ -304,46 +287,58 @@ result_kind(16#0003) -> set_keyspace; result_kind(16#0004) -> prepared; result_kind(16#0005) -> schema_change. -%% Result: Void - -spec void(binary()) -> void(). void(<<>>) -> {ok, void}. -%% Result: Rows - -spec rows(binary()) -> rows(). rows(Data) -> - {ColumnCount, ColumnSpecs, RowData} = metadata(Data), + {ColumnCount, Metadata, RowData} = metadata(Data), <> = RowData, - {_, ColumnTypes} = lists:unzip(ColumnSpecs), - Rows = rows(RowCount, ColumnCount, ColumnTypes, RowContent, []), - {ok, {Rows, ColumnSpecs}}. + Types = proplists:get_value(types, Metadata), + Rows = rows(RowCount, ColumnCount, Types, RowContent, []), + {ok, {Rows, Metadata}}. -spec metadata(binary()) -> {integer(), column_specs(), Rest :: binary()}. -metadata(<<_:31, 0:1, ColumnCount:?INT, ColumnData/binary>>) -> - {ColumnSpecs, Rest} = column_specs(false, ColumnCount, ColumnData, []), - {ColumnCount, ColumnSpecs, Rest}; -metadata(<<_:31, 1:1, ColumnCount:?INT, - Length:?SHORT, _Keyspace:?STRING(Length), - Length2:?SHORT, _Table:?STRING(Length2), ColumnData/binary>>) -> - {ColumnSpecs, Rest} = column_specs(true, ColumnCount, ColumnData, []), - {ColumnCount, ColumnSpecs, Rest}. - --spec column_specs(boolean(), integer(), binary(), column_specs()) -> +metadata(<<_:29, NoMetadata:1, HasMorePages:1, GlobalSpec:1, + ColumnCount:?INT, Rest/binary>>) -> + {Params2, Rest2} = maybe_more_pages(HasMorePages, [], Rest), + {Params3, Rest3} = maybe_global_spec(GlobalSpec, Params2, Rest2), + {Params4, Rest4} = maybe_column_spec(NoMetadata, GlobalSpec, + ColumnCount, Params3, Rest3), + {ColumnCount, Params4, Rest4}. + +maybe_more_pages(0, Params, Data) -> {Params, Data}; +maybe_more_pages(1, Params, Data) -> + <> = Data, + {[{paging_state, PagingState} | Params], Rest}. + +maybe_global_spec(0, Params, Data) -> {Params, Data}; +maybe_global_spec(1, Params, Data) -> + <> = Data, + {[{keyspace, Keyspace}, {table, Table} | Params], Rest}. + +maybe_column_spec(1, _, 0, Params, Data) -> {Params, Data}; +maybe_column_spec(0, Global, N, Params, Data) -> + {Columns, Types, Rest} = column_specs(Global == 1, N, Data, [], []), + {[{columns, Columns}, {types, Types} | Params], Rest}. + +-spec column_specs(boolean(), integer(), binary(), column_specs(), term()) -> {column_specs(), Rest :: binary()}. -column_specs(_Global, 0, Rest, ColumnSpecs) -> - {lists:reverse(ColumnSpecs), Rest}; -column_specs(true, N, <>, - ColumnSpecs) -> +column_specs(_, 0, Rest, Columns, Types) -> + {lists:reverse(Columns), lists:reverse(Types), Rest}; +column_specs(true, N, <>, + Columns, Types) -> {Type, Rest} = option(TypeData), - column_specs(true, N - 1, Rest, [{Name, Type} | ColumnSpecs]); + column_specs(true, N - 1, Rest, [Column | Columns], [Type | Types]); column_specs(false, N, <>, - ColumnSpecs) -> + Length3:?SHORT, Column:?STRING(Length3), + TypeData/binary>>, + Columns, Types) -> {Type, Rest} = option(TypeData), - column_specs(false, N - 1, Rest, [{Name, Type} | ColumnSpecs]). + column_specs(false, N - 1, Rest, [Column | Columns], [Type | Types]). -spec option(binary()) -> {option(), Rest :: binary()}. option(<>) -> @@ -410,21 +405,15 @@ row_values(N, [Type | Types], < set_keyspace(). set_keyspace(<>) -> {ok, Keyspace}. -%% Result: Prepared - -spec prepared(binary()) -> {ok, bitstring(), [option()]}. -prepared(<>) -> - {_ColumnCount, ColumnSpecs, <<>>} = metadata(Metadata), - {_, ColumnTypes} = lists:unzip(ColumnSpecs), - {ok, QueryId, ColumnTypes}. - -%% Result: Schema change +prepared(<>) -> + {_ColumnCount, RequestMetadata, Rest} = metadata(Data), + {_, ResultMetadata, <<>>} = metadata(Rest), + {ok, {QueryId, RequestMetadata, ResultMetadata}}. -spec schema_change(binary()) -> schema_change(). schema_change(<>) -> {ok, schema_change_type(Type)}. -%% Event ---------------------------------------------------------------------- - -spec event(binary()) -> event_res(). event(<>) -> Event = case event_type(Type) of @@ -448,8 +435,8 @@ event(<>) -> -spec event_type(bitstring()) -> event_type(). event_type(<<"TOPOLOGY_CHANGE">>) -> topology_change; -event_type(<<"STATUS_CHANGE">>) -> status_change; -event_type(<<"SCHEMA_CHANGE">>) -> schema_change. +event_type(<<"STATUS_CHANGE">>) -> status_change; +event_type(<<"SCHEMA_CHANGE">>) -> schema_change. -spec topology_change_event(binary()) -> {topology_change, Type :: atom(), Inet :: inet()}. @@ -457,7 +444,7 @@ topology_change_event(<>) -> {topology_change, topology_change_type(Type), inet(Data)}. -spec topology_change_type(bitstring()) -> atom(). -topology_change_type(<<"NEW_NODE">>) -> new_node; +topology_change_type(<<"NEW_NODE">>) -> new_node; topology_change_type(<<"REMOVED_NODE">>) -> removed_node. -spec status_change_event(binary()) -> @@ -466,7 +453,7 @@ status_change_event(<>) -> {status_change, status_change_type(Type), inet(Data)}. -spec status_change_type(bitstring()) -> atom(). -status_change_type(<<"UP">>) -> up; +status_change_type(<<"UP">>) -> up; status_change_type(<<"DOWN">>) -> down. -spec schema_change_event(binary()) -> @@ -492,3 +479,10 @@ inet(4, <>) -> inet(16, <>) -> {{A, B, C, D, E, F, G, H}, Port}. +-spec auth_challenge(binary()) -> {auth_challenge, binary()}. +auth_challenge(<>) -> + {auth_challenge, Token}. + +-spec auth_success(binary()) -> {auth_success, binary()}. +auth_success(<>) -> + {auth_success, Token}. diff --git a/src/erlcql_encode.erl b/src/erlcql_encode.erl index 5cb1534..34a24a7 100644 --- a/src/erlcql_encode.erl +++ b/src/erlcql_encode.erl @@ -18,79 +18,130 @@ %% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS %% IN THE SOFTWARE. -%% @doc Native protocol request encoding. -%% @author Krzysztof Rutka -module(erlcql_encode). -%% API --export([frame/3]). --export([startup/2, - credentials/1, - options/0, - 'query'/2, - prepare/1, - execute/3, - register/1]). +-export([frame/4]). +-export([startup/3]). +-export([credentials/2]). +-export([options/1]). +-export(['query'/3]). +-export([prepare/2]). +-export([execute/4]). +-export([batch/3]). +-export([register/2]). +-export([auth_response/2]). -include("erlcql.hrl"). -%%----------------------------------------------------------------------------- -%% API function -%%----------------------------------------------------------------------------- +-type query_parameters() :: [{atom(), term()}]. -%% @doc Encodes the entire request frame. --spec frame(request(), tuple(), integer()) -> Frame :: iolist(). -frame({Opcode, Payload}, {Compression, _}, Stream) -> +-spec frame(version(), request(), tuple(), integer()) -> Frame :: iolist(). +frame(V, {Opcode, Payload}, {Compression, _}, Stream) -> OpcodeByte = opcode(Opcode), {CompressionBit, Payload2} = maybe_compress(Compression, Payload), Length = int(iolist_size(Payload2)), - [<>, + [<>, Stream, OpcodeByte, Length, Payload2]. -%% @doc Encodes the startup request message body. --spec startup(compression(), bitstring()) -> {startup, iolist()}. -startup(false, CQLVersion) -> +-spec startup(version(), compression(), bitstring()) -> {startup, iolist()}. +startup(_V, false, CQLVersion) -> {startup, string_map([{<<"CQL_VERSION">>, CQLVersion}])}; -startup(Compression, CQLVersion) -> +startup(_V, Compression, CQLVersion) -> {startup, string_map([{<<"CQL_VERSION">>, CQLVersion}, {<<"COMPRESSION">>, compression(Compression)}])}. -%% @doc Encodes the credentials request message body. --spec credentials([{K :: bitstring(), V :: bitstring()}]) -> +-spec credentials(version(), [{K :: bitstring(), V :: bitstring()}]) -> {credentials, iolist()}. -credentials(Informations) -> +credentials(1, Informations) -> {credentials, string_map(Informations)}. -%% @doc Encodes the options request message body. --spec options() -> {options, iolist()}. -options() -> +-spec options(version()) -> {options, iolist()}. +options(_V) -> {options, []}. -%% @doc Encodes the 'query' request message body. --spec 'query'(iodata(), consistency()) -> {'query', iolist()}. -'query'(QueryString, Consistency) -> - {'query', [long_string(QueryString), consistency(Consistency)]}. - -%% @doc Encodes the prepare request message body. --spec prepare(iodata()) -> {prepare, iolist()}. -prepare(QueryString) -> +-spec 'query'(version(), iodata(), query_parameters()) -> + {'query', iolist()}. +'query'(1, QueryString, Params) -> + Consistency = erlcql_client:get_env_opt(consistency, Params), + {'query', [long_string(QueryString), + short(consistency(Consistency))]}; +'query'(2, QueryString, Params) -> + Consistency = erlcql_client:get_env_opt(consistency, Params), + Params2 = [{values, []}, + {skip_metadata, false} | Params], + {'query', [long_string(QueryString), short(consistency(Consistency)), + query_parameters(Params2)]}. + +query_parameters(Params) -> + Flags = [{values, + fun([]) -> false; + (Values) -> erlcql_convert:to_binary(Values) end}, + {skip_metadata, fun(Skip) -> Skip end}, + {page_size, + fun(undefined) -> false; + (PageSize) -> int(PageSize) end}, + {paging_state, + fun(undefined) -> false; + (PagingState) -> bytes(PagingState) end}, + {serial_consistency, + fun(undefined) -> false; + (Consistency) -> short(consistency(Consistency)) end}], + Params2 = [{Flag, Fun(proplists:get_value(Flag, Params))} + || {Flag, Fun} <- Flags], + process_flags(Params2, fun query_flag/1). + +process_flags(Params, BitFun) -> + P = fun({_Name, false}, {S, B}) -> + {S, B}; + ({Name, true}, {S, B}) -> + {S + BitFun(Name), B}; + ({Name, Value}, {S, B}) -> + {S + BitFun(Name), [Value | B]} + end, + {Flags, Body} = lists:foldl(P, {0, []}, Params), + [Flags, lists:reverse(Body)]. + +-spec query_flag(atom()) -> integer(). +query_flag(values) -> 16#01; +query_flag(skip_metadata) -> 16#02; +query_flag(page_size) -> 16#04; +query_flag(paging_state) -> 16#08; +query_flag(serial_consistency) -> 16#10. + +-spec prepare(version(), iodata()) -> {prepare, iolist()}. +prepare(_V, QueryString) -> {prepare, [long_string(QueryString)]}. -%% @doc Encodes the execute request message body. --spec execute(binary(), values(), consistency()) -> {execute, iolist()}. -execute(QueryId, Values, Consistency) -> +-spec execute(version(), binary(), values(), consistency()) -> + {execute, iolist()}. +execute(1, QueryId, Values, Consistency) -> BinaryValues = erlcql_convert:to_binary(Values), {execute, [short_bytes(QueryId), BinaryValues, - consistency(Consistency)]}. - -%% @doc Encodes the register request message body. --spec register([event_type()]) -> {register, iolist()}. -register(Events) -> + short(consistency(Consistency))]}; +execute(2, QueryId, Values, Params) -> + Consistency = erlcql_client:get_env_opt(consistency, Params), + Params2 = [{values, Values}, + {skip_metadata, false} | Params], + {execute, [short_bytes(QueryId), short(consistency(Consistency)), + query_parameters(Params2)]}. + +batch(2, Queries, Params) when is_list(Queries) -> + BatchType = erlcql_client:get_env_opt(batch_type, Params), + Consistency = erlcql_client:get_env_opt(consistency, Params), + Queries2 = [batch_query(Q) || Q <- Queries], + {batch, [batch_type(BatchType), short(length(Queries)), Queries2, + short(consistency(Consistency))]}. + +batch_query({QueryId, Values}) -> + [1, short_bytes(QueryId), erlcql_convert:to_binary(Values)]. + +-spec register(version(), [event_type()]) -> {register, iolist()}. +register(_V, Events) -> {register, event_list(Events)}. -%%----------------------------------------------------------------------------- -%% Encode functions -%%----------------------------------------------------------------------------- +-spec auth_response(version(), binary()) -> {auth_response, iolist()}. +auth_response(2, Token) -> + {auth_response, [bytes(Token)]}. -spec int(integer()) -> binary(). int(X) -> @@ -110,6 +161,11 @@ long_string(String) -> Length = iolist_size(String), [int(Length), String]. +-spec bytes(binary()) -> iolist(). +bytes(Bytes) -> + Length = iolist_size(Bytes), + [int(Length), Bytes]. + -spec short_bytes(binary()) -> iolist(). short_bytes(Bytes) -> Length = iolist_size(Bytes), @@ -122,38 +178,43 @@ string_map(KeyValues) -> || {Key, Value} <- KeyValues]]. -spec opcode(atom()) -> integer(). -opcode(startup) -> 1; -opcode(credentials) -> 4; -opcode(options) -> 5; -opcode('query') -> 7; -opcode(prepare) -> 9; -opcode(execute) -> 10; -opcode(register) -> 11. - --spec consistency(consistency()) -> binary(). -consistency(any) -> short(0); -consistency(one) -> short(1); -consistency(two) -> short(2); -consistency(three) -> short(3); -consistency(quorum) -> short(4); -consistency(all) -> short(5); -consistency(local_quorum) -> short(6); -consistency(each_quorum) -> short(7). +opcode(startup) -> 16#01; +opcode(credentials) -> 16#04; +opcode(options) -> 16#05; +opcode('query') -> 16#07; +opcode(prepare) -> 16#09; +opcode(execute) -> 16#0a; +opcode(register) -> 16#0b; +opcode(batch) -> 16#0d; +opcode(auth_response) -> 16#0f. + +-spec consistency(consistency()) -> integer(). +consistency(any) -> 16#00; +consistency(one) -> 16#01; +consistency(two) -> 16#02; +consistency(three) -> 16#03; +consistency(quorum) -> 16#04; +consistency(all) -> 16#05; +consistency(local_quorum) -> 16#06; +consistency(each_quorum) -> 16#07; +consistency(serial) -> 16#08; +consistency(local_serial) -> 16#09; +consistency(local_one) -> 16#0a. + +batch_type(logged) -> 0; +batch_type(unlogged) -> 1; +batch_type(counter) -> 2. -spec event(event_type()) -> bitstring(). event(topology_change) -> <<"TOPOLOGY_CHANGE">>; -event(status_change) -> <<"STATUS_CHANGE">>; -event(schema_change) -> <<"SCHEMA_CHANGE">>. +event(status_change) -> <<"STATUS_CHANGE">>; +event(schema_change) -> <<"SCHEMA_CHANGE">>. -spec event_list([event_type()]) -> iolist(). event_list(Events) -> N = length(Events), [short(N) | [string2(event(Event)) || Event <- Events]]. -%%----------------------------------------------------------------------------- -%% Internal functions -%%----------------------------------------------------------------------------- - %% @doc Compresses the payload if compression is enabled. -spec maybe_compress(compression(), iolist()) -> {0 | 1, iolist()}. maybe_compress(false, Payload) -> @@ -171,4 +232,4 @@ maybe_compress(lz4, Payload) -> -spec compression(compression()) -> bitstring(). compression(snappy) -> <<"snappy">>; -compression(lz4) -> <<"lz4">>. +compression(lz4) -> <<"lz4">>. diff --git a/src/erlcql_metrics.hrl b/src/erlcql_metrics.hrl new file mode 100644 index 0000000..a7fbc55 --- /dev/null +++ b/src/erlcql_metrics.hrl @@ -0,0 +1,8 @@ +-define(CONNECTION_FAILURE_METRIC, <<"erlcql.connection.failure">>). +-define(CONNECTION_STARTUP_FAIL_METRIC, <<"erlcql.connection.startup_fail">>). +-define(CONNECTION_STREAMS_EXHAUSTED_METRIC, <<"erlcql.connection.streams_exhausted">>). +-define(CONNECTION_SOCKET_CLOSED, <<"erlcql.connection.socket_closed">>). +-define(CONNECTION_SOCKET_ERROR, <<"erlcql.connection.socket_error">>). +-define(CONNECTION_PARSE_ERROR, <<"erlcql.connection.parse_error">>). +-define(CONNECTION_UNEXPECTED_RESPONSE, <<"erlcql.connection.unexpected_response">>). +-define(CONNECTION_QUERY_TIMEOUT, <<"erlcql.connection.query_timeout">>). diff --git a/test/erlcql_SUITE.erl b/test/erlcql_SUITE.erl index 247772e..fe47262 100644 --- a/test/erlcql_SUITE.erl +++ b/test/erlcql_SUITE.erl @@ -5,7 +5,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("proper/include/proper.hrl"). --define(OPTS, [{cql_version, <<"3.0.0">>}]). +-define(OPTS, []). -define(KEYSPACE, <<"erlcql_tests">>). -define(CREATE_KEYSPACE, <<"CREATE KEYSPACE IF NOT EXISTS erlcql_tests ", @@ -20,8 +20,6 @@ -define(PROPTEST(A), true = proper:quickcheck(A())). -define(PROPTEST(A, Args), true = proper:quickcheck(A(Args), {numtests, 1000})). --import(erlcql, [q/2, q/3]). - %% Fixtures ------------------------------------------------------------------- init_per_suite(Config) -> @@ -179,15 +177,16 @@ insert(Config) -> Pid = get_pid(Config), {ok, void} = q(Pid, <<"INSERT INTO t (k, v) VALUES (1, 'one')">>), Rows = q(Pid, <<"SELECT * FROM t">>, one), - {ok, {[[1, <<"one">>]], [{<<"k">>, int}, {<<"v">>, varchar}]}} = Rows. + {ok, {[[1, <<"one">>]], Metadata}} = Rows, + [<<"k">>, <<"v">>] = proplists:get_value(columns, Metadata), + [int, varchar] = proplists:get_value(types, Metadata). %% Client tests start_without_use(_Config) -> {ok, Pid} = erlcql:start_link("localhost", ?OPTS), unlink(Pid), - Msg = <<"no keyspace has been specified">>, - {error, {invalid, Msg, _}} = q(Pid, ?CREATE_TABLE), + {error, {invalid, _, _}} = q(Pid, ?CREATE_TABLE), exit(Pid, kill). start_with_use(_Config) -> @@ -202,6 +201,11 @@ start_with_use(_Config) -> get_pid(Config) -> proplists:get_value(pid, Config). +q(Pid, Query) -> q(Pid, Query, one). + +q(Pid, Query, Consistency) -> + erlcql_client:'query'(Pid, Query, [{consistency, Consistency}]). + get_keyspaces(Pid) -> Check = <<"SELECT keyspace_name FROM system.schema_keyspaces">>, {ok, {Keyspaces, _}} = q(Pid, Check, one), @@ -233,7 +237,9 @@ get_value(Pid, <<"map">>) -> get_value(Pid, {map, varchar, boolean}); get_value(Pid, Type) -> Res = q(Pid, <<"SELECT v FROM erlcql_tests.t">>, one), - {ok, {[[Value]], [{<<"v">>, Type}]}} = Res, + {ok, {[[Value]], Metadata}} = Res, + [<<"v">>] = proplists:get_value(columns, Metadata), + [Type] = proplists:get_value(types, Metadata), Value. clean_type_table(TestCase, Config) -> diff --git a/test/erlcql_prepare_SUITE.erl b/test/erlcql_prepare_SUITE.erl index 3931311..b798db1 100644 --- a/test/erlcql_prepare_SUITE.erl +++ b/test/erlcql_prepare_SUITE.erl @@ -8,7 +8,9 @@ %% Suite ---------------------------------------------------------------------- all() -> - [prepare_execute]. + [prepare_execute, + prepare_batch, + prepare_error]. %% Fixtures ------------------------------------------------------------------- @@ -68,3 +70,40 @@ prepare_execute(Config) -> {[Row], _} = execute(Client, select, [hd(Values)]), Row = Values. + +prepare_batch(Config) -> + Keyspace = ?c(keyspace, Config), + Table = gen_table_name(), + Create = [<<"CREATE TABLE ">>, Keyspace, <<".">>, Table, + <<" (x int PRIMARY KEY, y int)">>], + single_query(Create), + + Insert = [<<"INSERT INTO ">>, Table, + <<" (x, y) VALUES (?, ?)">>], + Select = [<<"SELECT * FROM ">>, Table], + Prepare = [{insert, Insert}, + {select, Select}], + Opts = [{use, Keyspace}, + {prepare, Prepare}], + Client = start_client(Opts), + + Points = [[1, 1], [2, 3], [3, 5]], + Queries = [{insert, V} || V <- Points], + batch(Client, Queries), + + {Rows, _} = execute(Client, select, []), + Points = lists:sort(Rows). + +prepare_error(Config) -> + Keyspace = ?c(keyspace, Config), + Table = gen_table_name(), + Create = [<<"CREATE TABLE ">>, Keyspace, <<".">>, Table, + <<" (x int PRIMARY KEY, y int)">>], + single_query(Create), + + Insert = [<<"INSERT INTO ">>, Table, <<" (x, y) VALUES (?)">>], + Opts = [{use, Keyspace}], + Client = start_client(Opts), + + {error, {broken_insert, Insert, {invalid, _, undefined}}} = + erlcql_client:prepare(Client, Insert, broken_insert). diff --git a/test/erlcql_test.erl b/test/erlcql_test.erl index 17ed79d..d2b214d 100644 --- a/test/erlcql_test.erl +++ b/test/erlcql_test.erl @@ -4,7 +4,7 @@ -define(CQL_VERSION, <<"3.0.0">>). -define(OPTS, [{cql_version, ?CQL_VERSION}]). --define(CONSISTENCY, quorum). +-define(CONSISTENCY, [{consistency, quorum}]). -spec create_keyspace() -> Keyspace :: bitstring(). create_keyspace() -> @@ -62,6 +62,11 @@ execute(Pid, Name, Values) -> {ok, Response} = erlcql_client:execute(Pid, Name, Values, ?CONSISTENCY), Response. +batch(Pid, Queries) -> + Opts = [{batch_type, logged} | ?CONSISTENCY], + {ok, Response} = erlcql_client:batch(Pid, Queries, Opts), + Response. + -spec stop_client(pid()) -> ok. stop_client(Pid) -> true = unlink(Pid), diff --git a/test/erlcql_test.hrl b/test/erlcql_test.hrl index b1fa795..6cdea20 100644 --- a/test/erlcql_test.hrl +++ b/test/erlcql_test.hrl @@ -5,6 +5,7 @@ single_query/1, 'query'/2, execute/3, + batch/2, start_client/1, stop_client/1]).