Skip to content

Commit

Permalink
Merge remote-tracking branch 'github/pr/656' into main
Browse files Browse the repository at this point in the history
Local branch v2.0-clientoptions-aggregateconnections
  • Loading branch information
nrk committed Sep 10, 2020
2 parents d954e9f + 9e48aef commit 985343a
Show file tree
Hide file tree
Showing 11 changed files with 892 additions and 251 deletions.
38 changes: 6 additions & 32 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use Predis\Configuration\OptionsInterface;
use Predis\Connection\ConnectionInterface;
use Predis\Connection\ParametersInterface;
use Predis\Connection\Replication\SentinelReplication;
use Predis\Monitor\Consumer as MonitorConsumer;
use Predis\Pipeline\Pipeline;
use Predis\PubSub\Consumer as PubSubConsumer;
Expand Down Expand Up @@ -126,14 +125,12 @@ protected function createConnection($parameters)
if (is_array($parameters)) {
if (!isset($parameters[0])) {
return $options->connections->create($parameters);
}

if ($options->defined('cluster')) {
return $this->createAggregateConnection($parameters, 'cluster');
} elseif ($options->defined('replication')) {
return $this->createAggregateConnection($parameters, 'replication');
} elseif ($options->defined('aggregate')) {
return $this->createAggregateConnection($parameters, 'aggregate');
} elseif ($options->defined('cluster') && $initializer = $options->cluster) {
return $initializer($parameters, true);
} elseif ($options->defined('replication') && $initializer = $options->replication) {
return $initializer($parameters, true);
} elseif ($options->defined('aggregate') && $initializer = $options->aggregate) {
return $initializer($parameters, false);
} else {
throw new \InvalidArgumentException(
'Array of connection parameters requires `cluster`, `replication` or `aggregate` client option'
Expand All @@ -154,29 +151,6 @@ protected function createConnection($parameters)
throw new \InvalidArgumentException('Invalid type for connection parameters');
}

/**
* Creates an aggregate connection.
*
* @param mixed $parameters Connection parameters.
* @param string $option Option for aggregate connections (`aggregate`, `cluster`, `replication`).
*
* @return \Closure
*/
protected function createAggregateConnection($parameters, $option)
{
$options = $this->getOptions();

$initializer = $options->$option;
$connection = $initializer($parameters);

// TODO: this is dirty but we must skip the redis-sentinel backend for now.
if ($option !== 'aggregate' && !$connection instanceof SentinelReplication) {
$options->connections->aggregate($connection, $parameters);
}

return $connection;
}

/**
* {@inheritdoc}
*/
Expand Down
87 changes: 65 additions & 22 deletions src/Configuration/Option/Aggregate.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,56 +11,99 @@

namespace Predis\Configuration\Option;

use InvalidArgumentException;
use Predis\Configuration\OptionInterface;
use Predis\Configuration\OptionsInterface;
use Predis\Connection\AggregateConnectionInterface;
use Predis\Connection\NodeConnectionInterface;

/**
* Configures an aggregate connection used for clustering
* multiple Redis nodes using various implementations with
* different algorithms or strategies.
* Client option for configuring generic aggregate connections.
*
* The only value accepted by this option is a callable that must return a valid
* connection instance of Predis\Connection\AggregateConnectionInterface when
* invoked by the client to create a new aggregate connection instance.
*
* Creation and configuration of the aggregate connection is up to the user.
*
* @author Daniele Alessandri <[email protected]>
*/
class Aggregate implements OptionInterface
{
/**
* Wraps a callable to ensure that the returned value is a valid connection.
*
* @param OptionsInterface $options Client options.
* @param mixed $callable Callable initializer.
*
* @return \Closure
* {@inheritdoc}
*/
protected function getConnectionInitializer(OptionsInterface $options, $callable)
public function filter(OptionsInterface $options, $value)
{
if (!is_callable($callable)) {
$class = get_called_class();

throw new \InvalidArgumentException("$class expects a valid callable");
if (!is_callable($value)) {
throw new InvalidArgumentException(sprintf(
'%s expects a callable object acting as an aggregate connection initializer',
static::class
));
}

$option = $this;
return $this->getConnectionInitializer($options, $value);
}

return function ($parameters = null) use ($callable, $options, $option) {
$connection = call_user_func($callable, $options, $parameters);
/**
* Wraps a user-supplied callable used to create a new aggregate connection.
*
* When the original callable acting as a connection initializer is executed
* by the client to create a new aggregate connection, it will receive the
* following arguments:
*
* - $parameters (same as passed to Predis\Client::__construct())
* - $options (options container, Predis\Configuration\OptionsInterface)
* - $option (current option, Predis\Configuration\OptionInterface)
*
* The original callable must return a valid aggregation connection instance
* of type Predis\Connection\AggregateConnectionInterface, this is enforced
* by the wrapper returned by this method and an exception is thrown when
* invalid values are returned.
*
* @param OptionsInterface $options Client options
* @param callable $callable Callable initializer
*
* @throws InvalidArgumentException
*
* @return callable
*/
protected function getConnectionInitializer(OptionsInterface $options, callable $callable)
{
return function ($parameters = null, $autoaggregate = false) use ($callable, $options) {
$connection = call_user_func_array($callable, [&$parameters, $options, $this]);

if (!$connection instanceof AggregateConnectionInterface) {
$class = get_class($option);
throw new InvalidArgumentException(sprintf(
'%s expects the supplied callable to return an instance of %s, but %s was returned',
static::class,
AggregateConnectionInterface::class,
is_object($connection) ? get_class($connection) : gettype($connection)
));
}

throw new \InvalidArgumentException("$class expects a valid connection type returned by callable initializer");
if ($parameters && $autoaggregate) {
static::aggregate($options, $connection, $parameters);
}

return $connection;
};
}

/**
* {@inheritdoc}
* Adds single connections to an aggregate connection instance.
*
* @param OptionsInterface $options Client options
* @param AggregateConnectionInterface $connection Target aggregate connection
* @param array $nodes List of nodes to be added to the target aggregate connection
*/
public function filter(OptionsInterface $options, $value)
public static function aggregate(OptionsInterface $options, AggregateConnectionInterface $connection, array $nodes)
{
return $this->getConnectionInitializer($options, $value);
$connections = $options->connections;

foreach ($nodes as $node) {
$connection->add($node instanceof NodeConnectionInterface ? $node : $connections->create($node));
}
}

/**
Expand Down
76 changes: 51 additions & 25 deletions src/Configuration/Option/Cluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Predis\Configuration\Option;

use InvalidArgumentException;
use Predis\Cluster\RedisStrategy;
use Predis\Configuration\OptionsInterface;
use Predis\Connection\Cluster\PredisCluster;
Expand All @@ -25,50 +26,75 @@
*/
class Cluster extends Aggregate
{
/**
* {@inheritdoc}
*/
public function filter(OptionsInterface $options, $value)
{
if (is_string($value)) {
$value = $this->getConnectionInitializerByString($options, $value);
}

if (is_callable($value)) {
return $this->getConnectionInitializer($options, $value);
} else {
throw new InvalidArgumentException(sprintf(
'%s expects either a string or a callable value, %s given',
static::class,
is_object($value) ? get_class($value) : gettype($value)
));
}
}

/**
* Returns a connection initializer from a descriptive name.
*
* @param OptionsInterface $options Client options.
* @param string $description Identifier of a cluster backend (`predis`, `redis`)
* @param OptionsInterface $options Client options
* @param string $description Identifier of a replication backend (`predis`, `sentinel`)
*
* @return callable
*/
protected function getConnectionInitializerByDescription(OptionsInterface $options, $description)
protected function getConnectionInitializerByString(OptionsInterface $options, string $description)
{
if ($description === 'predis') {
$callback = $this->getDefault($options);
} elseif ($description === 'redis') {
$callback = function ($options) {
return new RedisCluster($options->connections, new RedisStrategy($options->crc16));
};
} else {
throw new \InvalidArgumentException(
'String value for the cluster option must be either `predis` or `redis`'
);
}
switch ($description) {
case 'redis':
case 'redis-cluster':
return function ($parameters, $options, $option) {
return new RedisCluster($options->connections, new RedisStrategy($options->crc16));
};

return $this->getConnectionInitializer($options, $callback);
case 'predis':
return $this->getDefaultConnectionInitializer();

default:
throw new InvalidArgumentException(sprintf(
'%s expects either `predis`, `redis` or `redis-cluster` as valid string values, `%s` given',
static::class,
$description
));
}
}

/**
* {@inheritdoc}
* Returns the default connection initializer.
*
* @return callable
*/
public function filter(OptionsInterface $options, $value)
protected function getDefaultConnectionInitializer()
{
if (is_string($value)) {
return $this->getConnectionInitializerByDescription($options, $value);
} else {
return $this->getConnectionInitializer($options, $value);
}
return function ($parameters, $options, $option) {
return new PredisCluster();
};
}

/**
* {@inheritdoc}
*/
public function getDefault(OptionsInterface $options)
{
return function ($options) {
return new PredisCluster();
};
return $this->getConnectionInitializer(
$options,
$this->getDefaultConnectionInitializer()
);
}
}
Loading

0 comments on commit 985343a

Please sign in to comment.