Skip to content

Commit

Permalink
添加 用table控制 free 和 busy
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglei5 committed Nov 24, 2014
1 parent 6b2ba90 commit 2074156
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 30 deletions.
78 changes: 54 additions & 24 deletions src/DBPool/db_server.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
<?php
error_reporting(E_ALL);
ini_set('display_errors', 1);
class DBServer
{
protected $task_worker_num;
Expand Down Expand Up @@ -37,9 +39,9 @@ function __construct(array $config) {
for ($i = 0; $i < $this->task_worker_num; $i++) {
$free[] = $i;
}
$arr = array('free'=>$free,'busy'=>array());
//$arr = array('free'=>$free,'busy'=>array());

$this->free_table->set("task_id",array('task_id'=> json_encode($arr)));
$this->free_table->set("task_id",array('task_id'=> json_encode($free)));

$this->map_table = new swoole_table(1024); // 记录 fd 和 busy_id的对应关系
$this->map_table->column('busy_id',swoole_table::TYPE_INT, 4);
Expand Down Expand Up @@ -94,19 +96,25 @@ public function onConnect( $serv, $fd, $from_id ) {


private function getFreeTaskId($fd) {

$task = $this->free_table->get("task_id");
$task = json_decode($task['task_id'], true);
if ( !$this->map_table->get($fd) ) {
echo "not have key, first \n";
$task['busy'] = $worker_id = array_shift($task['free']);
$this->free_table->set("task_id",array("task_id"=>json_encode($task)));
$free = $this->_getFreeArr();
echo "map_cnt:".count($this->map_table)." free_cnt:".count($this->free_table)." task_worker_num:{$this->serv->setting['task_worker_num']}\n";
if ( !$this->map_table->get($fd) ) { // 如果不在正使用 修改map 和 free
if (count($this->map_table) == $this->serv->setting['task_worker_num']) { // 已经没有空闲链接了
return -1;
}
//echo "current session not have connection, first \n";
//$task['busy'] = $worker_id = array_shift($task['free']);

$worker_id = array_shift($free);
$this->free_table->set("task_id",array("task_id"=>json_encode($free)));
//$worker_id = rand(0,$this->serv->setting['task_worker_num']-1); //第一次所以可以随机
$this->map_table->set($fd,array('busy_id'=>$worker_id));
} else {
echo " current session fd:{$fd} have connection \n";
}

$task = json_decode($this->free_table->get("task_id")['task_id'], true);
$table_data = $this->map_table->get($fd);
$table_data = $this->map_table->get($fd); //var_dump($table_data, $task);
$worker_id = $table_data['busy_id'];

return $worker_id;
Expand All @@ -125,6 +133,9 @@ private function getFreeTaskId($fd) {
public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
$data = json_encode(array('fd' => $fd,'send_data' => $data));
$worker_id = $this->getFreeTaskId($fd);
if ($worker_id == -1 ) { // task_worker_id是从0开始的 所以不能返回0作为判断
echo "fd:{$fd} from_id:{$from_id} is busy \n";
}
$this->serv->task($data, $worker_id);
$this->request_cnt++;
}
Expand All @@ -145,8 +156,15 @@ public function doQuery($serv, $fd, $from_id, $data) {
$func_name = $data['func_name'];
$param = implode(',', $data['param']);
if ($func_name == "release") {
if ( $this->map_table->get($fd)) {
$current_worker_id = $this->_getBusy($fd);
if ($current_worker_id !== false) {
echo "---release worker_id:{$current_worker_id}\n";
$free = $this->_getFreeArr();
array_push($free, $current_worker_id);
//var_dump($free);
$this->free_table->set("task_id",array("task_id"=>json_encode($free)));
$this->map_table->del($fd);

}
} else { //执行一般pdo方法
if ($param != "" ) {
Expand Down Expand Up @@ -187,19 +205,19 @@ public function onClose( $serv, $fd, $from_id ) {
* @access public
*/
public function onTask($serv, $task_id, $from_id, $data) {
if ($this->pdo == null) {
// echo "Task create new pdo \n";
$this->pdo = new PDO(
"mysql:host=localhost;port=3306;dbname=test",
"root",
"",
array(
PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES 'UTF8';",
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_PERSISTENT => true
)
);
}
if ($this->pdo == null) {
// echo "Task create new pdo \n";
$this->pdo = new PDO(
"mysql:host=localhost;port=3306;dbname=test",
"root",
"",
array(
PDO::MYSQL_ATTR_INIT_COMMAND => "SET NAMES 'UTF8';",
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_PERSISTENT => true
)
);
}
$data = json_decode( $data , true );
$send_data = json_decode( $data['send_data'], true);
$this->doQuery($serv, $data['fd'], $from_id, $send_data);
Expand All @@ -209,6 +227,18 @@ public function onFinish($serv,$task_id, $data) {
echo "Task Id:{$task_id} On Finish, \n";
}


private function _getFreeArr() {
$task = $this->free_table->get("task_id");
$free = json_decode($task['task_id'], true);
return $free;
}

private function _getBusy($fd) {
$task = $this->map_table->get($fd);
$busy = json_decode($task['busy_id'], true);
return $busy;
}
}

?>
15 changes: 9 additions & 6 deletions src/DBPool/transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@

$db = new DB();
$db->connect($config[$key]);
// $db->beginTransaction();
for ($i = 1; $i <= 3; $i++) {
$i = 1;
/*
$sql = "insert into `test` values ({$i},'pool{$i}') ";
$db->exec($sql);
*/
$db->beginTransaction();
for ($i = 1; $i <= 2; $i++) {
$sql = "insert into `test` values ({$i},'pool{$i}') ";
$db->exec($sql);
}
//$db->commit();
// $db->rollBack();
//$db->release();


$db->rollBack();
// $db->release();

?>

0 comments on commit 2074156

Please sign in to comment.