Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
phroggyy committed Jan 8, 2019
0 parents commit 1d040a6
Show file tree
Hide file tree
Showing 12 changed files with 392 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
vendor
composer.lock
32 changes: 32 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"name": "decahedron/laravel-app-events",
"description": "Manage application-wide events for SOAs with Google Cloud PubSub",
"type": "library",
"license": "MIT",
"authors": [
{
"name": "Leo Sjöberg",
"email": "[email protected]"
}
],
"minimum-stability": "stable",
"require": {
"google/cloud": "^0.88.0",
"illuminate/config": "5.7.*",
"illuminate/console": "5.7.*",
"kainxspirits/laravel-pubsub-queue": "^0.1.0",
"google/protobuf": "^3.6.1"
},
"autoload": {
"psr-4": {
"Decahedron\\AppEvents\\": "src"
}
},
"extra": {
"laravel": {
"providers": [
"Decahedron\\AppEvents\\AppEventsProvider"
]
}
}
}
11 changes: 11 additions & 0 deletions config/app-events.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

return [
'project_id' => 'your-google-project',
'topic' => 'app-events',
'subscription' => 'your-service',

'mappings' => [],

'handlers' => [],
];
5 changes: 5 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Laravel App Events for Google Cloud PubSub

## Installation

1. `composer require decahedron/laravel-app-events`
46 changes: 46 additions & 0 deletions src/AppEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

namespace Decahedron\AppEvents;

use Google\Protobuf\Internal\Message;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class AppEvent implements ShouldQueue
{
use Dispatchable, Queueable;

/**
* @var string
*/
public $event;

/**
* @var Message
*/
public $payload;

/**
* Event constructor.
* @param string $event
* @param $payload
*/
public function __construct(string $event, Message $payload)
{
$this->onConnection('app-events');
$this->payload = $payload;
$this->event = $event;
}

public function handle()
{
foreach (config('app-events.handlers') as $event => $handler) {
if ($this->event !== $event) {
continue;
}

app()->make($handler)->handle($this->payload);
}
}
}
30 changes: 30 additions & 0 deletions src/AppEventFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

namespace Decahedron\AppEvents;

use Google\Cloud\PubSub\Message;
use Google\Protobuf\Internal\Message as ProtobufMessage;

class AppEventFactory
{

public static function fromMessage(Message $message): AppEvent
{
return new AppEvent($message->attribute('event_type'), static::resolveProtobufInstance($message));
}

protected static function resolveProtobufInstance(Message $message): ProtobufMessage
{
$rawData = json_decode(base64_decode($message->data()), JSON_OBJECT_AS_ARRAY);

if (! ($protobufClass = config('app-events.mappings.'.$rawData['proto']))) {
throw new UnserializableProtoException;
}

/** @var ProtobufMessage $proto */
$proto = new $protobufClass;
$proto->mergeFromString($rawData['payload']);

return $proto;
}
}
33 changes: 33 additions & 0 deletions src/AppEventsProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

namespace Decahedron\AppEvents;

use Decahedron\AppEvents\Commands\AppEventsListener;
use Illuminate\Support\ServiceProvider;

class AppEventsProvider extends ServiceProvider
{
public function boot()
{
$this->app['queue']->addConnector('pubsub', function () {
return new PubSubConnector;
});

$this->app['config']->set('queue.connections.app-events', [
'driver' => 'pubsub',
'queue' => $this->app['config']->get('app-events.topic'),
'project_id' => $this->app['config']->get('app-events.project_id'),
]);

$this->publishes([
__DIR__.'/../config/app-events.php' => config_path('app-events.php'),
]);
}

public function register()
{
$this->commands([
AppEventsListener::class,
]);
}
}
78 changes: 78 additions & 0 deletions src/Commands/AppEventsListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
<?php

namespace Decahedron\AppEvents\Commands;

use Decahedron\AppEvents\AppEvent;
use Decahedron\AppEvents\AppEventFactory;
use Google\Cloud\PubSub\Message;
use Google\Cloud\PubSub\PubSubClient;
use Illuminate\Console\Command;

class AppEventsListener extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'app-events:listen';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Listen for notifications across all services of your application';

/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}

/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$pubSub = new PubSubClient([
'projectId' => config('app-events.project_id'),
]);

$topic = $pubSub->topic(config('app-events.topic'));
if (!$topic->exists()) {
$topic->create();
}

$subscription = $topic->subscription(config('app-events.subscription'));
if (!$subscription->exists()) {
$subscription->create();
}

$this->info('Starting to listen for events');
while (true) {
$messages = $subscription->pull([
'maxMessages' => 500,
]);

if (count($messages) === 0) {
continue;
}

foreach ($messages as $message) {
$job = AppEventFactory::fromMessage($message);
$this->info('Handling message: '.$job->event);

$job->handle();
}

$subscription->acknowledgeBatch($messages);
}
}
}
19 changes: 19 additions & 0 deletions src/PubSubConnector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

namespace Decahedron\AppEvents;

use Google\Cloud\PubSub\PubSubClient;
use Kainxspirits\PubSubQueue\Connectors\PubSubConnector as BaseConnector;

class PubSubConnector extends BaseConnector
{
public function connect(array $config)
{
$gcp_config = $this->transformConfig($config);

return new PubSubQueue(
new PubSubClient($gcp_config),
$config['queue'] ?? $this->default_queue
);
}
}
116 changes: 116 additions & 0 deletions src/PubSubQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
<?php

namespace Decahedron\AppEvents;

use Google\Cloud\PubSub\Message;
use Kainxspirits\PubSubQueue\Jobs\PubSubJob;
use Kainxspirits\PubSubQueue\PubSubQueue as BaseQueue;

class PubSubQueue extends BaseQueue
{
protected function createPayload($job, $queue, $data = '')
{
$protoMappings = config('app-events.mappings');
$payloadClass = get_class($job->payload);

$payload = [
'proto' => array_flip($protoMappings)[$payloadClass] ?? $payloadClass,
'payload' => $job->payload->serializeToString(),
'id' => $this->getRandomId(),
];

return base64_encode(json_encode($payload));
}

public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue, [
'event_type' => $job->event,
]);
}

/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
*
* @return array
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
$topic = $this->getTopic($queue, true);

$this->subscribeToTopic($topic);

$publish = ['data' => $payload];

if (! empty($options)) {
$publish['attributes'] = $options;
}

$topic->publish($publish);

$decoded_payload = json_decode(base64_decode($payload), true);

return $decoded_payload['id'];
}


public function pop($queue = null)
{
$topic = $this->getTopic($this->getQueue($queue));

if (! $topic->exists()) {
return;
}

$subscription = $topic->subscription($this->getSubscriberName());
$messages = $subscription->pull([
'returnImmediately' => true,
'maxMessages' => 1,
]);

if (! empty($messages) && count($messages) > 0) {
return new PubSubJob(
$this->container,
$this,
$this->transformMessage($messages[0]),
$this->connectionName,
$this->getQueue($queue)
);
}
}

private function transformMessage(Message $message)
{
$job = AppEventFactory::fromMessage($message);

$payload = [
'job' => 'Illuminate\\Queue\\CallQueuedHandler@call',
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job)
]
];

return new Message(
[
'data' => base64_encode(json_encode($payload)),
'messageId' => $message->id(),
'publishTime' => $message->publishTime(),
'attributes' => $message->attributes(),
],
[
'ackId' => $message->ackId(),
'subscription' => $message->subscription(),
]
);
}

public function getSubscriberName()
{
return $this->container['config']->get('app-events.subscription');
}
}
10 changes: 10 additions & 0 deletions src/UnserializableProtoException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace Decahedron\AppEvents;

use Exception;

class UnserializableProtoException extends Exception
{

}
Loading

0 comments on commit 1d040a6

Please sign in to comment.