Skip to content

Commit

Permalink
将busy 和 free都用 table来实现,不过压测效果不太理想
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglei5 committed Nov 25, 2014
1 parent 666a8ce commit f97a3b2
Showing 1 changed file with 101 additions and 41 deletions.
142 changes: 101 additions & 41 deletions src/DBPool/db_server.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class DBServer

protected $busy_table;
protected $wait_queue = array(); //等待队列
protected $wait_queue_max = 100; //等待队列的最大长度,超过后将拒绝新的请求
//protected $wait_queue_max = 100; //等待队列的最大长度,超过后将拒绝新的请求
protected $db_host;
protected $db_user;
protected $db_pwd;
Expand All @@ -33,19 +33,18 @@ function __construct(array $config) {
$this->db_port = isset($config['db_port']) ? $config['db_port'] : 3306;

$this->free_table = new swoole_table(1024);
$this->free_table->column('task_id',swoole_table::TYPE_STRING, 100);
$this->free_table->column('task_id',swoole_table::TYPE_STRING, 1000);
$this->free_table->create();

for ($i = 0; $i < $this->task_worker_num; $i++) {
$free[] = $i;
}
//$arr = array('free'=>$free,'busy'=>array());

$this->free_table->set("task_id",array('task_id'=> json_encode($free)));

$this->map_table = new swoole_table(1024); // 记录 fd 和 busy_id的对应关系
$this->map_table->column('busy_id',swoole_table::TYPE_INT, 4);
$this->map_table->column('busy_id',swoole_table::TYPE_STRING, 1000);
$this->map_table->create();
$this->map_table->set("busy_id",array('busy_id'=> json_encode(array())));
}

function run() {
Expand All @@ -54,6 +53,8 @@ function run() {
'worker_num'=>$this->worker_num,
'task_worker_num' => $this->task_worker_num,
'task_max_request' => 0,
'max_request' => 0,
'log_file' => '/home/dev/git/swoole-doc/src/DBPool/swoole.log',
'dispatch_mode' => 2,
));
$this->serv->on('Start', array($this, 'onStart'));
Expand All @@ -69,7 +70,7 @@ function run() {
}

public function onStart($serv) {
echo "master_pid:{$serv->master_pid} manager_pid:{$serv->manager_pid} \n";
//echo "master_pid:{$serv->master_pid} manager_pid:{$serv->manager_pid} \n";
cli_set_process_title("php5 master {$serv->master_pid}");
}

Expand All @@ -81,42 +82,42 @@ public function onWorkerStart( $serv , $worker_id) {

// 判定是否为Task进程
if( $worker_id >= $serv->setting['worker_num'] ) {
echo "----onTaskStart worker_id: {$worker_id} \n";
//echo "----onTaskStart worker_id: {$worker_id} \n";
cli_set_process_title("php5 task_id {$worker_id}");
} else {
echo "--onWorkerStart worker_id: {$worker_id} \n";
//echo "--onWorkerStart worker_id: {$worker_id} \n";
cli_set_process_title("php5 worker {$worker_id}");
}
}


public function onConnect( $serv, $fd, $from_id ) {
echo "Client {$fd} from:{$from_id} connect\n";
//echo "Client {$fd} from:{$from_id} connect\n";
}


private function getFreeTaskId($fd) {
$free = $this->_getFreeArr();
echo "map_cnt:".count($this->map_table)." free_cnt:".count($this->free_table)." task_worker_num:{$this->serv->setting['task_worker_num']}\n";
if ( !$this->map_table->get($fd) ) { // 如果不在正使用 修改map 和 free
if (count($this->map_table) == $this->serv->setting['task_worker_num']) { // 已经没有空闲链接了
$busy_arr = $this->_getBusy();
////echo "map_cnt:".count($this->map_table)." free_cnt:".count($this->free_table)." task_worker_num:{$this->serv->setting['task_worker_num']}\n";
if ( !isset($busy_arr[$fd]) ) { // 如果不在正使用 修改map 和 free
if (count($busy_arr) == $this->serv->setting['task_worker_num']) { // 已经没有空闲链接了
return -1;
}
//echo "current session not have connection, first \n";
////echo "current session not have connection, first \n";
//$task['busy'] = $worker_id = array_shift($task['free']);

$worker_id = array_shift($free);
$this->free_table->set("task_id",array("task_id"=>json_encode($free)));
//$worker_id = rand(0,$this->serv->setting['task_worker_num']-1); //第一次所以可以随机
$this->map_table->set($fd,array('busy_id'=>$worker_id));
} else {
echo " current session fd:{$fd} have connection \n";
$worker_id = $this->_popFree();
//var_dump($busy_arr);
$this->_addBusy($fd, $worker_id);
}
/*
else {
//echo " current session fd:{$fd} have connection \n";
}
*/

$task = json_decode($this->free_table->get("task_id")['task_id'], true);
$table_data = $this->map_table->get($fd); //var_dump($table_data, $task);
$worker_id = $table_data['busy_id'];

//$task = json_decode($this->free_table->get("task_id")['task_id'], true);
$worker_id = $this->_getBusy($fd);
return $worker_id;
}

Expand All @@ -133,21 +134,33 @@ private function getFreeTaskId($fd) {
public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
$data = json_encode(array('fd' => $fd,'send_data' => $data));
$worker_id = $this->getFreeTaskId($fd);
//echo "----------{$worker_id}---------\n";
// 代表没有可用连接 进入等待队列
if ($worker_id == -1 ) { // task_worker_id是从0开始的 所以不能返回0作为判断
echo "fd:{$fd} from_id:{$from_id} is busy \n";
$this->wait_queue[] = array('fd'=>$fd, 'data' =>$data);
//echo "fd:{$fd} from_id:{$from_id} is busy \n";
//$serv->send($fd, '-1' , $from_id);
} else { // 获得可用连接
$this->serv->task($data, $worker_id);
}
$this->serv->task($data, $worker_id);
$this->request_cnt++;
$free = $this->_getFreeArr();
//echo "cnt:{$this->request_cnt} fd:{$fd} from_id:{$from_id} data:".json_encode($data)."\n";
$cur_link = $this->_getBusy($fd);
$busy = $this->_getBusy();
//echo "----onReceive cur_link:{$cur_link} follow is free and busy wait_queue\n";
//var_dump($free, $busy, $this->wait_queue);
//$this->process();
}

public function process() {
while (count($this->wait_queue) > 0) {
$wait_data = array_shift($this->wait_queue);
foreach ($wait_data as $row) {
$this->doQuery($fd, $row);
}
do {
$worker_id = $this->getFreeTaskId($wait_data['fd']);
} while ($worker_id < 0);
$this->serv->task($wait_data['data'], $worker_id);
}

}

public function doQuery($serv, $fd, $from_id, $data) {
Expand All @@ -158,16 +171,16 @@ public function doQuery($serv, $fd, $from_id, $data) {
if ($func_name == "release") {
$current_worker_id = $this->_getBusy($fd);
if ($current_worker_id !== false) {
echo "---release worker_id:{$current_worker_id}\n";
//echo "---- doQuery release worker_id:{$current_worker_id} busy_cnt:".count($this->map_table)." before \n";
$free = $this->_getFreeArr();
array_push($free, $current_worker_id);
$this->_addFree($current_worker_id);
//var_dump($free);
$this->free_table->set("task_id",array("task_id"=>json_encode($free)));
$this->map_table->del($fd);

$this->_delBusy($fd);
//echo "---- doQuery release worker_id:{$current_worker_id} busy_cnt:".count($this->map_table)." after \n";
}
} else { //执行一般pdo方法
if ($param != "" ) {
//echo "---- doQuery func:{$func_name} busy_cnt:".count($this->map_table)." before \n";
$rs = $st = $this->pdo->$func_name($param);
} else {
$rs = $st = $this->pdo->$func_name();
Expand All @@ -192,7 +205,7 @@ public function doQuery($serv, $fd, $from_id, $data) {


public function onClose( $serv, $fd, $from_id ) {
//echo "Client {$fd} from {$from_id} close connection\n";
////echo "Client {$fd} from {$from_id} close connection\n";
}

/**
Expand All @@ -206,7 +219,7 @@ public function onClose( $serv, $fd, $from_id ) {
*/
public function onTask($serv, $task_id, $from_id, $data) {
if ($this->pdo == null) {
// echo "Task create new pdo \n";
// //echo "Task create new pdo \n";
$this->pdo = new PDO(
"mysql:host=localhost;port=3306;dbname=test",
"root",
Expand All @@ -224,7 +237,7 @@ public function onTask($serv, $task_id, $from_id, $data) {
}

public function onFinish($serv,$task_id, $data) {
echo "Task Id:{$task_id} On Finish, \n";
//echo "Task Id:{$task_id} On Finish, \n";
}


Expand All @@ -233,11 +246,58 @@ private function _getFreeArr() {
$free = json_decode($task['task_id'], true);
return $free;
}

/**
* _addFree
* 使用完后 释放链接实际上是放回到空闲数组里来
*
* @param mixed $current_worker_id
* @access private
* @return void
*/
private function _addFree($current_worker_id) {
$free = $this->_getFreeArr();
array_push($free, $current_worker_id);
$this->free_table->set("task_id",array("task_id"=>json_encode($free)));
}

private function _getBusy($fd) {
$task = $this->map_table->get($fd);
/**
* _popFree
* 从空闲连接里获得一个连接id(用来指定到task进程)
* @access private
* @return int
*/
private function _popFree() {
$free = $this->_getFreeArr();
$cur_id = array_shift($free);
$this->free_table->set("task_id",array("task_id"=>json_encode($free)));
return $cur_id;
}

private function _getBusy($fd = NULL) {
$task = $this->map_table->get("busy_id");
$busy = json_decode($task['busy_id'], true);
return $busy;
if ($fd == NULL) {
return $busy;
} else {
if (isset($busy[$fd])) {
return $busy[$fd];
}
}
}



private function _delBusy($fd) {
$busy_arr = $this->_getBusy();
unset($busy_arr[$fd]);
return $this->map_table->set("busy_id", array('busy_id'=>json_encode($busy_arr)));
}

private function _addBusy($fd, $worker_id) {
$busy_arr = $this->_getBusy();
$busy_arr[$fd] = $worker_id;
return $this->map_table->set("busy_id", array('busy_id'=>json_encode($busy_arr)));
}
}

Expand Down

0 comments on commit f97a3b2

Please sign in to comment.