Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
tomtaylor committed Dec 29, 2020
0 parents commit d9771f7
Show file tree
Hide file tree
Showing 13 changed files with 354 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
32 changes: 32 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where third-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
elasticsearcher-*.tar

# Temporary files for e.g. tests
/tmp

# VSCode workspace
/.vscode

# Elixir Language Server
/.elixir_ls
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Elasticsearcher

**TODO: Add description**

## Installation

If [available in Hex](https://hex.pm/docs/publish), the package can be installed
by adding `elasticsearcher` to your list of dependencies in `mix.exs`:

```elixir
def deps do
[
{:elasticsearcher, "~> 0.1.0"}
]
end
```

Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
be found at [https://hexdocs.pm/elasticsearcher](https://hexdocs.pm/elasticsearcher).

18 changes: 18 additions & 0 deletions lib/elasticsearcher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Elasticsearcher do
def get(cluster, path) do
cluster.with_connection(fn pid ->
Elasticsearcher.Connection.request(pid, "GET", path, [], nil)
end)
|> parse_response()
end

defp parse_response(response) do
case response do
{:ok, %{data: data}} ->
Jason.decode(data)

err ->
err
end
end
end
34 changes: 34 additions & 0 deletions lib/elasticsearcher/cluster.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Elasticsearcher.Cluster do
defmacro __using__(opts) do
quote do
def init(config) do
{:ok, config}
end

defoverridable init: 1

def with_connection(fun) do
Elasticsearcher.Cluster.Supervisor.with_connection(__MODULE__, fun)
end

def config() do
Elasticsearcher.Cluster.Supervisor.config(__MODULE__)
end

def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
type: :supervisor
}
end

def start_link(config \\ []) do
otp_app = unquote(opts[:opt_app])
config = Application.get_env(otp_app, __MODULE__, config)

Elasticsearcher.Cluster.Supervisor.start_link(__MODULE__, otp_app, config)
end
end
end
end
21 changes: 21 additions & 0 deletions lib/elasticsearcher/config.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule Elasticsearcher.Config do
use GenServer

def start_link({name, config}) do
GenServer.start_link(__MODULE__, config, name: name)
end

def get(pid) do
GenServer.call(pid, :get)
end

## Callbacks

def init(config) do
{:ok, config}
end

def handle_call(:get, _from, config) do
{:reply, config, config}
end
end
92 changes: 92 additions & 0 deletions lib/elasticsearcher/connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
defmodule Elasticsearcher.Connection do
use GenServer

require Logger

defstruct [:conn, requests: %{}]

def start_link(opts) do
scheme = Keyword.fetch!(opts, :scheme)
host = Keyword.fetch!(opts, :host)
port = Keyword.fetch!(opts, :port)

GenServer.start_link(__MODULE__, {scheme, host, port})
end

def request(pid, method, path, headers, body) do
GenServer.call(pid, {:request, method, path, headers, body})
end

## Callbacks

@impl true
def init({scheme, host, port}) do
case Mint.HTTP.connect(scheme, host, port) do
{:ok, conn} ->
state = %__MODULE__{conn: conn}
{:ok, state}

{:error, reason} ->
{:stop, reason}
end
end

@impl true
def handle_call({:request, method, path, headers, body}, from, state) do
# In both the successful case and the error case, we make sure to update the connection
# struct in the state since the connection is an immutable data structure.
case Mint.HTTP.request(state.conn, method, path, headers, body) do
{:ok, conn, request_ref} ->
state = put_in(state.conn, conn)
# We store the caller this request belongs to and an empty map as the response.
# The map will be filled with status code, headers, and so on.
state = put_in(state.requests[request_ref], %{from: from, response: %{}})
{:noreply, state}

{:error, conn, reason} ->
state = put_in(state.conn, conn)
{:reply, {:error, reason}, state}
end
end

@impl true
def handle_info(message, state) do
# We should handle the error case here as well, but we're omitting it for brevity.
case Mint.HTTP.stream(state.conn, message) do
:unknown ->
_ = Logger.error(fn -> "Received unknown message: " <> inspect(message) end)
{:noreply, state}

{:ok, conn, responses} ->
state = put_in(state.conn, conn)
state = Enum.reduce(responses, state, &process_response/2)
{:noreply, state}
end
end

defp process_response({:status, request_ref, status}, state) do
put_in(state.requests[request_ref].response[:status], status)
end

defp process_response({:headers, request_ref, headers}, state) do
put_in(state.requests[request_ref].response[:headers], headers)
end

defp process_response({:data, request_ref, new_data}, state) do
update_in(state.requests[request_ref].response[:data], fn data -> (data || "") <> new_data end)
end

# When the request is done, we use GenServer.reply/2 to reply to the caller that was
# blocked waiting on this request.
defp process_response({:done, request_ref}, state) do
{%{response: response, from: from}, state} = pop_in(state.requests[request_ref])
GenServer.reply(from, {:ok, response})
state
end

defp process_response({:error, request_ref, reason}, state) do
{%{from: from}, state} = pop_in(state.requests[request_ref])
GenServer.reply(from, {:error, reason})
state
end
end
69 changes: 69 additions & 0 deletions lib/elasticsearcher/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
defmodule Elasticsearcher.Cluster.Supervisor do
use Supervisor
@timeout 60_000

def start_link(cluster, otp_app, config) do
Supervisor.start_link(__MODULE__, {cluster, otp_app, config}, name: cluster)
end

def with_connection(cluster, fun) do
:poolboy.transaction(
pool_name(cluster),
fun,
@timeout
)
end

def config(cluster) do
Elasticsearcher.Config.get(config_name(cluster))
end

## Callbacks

@doc false
@impl Supervisor
def init({cluster, _otp_app, config}) do
children = [
:poolboy.child_spec(
pool_name(cluster),
poolboy_child_spec(cluster, config),
poolboy_worker_args(config)
),
{Elasticsearcher.Config, {config_name(cluster), config}}
]

Supervisor.init(children, strategy: :one_for_one)
end

defp poolboy_child_spec(cluster, config) do
size = Map.get(config, :pool_size, 5)
overflow = Map.get(config, :pool_overflow, 2)

[
name: {:local, pool_name(cluster)},
worker_module: Elasticsearcher.Connection,
size: size,
overflow: overflow
]
end

defp poolboy_worker_args(config) do
url = Map.get(config, :url, "http://localhost:9200")
%URI{host: host, port: port, scheme: scheme} = URI.parse(url)
scheme = String.to_atom(scheme)

[
scheme: scheme,
host: host,
port: port
]
end

defp pool_name(cluster) do
Module.concat(cluster, Pool)
end

defp config_name(cluster) do
Module.concat(cluster, Config)
end
end
34 changes: 34 additions & 0 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Elasticsearcher.MixProject do
use Mix.Project

def project do
[
app: :elasticsearcher,
version: "0.1.0",
elixir: "~> 1.11",
start_permanent: Mix.env() == :prod,
elixirc_paths: elixirc_paths(Mix.env()),
deps: deps()
]
end

# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger]
]
end

defp elixirc_paths(env) when env in ~w(test dev)a, do: ["lib", "test/support"]
defp elixirc_paths(_), do: ["lib"]

# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:mint, "~> 1.0"},
{:castore, "~> 0.1.0"},
{:poolboy, "~> 1.5.1"},
{:jason, "~> 1.0"}
]
end
end
6 changes: 6 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
%{
"castore": {:hex, :castore, "0.1.8", "1b61eaba71bb755b756ac42d4741f4122f8beddb92456a84126d6177ec0af1fc", [:mix], [], "hexpm", "23ab8305baadb057bc689adc0088309f808cb2247dc9a48b87849bb1d242bb6c"},
"jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"},
"mint": {:hex, :mint, "1.2.0", "65e9d75c60c456a5fb1b800febb88f061f56157d103d755b99fcaeaeb3e956f3", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "19cbb3a5be91b7df4a35377ba94b26199481a541add055cf5d1d4299b55125ab"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
}
15 changes: 15 additions & 0 deletions test/elasticsearcher_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule ElasticsearcherTest do
use ExUnit.Case

alias Elasticsearcher.Test.Cluster

test "getting server status" do
{:ok, %{"status" => status}} = Elasticsearcher.get(Cluster, "/_cluster/health")
assert not is_nil(status)
end

test "getting config" do
%{url: url} = Cluster.config()
assert url == "http://localhost:9200"
end
end
3 changes: 3 additions & 0 deletions test/support/cluster.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
defmodule Elasticsearcher.Test.Cluster do
use Elasticsearcher.Cluster, otp_app: :elasticsearcher
end
5 changes: 5 additions & 0 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ExUnit.start()

url = "http://localhost:9200"

{:ok, _} = Elasticsearcher.Test.Cluster.start_link(%{url: url})

0 comments on commit d9771f7

Please sign in to comment.