From 75b3ec921dcc0c8117b1edcd631d973b1e111f96 Mon Sep 17 00:00:00 2001 From: Krzysztof Rutka Date: Thu, 22 May 2014 17:16:47 +0200 Subject: [PATCH 01/20] Initial support for native protocol v2 No new features added. --- include/erlcql.hrl | 3 +- src/erlcql.erl | 3 +- src/erlcql_client.erl | 89 ++++++++++++++++++++++++++----------------- src/erlcql_decode.erl | 52 +++++++------------------ src/erlcql_encode.erl | 85 ++++++++++++++++++++++------------------- 5 files changed, 116 insertions(+), 116 deletions(-) diff --git a/include/erlcql.hrl b/include/erlcql.hrl index 791eefa..782e3e8 100644 --- a/include/erlcql.hrl +++ b/include/erlcql.hrl @@ -21,7 +21,8 @@ %% @author Krzysztof Rutka -define(APP, erlcql). --define(VERSION, 1). + +-type version() :: 1 | 2. %% Directions -define(REQUEST, 0). diff --git a/src/erlcql.erl b/src/erlcql.erl index ddcad75..df58213 100644 --- a/src/erlcql.erl +++ b/src/erlcql.erl @@ -104,4 +104,5 @@ default(use) -> undefined; default(prepare) -> []; default(reconnect_start) -> 1000; default(reconnect_max) -> 30000; -default(keepalive) -> false. +default(keepalive) -> false; +default(version) -> 2. diff --git a/src/erlcql_client.erl b/src/erlcql_client.erl index bd135dc..8a9139f 100644 --- a/src/erlcql_client.erl +++ b/src/erlcql_client.erl @@ -64,7 +64,8 @@ prepare :: [{Name :: atom(), Query :: iodata()}], prepared_ets :: ets(), socket :: undefined | socket(), - streams = lists:seq(1, 127) :: [integer()] + streams = lists:seq(1, 127) :: [integer()], + version :: version() }). -type state() :: #state{}. @@ -208,6 +209,7 @@ init_state(Opts) -> Parser = erlcql_decode:new_parser(), Prepare = get_env_opt(prepare, Opts), PreparedETS = maybe_create_prepared_ets(Opts), + Version = get_env_opt(version, Opts), #state{async_ets = AsyncETS, auto_reconnect = AutoReconnect, backoff = Backoff, @@ -222,7 +224,8 @@ init_state(Opts) -> parent = Parent, parser = Parser, prepare = Prepare, - prepared_ets = PreparedETS}. + prepared_ets = PreparedETS, + version = Version}. %% State: startup ------------------------------------------------------------- @@ -286,47 +289,55 @@ ready(Event, State) -> ready({_Ref, _}, _From, #state{streams = []} = State) -> ?CRITICAL("Too many requests!"), {reply, {error, too_many_requests}, ready, State}; -ready({Ref, {'query', QueryString, Consistency}}, {From, _}, State) -> - Query = erlcql_encode:'query'(QueryString, Consistency), +ready({Ref, {'query', QueryString, Consistency}}, {From, _}, + #state{version = Version} = State) -> + Query = erlcql_encode:'query'(Version, QueryString, Consistency), send(Query, {Ref, From}, State); -ready({Ref, {prepare, Query}}, {From, _}, State) -> - Prepare = erlcql_encode:prepare(Query), +ready({Ref, {prepare, Query}}, {From, _}, + #state{version = Version} = State) -> + Prepare = erlcql_encode:prepare(Version, Query), send(Prepare, {Ref, From}, State); ready({Ref, {prepare, Query, Name}}, {From, _}, - #state{prepared_ets = PreparedETS} = State) -> - Prepare = erlcql_encode:prepare(Query), - Fun = fun({ok, QueryId, Types}) -> + #state{prepared_ets = PreparedETS, + version = Version} = State) -> + Prepare = erlcql_encode:prepare(Version, Query), + Fun = fun({ok, QueryId, {ReqSpecs, _}} = Response) -> + {_, Types} = lists:unzip(ReqSpecs), true = ets:insert(PreparedETS, {Name, Query, QueryId, Types}), - {ok, QueryId}; + Response; ({error, _} = Response) -> Response end, send(Prepare, {Ref, From, Fun}, State); -ready({Ref, {execute, QueryId, Values, Consistency}}, - {From, _}, State) when is_binary(QueryId) -> - Execute = erlcql_encode:execute(QueryId, Values, Consistency), +ready({Ref, {execute, QueryId, Values, Consistency}}, {From, _}, + #state{version = Version} = State) when is_binary(QueryId) -> + Execute = erlcql_encode:execute(Version, QueryId, Values, Consistency), send(Execute, {Ref, From}, State); ready({Ref, {execute, Name, Values, Consistency}}, {From, _}, - #state{prepared_ets = PreparedETS} = State) when is_atom(Name) -> + #state{prepared_ets = PreparedETS, + version = Version} = State) when is_atom(Name) -> case ets:lookup(PreparedETS, Name) of [{Name, Query, QueryId, undefined}] -> ?DEBUG("Executing ~s: ~s, ~p", [Name, Query, Values]), - Execute = erlcql_encode:execute(QueryId, Values, Consistency), + Execute = erlcql_encode:execute(Version, QueryId, + Values, Consistency), send(Execute, {Ref, From}, State); [{Name, Query, QueryId, Types}] -> TypedValues = lists:zip(Types, Values), ?DEBUG("Executing ~s: ~s, ~p", [Name, Query, Values]), - Execute = erlcql_encode:execute(QueryId, TypedValues, Consistency), + Execute = erlcql_encode:execute(Version, QueryId, + TypedValues, Consistency), send(Execute, {Ref, From}, State); [] -> ?DEBUG("Execute failed, invalid query name: ~s", [Name]), {reply, {error, invalid_query_name}, ready, State} end; -ready({Ref, options}, {From, _}, State) -> - Options = erlcql_encode:options(), +ready({Ref, options}, {From, _}, #state{version = Version} = State) -> + Options = erlcql_encode:options(Version), send(Options, {Ref, From}, State); -ready({Ref, {register, Events}}, {From, _}, State) -> - Register = erlcql_encode:register(Events), +ready({Ref, {register, Events}}, {From, _}, + #state{version = Version} = State) -> + Register = erlcql_encode:register(Version, Events), send(Register, {Ref, From}, State); ready(Event, _From, State) -> ?ERROR("Bad event (ready/sync): ~p", [Event]), @@ -419,8 +430,9 @@ apply_funs([Fun | Rest], State) -> end. -spec send_options(state()) -> {ok, state()} | {error, Reason :: term()}. -send_options(#state{cql_version = undefined} = State) -> - Options = erlcql_encode:options(), +send_options(#state{cql_version = undefined, + version = Version} = State) -> + Options = erlcql_encode:options(Version), ok = send_request(Options, 0, State), case wait_for_response(State) of {ok, Supported} -> @@ -435,8 +447,9 @@ send_options(State) -> -spec send_startup(state()) -> {ok, state()} | {error, Reason :: term()}. send_startup(#state{flags = {Compression, Tracing}, - cql_version = CQLVersion} = State) -> - Startup = erlcql_encode:startup(Compression, CQLVersion), + cql_version = CQLVersion, + version = Version} = State) -> + Startup = erlcql_encode:startup(Version, Compression, CQLVersion), ok = send_request(Startup, 0, State#state{flags = {false, Tracing}}), wait_for_ready(State). @@ -458,10 +471,11 @@ wait_for_ready(State) -> -spec try_auth(bitstring(), state()) -> ok | {error, term()}. try_auth(<<"org.apache.cassandra.auth.PasswordAuthenticator">>, - #state{credentials = {Username, Password}} = State) -> + #state{credentials = {Username, Password}, + version = Version} = State) -> Map = [{<<"username">>, Username}, {<<"password">>, Password}], - Credentials = erlcql_encode:credentials(Map), + Credentials = erlcql_encode:credentials(Version, Map), ok = send_request(Credentials, 0, State); try_auth(Other, _State) -> {error, {unknown_auth_class, Other}}. @@ -471,8 +485,9 @@ register_to_events(#state{events = undefined} = State) -> {ok, State}; register_to_events(#state{events = []} = State) -> {ok, State}; -register_to_events(#state{events = Events} = State) -> - Register = erlcql_encode:register(Events), +register_to_events(#state{events = Events, + version = Version} = State) -> + Register = erlcql_encode:register(Version, Events), ok = send_request(Register, 0, State), case wait_for_response(State) of ready -> @@ -484,8 +499,9 @@ register_to_events(#state{events = Events} = State) -> -spec use_keyspace(state()) -> {ok, state()} | {error, Reason :: term()}. use_keyspace(#state{keyspace = undefined} = State) -> {ok, State}; -use_keyspace(#state{keyspace = Keyspace} = State) -> - Use = erlcql_encode:'query'([<<"USE ">>, Keyspace], any), +use_keyspace(#state{keyspace = Keyspace, + version = Version} = State) -> + Use = erlcql_encode:'query'(Version, [<<"USE ">>, Keyspace], any), ok = send_request(Use, 0, State), case wait_for_response(State) of {ok, Keyspace} -> @@ -505,11 +521,13 @@ prepare_queries(#state{prepare = Queries} = State) -> prepare_queries([], State) -> {ok, State}; prepare_queries([{Name, Query} | Rest], - #state{prepared_ets = PreparedETS} = State) -> - Prepare = erlcql_encode:prepare(Query), + #state{prepared_ets = PreparedETS, + version = Version} = State) -> + Prepare = erlcql_encode:prepare(Version, Query), ok = send_request(Prepare, 0, State), case wait_for_response(State) of - {ok, QueryId, Types} -> + {ok, QueryId, {ReqSpecs, _}} -> + {_, Types} = lists:unzip(ReqSpecs), true = ets:insert(PreparedETS, {Name, Query, QueryId, Types}), prepare_queries(Rest, State); {error, _Reason} = Error -> @@ -518,8 +536,9 @@ prepare_queries([{Name, Query} | Rest], -spec send_request(request(), integer(), state()) -> ok. send_request(Body, Stream, #state{socket = Socket, - flags = Flags}) -> - Frame = erlcql_encode:frame(Body, Flags, Stream), + flags = Flags, + version = Version}) -> + Frame = erlcql_encode:frame(Version, Body, Flags, Stream), ok = gen_tcp:send(Socket, Frame). -spec wait_for_response(state()) -> diff --git a/src/erlcql_decode.erl b/src/erlcql_decode.erl index 21d689b..7b65670 100644 --- a/src/erlcql_decode.erl +++ b/src/erlcql_decode.erl @@ -22,19 +22,15 @@ %% @author Krzysztof Rutka -module(erlcql_decode). -%% API --export([new_parser/0, - parse/3]). - +-export([new_parser/0]). +-export([parse/3]). -export([decode/2]). -include("erlcql.hrl"). -define(STRING(Length), Length/bytes). -%%----------------------------------------------------------------------------- -%% API functions -%%----------------------------------------------------------------------------- +%% API functions -------------------------------------------------------------- %% @doc Returns a new parser. -spec new_parser() -> parser(). @@ -52,9 +48,7 @@ parse(Data, #parser{buffer = Buffer} = Parser, Compression) -> NewParser = Parser#parser{buffer = NewBuffer}, parse_loop(NewParser, Compression, []). -%%----------------------------------------------------------------------------- -%% Parser functions -%%----------------------------------------------------------------------------- +%% Parser functions ----------------------------------------------------------- -spec parse_loop(parser(), compression(), [{integer(), response()}]) -> {ok, Responses :: [{Stream :: integer(), @@ -100,14 +94,12 @@ run_decode(#parser{buffer = Buffer} = Parser, Compression) -> {error, Other} end. -%%----------------------------------------------------------------------------- -%% Decode functions -%%----------------------------------------------------------------------------- +%% Decode functions ----------------------------------------------------------- -spec decode(binary(), compression()) -> {ok, Stream :: integer(), Response :: response(), Rest :: binary()} | {error, Reason :: term()}. -decode(<>, Compression) -> Data2 = maybe_decompress(Decompress, Compression, Data), @@ -147,8 +139,6 @@ opcode(16#06) -> supported; opcode(16#08) -> result; opcode(16#0c) -> event. -%% Error ---------------------------------------------------------------------- - -spec error2(binary()) -> cql_error(). error2(<>) -> Error = case error_code(ErrorCode) of @@ -244,20 +234,14 @@ consistency(5) -> all; consistency(6) -> local_quorum; consistency(7) -> each_quorum. -%% Ready ---------------------------------------------------------------------- - -spec ready(binary()) -> ready. ready(<<>>) -> ready. -%% Authenticate --------------------------------------------------------------- - -spec authenticate(binary()) -> authenticate(). authenticate(<>) -> {authenticate, AuthClass}. -%% Supported ------------------------------------------------------------------ - -spec supported(binary()) -> supported(). supported(<>) -> Supported = supported_map(N, Data, []), @@ -280,8 +264,6 @@ supported_list(N, <>, Values) -> supported_list(N - 1, Rest, [Value | Values]). -%% Result --------------------------------------------------------------------- - -spec result(binary()) -> result(). result(<>) -> case result_kind(Kind) of @@ -304,14 +286,10 @@ result_kind(16#0003) -> set_keyspace; result_kind(16#0004) -> prepared; result_kind(16#0005) -> schema_change. -%% Result: Void - -spec void(binary()) -> void(). void(<<>>) -> {ok, void}. -%% Result: Rows - -spec rows(binary()) -> rows(). rows(Data) -> {ColumnCount, ColumnSpecs, RowData} = metadata(Data), @@ -321,6 +299,9 @@ rows(Data) -> {ok, {Rows, ColumnSpecs}}. -spec metadata(binary()) -> {integer(), column_specs(), Rest :: binary()}. +metadata(<<_:29, 0:1, _:2, 0:?INT, Rest/binary>>) -> + %% NEW: no_metadata + {0, [], Rest}; metadata(<<_:31, 0:1, ColumnCount:?INT, ColumnData/binary>>) -> {ColumnSpecs, Rest} = column_specs(false, ColumnCount, ColumnData, []), {ColumnCount, ColumnSpecs, Rest}; @@ -410,21 +391,16 @@ row_values(N, [Type | Types], < set_keyspace(). set_keyspace(<>) -> {ok, Keyspace}. -%% Result: Prepared - -spec prepared(binary()) -> {ok, bitstring(), [option()]}. prepared(<>) -> - {_ColumnCount, ColumnSpecs, <<>>} = metadata(Metadata), - {_, ColumnTypes} = lists:unzip(ColumnSpecs), - {ok, QueryId, ColumnTypes}. - -%% Result: Schema change + {_ColumnCount, ColumnSpecs, ResultMetadata} = metadata(Metadata), + %% {_, ColumnTypes} = lists:unzip(ColumnSpecs), + {_, ResultSpecs, <<>>} = metadata(ResultMetadata), %% NEW: result_metadata + {ok, QueryId, {ColumnSpecs, ResultSpecs}}. -spec schema_change(binary()) -> schema_change(). schema_change(<>) -> {ok, schema_change_type(Type)}. -%% Event ---------------------------------------------------------------------- - -spec event(binary()) -> event_res(). event(<>) -> Event = case event_type(Type) of diff --git a/src/erlcql_encode.erl b/src/erlcql_encode.erl index 5cb1534..362c023 100644 --- a/src/erlcql_encode.erl +++ b/src/erlcql_encode.erl @@ -22,75 +22,84 @@ %% @author Krzysztof Rutka -module(erlcql_encode). -%% API --export([frame/3]). --export([startup/2, - credentials/1, - options/0, - 'query'/2, - prepare/1, - execute/3, - register/1]). +-export([frame/4]). +-export([startup/3]). +-export([credentials/2]). +-export([options/1]). +-export(['query'/3]). +-export([prepare/2]). +-export([execute/4]). +-export([register/2]). -include("erlcql.hrl"). -%%----------------------------------------------------------------------------- -%% API function -%%----------------------------------------------------------------------------- +%% API function --------------------------------------------------------------- %% @doc Encodes the entire request frame. --spec frame(request(), tuple(), integer()) -> Frame :: iolist(). -frame({Opcode, Payload}, {Compression, _}, Stream) -> +-spec frame(version(), request(), tuple(), integer()) -> Frame :: iolist(). +frame(V, {Opcode, Payload}, {Compression, _}, Stream) -> OpcodeByte = opcode(Opcode), {CompressionBit, Payload2} = maybe_compress(Compression, Payload), Length = int(iolist_size(Payload2)), - [<>, + [<>, Stream, OpcodeByte, Length, Payload2]. %% @doc Encodes the startup request message body. --spec startup(compression(), bitstring()) -> {startup, iolist()}. -startup(false, CQLVersion) -> +-spec startup(version(), compression(), bitstring()) -> {startup, iolist()}. +startup(_V, false, CQLVersion) -> {startup, string_map([{<<"CQL_VERSION">>, CQLVersion}])}; -startup(Compression, CQLVersion) -> +startup(_V, Compression, CQLVersion) -> {startup, string_map([{<<"CQL_VERSION">>, CQLVersion}, {<<"COMPRESSION">>, compression(Compression)}])}. %% @doc Encodes the credentials request message body. --spec credentials([{K :: bitstring(), V :: bitstring()}]) -> +-spec credentials(version(), [{K :: bitstring(), V :: bitstring()}]) -> {credentials, iolist()}. -credentials(Informations) -> +credentials(_V, Informations) -> {credentials, string_map(Informations)}. %% @doc Encodes the options request message body. --spec options() -> {options, iolist()}. -options() -> +-spec options(version()) -> {options, iolist()}. +options(_V) -> {options, []}. -%% @doc Encodes the 'query' request message body. --spec 'query'(iodata(), consistency()) -> {'query', iolist()}. -'query'(QueryString, Consistency) -> - {'query', [long_string(QueryString), consistency(Consistency)]}. +%% @doc Encodes the query request message body. +-spec 'query'(version(), iodata(), consistency()) -> {'query', iolist()}. +'query'(1, QueryString, Consistency) -> + {'query', [long_string(QueryString), consistency(Consistency)]}; +'query'(2, QueryString, Consistency) -> + %% TODO: Implement support for query flags + %% native_protocol_v2.spec#L292 + Flags = 0, + {'query', [long_string(QueryString), consistency(Consistency), Flags]}. %% @doc Encodes the prepare request message body. --spec prepare(iodata()) -> {prepare, iolist()}. -prepare(QueryString) -> +-spec prepare(version(), iodata()) -> {prepare, iolist()}. +prepare(_V, QueryString) -> {prepare, [long_string(QueryString)]}. %% @doc Encodes the execute request message body. --spec execute(binary(), values(), consistency()) -> {execute, iolist()}. -execute(QueryId, Values, Consistency) -> +-spec execute(version(), binary(), values(), consistency()) -> + {execute, iolist()}. +execute(1, QueryId, Values, Consistency) -> BinaryValues = erlcql_convert:to_binary(Values), {execute, [short_bytes(QueryId), BinaryValues, - consistency(Consistency)]}. + consistency(Consistency)]}; +execute(2, QueryId, Values, Consistency) -> + %% TODO: Implement support for query flags + %% native_protocol_v2.spec#L292 + %% TODO: Merge with regular query + BinaryValues = erlcql_convert:to_binary(Values), + Flags = 1, + {execute, [short_bytes(QueryId), consistency(Consistency), + Flags, BinaryValues]}. %% @doc Encodes the register request message body. --spec register([event_type()]) -> {register, iolist()}. -register(Events) -> +-spec register(version(), [event_type()]) -> {register, iolist()}. +register(_V, Events) -> {register, event_list(Events)}. -%%----------------------------------------------------------------------------- -%% Encode functions -%%----------------------------------------------------------------------------- +%% Encode functions ----------------------------------------------------------- -spec int(integer()) -> binary(). int(X) -> @@ -150,10 +159,6 @@ event_list(Events) -> N = length(Events), [short(N) | [string2(event(Event)) || Event <- Events]]. -%%----------------------------------------------------------------------------- -%% Internal functions -%%----------------------------------------------------------------------------- - %% @doc Compresses the payload if compression is enabled. -spec maybe_compress(compression(), iolist()) -> {0 | 1, iolist()}. maybe_compress(false, Payload) -> From 8cf57ba3f6b0e35454e2deeece9986a602f057f1 Mon Sep 17 00:00:00 2001 From: Krzysztof Rutka Date: Mon, 26 May 2014 12:12:17 +0200 Subject: [PATCH 02/20] Add v2 authentication request/response messages --- src/erlcql_decode.erl | 17 +++++++++-- src/erlcql_encode.erl | 65 ++++++++++++++++++++++++++----------------- test/erlcql_SUITE.erl | 2 +- 3 files changed, 56 insertions(+), 28 deletions(-) diff --git a/src/erlcql_decode.erl b/src/erlcql_decode.erl index 7b65670..f08a558 100644 --- a/src/erlcql_decode.erl +++ b/src/erlcql_decode.erl @@ -115,7 +115,11 @@ decode(< result(Data2); event -> - event(Data2) + event(Data2); + auth_challenge -> + auth_challenge(Data2); + auth_success -> + auth_success(Data2) end, {ok, Stream, Response, Rest}; decode(_Other, _Compression) -> @@ -137,7 +141,9 @@ opcode(16#02) -> ready; opcode(16#03) -> authenticate; opcode(16#06) -> supported; opcode(16#08) -> result; -opcode(16#0c) -> event. +opcode(16#0c) -> event; +opcode(16#0e) -> auth_challenge; +opcode(16#10) -> auth_success. -spec error2(binary()) -> cql_error(). error2(<>) -> @@ -466,3 +472,10 @@ inet(4, <>) -> inet(16, <>) -> {{A, B, C, D, E, F, G, H}, Port}. +-spec auth_challenge(binary()) -> {auth_challenge, binary()}. +auth_challenge(<>) -> + {auth_challenge, Token}. + +-spec auth_success(binary()) -> {auth_success, binary()}. +auth_success(<>) -> + {auth_success, Token}. diff --git a/src/erlcql_encode.erl b/src/erlcql_encode.erl index 362c023..118e1e2 100644 --- a/src/erlcql_encode.erl +++ b/src/erlcql_encode.erl @@ -30,6 +30,7 @@ -export([prepare/2]). -export([execute/4]). -export([register/2]). +-export([auth_response/2]). -include("erlcql.hrl"). @@ -55,7 +56,7 @@ startup(_V, Compression, CQLVersion) -> %% @doc Encodes the credentials request message body. -spec credentials(version(), [{K :: bitstring(), V :: bitstring()}]) -> {credentials, iolist()}. -credentials(_V, Informations) -> +credentials(1, Informations) -> {credentials, string_map(Informations)}. %% @doc Encodes the options request message body. @@ -66,12 +67,13 @@ options(_V) -> %% @doc Encodes the query request message body. -spec 'query'(version(), iodata(), consistency()) -> {'query', iolist()}. 'query'(1, QueryString, Consistency) -> - {'query', [long_string(QueryString), consistency(Consistency)]}; + {'query', [long_string(QueryString), short(consistency(Consistency))]}; 'query'(2, QueryString, Consistency) -> %% TODO: Implement support for query flags %% native_protocol_v2.spec#L292 Flags = 0, - {'query', [long_string(QueryString), consistency(Consistency), Flags]}. + {'query', [long_string(QueryString), + short(consistency(Consistency)), Flags]}. %% @doc Encodes the prepare request message body. -spec prepare(version(), iodata()) -> {prepare, iolist()}. @@ -84,14 +86,14 @@ prepare(_V, QueryString) -> execute(1, QueryId, Values, Consistency) -> BinaryValues = erlcql_convert:to_binary(Values), {execute, [short_bytes(QueryId), BinaryValues, - consistency(Consistency)]}; + short(consistency(Consistency))]}; execute(2, QueryId, Values, Consistency) -> %% TODO: Implement support for query flags %% native_protocol_v2.spec#L292 %% TODO: Merge with regular query BinaryValues = erlcql_convert:to_binary(Values), Flags = 1, - {execute, [short_bytes(QueryId), consistency(Consistency), + {execute, [short_bytes(QueryId), short(consistency(Consistency)), Flags, BinaryValues]}. %% @doc Encodes the register request message body. @@ -99,6 +101,10 @@ execute(2, QueryId, Values, Consistency) -> register(_V, Events) -> {register, event_list(Events)}. +-spec auth_response(version(), binary()) -> {auth_response, iolist()}. +auth_response(2, Token) -> + {auth_response, [bytes(Token)]}. + %% Encode functions ----------------------------------------------------------- -spec int(integer()) -> binary(). @@ -119,6 +125,11 @@ long_string(String) -> Length = iolist_size(String), [int(Length), String]. +-spec bytes(binary()) -> iolist(). +bytes(Bytes) -> + Length = iolist_size(Bytes), + [int(Length), Bytes]. + -spec short_bytes(binary()) -> iolist(). short_bytes(Bytes) -> Length = iolist_size(Bytes), @@ -131,28 +142,32 @@ string_map(KeyValues) -> || {Key, Value} <- KeyValues]]. -spec opcode(atom()) -> integer(). -opcode(startup) -> 1; -opcode(credentials) -> 4; -opcode(options) -> 5; -opcode('query') -> 7; -opcode(prepare) -> 9; -opcode(execute) -> 10; -opcode(register) -> 11. - --spec consistency(consistency()) -> binary(). -consistency(any) -> short(0); -consistency(one) -> short(1); -consistency(two) -> short(2); -consistency(three) -> short(3); -consistency(quorum) -> short(4); -consistency(all) -> short(5); -consistency(local_quorum) -> short(6); -consistency(each_quorum) -> short(7). +opcode(startup) -> 16#01; +opcode(credentials) -> 16#04; +opcode(options) -> 16#05; +opcode('query') -> 16#07; +opcode(prepare) -> 16#09; +opcode(execute) -> 16#0a; +opcode(register) -> 16#0b; +opcode(auth_response) -> 16#0f. + +-spec consistency(consistency()) -> integer(). +consistency(any) -> 16#00; +consistency(one) -> 16#01; +consistency(two) -> 16#02; +consistency(three) -> 16#03; +consistency(quorum) -> 16#04; +consistency(all) -> 16#05; +consistency(local_quorum) -> 16#06; +consistency(each_quorum) -> 16#07; +consistency(serial) -> 16#08; +consistency(local_serial) -> 16#09; +consistency(local_one) -> 16#0a. -spec event(event_type()) -> bitstring(). event(topology_change) -> <<"TOPOLOGY_CHANGE">>; -event(status_change) -> <<"STATUS_CHANGE">>; -event(schema_change) -> <<"SCHEMA_CHANGE">>. +event(status_change) -> <<"STATUS_CHANGE">>; +event(schema_change) -> <<"SCHEMA_CHANGE">>. -spec event_list([event_type()]) -> iolist(). event_list(Events) -> @@ -176,4 +191,4 @@ maybe_compress(lz4, Payload) -> -spec compression(compression()) -> bitstring(). compression(snappy) -> <<"snappy">>; -compression(lz4) -> <<"lz4">>. +compression(lz4) -> <<"lz4">>. diff --git a/test/erlcql_SUITE.erl b/test/erlcql_SUITE.erl index 247772e..895f6b7 100644 --- a/test/erlcql_SUITE.erl +++ b/test/erlcql_SUITE.erl @@ -5,7 +5,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("proper/include/proper.hrl"). --define(OPTS, [{cql_version, <<"3.0.0">>}]). +-define(OPTS, [{version, 1}]). -define(KEYSPACE, <<"erlcql_tests">>). -define(CREATE_KEYSPACE, <<"CREATE KEYSPACE IF NOT EXISTS erlcql_tests ", From 6ca82ad80314ddf58d5c862625f87ab4dde75d76 Mon Sep 17 00:00:00 2001 From: Krzysztof Rutka Date: Tue, 3 Jun 2014 16:16:42 +0200 Subject: [PATCH 03/20] Support for query parameters (e.g. paging, serial consistency) --- include/erlcql.hrl | 3 +- src/erlcql_client.erl | 81 +++++++++++++++++----------- src/erlcql_decode.erl | 123 ++++++++++++++++++++++++------------------ src/erlcql_encode.erl | 69 ++++++++++++++++++------ test/erlcql_SUITE.erl | 17 ++++-- test/erlcql_test.erl | 2 +- 6 files changed, 188 insertions(+), 107 deletions(-) diff --git a/include/erlcql.hrl b/include/erlcql.hrl index 782e3e8..668bbfe 100644 --- a/include/erlcql.hrl +++ b/include/erlcql.hrl @@ -38,7 +38,8 @@ %% Parser -record(parser, { buffer = <<>> :: binary(), - length :: pos_integer() + length :: pos_integer(), + version :: version() }). -type parser() :: #parser{}. diff --git a/src/erlcql_client.erl b/src/erlcql_client.erl index 8a9139f..bb3a485 100644 --- a/src/erlcql_client.erl +++ b/src/erlcql_client.erl @@ -23,15 +23,22 @@ -module(erlcql_client). -behaviour(gen_fsm). -%% API -export([start_link/1]). --export(['query'/3, async_query/3, - execute/4, async_execute/4]). --export([prepare/2, prepare/3, - options/1, - register/2]). --export([await/1, - await/2]). + +-export(['query'/3]). +-export([execute/4]). + +-export([prepare/2]). +-export([prepare/3]). +-export([options/1]). +-export([register/2]). + +-export([async_query/3]). +-export([async_execute/4]). +-export([await/1]). +-export([await/2]). + +-export([get_env_opt/2]). %% gen_fsm callbacks -export([init/1, @@ -98,8 +105,8 @@ start_link(Opts) -> -spec 'query'(pid(), iodata(), consistency()) -> result() | {error, Reason :: term()}. -'query'(Pid, QueryString, Consistency) -> - async_call(Pid, {'query', QueryString, Consistency}). +'query'(Pid, QueryString, Params) -> + async_call(Pid, {'query', QueryString, Params}). -spec prepare(pid(), iodata()) -> prepared() | {error, Reason :: term()}. prepare(Pid, QueryString) -> @@ -126,8 +133,8 @@ register(Pid, Events) -> -spec async_query(pid(), iodata(), consistency()) -> {ok, QueryRef :: erlcql:query_ref()} | {error, Reason :: term()}. -async_query(Pid, QueryString, Consistency) -> - cast(Pid, {'query', QueryString, Consistency}). +async_query(Pid, QueryString, Params) -> + cast(Pid, {'query', QueryString, Params}). -spec async_execute(pid(), erlcql:uuid() | atom(), values(), consistency()) -> {ok, QueryRef :: erlcql:query_ref()} | {error, Reason :: term()}. @@ -198,18 +205,18 @@ init_state(Opts) -> Host = get_env_opt(host, Opts), Port = get_env_opt(port, Opts), Database = {Host, Port}, - EventFun = get_opt(event_fun, Opts), + EventFun = get_internal_opt(event_fun, Opts), Events = get_env_opt(register, Opts), Compression = get_env_opt(compression, Opts), Tracing = get_env_opt(tracing, Opts), Flags = {Compression, Tracing}, Keepalive = get_env_opt(keepalive, Opts), Keyspace = get_env_opt(use, Opts), - Parent = get_opt(parent, Opts), - Parser = erlcql_decode:new_parser(), + Parent = get_internal_opt(parent, Opts), Prepare = get_env_opt(prepare, Opts), PreparedETS = maybe_create_prepared_ets(Opts), Version = get_env_opt(version, Opts), + Parser = erlcql_decode:new_parser(Version), #state{async_ets = AsyncETS, auto_reconnect = AutoReconnect, backoff = Backoff, @@ -289,9 +296,9 @@ ready(Event, State) -> ready({_Ref, _}, _From, #state{streams = []} = State) -> ?CRITICAL("Too many requests!"), {reply, {error, too_many_requests}, ready, State}; -ready({Ref, {'query', QueryString, Consistency}}, {From, _}, +ready({Ref, {'query', QueryString, Params}}, {From, _}, #state{version = Version} = State) -> - Query = erlcql_encode:'query'(Version, QueryString, Consistency), + Query = erlcql_encode:'query'(Version, QueryString, Params), send(Query, {Ref, From}, State); ready({Ref, {prepare, Query}}, {From, _}, #state{version = Version} = State) -> @@ -301,9 +308,10 @@ ready({Ref, {prepare, Query, Name}}, {From, _}, #state{prepared_ets = PreparedETS, version = Version} = State) -> Prepare = erlcql_encode:prepare(Version, Query), - Fun = fun({ok, QueryId, {ReqSpecs, _}} = Response) -> - {_, Types} = lists:unzip(ReqSpecs), - true = ets:insert(PreparedETS, {Name, Query, QueryId, Types}), + Fun = fun({ok, {QueryId, RequestMetadata, _ResultMetadata}} = Response) -> + Types = proplists:get_value(types, RequestMetadata), + Entry = {Name, Query, QueryId, Types}, + true = ets:insert(PreparedETS, Entry), Response; ({error, _} = Response) -> Response @@ -394,12 +402,8 @@ terminate(_Reason, _StateName, #state{socket = Socket}) -> -spec maybe_create_prepared_ets(proplist()) -> ets(). maybe_create_prepared_ets(Opts) -> - case get_opt(prepared_statements_ets_tid, Opts) of - undefined -> - ets:new(?PREPARED_ETS_NAME, ?PREPARED_ETS_OPTS); - Tid -> - Tid - end. + New = fun() -> ets:new(?PREPARED_ETS_NAME, ?PREPARED_ETS_OPTS) end, + get_opt(prepared_statements_ets_tid, Opts, New). -spec event_fun(event_fun() | pid()) -> event_fun(). event_fun(Fun) when is_function(Fun) -> @@ -501,7 +505,8 @@ use_keyspace(#state{keyspace = undefined} = State) -> {ok, State}; use_keyspace(#state{keyspace = Keyspace, version = Version} = State) -> - Use = erlcql_encode:'query'(Version, [<<"USE ">>, Keyspace], any), + Use = erlcql_encode:'query'(Version, [<<"USE ">>, Keyspace], + [{consistency, any}]), ok = send_request(Use, 0, State), case wait_for_response(State) of {ok, Keyspace} -> @@ -526,8 +531,8 @@ prepare_queries([{Name, Query} | Rest], Prepare = erlcql_encode:prepare(Version, Query), ok = send_request(Prepare, 0, State), case wait_for_response(State) of - {ok, QueryId, {ReqSpecs, _}} -> - {_, Types} = lists:unzip(ReqSpecs), + {ok, {QueryId, RequestMetadata, _}} -> + Types = proplists:get_value(types, RequestMetadata), true = ets:insert(PreparedETS, {Name, Query, QueryId, Types}), prepare_queries(Rest, State); {error, _Reason} = Error -> @@ -566,8 +571,9 @@ wait_for_body(<<_:32, Length:32>> = Header, -spec decode_response(binary(), state()) -> ready() | authenticate() | response() | {error, term()}. -decode_response(Binary, #state{flags = {Compression, _}}) -> - case erlcql_decode:decode(Binary, Compression) of +decode_response(Binary, #state{version = Version, + flags = {Compression, _}}) -> + case erlcql_decode:decode(Version, Binary, Compression) of {ok, 0, Response, <<>>} -> Response; {error, _} = Error -> @@ -681,11 +687,22 @@ get_env_opt(Opt, Opts) -> get_env(Opt) end. +get_internal_opt(Opt, Opts) -> + {Opt, Value} = lists:keyfind(Opt, 1, Opts), + Value. + -spec get_opt(term(), proplist()) -> Value :: term(). get_opt(Opt, Opts) -> get_opt(Opt, Opts, undefined). --spec get_opt(term(), proplist(), term()) -> Value :: term(). +-spec get_opt(term(), proplist(), term() | function()) -> Value :: term(). +get_opt(Opt, Opts, Default) when is_function(Default) -> + case lists:keyfind(Opt, 1, Opts) of + {Opt, Value} -> + Value; + false -> + Default() + end; get_opt(Opt, Opts, Default) -> case lists:keyfind(Opt, 1, Opts) of {Opt, Value} -> diff --git a/src/erlcql_decode.erl b/src/erlcql_decode.erl index f08a558..424c15e 100644 --- a/src/erlcql_decode.erl +++ b/src/erlcql_decode.erl @@ -22,20 +22,21 @@ %% @author Krzysztof Rutka -module(erlcql_decode). --export([new_parser/0]). +-export([new_parser/1]). -export([parse/3]). --export([decode/2]). +-export([decode/3]). -include("erlcql.hrl"). -define(STRING(Length), Length/bytes). +-define(BYTES(Length), Length/bytes). %% API functions -------------------------------------------------------------- %% @doc Returns a new parser. --spec new_parser() -> parser(). -new_parser() -> - #parser{}. +-spec new_parser(version()) -> parser(). +new_parser(Version) -> + #parser{version = Version}. %% @doc Parses given data using a parser. -spec parse(binary(), parser(), compression()) -> @@ -84,8 +85,9 @@ get_length(#parser{length = undefined, buffer = Buffer} = Parser) -> Parser#parser{length = Length + 8}; get_length(Parser) -> Parser. -run_decode(#parser{buffer = Buffer} = Parser, Compression) -> - case decode(Buffer, Compression) of +run_decode(#parser{version = Version, + buffer = Buffer} = Parser, Compression) -> + case decode(Version, Buffer, Compression) of {ok, Stream, Response, Leftovers} -> NewParser = Parser#parser{length = undefined, buffer = Leftovers}, @@ -96,16 +98,16 @@ run_decode(#parser{buffer = Buffer} = Parser, Compression) -> %% Decode functions ----------------------------------------------------------- --spec decode(binary(), compression()) -> +-spec decode(version(), binary(), compression()) -> {ok, Stream :: integer(), Response :: response(), Rest :: binary()} | {error, Reason :: term()}. -decode(<>, Compression) -> +decode(V, <>, Compression) -> Data2 = maybe_decompress(Decompress, Compression, Data), Response = case opcode(Opcode) of - error -> - error2(Data2); + cql_error -> + cql_error(Data2); ready -> ready(Data2); authenticate -> @@ -122,8 +124,11 @@ decode(< - {error, bad_header}. +decode(_V, <<_Other:32, Length:32, _Data:Length/binary, + _Rest/binary>>, _Compression) -> + {error, bad_header}; +decode(_V, _Other, _Compression) -> + {error, binary_too_small}. -spec maybe_decompress(0 | 1, compression(), binary()) -> binary(). maybe_decompress(0, _Compression, Data) -> @@ -136,7 +141,7 @@ maybe_decompress(1, lz4, <>) -> UnpackedData. -spec opcode(integer()) -> response_opcode(). -opcode(16#00) -> error; +opcode(16#00) -> cql_error; opcode(16#02) -> ready; opcode(16#03) -> authenticate; opcode(16#06) -> supported; @@ -145,8 +150,8 @@ opcode(16#0c) -> event; opcode(16#0e) -> auth_challenge; opcode(16#10) -> auth_success. --spec error2(binary()) -> cql_error(). -error2(<>) -> +-spec cql_error(binary()) -> cql_error(). +cql_error(<>) -> Error = case error_code(ErrorCode) of unavailable_exception -> unavailable_exception(Data); @@ -167,7 +172,7 @@ error2(<>) -> -spec error_code(integer()) -> error_code(). error_code(16#0000) -> server_error; -error_code(16#000A) -> protocol_error; +error_code(16#000a) -> protocol_error; error_code(16#0100) -> bad_credentials; error_code(16#1000) -> unavailable_exception; error_code(16#1001) -> overloaded; @@ -298,39 +303,52 @@ void(<<>>) -> -spec rows(binary()) -> rows(). rows(Data) -> - {ColumnCount, ColumnSpecs, RowData} = metadata(Data), + {ColumnCount, Metadata, RowData} = metadata(Data), <> = RowData, - {_, ColumnTypes} = lists:unzip(ColumnSpecs), - Rows = rows(RowCount, ColumnCount, ColumnTypes, RowContent, []), - {ok, {Rows, ColumnSpecs}}. + Types = proplists:get_value(types, Metadata), + Rows = rows(RowCount, ColumnCount, Types, RowContent, []), + {ok, {Rows, Metadata}}. -spec metadata(binary()) -> {integer(), column_specs(), Rest :: binary()}. -metadata(<<_:29, 0:1, _:2, 0:?INT, Rest/binary>>) -> - %% NEW: no_metadata - {0, [], Rest}; -metadata(<<_:31, 0:1, ColumnCount:?INT, ColumnData/binary>>) -> - {ColumnSpecs, Rest} = column_specs(false, ColumnCount, ColumnData, []), - {ColumnCount, ColumnSpecs, Rest}; -metadata(<<_:31, 1:1, ColumnCount:?INT, - Length:?SHORT, _Keyspace:?STRING(Length), - Length2:?SHORT, _Table:?STRING(Length2), ColumnData/binary>>) -> - {ColumnSpecs, Rest} = column_specs(true, ColumnCount, ColumnData, []), - {ColumnCount, ColumnSpecs, Rest}. - --spec column_specs(boolean(), integer(), binary(), column_specs()) -> +metadata(<<_:29, NoMetadata:1, HasMorePages:1, GlobalSpec:1, + ColumnCount:?INT, Rest/binary>>) -> + {Params2, Rest2} = maybe_more_pages(HasMorePages, [], Rest), + {Params3, Rest3} = maybe_global_spec(GlobalSpec, Params2, Rest2), + {Params4, Rest4} = maybe_column_spec(NoMetadata, GlobalSpec, + ColumnCount, Params3, Rest3), + {ColumnCount, Params4, Rest4}. + +maybe_more_pages(0, Params, Data) -> {Params, Data}; +maybe_more_pages(1, Params, Data) -> + <> = Data, + {[{paging_state, PagingState} | Params], Rest}. + +maybe_global_spec(0, Params, Data) -> {Params, Data}; +maybe_global_spec(1, Params, Data) -> + <> = Data, + {[{keyspace, Keyspace}, {table, Table} | Params], Rest}. + +maybe_column_spec(1, _, 0, Params, Data) -> {Params, Data}; +maybe_column_spec(0, Global, N, Params, Data) -> + {Columns, Types, Rest} = column_specs(Global == 1, N, Data, [], []), + {[{columns, Columns}, {types, Types} | Params], Rest}. + +-spec column_specs(boolean(), integer(), binary(), column_specs(), term()) -> {column_specs(), Rest :: binary()}. -column_specs(_Global, 0, Rest, ColumnSpecs) -> - {lists:reverse(ColumnSpecs), Rest}; -column_specs(true, N, <>, - ColumnSpecs) -> +column_specs(_, 0, Rest, Columns, Types) -> + {lists:reverse(Columns), lists:reverse(Types), Rest}; +column_specs(true, N, <>, + Columns, Types) -> {Type, Rest} = option(TypeData), - column_specs(true, N - 1, Rest, [{Name, Type} | ColumnSpecs]); + column_specs(true, N - 1, Rest, [Column | Columns], [Type | Types]); column_specs(false, N, <>, - ColumnSpecs) -> + Length3:?SHORT, Column:?STRING(Length3), + TypeData/binary>>, + Columns, Types) -> {Type, Rest} = option(TypeData), - column_specs(false, N - 1, Rest, [{Name, Type} | ColumnSpecs]). + column_specs(false, N - 1, Rest, [Column | Columns], [Type | Types]). -spec option(binary()) -> {option(), Rest :: binary()}. option(<>) -> @@ -402,11 +420,10 @@ set_keyspace(<>) -> {ok, Keyspace}. -spec prepared(binary()) -> {ok, bitstring(), [option()]}. -prepared(<>) -> - {_ColumnCount, ColumnSpecs, ResultMetadata} = metadata(Metadata), - %% {_, ColumnTypes} = lists:unzip(ColumnSpecs), - {_, ResultSpecs, <<>>} = metadata(ResultMetadata), %% NEW: result_metadata - {ok, QueryId, {ColumnSpecs, ResultSpecs}}. +prepared(<>) -> + {_ColumnCount, RequestMetadata, Rest} = metadata(Data), + {_, ResultMetadata, <<>>} = metadata(Rest), + {ok, {QueryId, RequestMetadata, ResultMetadata}}. -spec schema_change(binary()) -> schema_change(). schema_change(<>) -> -spec event_type(bitstring()) -> event_type(). event_type(<<"TOPOLOGY_CHANGE">>) -> topology_change; -event_type(<<"STATUS_CHANGE">>) -> status_change; -event_type(<<"SCHEMA_CHANGE">>) -> schema_change. +event_type(<<"STATUS_CHANGE">>) -> status_change; +event_type(<<"SCHEMA_CHANGE">>) -> schema_change. -spec topology_change_event(binary()) -> {topology_change, Type :: atom(), Inet :: inet()}. @@ -437,7 +454,7 @@ topology_change_event(<>) -> {topology_change, topology_change_type(Type), inet(Data)}. -spec topology_change_type(bitstring()) -> atom(). -topology_change_type(<<"NEW_NODE">>) -> new_node; +topology_change_type(<<"NEW_NODE">>) -> new_node; topology_change_type(<<"REMOVED_NODE">>) -> removed_node. -spec status_change_event(binary()) -> @@ -446,7 +463,7 @@ status_change_event(<>) -> {status_change, status_change_type(Type), inet(Data)}. -spec status_change_type(bitstring()) -> atom(). -status_change_type(<<"UP">>) -> up; +status_change_type(<<"UP">>) -> up; status_change_type(<<"DOWN">>) -> down. -spec schema_change_event(binary()) -> diff --git a/src/erlcql_encode.erl b/src/erlcql_encode.erl index 118e1e2..ab909ea 100644 --- a/src/erlcql_encode.erl +++ b/src/erlcql_encode.erl @@ -34,6 +34,8 @@ -include("erlcql.hrl"). +-type query_parameters() :: [{atom(), term()}]. + %% API function --------------------------------------------------------------- %% @doc Encodes the entire request frame. @@ -65,15 +67,54 @@ options(_V) -> {options, []}. %% @doc Encodes the query request message body. --spec 'query'(version(), iodata(), consistency()) -> {'query', iolist()}. -'query'(1, QueryString, Consistency) -> - {'query', [long_string(QueryString), short(consistency(Consistency))]}; -'query'(2, QueryString, Consistency) -> - %% TODO: Implement support for query flags - %% native_protocol_v2.spec#L292 - Flags = 0, +-spec 'query'(version(), iodata(), query_parameters()) -> + {'query', iolist()}. +'query'(1, QueryString, Params) -> + Consistency = erlcql_client:get_env_opt(consistency, Params), {'query', [long_string(QueryString), - short(consistency(Consistency)), Flags]}. + short(consistency(Consistency))]}; +'query'(2, QueryString, Params) -> + Consistency = erlcql_client:get_env_opt(consistency, Params), + Params2 = [{values, []}, + {skip_metadata, false} | Params], + {'query', [long_string(QueryString), short(consistency(Consistency)), + query_parameters(Params2)]}. + +query_parameters(Params) -> + Flags = [{values, + fun([]) -> false; + (Values) -> erlcql_convert:to_binary(Values) end}, + {skip_metadata, fun(Skip) -> Skip end}, + {page_size, + fun(undefined) -> false; + (PageSize) -> int(PageSize) end}, + {paging_state, + fun(undefined) -> false; + (PagingState) -> bytes(PagingState) end}, + {serial_consistency, + fun(undefined) -> false; + (Consistency) -> short(consistency(Consistency)) end}], + Params2 = [{Flag, Fun(proplists:get_value(Flag, Params))} + || {Flag, Fun} <- Flags], + process_flags(Params2, fun query_flag/1). + +process_flags(Params, BitFun) -> + P = fun({_Name, false}, {S, B}) -> + {S, B}; + ({Name, true}, {S, B}) -> + {S + BitFun(Name), B}; + ({Name, Value}, {S, B}) -> + {S + BitFun(Name), [Value | B]} + end, + {Flags, Body} = lists:foldl(P, {0, []}, Params), + [Flags, lists:reverse(Body)]. + +-spec query_flag(atom()) -> integer(). +query_flag(values) -> 16#01; +query_flag(skip_metadata) -> 16#02; +query_flag(page_size) -> 16#04; +query_flag(paging_state) -> 16#08; +query_flag(serial_consistency) -> 16#10. %% @doc Encodes the prepare request message body. -spec prepare(version(), iodata()) -> {prepare, iolist()}. @@ -87,14 +128,12 @@ execute(1, QueryId, Values, Consistency) -> BinaryValues = erlcql_convert:to_binary(Values), {execute, [short_bytes(QueryId), BinaryValues, short(consistency(Consistency))]}; -execute(2, QueryId, Values, Consistency) -> - %% TODO: Implement support for query flags - %% native_protocol_v2.spec#L292 - %% TODO: Merge with regular query - BinaryValues = erlcql_convert:to_binary(Values), - Flags = 1, +execute(2, QueryId, Values, Params) -> + Consistency = erlcql_client:get_env_opt(consistency, Params), + Params2 = [{values, Values}, + {skip_metadata, false} | Params], {execute, [short_bytes(QueryId), short(consistency(Consistency)), - Flags, BinaryValues]}. + query_parameters(Params2)]}. %% @doc Encodes the register request message body. -spec register(version(), [event_type()]) -> {register, iolist()}. diff --git a/test/erlcql_SUITE.erl b/test/erlcql_SUITE.erl index 895f6b7..a94e048 100644 --- a/test/erlcql_SUITE.erl +++ b/test/erlcql_SUITE.erl @@ -5,7 +5,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("proper/include/proper.hrl"). --define(OPTS, [{version, 1}]). +-define(OPTS, []). -define(KEYSPACE, <<"erlcql_tests">>). -define(CREATE_KEYSPACE, <<"CREATE KEYSPACE IF NOT EXISTS erlcql_tests ", @@ -20,8 +20,6 @@ -define(PROPTEST(A), true = proper:quickcheck(A())). -define(PROPTEST(A, Args), true = proper:quickcheck(A(Args), {numtests, 1000})). --import(erlcql, [q/2, q/3]). - %% Fixtures ------------------------------------------------------------------- init_per_suite(Config) -> @@ -179,7 +177,9 @@ insert(Config) -> Pid = get_pid(Config), {ok, void} = q(Pid, <<"INSERT INTO t (k, v) VALUES (1, 'one')">>), Rows = q(Pid, <<"SELECT * FROM t">>, one), - {ok, {[[1, <<"one">>]], [{<<"k">>, int}, {<<"v">>, varchar}]}} = Rows. + {ok, {[[1, <<"one">>]], Metadata}} = Rows, + [<<"k">>, <<"v">>] = proplists:get_value(columns, Metadata), + [int, varchar] = proplists:get_value(types, Metadata). %% Client tests @@ -202,6 +202,11 @@ start_with_use(_Config) -> get_pid(Config) -> proplists:get_value(pid, Config). +q(Pid, Query) -> q(Pid, Query, one). + +q(Pid, Query, Consistency) -> + erlcql_client:'query'(Pid, Query, [{consistency, Consistency}]). + get_keyspaces(Pid) -> Check = <<"SELECT keyspace_name FROM system.schema_keyspaces">>, {ok, {Keyspaces, _}} = q(Pid, Check, one), @@ -233,7 +238,9 @@ get_value(Pid, <<"map">>) -> get_value(Pid, {map, varchar, boolean}); get_value(Pid, Type) -> Res = q(Pid, <<"SELECT v FROM erlcql_tests.t">>, one), - {ok, {[[Value]], [{<<"v">>, Type}]}} = Res, + {ok, {[[Value]], Metadata}} = Res, + [<<"v">>] = proplists:get_value(columns, Metadata), + [Type] = proplists:get_value(types, Metadata), Value. clean_type_table(TestCase, Config) -> diff --git a/test/erlcql_test.erl b/test/erlcql_test.erl index 17ed79d..7f6bc82 100644 --- a/test/erlcql_test.erl +++ b/test/erlcql_test.erl @@ -4,7 +4,7 @@ -define(CQL_VERSION, <<"3.0.0">>). -define(OPTS, [{cql_version, ?CQL_VERSION}]). --define(CONSISTENCY, quorum). +-define(CONSISTENCY, [{consistency, quorum}]). -spec create_keyspace() -> Keyspace :: bitstring(). create_keyspace() -> From db3e36489f7be80098bd8b06f46c321dcde0ef2b Mon Sep 17 00:00:00 2001 From: Krzysztof Rutka Date: Tue, 2 Sep 2014 10:20:48 +0200 Subject: [PATCH 04/20] Add support for BATCH request --- include/erlcql.hrl | 6 +++++- src/erlcql.erl | 4 +++- src/erlcql_client.erl | 11 +++++++++++ src/erlcql_encode.erl | 21 ++++++++++++++++++--- 4 files changed, 37 insertions(+), 5 deletions(-) diff --git a/include/erlcql.hrl b/include/erlcql.hrl index 668bbfe..085f6df 100644 --- a/include/erlcql.hrl +++ b/include/erlcql.hrl @@ -104,7 +104,11 @@ | quorum | all | local_quorum - | each_quorum. + | each_quorum + | local_one. + +-type serial_consistency() :: serial + | local_serial. -type event_type() :: topology_change | status_change diff --git a/src/erlcql.erl b/src/erlcql.erl index df58213..40d9c1c 100644 --- a/src/erlcql.erl +++ b/src/erlcql.erl @@ -98,10 +98,12 @@ default(compression) -> false; default(tracing) -> false; default(event_handler) -> self(); default(consistency) -> quorum; -default(auto_reconnect) -> false; +default(serial_consistency) -> serial; +default(batch_type) -> logged; default(register) -> []; default(use) -> undefined; default(prepare) -> []; +default(auto_reconnect) -> false; default(reconnect_start) -> 1000; default(reconnect_max) -> 30000; default(keepalive) -> false; diff --git a/src/erlcql_client.erl b/src/erlcql_client.erl index bb3a485..26f4fff 100644 --- a/src/erlcql_client.erl +++ b/src/erlcql_client.erl @@ -27,6 +27,7 @@ -export(['query'/3]). -export([execute/4]). +-export([batch/3]). -export([prepare/2]). -export([prepare/3]). @@ -121,6 +122,9 @@ prepare(Pid, QueryString, Name) -> execute(Pid, QueryId, Values, Consistency) -> async_call(Pid, {execute, QueryId, Values, Consistency}). +batch(Pid, Queries, Params) -> + async_call(Pid, {batch, Queries, Params}). + -spec options(pid()) -> supported() | {error, Reason :: term()}. options(Pid) -> async_call(Pid, options). @@ -271,6 +275,8 @@ startup({_Ref, {prepare, Query, _}}, _From, State) -> not_ready(prepare, Query, State); startup({_Ref, {execute, Query, _, _}}, _From, State) -> not_ready(execute, Query, State); +startup({_Ref, {batch, _, _, _}}, _From, State) -> + not_ready(batch, State); startup({_Ref, options}, _From, State) -> not_ready(options, State); startup({_Ref, {register, _}}, _From, State) -> @@ -340,6 +346,10 @@ ready({Ref, {execute, Name, Values, Consistency}}, {From, _}, ?DEBUG("Execute failed, invalid query name: ~s", [Name]), {reply, {error, invalid_query_name}, ready, State} end; +ready({Ref, {batch, Queries, Params}}, {From, _}, + #state{version = Version} = State) -> + Batch = erlcql_encode:batch(Version, Queries, Params), + send(Batch, {Ref, From}, State); ready({Ref, options}, {From, _}, #state{version = Version} = State) -> Options = erlcql_encode:options(Version), send(Options, {Ref, From}, State); @@ -648,6 +658,7 @@ parse_response(Data, #state{parser = Parser, {stop, Reason, State} end. +handle_responses([], State) -> State; handle_responses(Responses, State) -> lists:foldl(fun handle_response/2, State, Responses). diff --git a/src/erlcql_encode.erl b/src/erlcql_encode.erl index ab909ea..43d0e27 100644 --- a/src/erlcql_encode.erl +++ b/src/erlcql_encode.erl @@ -29,6 +29,7 @@ -export(['query'/3]). -export([prepare/2]). -export([execute/4]). +-export([batch/3]). -export([register/2]). -export([auth_response/2]). @@ -116,12 +117,10 @@ query_flag(page_size) -> 16#04; query_flag(paging_state) -> 16#08; query_flag(serial_consistency) -> 16#10. -%% @doc Encodes the prepare request message body. -spec prepare(version(), iodata()) -> {prepare, iolist()}. prepare(_V, QueryString) -> {prepare, [long_string(QueryString)]}. -%% @doc Encodes the execute request message body. -spec execute(version(), binary(), values(), consistency()) -> {execute, iolist()}. execute(1, QueryId, Values, Consistency) -> @@ -135,7 +134,18 @@ execute(2, QueryId, Values, Params) -> {execute, [short_bytes(QueryId), short(consistency(Consistency)), query_parameters(Params2)]}. -%% @doc Encodes the register request message body. +batch(2, Queries, Params) when is_list(Queries) -> + BatchType = erlcql_client:get_env_opt(batch_type, Params), + Consistency = erlcql_client:get_env_opt(consistency, Params), + Queries2 = [batch_query(Q) || Q <- Queries], + {batch, [batch_type(BatchType), short(length(Queries)), Queries2, + short(consistency(Consistency))]}. + +batch_query({Query, Values}) when is_atom(Query) -> + [1, short_bytes(Query), erlcql_convert:to_binary(Values)]; +batch_query({Query, Values}) -> + [1, long_string(Query), erlcql_convert:to_binary(Values)]. + -spec register(version(), [event_type()]) -> {register, iolist()}. register(_V, Events) -> {register, event_list(Events)}. @@ -188,6 +198,7 @@ opcode('query') -> 16#07; opcode(prepare) -> 16#09; opcode(execute) -> 16#0a; opcode(register) -> 16#0b; +opcode(batch) -> 16#0d; opcode(auth_response) -> 16#0f. -spec consistency(consistency()) -> integer(). @@ -203,6 +214,10 @@ consistency(serial) -> 16#08; consistency(local_serial) -> 16#09; consistency(local_one) -> 16#0a. +batch_type(logged) -> 0; +batch_type(unlogged) -> 1; +batch_type(counter) -> 2. + -spec event(event_type()) -> bitstring(). event(topology_change) -> <<"TOPOLOGY_CHANGE">>; event(status_change) -> <<"STATUS_CHANGE">>; From bf6b0194cd9d773eaf76b4de24f4b02c4b9ba305 Mon Sep 17 00:00:00 2001 From: Krzysztof Rutka Date: Tue, 2 Sep 2014 10:23:00 +0200 Subject: [PATCH 05/20] Some irrelevant changes --- .travis.yml | 1 + README.md | 9 +++---- src/erlcql.app.src | 12 ++++------ src/erlcql.erl | 47 +++++++++++++++++-------------------- {include => src}/erlcql.hrl | 15 ------------ src/erlcql_client.erl | 47 ++++++++----------------------------- src/erlcql_convert.erl | 12 ++++------ src/erlcql_decode.erl | 10 -------- src/erlcql_encode.erl | 11 --------- test/erlcql_SUITE.erl | 5 +++- 10 files changed, 47 insertions(+), 122 deletions(-) rename {include => src}/erlcql.hrl (93%) diff --git a/.travis.yml b/.travis.yml index 82af05d..d1e2666 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,6 @@ language: erlang otp_release: + - 17.1 - 17.0 - R16B03-1 - R16B03 diff --git a/README.md b/README.md index b7cd2bf..a69bd7b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ # erlCQL [![Build Status][travis_ci_image]][travis_ci] -[![Bitdeli Badge][bitdeli_image]][bitdeli] Cassandra native protocol CQL Erlang client. @@ -61,11 +60,9 @@ erlcql:start_link(Options :: proplists:proplist()) -> ### Versions -Supported versions: [`v1`][proto_v1]. +Supported versions: [`v1`][proto_v1], [`v2`][proto_v2] [travis_ci]: https://travis-ci.org/rpt/erlcql [travis_ci_image]: https://travis-ci.org/rpt/erlcql.png -[bitdeli]: https://bitdeli.com/free -[bitdeli_image]: https://d2weczhvl823v0.cloudfront.net/rpt/erlcql/trend.png -[proto_v1]: -https://raw.github.com/apache/cassandra/trunk/doc/native_protocol_v1.spec +[proto_v1]: https://raw.github.com/apache/cassandra/trunk/doc/native_protocol_v1.spec +[proto_v2]: https://raw.github.com/apache/cassandra/trunk/doc/native_protocol_v2.spec diff --git a/src/erlcql.app.src b/src/erlcql.app.src index 1a33b7a..3b9987e 100644 --- a/src/erlcql.app.src +++ b/src/erlcql.app.src @@ -1,12 +1,8 @@ -%% -*- mode: erlang -*- -%% vi: set ft=erlang : - {application, erlcql, [{description, "Cassandra native protocol CQL client"}, {vsn, "0.1.7"}, {registered, []}, - {applications, - [kernel, - stdlib, - snappy, - lz4]}]}. + {applications, [kernel, + stdlib, + snappy, + lz4]}]}. diff --git a/src/erlcql.erl b/src/erlcql.erl index 40d9c1c..f28b82b 100644 --- a/src/erlcql.erl +++ b/src/erlcql.erl @@ -18,16 +18,15 @@ %% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS %% IN THE SOFTWARE. -%% @doc API module. -%% @author Krzysztof Rutka -module(erlcql). -%% API --export([start_link/0, - start_link/1, - start_link/2]). --export([q/2, q/3, - e/3, e/4]). +-export([start_link/0]). +-export([start_link/1]). +-export([start_link/2]). +-export([q/2]). +-export([q/3]). +-export([e/3]). +-export([e/4]). -export([default/1]). -include("erlcql.hrl"). @@ -36,24 +35,20 @@ Pid :: pid(), Stream :: integer()}. --export_type([response/0, - consistency/0, - compression/0, - event_type/0, - event_fun/0, - query_ref/0]). --export_type([values/0, - type/0, - native_type/0, - uuid/0, - collection_type/0, - erlcql_list/0, - erlcql_set/0, - erlcql_map/0]). - -%%----------------------------------------------------------------------------- -%% API functions -%%----------------------------------------------------------------------------- +-export_type([response/0]). +-export_type([consistency/0]). +-export_type([compression/0]). +-export_type([event_type/0]). +-export_type([event_fun/0]). +-export_type([query_ref/0]). +-export_type([values/0]). +-export_type([type/0]). +-export_type([native_type/0]). +-export_type([uuid/0]). +-export_type([collection_type/0]). +-export_type([erlcql_list/0]). +-export_type([erlcql_set/0]). +-export_type([erlcql_map/0]). -spec start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}. start_link() -> diff --git a/include/erlcql.hrl b/src/erlcql.hrl similarity index 93% rename from include/erlcql.hrl rename to src/erlcql.hrl index 085f6df..aaa5763 100644 --- a/include/erlcql.hrl +++ b/src/erlcql.hrl @@ -18,24 +18,19 @@ %% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS %% IN THE SOFTWARE. -%% @author Krzysztof Rutka - -define(APP, erlcql). -type version() :: 1 | 2. -%% Directions -define(REQUEST, 0). -define(RESPONSE, 1). -%% Encode/decode types -define(INT, 4/big-signed-integer-unit:8). -define(SHORT, 2/big-unsigned-integer-unit:8). -define(int(X), <>). -define(short(X), <>). -%% Parser -record(parser, { buffer = <<>> :: binary(), length :: pos_integer(), @@ -45,10 +40,6 @@ -type event_fun() :: fun((event()) -> any()). -%%----------------------------------------------------------------------------- -%% Types -%%----------------------------------------------------------------------------- - -type proplist() :: proplists:proplist(). -type socket() :: inet:socket(). -type ets() :: ets:tid(). @@ -170,8 +161,6 @@ -type inet() :: {inet:ip_address(), inet:port_number()}. -%% Types ---------------------------------------------------------------------- - -type uuid() :: bitstring(). -type native_type() :: binary() @@ -195,10 +184,6 @@ -type values() :: [type() | {option(), type()}]. -%%----------------------------------------------------------------------------- -%% Logging macros -%%----------------------------------------------------------------------------- - -ifdef(ERLCQL_NO_LOGS). -define(ERROR(Format, Data), begin Format, Data, ok end). -define(EMERGENCY(Format, Data), begin Format, Data, ok end). diff --git a/src/erlcql_client.erl b/src/erlcql_client.erl index 26f4fff..6947b86 100644 --- a/src/erlcql_client.erl +++ b/src/erlcql_client.erl @@ -18,40 +18,33 @@ %% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS %% IN THE SOFTWARE. -%% @doc Native protocol CQL client module. -%% @author Krzysztof Rutka -module(erlcql_client). -behaviour(gen_fsm). -export([start_link/1]). - -export(['query'/3]). -export([execute/4]). -export([batch/3]). - -export([prepare/2]). -export([prepare/3]). -export([options/1]). -export([register/2]). - -export([async_query/3]). -export([async_execute/4]). -export([await/1]). -export([await/2]). - -export([get_env_opt/2]). -%% gen_fsm callbacks --export([init/1, - handle_event/3, - handle_sync_event/4, - handle_info/3, - code_change/4, - terminate/3]). --export([startup/2, - startup/3, - ready/2, - ready/3]). +-export([init/1]). +-export([handle_event/3]). +-export([handle_sync_event/4]). +-export([handle_info/3]). +-export([code_change/4]). +-export([terminate/3]). +-export([startup/2]). +-export([startup/3]). +-export([ready/2]). +-export([ready/3]). -include("erlcql.hrl"). @@ -94,16 +87,12 @@ }). -type backoff() :: #backoff{}. -%% Start API ------------------------------------------------------------------ - start_link(Opts) -> Opts2 = [{parent, self()} | Opts], EventFun = event_fun(get_env_opt(event_handler, Opts)), Opts3 = [{event_fun, EventFun} | Opts2], gen_fsm:start_link(?MODULE, proplists:unfold(Opts3), []). -%% Request API ---------------------------------------------------------------- - -spec 'query'(pid(), iodata(), consistency()) -> result() | {error, Reason :: term()}. 'query'(Pid, QueryString, Params) -> @@ -133,8 +122,6 @@ options(Pid) -> register(Pid, Events) -> async_call(Pid, {register, Events}). -%% Async request API ---------------------------------------------------------- - -spec async_query(pid(), iodata(), consistency()) -> {ok, QueryRef :: erlcql:query_ref()} | {error, Reason :: term()}. async_query(Pid, QueryString, Params) -> @@ -162,8 +149,6 @@ await({error, _Reason} = Error, _Timeout) -> await(QueryRef, Timeout) -> do_await(QueryRef, Timeout). -%% FSM: init ------------------------------------------------------------------ - init(Opts) -> State = init_state(Opts), case State#state.auto_reconnect of @@ -238,8 +223,6 @@ init_state(Opts) -> prepared_ets = PreparedETS, version = Version}. -%% State: startup ------------------------------------------------------------- - startup(reconnect, #state{backoff = Backoff, database = {Host, Port}, keepalive = Keepalive} = State) -> @@ -293,8 +276,6 @@ not_ready(Request, Info, State) -> ?DEBUG("Connection not ready for '~s': ~p", [Request, Info]), {reply, {error, not_ready}, startup, State}. -%% State: ready --------------------------------------------------------------- - ready(Event, State) -> ?ERROR("Bad event (ready): ~p", [Event]), {stop, {bad_event, Event}, State}. @@ -361,8 +342,6 @@ ready(Event, _From, State) -> ?ERROR("Bad event (ready/sync): ~p", [Event]), {stop, {bad_event, Event}, State}. -%% FSM: event handling -------------------------------------------------------- - handle_event({timeout, Stream}, StateName, #state{async_ets = AsyncETS, streams = Streams} = State) -> @@ -406,10 +385,6 @@ code_change(_OldVsn, StateName, State, _Extra) -> terminate(_Reason, _StateName, #state{socket = Socket}) -> close_socket(Socket). -%%----------------------------------------------------------------------------- -%% Internal functions -%%----------------------------------------------------------------------------- - -spec maybe_create_prepared_ets(proplist()) -> ets(). maybe_create_prepared_ets(Opts) -> New = fun() -> ets:new(?PREPARED_ETS_NAME, ?PREPARED_ETS_OPTS) end, @@ -687,8 +662,6 @@ send_response(Stream, Response, Pid, #state{async_ets = AsyncETS, Pid ! Response, State#state{streams = [Stream | Streams]}. -%% Helper functions ----------------------------------------------------------- - -spec get_env_opt(term(), proplist()) -> Value :: term(). get_env_opt(Opt, Opts) -> case lists:keyfind(Opt, 1, Opts) of diff --git a/src/erlcql_convert.erl b/src/erlcql_convert.erl index 54980ac..dbee1b0 100644 --- a/src/erlcql_convert.erl +++ b/src/erlcql_convert.erl @@ -18,17 +18,15 @@ %% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS %% IN THE SOFTWARE. -%% @doc Value encoding/decoding module. -%% @author Krzysztof Rutka -module(erlcql_convert). --export([from_binary/2, from_null/1, - to_binary/1, to_binary/2]). +-export([from_binary/2]). +-export([from_null/1]). +-export([to_binary/1]). +-export([to_binary/2]). -include("erlcql.hrl"). -%% From binary ---------------------------------------------------------------- - -spec from_binary(option(), binary()) -> type(). from_binary(ascii, Binary) -> Binary; @@ -116,8 +114,6 @@ decode_signed(Binary) -> <> = Binary, Int. -%% To binary ------------------------------------------------------------------ - -spec to_binary(Values) -> Binaries when Values :: [Value | TypedValue], Value :: type() | null, diff --git a/src/erlcql_decode.erl b/src/erlcql_decode.erl index 424c15e..513ee51 100644 --- a/src/erlcql_decode.erl +++ b/src/erlcql_decode.erl @@ -18,8 +18,6 @@ %% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS %% IN THE SOFTWARE. -%% @doc Native protocol decoder/parser. -%% @author Krzysztof Rutka -module(erlcql_decode). -export([new_parser/1]). @@ -31,14 +29,10 @@ -define(STRING(Length), Length/bytes). -define(BYTES(Length), Length/bytes). -%% API functions -------------------------------------------------------------- - -%% @doc Returns a new parser. -spec new_parser(version()) -> parser(). new_parser(Version) -> #parser{version = Version}. -%% @doc Parses given data using a parser. -spec parse(binary(), parser(), compression()) -> {ok, Responses :: [{Stream :: integer(), Response :: response()}], @@ -49,8 +43,6 @@ parse(Data, #parser{buffer = Buffer} = Parser, Compression) -> NewParser = Parser#parser{buffer = NewBuffer}, parse_loop(NewParser, Compression, []). -%% Parser functions ----------------------------------------------------------- - -spec parse_loop(parser(), compression(), [{integer(), response()}]) -> {ok, Responses :: [{Stream :: integer(), Response :: response()}], @@ -96,8 +88,6 @@ run_decode(#parser{version = Version, {error, Other} end. -%% Decode functions ----------------------------------------------------------- - -spec decode(version(), binary(), compression()) -> {ok, Stream :: integer(), Response :: response(), Rest :: binary()} | {error, Reason :: term()}. diff --git a/src/erlcql_encode.erl b/src/erlcql_encode.erl index 43d0e27..01c2e14 100644 --- a/src/erlcql_encode.erl +++ b/src/erlcql_encode.erl @@ -18,8 +18,6 @@ %% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS %% IN THE SOFTWARE. -%% @doc Native protocol request encoding. -%% @author Krzysztof Rutka -module(erlcql_encode). -export([frame/4]). @@ -37,9 +35,6 @@ -type query_parameters() :: [{atom(), term()}]. -%% API function --------------------------------------------------------------- - -%% @doc Encodes the entire request frame. -spec frame(version(), request(), tuple(), integer()) -> Frame :: iolist(). frame(V, {Opcode, Payload}, {Compression, _}, Stream) -> OpcodeByte = opcode(Opcode), @@ -48,7 +43,6 @@ frame(V, {Opcode, Payload}, {Compression, _}, Stream) -> [<>, Stream, OpcodeByte, Length, Payload2]. -%% @doc Encodes the startup request message body. -spec startup(version(), compression(), bitstring()) -> {startup, iolist()}. startup(_V, false, CQLVersion) -> {startup, string_map([{<<"CQL_VERSION">>, CQLVersion}])}; @@ -56,18 +50,15 @@ startup(_V, Compression, CQLVersion) -> {startup, string_map([{<<"CQL_VERSION">>, CQLVersion}, {<<"COMPRESSION">>, compression(Compression)}])}. -%% @doc Encodes the credentials request message body. -spec credentials(version(), [{K :: bitstring(), V :: bitstring()}]) -> {credentials, iolist()}. credentials(1, Informations) -> {credentials, string_map(Informations)}. -%% @doc Encodes the options request message body. -spec options(version()) -> {options, iolist()}. options(_V) -> {options, []}. -%% @doc Encodes the query request message body. -spec 'query'(version(), iodata(), query_parameters()) -> {'query', iolist()}. 'query'(1, QueryString, Params) -> @@ -154,8 +145,6 @@ register(_V, Events) -> auth_response(2, Token) -> {auth_response, [bytes(Token)]}. -%% Encode functions ----------------------------------------------------------- - -spec int(integer()) -> binary(). int(X) -> <>. diff --git a/test/erlcql_SUITE.erl b/test/erlcql_SUITE.erl index a94e048..9c202fe 100644 --- a/test/erlcql_SUITE.erl +++ b/test/erlcql_SUITE.erl @@ -186,7 +186,10 @@ insert(Config) -> start_without_use(_Config) -> {ok, Pid} = erlcql:start_link("localhost", ?OPTS), unlink(Pid), - Msg = <<"no keyspace has been specified">>, + %% Msg = <<"no keyspace has been specified">>, + %% TODO: Different messages for v1 and v2 + Msg = <<"No keyspace has been specified. " + "USE a keyspace, or explicity specify keyspace.tablename">>, {error, {invalid, Msg, _}} = q(Pid, ?CREATE_TABLE), exit(Pid, kill). From e94594eb0f2dc7b3459d2db5825e3a7c0fbc0f2d Mon Sep 17 00:00:00 2001 From: Krzysztof Rutka Date: Wed, 10 Sep 2014 11:13:53 +0200 Subject: [PATCH 06/20] Add support for batch request Right now it only supports prepared queries. Bump to 0.2.0-pre1. --- src/erlcql.app.src | 2 +- src/erlcql_client.erl | 31 +++++++++++++++++++++++++++---- src/erlcql_encode.erl | 6 ++---- test/erlcql_prepare_SUITE.erl | 26 +++++++++++++++++++++++++- test/erlcql_test.erl | 5 +++++ test/erlcql_test.hrl | 1 + 6 files changed, 61 insertions(+), 10 deletions(-) diff --git a/src/erlcql.app.src b/src/erlcql.app.src index 3b9987e..88316b7 100644 --- a/src/erlcql.app.src +++ b/src/erlcql.app.src @@ -1,6 +1,6 @@ {application, erlcql, [{description, "Cassandra native protocol CQL client"}, - {vsn, "0.1.7"}, + {vsn, "0.2.0"}, {registered, []}, {applications, [kernel, stdlib, diff --git a/src/erlcql_client.erl b/src/erlcql_client.erl index 6947b86..963882b 100644 --- a/src/erlcql_client.erl +++ b/src/erlcql_client.erl @@ -111,6 +111,8 @@ prepare(Pid, QueryString, Name) -> execute(Pid, QueryId, Values, Consistency) -> async_call(Pid, {execute, QueryId, Values, Consistency}). +-spec batch(pid(), [{atom(), values()}], proplist()) -> + result() | {error, Reason :: term()}. batch(Pid, Queries, Params) -> async_call(Pid, {batch, Queries, Params}). @@ -258,7 +260,7 @@ startup({_Ref, {prepare, Query, _}}, _From, State) -> not_ready(prepare, Query, State); startup({_Ref, {execute, Query, _, _}}, _From, State) -> not_ready(execute, Query, State); -startup({_Ref, {batch, _, _, _}}, _From, State) -> +startup({_Ref, {batch, _, _}}, _From, State) -> not_ready(batch, State); startup({_Ref, options}, _From, State) -> not_ready(options, State); @@ -328,9 +330,15 @@ ready({Ref, {execute, Name, Values, Consistency}}, {From, _}, {reply, {error, invalid_query_name}, ready, State} end; ready({Ref, {batch, Queries, Params}}, {From, _}, - #state{version = Version} = State) -> - Batch = erlcql_encode:batch(Version, Queries, Params), - send(Batch, {Ref, From}, State); + #state{prepared_ets = PreparedETS, + version = Version} = State) -> + case expand_prepared(PreparedETS, Queries, []) of + {error, Error} -> + {reply, {error, Error}, ready, State}; + Queries2 -> + Batch = erlcql_encode:batch(Version, Queries2, Params), + send(Batch, {Ref, From}, State) + end; ready({Ref, options}, {From, _}, #state{version = Version} = State) -> Options = erlcql_encode:options(Version), send(Options, {Ref, From}, State); @@ -385,6 +393,21 @@ code_change(_OldVsn, StateName, State, _Extra) -> terminate(_Reason, _StateName, #state{socket = Socket}) -> close_socket(Socket). +-spec expand_prepared(ets(), [{atom(), values()}], [{binary(), values()}]) -> + [{binary(), values()}] | {error, invalid_query_name}. +expand_prepared(_, [], Acc) -> + lists:reverse(Acc); +expand_prepared(ETS, [{Name, Values} | Qs], Acc) when is_atom(Name) -> + case ets:lookup(ETS, Name) of + [{Name, _Query, QueryId, undefined}] -> + expand_prepared(ETS, Qs, [{QueryId, Values} | Acc]); + [{Name, _Query, QueryId, Types}] -> + TypedValues = lists:zip(Types, Values), + expand_prepared(ETS, Qs, [{QueryId, TypedValues} | Acc]); + [] -> + {error, invalid_query_name} + end. + -spec maybe_create_prepared_ets(proplist()) -> ets(). maybe_create_prepared_ets(Opts) -> New = fun() -> ets:new(?PREPARED_ETS_NAME, ?PREPARED_ETS_OPTS) end, diff --git a/src/erlcql_encode.erl b/src/erlcql_encode.erl index 01c2e14..34a24a7 100644 --- a/src/erlcql_encode.erl +++ b/src/erlcql_encode.erl @@ -132,10 +132,8 @@ batch(2, Queries, Params) when is_list(Queries) -> {batch, [batch_type(BatchType), short(length(Queries)), Queries2, short(consistency(Consistency))]}. -batch_query({Query, Values}) when is_atom(Query) -> - [1, short_bytes(Query), erlcql_convert:to_binary(Values)]; -batch_query({Query, Values}) -> - [1, long_string(Query), erlcql_convert:to_binary(Values)]. +batch_query({QueryId, Values}) -> + [1, short_bytes(QueryId), erlcql_convert:to_binary(Values)]. -spec register(version(), [event_type()]) -> {register, iolist()}. register(_V, Events) -> diff --git a/test/erlcql_prepare_SUITE.erl b/test/erlcql_prepare_SUITE.erl index 3931311..34bea29 100644 --- a/test/erlcql_prepare_SUITE.erl +++ b/test/erlcql_prepare_SUITE.erl @@ -8,7 +8,8 @@ %% Suite ---------------------------------------------------------------------- all() -> - [prepare_execute]. + [prepare_execute, + prepare_batch]. %% Fixtures ------------------------------------------------------------------- @@ -68,3 +69,26 @@ prepare_execute(Config) -> {[Row], _} = execute(Client, select, [hd(Values)]), Row = Values. + +prepare_batch(Config) -> + Keyspace = ?c(keyspace, Config), + Table = gen_table_name(), + Create = [<<"CREATE TABLE ">>, Keyspace, <<".">>, Table, + <<" (x int PRIMARY KEY, y int)">>], + single_query(Create), + + Insert = [<<"INSERT INTO ">>, Table, + <<" (x, y) VALUES (?, ?)">>], + Select = [<<"SELECT * FROM ">>, Table], + Prepare = [{insert, Insert}, + {select, Select}], + Opts = [{use, Keyspace}, + {prepare, Prepare}], + Client = start_client(Opts), + + Points = [[1, 1], [2, 3], [3, 5]], + Queries = [{insert, V} || V <- Points], + batch(Client, Queries), + + {Rows, _} = execute(Client, select, []), + Points = lists:sort(Rows). diff --git a/test/erlcql_test.erl b/test/erlcql_test.erl index 7f6bc82..d2b214d 100644 --- a/test/erlcql_test.erl +++ b/test/erlcql_test.erl @@ -62,6 +62,11 @@ execute(Pid, Name, Values) -> {ok, Response} = erlcql_client:execute(Pid, Name, Values, ?CONSISTENCY), Response. +batch(Pid, Queries) -> + Opts = [{batch_type, logged} | ?CONSISTENCY], + {ok, Response} = erlcql_client:batch(Pid, Queries, Opts), + Response. + -spec stop_client(pid()) -> ok. stop_client(Pid) -> true = unlink(Pid), diff --git a/test/erlcql_test.hrl b/test/erlcql_test.hrl index b1fa795..6cdea20 100644 --- a/test/erlcql_test.hrl +++ b/test/erlcql_test.hrl @@ -5,6 +5,7 @@ single_query/1, 'query'/2, execute/3, + batch/2, start_client/1, stop_client/1]). From 1cc263aa04f8afc706c7f0bc809516b2d05dbf1e Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Fri, 10 Oct 2014 14:07:56 +0200 Subject: [PATCH 07/20] Return name and query when erlcql_client:prepare/3 fails Fixes #26 --- src/erlcql_client.erl | 4 ++-- test/erlcql_SUITE.erl | 6 +----- test/erlcql_prepare_SUITE.erl | 17 ++++++++++++++++- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/erlcql_client.erl b/src/erlcql_client.erl index 963882b..c7a46b1 100644 --- a/src/erlcql_client.erl +++ b/src/erlcql_client.erl @@ -302,8 +302,8 @@ ready({Ref, {prepare, Query, Name}}, {From, _}, Entry = {Name, Query, QueryId, Types}, true = ets:insert(PreparedETS, Entry), Response; - ({error, _} = Response) -> - Response + ({error, Reason}) -> + {error, {Name, Query, Reason}} end, send(Prepare, {Ref, From, Fun}, State); ready({Ref, {execute, QueryId, Values, Consistency}}, {From, _}, diff --git a/test/erlcql_SUITE.erl b/test/erlcql_SUITE.erl index 9c202fe..fe47262 100644 --- a/test/erlcql_SUITE.erl +++ b/test/erlcql_SUITE.erl @@ -186,11 +186,7 @@ insert(Config) -> start_without_use(_Config) -> {ok, Pid} = erlcql:start_link("localhost", ?OPTS), unlink(Pid), - %% Msg = <<"no keyspace has been specified">>, - %% TODO: Different messages for v1 and v2 - Msg = <<"No keyspace has been specified. " - "USE a keyspace, or explicity specify keyspace.tablename">>, - {error, {invalid, Msg, _}} = q(Pid, ?CREATE_TABLE), + {error, {invalid, _, _}} = q(Pid, ?CREATE_TABLE), exit(Pid, kill). start_with_use(_Config) -> diff --git a/test/erlcql_prepare_SUITE.erl b/test/erlcql_prepare_SUITE.erl index 34bea29..b798db1 100644 --- a/test/erlcql_prepare_SUITE.erl +++ b/test/erlcql_prepare_SUITE.erl @@ -9,7 +9,8 @@ all() -> [prepare_execute, - prepare_batch]. + prepare_batch, + prepare_error]. %% Fixtures ------------------------------------------------------------------- @@ -92,3 +93,17 @@ prepare_batch(Config) -> {Rows, _} = execute(Client, select, []), Points = lists:sort(Rows). + +prepare_error(Config) -> + Keyspace = ?c(keyspace, Config), + Table = gen_table_name(), + Create = [<<"CREATE TABLE ">>, Keyspace, <<".">>, Table, + <<" (x int PRIMARY KEY, y int)">>], + single_query(Create), + + Insert = [<<"INSERT INTO ">>, Table, <<" (x, y) VALUES (?)">>], + Opts = [{use, Keyspace}], + Client = start_client(Opts), + + {error, {broken_insert, Insert, {invalid, _, undefined}}} = + erlcql_client:prepare(Client, Insert, broken_insert). From ca5a2ebdfbe939183dc721f2d87c16cd9ec0db64 Mon Sep 17 00:00:00 2001 From: Krzysztof Rutka Date: Tue, 25 Nov 2014 12:49:28 +0100 Subject: [PATCH 08/20] Don't test with R14 on Travis --- .travis.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index d1e2666..9f6d597 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,8 +11,5 @@ otp_release: - R15B02 - R15B01 - R15B - - R14B04 - - R14B03 - - R14B02 services: cassandra script: make test From 202687836ac782e527a09d47c694b6e64ff78b4e Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Tue, 25 Nov 2014 16:09:31 +0100 Subject: [PATCH 09/20] Add default_timeout config option --- README.md | 1 + src/erlcql.erl | 3 ++- src/erlcql_client.erl | 7 ++++--- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index a69bd7b..769f8fe 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ erlcql:start_link(Options :: proplists:proplist()) -> | reconnect_start | pos_integer() | 1000 | | reconnect_max | pos_integer() | 30000 | | keepalive | boolean() | false | +| default_timeout | pos_integer() | 5000 | ### Query diff --git a/src/erlcql.erl b/src/erlcql.erl index f28b82b..13dc60e 100644 --- a/src/erlcql.erl +++ b/src/erlcql.erl @@ -102,4 +102,5 @@ default(auto_reconnect) -> false; default(reconnect_start) -> 1000; default(reconnect_max) -> 30000; default(keepalive) -> false; -default(version) -> 2. +default(version) -> 2; +default(default_timeout) -> timer:seconds(5). diff --git a/src/erlcql_client.erl b/src/erlcql_client.erl index c7a46b1..0cfd715 100644 --- a/src/erlcql_client.erl +++ b/src/erlcql_client.erl @@ -78,7 +78,6 @@ -define(PREPARED_ETS_NAME, erlcql_prepared). -define(PREPARED_ETS_OPTS, [set, private, {read_concurrency, true}]). --define(TIMEOUT, timer:seconds(5)). -record(backoff, { start :: pos_integer(), @@ -136,11 +135,13 @@ async_execute(Pid, QueryId, Values, Consistency) -> -spec await(erlcql:query_ref()) -> response() | {error, Reason :: term()}. await({ok, QueryRef}) -> - do_await(QueryRef, ?TIMEOUT); + Timeout = get_env(default_timeout), + do_await(QueryRef, Timeout); await({error, _Reason} = Error) -> Error; await(QueryRef) -> - do_await(QueryRef, ?TIMEOUT). + Timeout = get_env(default_timeout), + do_await(QueryRef, Timeout). -spec await(erlcql:query_ref(), integer()) -> response() | {error, Reason :: term()}. From 8a0362997a2cb461f1bd5b6465e153620182b6cd Mon Sep 17 00:00:00 2001 From: Krzysztof Rutka Date: Tue, 25 Nov 2014 12:18:36 +0100 Subject: [PATCH 10/20] Only return stream id when the response gets to the right caller WIP. --- src/erlcql_client.erl | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/erlcql_client.erl b/src/erlcql_client.erl index 0cfd715..33fc3b1 100644 --- a/src/erlcql_client.erl +++ b/src/erlcql_client.erl @@ -351,7 +351,7 @@ ready(Event, _From, State) -> ?ERROR("Bad event (ready/sync): ~p", [Event]), {stop, {bad_event, Event}, State}. -handle_event({timeout, Stream}, StateName, +handle_event({return, Stream}, StateName, #state{async_ets = AsyncETS, streams = Streams} = State) -> true = ets:delete(AsyncETS, Stream), @@ -631,9 +631,10 @@ cast(Pid, Request) -> do_await({Ref, Pid, Stream}, Timeout) -> receive {Ref, Response} -> + gen_fsm:send_all_state_event(Pid, {return, Stream}), Response after Timeout -> - gen_fsm:send_all_state_event(Pid, {timeout, Stream}), + gen_fsm:send_all_state_event(Pid, {return, Stream}), {error, timeout} end. @@ -680,11 +681,9 @@ handle_response({Stream, Response}, #state{async_ets = AsyncETS} = State) -> -spec send_response(integer(), {erlcql:query_ref(), erlcql:response()}, pid(), state()) -> state(). -send_response(Stream, Response, Pid, #state{async_ets = AsyncETS, - streams = Streams} = State) -> - true = ets:delete(AsyncETS, Stream), +send_response(_Stream, Response, Pid, State) -> Pid ! Response, - State#state{streams = [Stream | Streams]}. + State. -spec get_env_opt(term(), proplist()) -> Value :: term(). get_env_opt(Opt, Opts) -> From 5076fa483399d4a27e7ed2ff740f3824ad6f3f4c Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Wed, 26 Nov 2014 10:14:06 +0100 Subject: [PATCH 11/20] Only return stream when we get a response on it --- src/erlcql_client.erl | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/erlcql_client.erl b/src/erlcql_client.erl index 33fc3b1..ae179de 100644 --- a/src/erlcql_client.erl +++ b/src/erlcql_client.erl @@ -351,11 +351,6 @@ ready(Event, _From, State) -> ?ERROR("Bad event (ready/sync): ~p", [Event]), {stop, {bad_event, Event}, State}. -handle_event({return, Stream}, StateName, - #state{async_ets = AsyncETS, - streams = Streams} = State) -> - true = ets:delete(AsyncETS, Stream), - {next_state, StateName, State#state{streams = [Stream | Streams]}}; handle_event(Event, StateName, State) -> ?ERROR("Bad event (~s/handle_event): ~p", [StateName, Event]), {stop, {bad_event, Event}, State}. @@ -628,13 +623,11 @@ cast(Pid, Request) -> -spec do_await(erlcql:query_ref(), integer()) -> response() | {error, Reason :: term()}. -do_await({Ref, Pid, Stream}, Timeout) -> +do_await({Ref, _Pid, _Stream}, Timeout) -> receive {Ref, Response} -> - gen_fsm:send_all_state_event(Pid, {return, Stream}), Response after Timeout -> - gen_fsm:send_all_state_event(Pid, {return, Stream}), {error, timeout} end. @@ -681,9 +674,10 @@ handle_response({Stream, Response}, #state{async_ets = AsyncETS} = State) -> -spec send_response(integer(), {erlcql:query_ref(), erlcql:response()}, pid(), state()) -> state(). -send_response(_Stream, Response, Pid, State) -> +send_response(Stream, Response, Pid, State=#state{async_ets = AsyncETS, streams = Streams}) -> Pid ! Response, - State. + true = ets:delete(AsyncETS, Stream), + State#state{streams = [Stream | Streams]}. -spec get_env_opt(term(), proplist()) -> Value :: term(). get_env_opt(Opt, Opts) -> From 193192f08776cbe5bdb1ca0726cba6a3e8f70a5f Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Thu, 27 Nov 2014 12:44:29 +0100 Subject: [PATCH 12/20] vsn 0.2.1 --- src/erlcql.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/erlcql.app.src b/src/erlcql.app.src index 88316b7..523daee 100644 --- a/src/erlcql.app.src +++ b/src/erlcql.app.src @@ -1,6 +1,6 @@ {application, erlcql, [{description, "Cassandra native protocol CQL client"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [kernel, stdlib, From abdd552a9c4f141fc6133b19d6e847229c4d5d1d Mon Sep 17 00:00:00 2001 From: Aaron France Date: Thu, 27 Nov 2014 15:30:28 +0100 Subject: [PATCH 13/20] Add metrics --- rebar.config | 3 +- src/erlcql_client.erl | 12 ++++++ src/erlcql_folsom.erl | 86 ++++++++++++++++++++++++++++++++++++++++++ src/erlcql_metrics.hrl | 8 ++++ 4 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 src/erlcql_folsom.erl create mode 100644 src/erlcql_metrics.hrl diff --git a/rebar.config b/rebar.config index 90b0444..cd51a73 100644 --- a/rebar.config +++ b/rebar.config @@ -4,6 +4,7 @@ {deps, [{snappy, "", {git, "https://github.com/rpt/snappy.git"}}, {lz4, "", {git, "https://github.com/szktty/erlang-lz4.git"}}, - {proper, "", {git, "https://github.com/manopapad/proper.git"}}]}. + {proper, "", {git, "https://github.com/manopapad/proper.git"}}, + {folsomite, ".*", {git, "git://github.com/puzza007/folsomite.git"}}]}. {cover_enabled, true}. diff --git a/src/erlcql_client.erl b/src/erlcql_client.erl index ae179de..85307e8 100644 --- a/src/erlcql_client.erl +++ b/src/erlcql_client.erl @@ -47,6 +47,8 @@ -export([ready/3]). -include("erlcql.hrl"). +-include("erlcql_metrics.hrl"). + -record(state, { async_ets :: ets(), @@ -176,6 +178,7 @@ try_connect(#state{database = {Host, Port}, {ok, ready, State3}; {error, Reason} = Error -> ?ERROR("Connection init failed: ~s", [Reason]), + ok = erlcql_folsom:notify_histogram(?CONNECTION_FAILURE_METRIC, 1), {stop, Error} end; {error, Reason} = Error -> @@ -251,6 +254,7 @@ startup(reconnect, #state{backoff = Backoff, end; startup(Event, State) -> ?ERROR("Bad event (startup): ~p", [Event]), + ok = erlcql_folsom:notify_histogram(?CONNECTION_STARTUP_FAIL_METRIC, 1), {stop, {bad_event, Event}, State}. startup({_Ref, {'query', Query, _}}, _From, State) -> @@ -285,6 +289,7 @@ ready(Event, State) -> ready({_Ref, _}, _From, #state{streams = []} = State) -> ?CRITICAL("Too many requests!"), + ok = erlcql_folsom:notify_histogram(?CONNECTION_STREAMS_EXHAUSTED_METRIC, 1), {reply, {error, too_many_requests}, ready, State}; ready({Ref, {'query', QueryString, Params}}, {From, _}, #state{version = Version} = State) -> @@ -366,18 +371,22 @@ handle_info({tcp, Socket, Data}, ready, #state{socket = Socket} = State) -> handle_info({tcp_closed, Socket}, _StateName, #state{socket = Socket, auto_reconnect = false} = State) -> ?ERROR("TCP socket ~p closed", [Socket]), + ok = erlcql_folsom:notify_histogram(?CONNECTION_SOCKET_CLOSED, 1), {stop, tcp_closed, State}; handle_info({tcp_closed, Socket}, _StateName, #state{socket = Socket, auto_reconnect = true} = State) -> ?WARNING("TCP socket ~p closed", [Socket]), + ok = erlcql_folsom:notify_histogram(?CONNECTION_SOCKET_CLOSED, 1), try_again(State); handle_info({tcp_error, Socket, Reason}, _StateName, #state{socket = Socket, auto_reconnect = false} = State) -> ?ERROR("TCP socket ~p error: ~p", [Socket, Reason]), + ok = erlcql_folsom:notify_histogram(?CONNECTION_SOCKET_ERROR, 1), {stop, {tcp_error, Reason}, State}; handle_info({tcp_error, Socket, Reason}, _StateName, #state{socket = Socket, auto_reconnect = true} = State) -> ?WARNING("TCP socket ~p error: ~p", [Socket, Reason]), + ok = erlcql_folsom:notify_histogram(?CONNECTION_SOCKET_ERROR, 1), try_again(State); handle_info(Info, StateName, State) -> ?ERROR("Bad info (~s/handle_info): ~p", [StateName, Info]), @@ -628,6 +637,7 @@ do_await({Ref, _Pid, _Stream}, Timeout) -> {Ref, Response} -> Response after Timeout -> + ok = erclql_folsom:notify_histogram(?CONNECTION_QUERY_TIMEOUT, 1), {error, timeout} end. @@ -648,6 +658,7 @@ parse_response(Data, #state{parser = Parser, {next_state, ready, State2#state{parser = Parser2}}; {error, Reason} -> ?ERROR("Parsing response failed: ~p", [Reason]), + ok = erlcql_folsom:notify_histogram(?CONNECTION_PARSE_ERROR, 1), {stop, Reason, State} end. @@ -669,6 +680,7 @@ handle_response({Stream, Response}, #state{async_ets = AsyncETS} = State) -> send_response(Stream, {Ref, Response2}, Pid, State); [] -> ?WARNING("Unexpected response (~p): ~p", [Stream, Response]), + ok = erlcql_folsom:notify_histogram(?CONNECTION_UNEXPECTED_RESPONSE, 1), State end. diff --git a/src/erlcql_folsom.erl b/src/erlcql_folsom.erl new file mode 100644 index 0000000..d29289f --- /dev/null +++ b/src/erlcql_folsom.erl @@ -0,0 +1,86 @@ +-module(erlcql_folsom). + +-export([notify_counter/1]). +-export([notify_counter/2]). +-export([notify_gauge/1]). +-export([notify_gauge/2]). +-export([notify_histogram/1]). +-export([notify_histogram/2]). +-export([notify_history/1]). +-export([notify_history/2]). +-export([notify_meter/1]). +-export([notify_meter/2]). +-export([notify_meter_reader/1]). +-export([notify_meter_reader/2]). +-export([notify_duration/1]). +-export([notify_duration/2]). +-export([notify_spiral/1]). +-export([notify_spiral/2]). +-export([notify_timed/1]). +-export([begin_timed/1]). + + +notify_counter(Event) -> + notify(new_counter, Event). +notify_counter(Name, Value) -> + notify(new_counter, Name, Value). + +notify_gauge(Event) -> + notify(new_gauge, Event). +notify_gauge(Name, Value) -> + notify(new_gauge, Name, Value). + +notify_histogram(Event) -> + notify(new_histogram, Event). +notify_histogram(Name, Value) -> + notify(new_histogram, Name, Value). + +notify_history(Event) -> + notify(new_history, Event). +notify_history(Name, Value) -> + notify(new_history, Name, Value). + +notify_meter(Event) -> + notify(new_meter, Event). +notify_meter(Name, Value) -> + notify(new_meter, Name, Value). + +notify_meter_reader(Event) -> + notify(new_meter_reader, Event). +notify_meter_reader(Name, Value) -> + notify(new_meter_reader, Name, Value). + +notify_duration(Event) -> + notify(new_duration, Event). +notify_duration(Name, Value) -> + notify(new_duration, Name, Value). + +notify_spiral(Event) -> + notify(new_spiral, Event). +notify_spiral(Name, Value) -> + notify(new_spiral, Name, Value). + +notify_timed(Timer) -> + case folsom_metrics:safely_histogram_timed_notify(Timer) of + {error, Name, nonexistent_metric} -> + folsom_metrics:new_histogram(Name), + folsom_metrics:safely_histogram_timed_notify(Timer); + ok -> + ok + end. + +begin_timed(Name) -> + folsom_metrics:histogram_timed_begin(Name). + +notify(Fun, {Name, Value}) -> + notify(Fun, Name, Value). + +notify(Fun, Name, Value) -> + case folsom_metrics:safely_notify(Name, Value) of + {error, Name, nonexistent_metric} -> + folsom_metrics:Fun(Name), + folsom_metrics:safely_notify(Name, Value); + ok -> + ok + end. + diff --git a/src/erlcql_metrics.hrl b/src/erlcql_metrics.hrl new file mode 100644 index 0000000..a7fbc55 --- /dev/null +++ b/src/erlcql_metrics.hrl @@ -0,0 +1,8 @@ +-define(CONNECTION_FAILURE_METRIC, <<"erlcql.connection.failure">>). +-define(CONNECTION_STARTUP_FAIL_METRIC, <<"erlcql.connection.startup_fail">>). +-define(CONNECTION_STREAMS_EXHAUSTED_METRIC, <<"erlcql.connection.streams_exhausted">>). +-define(CONNECTION_SOCKET_CLOSED, <<"erlcql.connection.socket_closed">>). +-define(CONNECTION_SOCKET_ERROR, <<"erlcql.connection.socket_error">>). +-define(CONNECTION_PARSE_ERROR, <<"erlcql.connection.parse_error">>). +-define(CONNECTION_UNEXPECTED_RESPONSE, <<"erlcql.connection.unexpected_response">>). +-define(CONNECTION_QUERY_TIMEOUT, <<"erlcql.connection.query_timeout">>). From ea20ae307a9fad4d14dc276f5147ac352f8479eb Mon Sep 17 00:00:00 2001 From: Aaron France Date: Thu, 27 Nov 2014 15:30:47 +0100 Subject: [PATCH 14/20] Lager is the default, always --- rebar.config | 3 ++- src/erlcql.hrl | 22 ---------------------- 2 files changed, 2 insertions(+), 23 deletions(-) diff --git a/rebar.config b/rebar.config index cd51a73..c2736ea 100644 --- a/rebar.config +++ b/rebar.config @@ -2,7 +2,8 @@ %% vi: set ft=erlang : {deps, - [{snappy, "", {git, "https://github.com/rpt/snappy.git"}}, + [{lager, ".*", {git, "https://github.com/basho/lager.git"}}, + {snappy, "", {git, "https://github.com/rpt/snappy.git"}}, {lz4, "", {git, "https://github.com/szktty/erlang-lz4.git"}}, {proper, "", {git, "https://github.com/manopapad/proper.git"}}, {folsomite, ".*", {git, "git://github.com/puzza007/folsomite.git"}}]}. diff --git a/src/erlcql.hrl b/src/erlcql.hrl index aaa5763..de4c38c 100644 --- a/src/erlcql.hrl +++ b/src/erlcql.hrl @@ -184,17 +184,6 @@ -type values() :: [type() | {option(), type()}]. --ifdef(ERLCQL_NO_LOGS). --define(ERROR(Format, Data), begin Format, Data, ok end). --define(EMERGENCY(Format, Data), begin Format, Data, ok end). --define(ALERT(Format, Data), begin Format, Data, ok end). --define(CRITICAL(Format, Data), begin Format, Data, ok end). --define(WARNING(Format, Data), begin Format, Data, ok end). --define(INFO(Format, Data), begin Format, Data, ok end). --define(NOTICE(Format, Data), begin Format, Data, ok end). --define(DEBUG(Format, Data), begin Format, Data, ok end). --else. --ifdef(ERLCQL_LAGER). -compile({parse_transform, lager_transform}). -define(EMERGENCY(Format, Data), lager:emergency(Format, Data)). -define(ALERT(Format, Data), lager:alert(Format, Data)). @@ -204,17 +193,6 @@ -define(NOTICE(Format, Data), lager:notice(Format, Data)). -define(INFO(Format, Data), lager:info(Format, Data)). -define(DEBUG(Format, Data), lager:debug(Format, Data)). --else. --define(ERROR(Format, Data), error_logger:error_msg(Format ++ "~n", Data)). --define(EMERGENCY(Format, Data), ?ERROR(Format, Data)). --define(ALERT(Format, Data), ?ERROR(Format, Data)). --define(CRITICAL(Format, Data), ?ERROR(Format, Data)). --define(WARNING(Format, Data), error_logger:warning_msg(Format ++ "~n", Data)). --define(INFO(Format, Data), error_logger:info_msg(Format ++ "~n", Data)). --define(NOTICE(Format, Data), ?INFO(Format, Data)). --define(DEBUG(Format, Data), ?INFO(Format, Data)). --endif. --endif. -define(EMERGENCY(Format), ?EMERGENCY(Format, [])). -define(ALERT(Format), ?ALERT(Format, [])). From 3259b50cc18b6e150762907ab145e88906db988a Mon Sep 17 00:00:00 2001 From: Aaron France Date: Thu, 27 Nov 2014 15:30:53 +0100 Subject: [PATCH 15/20] Tweak Makefile --- Makefile | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index 5df5186..f4bfe7d 100644 --- a/Makefile +++ b/Makefile @@ -11,26 +11,22 @@ all: deps compile deps: @ rebar get-deps -compile: +compile: deps @ rebar compile dialyze: compile $(PLT) - @ echo "==> (dialyze)" @ dialyzer --plt $(PLT) ebin \ -Wunmatched_returns \ -Wno_undefined_callbacks $(PLT): dialyzer.apps - @ echo "==> (dialyze)" - @ printf "Building $(PLT) file..." @- dialyzer -q --build_plt --output_plt $(PLT) \ --apps $(shell cat $(APPS)) - @ echo " done" wait: @ ./scripts/wait.escript $(DB_HOST) $(DB_PORT) $(TIMEOUT) -test: compile wait +test: deps compile wait @ rebar skip_deps=true ct console: compile From 41e2cf9cc61d37154b3962da5b56cf1495570635 Mon Sep 17 00:00:00 2001 From: Aaron France Date: Thu, 27 Nov 2014 16:36:38 +0100 Subject: [PATCH 16/20] Pin to lager 2.0.3 --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index c2736ea..8289a0d 100644 --- a/rebar.config +++ b/rebar.config @@ -2,7 +2,7 @@ %% vi: set ft=erlang : {deps, - [{lager, ".*", {git, "https://github.com/basho/lager.git"}}, + [{lager, ".*", {git, "https://github.com/basho/lager.git", {tag, "2.0.3"}}}, {snappy, "", {git, "https://github.com/rpt/snappy.git"}}, {lz4, "", {git, "https://github.com/szktty/erlang-lz4.git"}}, {proper, "", {git, "https://github.com/manopapad/proper.git"}}, From 7b859797a352c266f61fe086205fe87e4b7946c6 Mon Sep 17 00:00:00 2001 From: Aaron France Date: Mon, 1 Dec 2014 02:01:00 +0100 Subject: [PATCH 17/20] Use quintana --- rebar.config | 3 +- src/erlcql_client.erl | 18 ++++----- src/erlcql_folsom.erl | 86 ------------------------------------------- 3 files changed, 11 insertions(+), 96 deletions(-) delete mode 100644 src/erlcql_folsom.erl diff --git a/rebar.config b/rebar.config index 8289a0d..64c7250 100644 --- a/rebar.config +++ b/rebar.config @@ -6,6 +6,7 @@ {snappy, "", {git, "https://github.com/rpt/snappy.git"}}, {lz4, "", {git, "https://github.com/szktty/erlang-lz4.git"}}, {proper, "", {git, "https://github.com/manopapad/proper.git"}}, - {folsomite, ".*", {git, "git://github.com/puzza007/folsomite.git"}}]}. + {folsomite, ".*", {git, "git://github.com/puzza007/folsomite.git"}}, + {quintana, ".*", {git, "https://github.com/puzza007/quintana.git"}}]}. {cover_enabled, true}. diff --git a/src/erlcql_client.erl b/src/erlcql_client.erl index 85307e8..a713b41 100644 --- a/src/erlcql_client.erl +++ b/src/erlcql_client.erl @@ -178,7 +178,7 @@ try_connect(#state{database = {Host, Port}, {ok, ready, State3}; {error, Reason} = Error -> ?ERROR("Connection init failed: ~s", [Reason]), - ok = erlcql_folsom:notify_histogram(?CONNECTION_FAILURE_METRIC, 1), + ok = quintana:notify_histogram(?CONNECTION_FAILURE_METRIC, 1), {stop, Error} end; {error, Reason} = Error -> @@ -254,7 +254,7 @@ startup(reconnect, #state{backoff = Backoff, end; startup(Event, State) -> ?ERROR("Bad event (startup): ~p", [Event]), - ok = erlcql_folsom:notify_histogram(?CONNECTION_STARTUP_FAIL_METRIC, 1), + ok = quintana:notify_histogram(?CONNECTION_STARTUP_FAIL_METRIC, 1), {stop, {bad_event, Event}, State}. startup({_Ref, {'query', Query, _}}, _From, State) -> @@ -289,7 +289,7 @@ ready(Event, State) -> ready({_Ref, _}, _From, #state{streams = []} = State) -> ?CRITICAL("Too many requests!"), - ok = erlcql_folsom:notify_histogram(?CONNECTION_STREAMS_EXHAUSTED_METRIC, 1), + ok = quintana:notify_histogram(?CONNECTION_STREAMS_EXHAUSTED_METRIC, 1), {reply, {error, too_many_requests}, ready, State}; ready({Ref, {'query', QueryString, Params}}, {From, _}, #state{version = Version} = State) -> @@ -371,22 +371,22 @@ handle_info({tcp, Socket, Data}, ready, #state{socket = Socket} = State) -> handle_info({tcp_closed, Socket}, _StateName, #state{socket = Socket, auto_reconnect = false} = State) -> ?ERROR("TCP socket ~p closed", [Socket]), - ok = erlcql_folsom:notify_histogram(?CONNECTION_SOCKET_CLOSED, 1), + ok = quintana:notify_histogram(?CONNECTION_SOCKET_CLOSED, 1), {stop, tcp_closed, State}; handle_info({tcp_closed, Socket}, _StateName, #state{socket = Socket, auto_reconnect = true} = State) -> ?WARNING("TCP socket ~p closed", [Socket]), - ok = erlcql_folsom:notify_histogram(?CONNECTION_SOCKET_CLOSED, 1), + ok = quintana:notify_histogram(?CONNECTION_SOCKET_CLOSED, 1), try_again(State); handle_info({tcp_error, Socket, Reason}, _StateName, #state{socket = Socket, auto_reconnect = false} = State) -> ?ERROR("TCP socket ~p error: ~p", [Socket, Reason]), - ok = erlcql_folsom:notify_histogram(?CONNECTION_SOCKET_ERROR, 1), + ok = quintana:notify_histogram(?CONNECTION_SOCKET_ERROR, 1), {stop, {tcp_error, Reason}, State}; handle_info({tcp_error, Socket, Reason}, _StateName, #state{socket = Socket, auto_reconnect = true} = State) -> ?WARNING("TCP socket ~p error: ~p", [Socket, Reason]), - ok = erlcql_folsom:notify_histogram(?CONNECTION_SOCKET_ERROR, 1), + ok = quintana:notify_histogram(?CONNECTION_SOCKET_ERROR, 1), try_again(State); handle_info(Info, StateName, State) -> ?ERROR("Bad info (~s/handle_info): ~p", [StateName, Info]), @@ -658,7 +658,7 @@ parse_response(Data, #state{parser = Parser, {next_state, ready, State2#state{parser = Parser2}}; {error, Reason} -> ?ERROR("Parsing response failed: ~p", [Reason]), - ok = erlcql_folsom:notify_histogram(?CONNECTION_PARSE_ERROR, 1), + ok = quintana:notify_histogram(?CONNECTION_PARSE_ERROR, 1), {stop, Reason, State} end. @@ -680,7 +680,7 @@ handle_response({Stream, Response}, #state{async_ets = AsyncETS} = State) -> send_response(Stream, {Ref, Response2}, Pid, State); [] -> ?WARNING("Unexpected response (~p): ~p", [Stream, Response]), - ok = erlcql_folsom:notify_histogram(?CONNECTION_UNEXPECTED_RESPONSE, 1), + ok = quintana:notify_histogram(?CONNECTION_UNEXPECTED_RESPONSE, 1), State end. diff --git a/src/erlcql_folsom.erl b/src/erlcql_folsom.erl deleted file mode 100644 index d29289f..0000000 --- a/src/erlcql_folsom.erl +++ /dev/null @@ -1,86 +0,0 @@ --module(erlcql_folsom). - --export([notify_counter/1]). --export([notify_counter/2]). --export([notify_gauge/1]). --export([notify_gauge/2]). --export([notify_histogram/1]). --export([notify_histogram/2]). --export([notify_history/1]). --export([notify_history/2]). --export([notify_meter/1]). --export([notify_meter/2]). --export([notify_meter_reader/1]). --export([notify_meter_reader/2]). --export([notify_duration/1]). --export([notify_duration/2]). --export([notify_spiral/1]). --export([notify_spiral/2]). --export([notify_timed/1]). --export([begin_timed/1]). - - -notify_counter(Event) -> - notify(new_counter, Event). -notify_counter(Name, Value) -> - notify(new_counter, Name, Value). - -notify_gauge(Event) -> - notify(new_gauge, Event). -notify_gauge(Name, Value) -> - notify(new_gauge, Name, Value). - -notify_histogram(Event) -> - notify(new_histogram, Event). -notify_histogram(Name, Value) -> - notify(new_histogram, Name, Value). - -notify_history(Event) -> - notify(new_history, Event). -notify_history(Name, Value) -> - notify(new_history, Name, Value). - -notify_meter(Event) -> - notify(new_meter, Event). -notify_meter(Name, Value) -> - notify(new_meter, Name, Value). - -notify_meter_reader(Event) -> - notify(new_meter_reader, Event). -notify_meter_reader(Name, Value) -> - notify(new_meter_reader, Name, Value). - -notify_duration(Event) -> - notify(new_duration, Event). -notify_duration(Name, Value) -> - notify(new_duration, Name, Value). - -notify_spiral(Event) -> - notify(new_spiral, Event). -notify_spiral(Name, Value) -> - notify(new_spiral, Name, Value). - -notify_timed(Timer) -> - case folsom_metrics:safely_histogram_timed_notify(Timer) of - {error, Name, nonexistent_metric} -> - folsom_metrics:new_histogram(Name), - folsom_metrics:safely_histogram_timed_notify(Timer); - ok -> - ok - end. - -begin_timed(Name) -> - folsom_metrics:histogram_timed_begin(Name). - -notify(Fun, {Name, Value}) -> - notify(Fun, Name, Value). - -notify(Fun, Name, Value) -> - case folsom_metrics:safely_notify(Name, Value) of - {error, Name, nonexistent_metric} -> - folsom_metrics:Fun(Name), - folsom_metrics:safely_notify(Name, Value); - ok -> - ok - end. - From 7c9268455cc0607d0a6fb6d46b28e60e8484bbb3 Mon Sep 17 00:00:00 2001 From: Aaron France Date: Fri, 5 Dec 2014 16:19:45 +0100 Subject: [PATCH 18/20] Fix bad reference to erlcql_folsom --- src/erlcql.app.src | 2 +- src/erlcql_client.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/erlcql.app.src b/src/erlcql.app.src index 523daee..21e6c8e 100644 --- a/src/erlcql.app.src +++ b/src/erlcql.app.src @@ -1,6 +1,6 @@ {application, erlcql, [{description, "Cassandra native protocol CQL client"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [kernel, stdlib, diff --git a/src/erlcql_client.erl b/src/erlcql_client.erl index a713b41..02b485f 100644 --- a/src/erlcql_client.erl +++ b/src/erlcql_client.erl @@ -637,7 +637,7 @@ do_await({Ref, _Pid, _Stream}, Timeout) -> {Ref, Response} -> Response after Timeout -> - ok = erclql_folsom:notify_histogram(?CONNECTION_QUERY_TIMEOUT, 1), + ok = quintana:notify_histogram(?CONNECTION_QUERY_TIMEOUT, 1), {error, timeout} end. From 2b38f6e4d74f25a4936b7faed644b2a3672cef5c Mon Sep 17 00:00:00 2001 From: Aaron France Date: Fri, 5 Dec 2014 16:41:57 +0100 Subject: [PATCH 19/20] Regulate versions --- src/erlcql.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/erlcql.app.src b/src/erlcql.app.src index 21e6c8e..2b76fa6 100644 --- a/src/erlcql.app.src +++ b/src/erlcql.app.src @@ -1,6 +1,6 @@ {application, erlcql, [{description, "Cassandra native protocol CQL client"}, - {vsn, "0.2.2"}, + {vsn, "0.2.4"}, {registered, []}, {applications, [kernel, stdlib, From 77020133874003005d7a72c4c94731e749ac14f5 Mon Sep 17 00:00:00 2001 From: Aaron France Date: Fri, 5 Dec 2014 17:39:41 +0100 Subject: [PATCH 20/20] Hibernate after sending a response Reasoning: * The processes themselves do relatively little * The processes are typically *pooled* and thus aren't doing much of an application's whole workload. * Memory usage with this change does not register --- src/erlcql.app.src | 2 +- src/erlcql_client.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/erlcql.app.src b/src/erlcql.app.src index 2b76fa6..b996f30 100644 --- a/src/erlcql.app.src +++ b/src/erlcql.app.src @@ -1,6 +1,6 @@ {application, erlcql, [{description, "Cassandra native protocol CQL client"}, - {vsn, "0.2.4"}, + {vsn, "0.2.5"}, {registered, []}, {applications, [kernel, stdlib, diff --git a/src/erlcql_client.erl b/src/erlcql_client.erl index 02b485f..41e7a34 100644 --- a/src/erlcql_client.erl +++ b/src/erlcql_client.erl @@ -655,7 +655,7 @@ parse_response(Data, #state{parser = Parser, case erlcql_decode:parse(Data, Parser, Compression) of {ok, Responses, Parser2} -> State2 = handle_responses(Responses, State), - {next_state, ready, State2#state{parser = Parser2}}; + {next_state, ready, State2#state{parser = Parser2}, hibernate}; {error, Reason} -> ?ERROR("Parsing response failed: ~p", [Reason]), ok = quintana:notify_histogram(?CONNECTION_PARSE_ERROR, 1),