Skip to content

Commit

Permalink
Provide higher level API into Multi Search
Browse files Browse the repository at this point in the history
  • Loading branch information
tomtaylor committed Sep 6, 2022
1 parent e21033c commit c6d2afe
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ See the full [API docs](https://hexdocs.pm/snap).
- Streaming bulk operations
- Connection pooling
- Telemetry events
- High level interface over the [Multi Search API](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-multi-search.html)

## Installation

Expand Down
94 changes: 94 additions & 0 deletions lib/snap/multi/multi.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
defmodule Snap.Multi do
@moduledoc """
Provides a high level abstraction over the [Multi Search
API](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-multi-search.html)
which allows the user to perform multiple searches in a single request.
Example usage:
Multi.new()
|> Multi.add(query_1, id: "query-1")
|> Multi.add(query_2, id: "query-2")
|> Multi.run(Cluster, index)
This returns a `Snap.Multi.Response`, with a `searches` field containing a
list of responses.
Each query can be named, using any value you like, by passing an `id: "foo"`
into `Multi.add`. The list in `Snap.Multi.Response` contains tuple pairs where
the first value is the ID and the second is the result of the query.
If you choose not to name a query the ID in the tuple will be `nil`.
If your query IDs are unique you can convert this to a `Map` for easy lookup
later using `Enum.into(response.searches, %{})`.
"""

defstruct searches: []

@type t :: %__MODULE__{searches: list()}

alias Snap.Multi.Response
alias Snap.Multi.Search

@doc """
Build a `Snap.Multi` request.
"""
@spec new() :: t()
def new() do
%__MODULE__{}
end

@doc """
Append to a `Snap.Multi` request. The `body` is required. If you pass an `id`
into the headers, this will be used to name the query in the responses list
and won't be passed through as a header in the request.
"""
@spec add(t(), map(), Keyword.t()) :: t()
def add(%__MODULE__{} = multi, body, headers \\ []) do
{id, headers} = Keyword.pop(headers, :id)
search = %Search{id: id, body: body, headers: headers}

%__MODULE__{multi | searches: multi.searches ++ [search]}
end

@doc """
Perform the `Snap.Multi` request. This returns `{:ok, Snap.Multi.Response}` or
an error.
"""
@spec run(t(), atom(), String.t(), Keyword.t(), Keyword.t(), Keyword.t()) ::
{:ok, Snap.Multi.Response.t()} | {:error, Snap.Cluster.error()}
def run(%__MODULE__{} = multi, cluster, index_or_alias, params \\ [], headers \\ [], opts \\ []) do
ids = build_ids(multi.searches)
body = encode(multi)
headers = headers ++ [{"content-type", "application/x-ndjson"}]

case cluster.post("/#{index_or_alias}/_msearch", body, params, headers, opts) do
{:ok, response} -> {:ok, Response.new(response, ids)}
err -> err
end
end

defp encode(%__MODULE__{} = multi) do
multi.searches
|> Enum.flat_map(&encode_search/1)
end

defp encode_search(%Search{headers: headers, body: body}) do
[encode_headers(headers), "\n", encode_body(body), "\n"]
end

defp encode_headers(headers) do
headers
|> Enum.into(%{})
|> Jason.encode!(pretty: false)
end

defp encode_body(body) do
Jason.encode!(body, pretty: false)
end

defp build_ids(searches) do
Enum.map(searches, & &1.id)
end
end
22 changes: 22 additions & 0 deletions lib/snap/multi/response.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Snap.Multi.Response do
@moduledoc """
Represents a successful response for a `Snap.Multi` request.
"""
defstruct [:searches, :took]

@type t :: %__MODULE__{
searches: list({atom(), Snap.SearchResponse.t()})
}

alias Snap.SearchResponse

@doc false
def new(body, ids) do
responses = Enum.map(body["responses"], &SearchResponse.new/1)
took = body["took"]

searches = Enum.zip(ids, responses)

%__MODULE__{searches: searches, took: took}
end
end
4 changes: 4 additions & 0 deletions lib/snap/multi/search.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
defmodule Snap.Multi.Search do
@moduledoc false
defstruct [:id, :headers, :body]
end
54 changes: 54 additions & 0 deletions test/multi/multi_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
defmodule Snap.MultiTest do
use Snap.IntegrationCase

alias Snap.Bulk.Action
alias Snap.Multi
alias Snap.Test.Cluster

setup_all do
{:ok, _} = Snap.Indexes.create(Cluster, @test_index, %{})

1..5
|> Enum.map(fn i ->
doc = %{"title" => "Document #{i}"}

%Action.Index{_id: i, doc: doc}
end)
|> Snap.Bulk.perform(Cluster, @test_index, refresh: :wait_for)
end

test "simple multi search" do
{:ok, %Multi.Response{} = response} =
Multi.new()
|> Multi.add(%{query: %{query_string: %{query: "Document 1"}}})
|> Multi.add(%{query: %{query_string: %{query: "Document 2"}}})
|> Multi.run(Cluster, @test_index)

assert length(response.searches) == 2
end

test "multi search with headers" do
{:ok, %Multi.Response{} = response} =
Multi.new()
|> Multi.add(%{query: %{query_string: %{query: "Document 1"}}}, request_cache: false)
|> Multi.add(%{query: %{query_string: %{query: "Document 2"}}})
|> Multi.run(Cluster, @test_index)

assert length(response.searches) == 2
end

test "multi search with named queries" do
{:ok, %Multi.Response{} = response} =
Multi.new()
|> Multi.add(%{query: %{query_string: %{query: "Document 1"}}}, id: "foo")
|> Multi.add(%{query: %{query_string: %{query: "Document 2"}}}, id: "bar")
|> Multi.run(Cluster, @test_index)

assert length(response.searches) == 2

searches_map = Enum.into(response.searches, %{})

assert Map.has_key?(searches_map, "foo")
assert Map.has_key?(searches_map, "bar")
end
end
10 changes: 10 additions & 0 deletions test/support/integration_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,18 @@ defmodule Snap.IntegrationCase do

@prefix "snap-test"

setup_all do
clear_indexes()
end

# Clean out any indexes remaining after each test run
setup do
on_exit(fn ->
clear_indexes()
end)
end

defp clear_indexes() do
{:ok, indexes} = Snap.Indexes.list(Cluster)

indexes
Expand Down

0 comments on commit c6d2afe

Please sign in to comment.