-
Notifications
You must be signed in to change notification settings - Fork 123
/
DqClient.php
153 lines (132 loc) · 4.49 KB
/
DqClient.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
<?php
include_once 'DqLoader.php';
class DqClient{
private $serverList = array();
private $fd = NULL;
public function addServer($server){
if(is_string($server)){
$this->serverList[] = $server;
}
if(is_array($server) && !empty($server)){
$this->serverList = array_merge($this->serverList,$server);
}
$this->serverList = array_unique($this->serverList);
}
public function connect(){
if(!empty($this->fd)){
return $this->fd;
}
if (empty($this->serverList)){
DqLog::writeLog('empty server list');
return false;
}
$serverList=$this->serverList;
while(count($serverList)){
try {
$idx = rand(0,count($serverList)-1);
$server = $serverList[$idx];
list($host,$port)= explode(':',$server);
$fd = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
//1s内没处理完直接返回
socket_set_option($fd,SOL_SOCKET,SO_RCVTIMEO,array("sec"=>1, "usec"=>0));
if (!is_resource($fd)) {
$strMsg = 'socket_create error:' . socket_strerror(socket_last_error());
throw new DqException($strMsg);
}
if (!socket_connect($fd, $host, $port)) {
$strMsg = 'socket_create error:' . socket_strerror(socket_last_error().' ip='.$host.' port='.$port);
throw new DqException($strMsg);
}
$this->fd = $fd;
return $fd;
} catch (DqException $e) {
unset($this->serverList[$idx]); //删除无用的配置
unset($serverList[$idx]);
$serverList = array_values($serverList);
DqLog::writeLog($e->getDqMessage(), DqLog::LOG_TYPE_EXCEPTION);
}
}
return false;
}
public function parse_result($ret){
if($ret['code']==1){
return true;
}else{
return false;
}
}
public function add($topic,$data){
try{
$fd = $this->connect();
if($fd===false){
throw new DqException(' connect server faild');
}
$data['cmd']='add';
$data['topic'] = $topic;
if(DqComm::socket_wirte($fd,$data)){
$ret = DqComm::socket_read($fd);
return $this->parse_result($ret);
}else{
throw new DqException('add error,data='.json_encode($data));
}
}catch (DqException $e){
DqLog::writeLog($e->getDqMessage(),DqLog::LOG_TYPE_EXCEPTION);
}
return false;
}
public function del($topic,$id){
try{
$fd = $this->connect();
if($fd===false){
throw new DqException(' connect server faild');
}
$data['cmd']='del';
$data['topic'] = $topic;
$data['id'] = $id;
if(DqComm::socket_wirte($fd,$data)){
$ret = DqComm::socket_read($fd);
return $this->parse_result($ret);
}else{
throw new DqException('add error,data='.json_encode($data));
}
}catch (DqException $e){
DqLog::writeLog($e->getDqMessage(),DqLog::LOG_TYPE_EXCEPTION);
}
return false;
}
public function get($topic,$id){
try{
$fd = $this->connect();
if($fd===false){
throw new DqException(' connect server faild');
}
$data['cmd']='get';
$data['topic'] = $topic;
$data['id'] = $id;
if(DqComm::socket_wirte($fd,$data)){
$ret = DqComm::socket_read($fd);
return $ret;
}else{
throw new DqException('add error,data='.json_encode($data));
}
}catch (DqException $e){
DqLog::writeLog($e->getDqMessage(),DqLog::LOG_TYPE_EXCEPTION);
return false;
}
}
}
//$server=array('10.210.234.203:6879');
//
//$topic ='order_comment';
//$id = rand(1,10000);
//$data=array(
// 'id'=>$id, //topic+业务唯一id的组合
// 'body'=>array('a'=>1,'b'=>2,'c'=>3),
//);
//
//$dqClient = new DqClient();
//$dqClient->addServer($server);
//
//var_dump($dqClient->add($topic,$data));
//var_dump($dqClient->get($topic,$id));
//var_dump($dqClient->del($topic,$id));