forked from mailerlite/laravel-elasticsearch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFactory.php
183 lines (163 loc) · 7.12 KB
/
Factory.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
<?php namespace Cviebrock\LaravelElasticsearch;
use Elasticsearch\Client;
use Elasticsearch\ClientBuilder;
use GuzzleHttp\Psr7\Request;
use GuzzleHttp\Psr7\Uri;
use GuzzleHttp\Ring\Future\CompletedFutureArray;
use Illuminate\Support\Arr;
use Illuminate\Support\Reflector;
use Psr\Http\Message\ResponseInterface;
use Psr\Log\LoggerInterface;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
class Factory
{
/**
* Map configuration array keys with ES ClientBuilder setters
*
* @var array
*/
protected $configMappings = [
'sslVerification' => 'setSSLVerification',
'sniffOnStart' => 'setSniffOnStart',
'retries' => 'setRetries',
'httpHandler' => 'setHandler',
'connectionPool' => 'setConnectionPool',
'connectionSelector' => 'setSelector',
'serializer' => 'setSerializer',
'connectionFactory' => 'setConnectionFactory',
'endpoint' => 'setEndpoint',
'namespaces' => 'registerNamespace',
];
/**
* Make the Elasticsearch client for the given named configuration, or
* the default client.
*
* @param array $config
*
* @return \Elasticsearch\Client
*/
public function make(array $config): Client
{
return $this->buildClient($config);
}
/**
* Build and configure an Elasticsearch client.
*
* @param array $config
*
* @return \Elasticsearch\Client
*/
protected function buildClient(array $config): Client
{
$clientBuilder = ClientBuilder::create();
// Configure hosts
$clientBuilder->setHosts($config['hosts']);
// Configure logging
if (Arr::get($config, 'logging')) {
$logObject = Arr::get($config, 'logObject');
$logPath = Arr::get($config, 'logPath');
$logLevel = Arr::get($config, 'logLevel');
if ($logObject && $logObject instanceof LoggerInterface) {
$clientBuilder->setLogger($logObject);
} elseif ($logPath && $logLevel) {
$handler = new StreamHandler($logPath, $logLevel);
$logObject = new Logger('log');
$logObject->pushHandler($handler);
$clientBuilder->setLogger($logObject);
}
}
// Configure tracer
if ($tracer = Arr::get($config, 'tracer')) {
$clientBuilder->setTracer(app($tracer));
}
// Set additional client configuration
foreach ($this->configMappings as $key => $method) {
$value = Arr::get($config, $key);
if (is_array($value)) {
foreach ($value as $vItem) {
$clientBuilder->$method($vItem);
}
} elseif ($value !== null) {
$clientBuilder->$method($value);
}
}
// Configure handlers for any AWS hosts
foreach ($config['hosts'] as $host) {
if (isset($host['aws']) && $host['aws']) {
$clientBuilder->setHandler(function(array $request) use ($host) {
$psr7Handler = \Aws\default_http_handler();
$signer = new \Aws\Signature\SignatureV4('es', $host['aws_region']);
$request['headers']['Host'][0] = parse_url($request['headers']['Host'][0])['host'] ?? $request['headers']['Host'][0];
// Create a PSR-7 request from the array passed to the handler
$psr7Request = new Request(
$request['http_method'],
(new Uri($request['uri']))
->withScheme($request['scheme'])
->withPort($host['port'])
->withHost($request['headers']['Host'][0]),
$request['headers'],
$request['body']
);
// Create the Credentials instance with the credentials from the environment
$credentials = new \Aws\Credentials\Credentials(
$host['aws_key'],
$host['aws_secret'],
$host['aws_session_token'] ?? null
);
// check if the aws_credentials from config is set and if it contains a Credentials instance
if (!empty($host['aws_credentials']) && $host['aws_credentials'] instanceof \Aws\Credentials\Credentials) {
// Set the credentials as in config
$credentials = $host['aws_credentials'];
}
// If the aws_credentials is an array try using it as a static method of the class
if (
!empty($host['aws_credentials'])
&& is_array($host['aws_credentials'])
&& Reflector::isCallable($host['aws_credentials'], true)
) {
$host['aws_credentials'] = call_user_func([$host['aws_credentials'][0], $host['aws_credentials'][1]]);
}
if (!empty($host['aws_credentials']) && $host['aws_credentials'] instanceof \Closure) {
// If it contains a closure you can obtain the credentials by invoking it
$credentials = $host['aws_credentials']()->wait();
}
// Sign the PSR-7 request
$signedRequest = $signer->signRequest(
$psr7Request,
$credentials
);
// Get curl stats
$http_stats = new class {
public $data = [];
public function __invoke(...$args)
{
$this->data = $args[0];
}
};
// Send the signed request to Amazon ES
$response = $psr7Handler($signedRequest, ['http_stats_receiver' => $http_stats])
->then(function(ResponseInterface $response) {
return $response;
}, function($error) {
return $error['response'];
})
->wait();
// Convert the PSR-7 response to a RingPHP response
return new CompletedFutureArray([
'status' => $response->getStatusCode(),
'headers' => $response->getHeaders(),
'body' => $response->getBody()->detach(),
'transfer_stats' => [
'total_time' => $http_stats->data['total_time'] ?? 0,
'primary_port' => $http_stats->data['primary_port'] ?? '',
],
'effective_url' => (string) $psr7Request->getUri(),
]);
});
}
}
// Build and return the client
return $clientBuilder->build();
}
}