Skip to content

Commit

Permalink
add on method
Browse files Browse the repository at this point in the history
  • Loading branch information
walkor committed Feb 8, 2016
1 parent eacfa64 commit 5d4daff
Showing 1 changed file with 47 additions and 9 deletions.
56 changes: 47 additions & 9 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class Client
protected static $_remotePort = null;

protected static $_timer = null;

protected static $_events = array();

public static function connect($ip = '127.0.0.1', $port = 2206)
{
Expand All @@ -27,49 +29,85 @@ public static function connect($ip = '127.0.0.1', $port = 2206)
self::$_remoteConnection->onMessage = function($connection, $data)
{
$data = unserialize($data);
call_user_func(Client::$onMessage, $data['channel'], $data['data']);
$event = $data['channel'];
$event_data = $data['data'];
if(!empty(self::$_events[$event]))
{
call_user_func(self::$_events[$event], $event_data);
}
else
{
call_user_func(Client::$onMessage, $event, $event_data);
}
};
self::$_remoteConnection->connect();
}
}

public static function onRemoteClose()
{
echo "Waring channel connection closed and try to reconnect\n";
self::clearTimer();
self::$_timer = Timer::add(1, 'Channel\Client::connect', array(self::$_remoteIp, self::$_remotePort));
}

public static function onRemoteConnect()
{
$all_event_names = array_keys(self::$_events);
if($all_event_names)
{
self::subscribe($all_event_names);
}
self::clearTimer();
}

public static function clearTimer()
{

if(self::$_timer)
{
Timer::del(self::$_timer);
self::$_timer = null;
}
}

public static function on($event, $callback)
{
if(!is_callable($callback))
{
throw new \Exception('callback is not callable');
}
self::$_events[$event] = $callback;
self::subscribe(array($event));
}

public static function subscribe($channels)
public static function subscribe($events)
{
self::connect();
self::$_remoteConnection->send(serialize(array('type' => 'subscribe', 'channels'=>(array)$channels)));
$events = (array)$events;
foreach($events as $event)
{
if(!isset(self::$_events[$event]))
{
self::$_events[$event] = null;
}
}
self::$_remoteConnection->send(serialize(array('type' => 'subscribe', 'channels'=>(array)$events)));
}

public static function unsubscribe($channels)
public static function unsubscribe($events)
{
self::connect();
self::$_remoteConnection->send(serialize(array('type' => 'unsubscribe', 'channels'=>(array)$channels)));
$events = (array)$events;
foreach($events as $event)
{
unset(self::$_events[$event]);
}
self::$_remoteConnection->send(serialize(array('type' => 'unsubscribe', 'channels'=>$events)));
}

public static function publish($channels, $data)
public static function publish($events, $data)
{
self::connect();
self::$_remoteConnection->send(serialize(array('type' => 'publish', 'channels'=>(array)$channels, 'data' => $data)));
self::$_remoteConnection->send(serialize(array('type' => 'publish', 'channels'=>(array)$events, 'data' => $data)));
}

}

0 comments on commit 5d4daff

Please sign in to comment.