diff --git a/README.md b/README.md index ef306b2..995bafc 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ See the full [API docs](https://hexdocs.pm/snap). - Streaming bulk operations - Connection pooling - Telemetry events +- High level interface over the [Multi Search API](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-multi-search.html) ## Installation diff --git a/lib/snap/multi/multi.ex b/lib/snap/multi/multi.ex new file mode 100644 index 0000000..a675e25 --- /dev/null +++ b/lib/snap/multi/multi.ex @@ -0,0 +1,94 @@ +defmodule Snap.Multi do + @moduledoc """ + Provides a high level abstraction over the [Multi Search + API](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-multi-search.html) + which allows the user to perform multiple searches in a single request. + + Example usage: + + Multi.new() + |> Multi.add(query_1, id: "query-1") + |> Multi.add(query_2, id: "query-2") + |> Multi.run(Cluster, index) + + This returns a `Snap.Multi.Response`, with a `searches` field containing a + list of responses. + + Each query can be named, using any value you like, by passing an `id: "foo"` + into `Multi.add`. The list in `Snap.Multi.Response` contains tuple pairs where + the first value is the ID and the second is the result of the query. + + If you choose not to name a query the ID in the tuple will be `nil`. + + If your query IDs are unique you can convert this to a `Map` for easy lookup + later using `Enum.into(response.searches, %{})`. + """ + + defstruct searches: [] + + @type t :: %__MODULE__{searches: list()} + + alias Snap.Multi.Response + alias Snap.Multi.Search + + @doc """ + Build a `Snap.Multi` request. + """ + @spec new() :: t() + def new() do + %__MODULE__{} + end + + @doc """ + Append to a `Snap.Multi` request. The `body` is required. If you pass an `id` + into the headers, this will be used to name the query in the responses list + and won't be passed through as a header in the request. + """ + @spec add(t(), map(), Keyword.t()) :: t() + def add(%__MODULE__{} = multi, body, headers \\ []) do + {id, headers} = Keyword.pop(headers, :id) + search = %Search{id: id, body: body, headers: headers} + + %__MODULE__{multi | searches: multi.searches ++ [search]} + end + + @doc """ + Perform the `Snap.Multi` request. This returns `{:ok, Snap.Multi.Response}` or + an error. + """ + @spec run(t(), atom(), String.t(), Keyword.t(), Keyword.t(), Keyword.t()) :: + {:ok, Snap.Multi.Response.t()} | {:error, Snap.Cluster.error()} + def run(%__MODULE__{} = multi, cluster, index_or_alias, params \\ [], headers \\ [], opts \\ []) do + ids = build_ids(multi.searches) + body = encode(multi) + headers = headers ++ [{"content-type", "application/x-ndjson"}] + + case cluster.post("/#{index_or_alias}/_msearch", body, params, headers, opts) do + {:ok, response} -> {:ok, Response.new(response, ids)} + err -> err + end + end + + defp encode(%__MODULE__{} = multi) do + multi.searches + |> Enum.flat_map(&encode_search/1) + end + + defp encode_search(%Search{headers: headers, body: body}) do + [encode_headers(headers), "\n", encode_body(body), "\n"] + end + + defp encode_headers(headers) do + headers + |> Enum.into(%{}) + |> Jason.encode!(pretty: false) + end + + defp encode_body(body) do + Jason.encode!(body, pretty: false) + end + + defp build_ids(searches) do + Enum.map(searches, & &1.id) + end +end diff --git a/lib/snap/multi/response.ex b/lib/snap/multi/response.ex new file mode 100644 index 0000000..a8ad962 --- /dev/null +++ b/lib/snap/multi/response.ex @@ -0,0 +1,22 @@ +defmodule Snap.Multi.Response do + @moduledoc """ + Represents a successful response for a `Snap.Multi` request. + """ + defstruct [:searches, :took] + + @type t :: %__MODULE__{ + searches: list({atom(), Snap.SearchResponse.t()}) + } + + alias Snap.SearchResponse + + @doc false + def new(body, ids) do + responses = Enum.map(body["responses"], &SearchResponse.new/1) + took = body["took"] + + searches = Enum.zip(ids, responses) + + %__MODULE__{searches: searches, took: took} + end +end diff --git a/lib/snap/multi/search.ex b/lib/snap/multi/search.ex new file mode 100644 index 0000000..1dcbb3c --- /dev/null +++ b/lib/snap/multi/search.ex @@ -0,0 +1,4 @@ +defmodule Snap.Multi.Search do + @moduledoc false + defstruct [:id, :headers, :body] +end diff --git a/test/multi/multi_test.exs b/test/multi/multi_test.exs new file mode 100644 index 0000000..13b782a --- /dev/null +++ b/test/multi/multi_test.exs @@ -0,0 +1,54 @@ +defmodule Snap.MultiTest do + use Snap.IntegrationCase + + alias Snap.Bulk.Action + alias Snap.Multi + alias Snap.Test.Cluster + + setup_all do + {:ok, _} = Snap.Indexes.create(Cluster, @test_index, %{}) + + 1..5 + |> Enum.map(fn i -> + doc = %{"title" => "Document #{i}"} + + %Action.Index{_id: i, doc: doc} + end) + |> Snap.Bulk.perform(Cluster, @test_index, refresh: :wait_for) + end + + test "simple multi search" do + {:ok, %Multi.Response{} = response} = + Multi.new() + |> Multi.add(%{query: %{query_string: %{query: "Document 1"}}}) + |> Multi.add(%{query: %{query_string: %{query: "Document 2"}}}) + |> Multi.run(Cluster, @test_index) + + assert length(response.searches) == 2 + end + + test "multi search with headers" do + {:ok, %Multi.Response{} = response} = + Multi.new() + |> Multi.add(%{query: %{query_string: %{query: "Document 1"}}}, request_cache: false) + |> Multi.add(%{query: %{query_string: %{query: "Document 2"}}}) + |> Multi.run(Cluster, @test_index) + + assert length(response.searches) == 2 + end + + test "multi search with named queries" do + {:ok, %Multi.Response{} = response} = + Multi.new() + |> Multi.add(%{query: %{query_string: %{query: "Document 1"}}}, id: "foo") + |> Multi.add(%{query: %{query_string: %{query: "Document 2"}}}, id: "bar") + |> Multi.run(Cluster, @test_index) + + assert length(response.searches) == 2 + + searches_map = Enum.into(response.searches, %{}) + + assert Map.has_key?(searches_map, "foo") + assert Map.has_key?(searches_map, "bar") + end +end diff --git a/test/support/integration_case.ex b/test/support/integration_case.ex index aa603b6..4ee8cd9 100644 --- a/test/support/integration_case.ex +++ b/test/support/integration_case.ex @@ -6,8 +6,18 @@ defmodule Snap.IntegrationCase do @prefix "snap-test" + setup_all do + clear_indexes() + end + # Clean out any indexes remaining after each test run setup do + on_exit(fn -> + clear_indexes() + end) + end + + defp clear_indexes() do {:ok, indexes} = Snap.Indexes.list(Cluster) indexes