socket 即时通讯

看了 workerman 源码后, 实现的一个简单DEMO
前置知识:守护进程以及信号 posix/pcntl、socket 编程 socket/stream_socket
ps:若想多进程, 多执行几次 forkProcess 就行, 但是像进程间通信, 尚未实现

Timer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class Timer
{
/**
* Tasks that based on ALARM signal.
*
* @var array
*/
private static $tasks = [];

/**
* Init.
*/
public static function init()
{
// 安装一个信号处理器
pcntl_signal(SIGALRM, ['Timer', 'signalHandle'], false);
}

/**
* ALARM signal handler.
*/
public static function signalHandle()
{
pcntl_alarm(1);
self::tick();
}

/**
* 添加定时任务
*
* @param float $timeInterval
* @param callable $func
* @param bool $persistent
*/
public static function add($timeInterval, $func, $persistent = true)
{
// 未设置定时任务
if (empty(self::$tasks)) {
// 创建一个计时器,该计时器在给定的秒数后向进程发送SIGALRM信号
pcntl_alarm(1);
}

// 添加定时任务
$nowTime = time();
$runTime = $nowTime + $timeInterval;
if (!isset(self::$tasks[$runTime])) {
self::$tasks[$runTime] = [];
}

self::$tasks[$runTime][] = [$timeInterval, $func, $persistent];
}

/**
* Tick.
*/
public static function tick()
{
// 定时任务为空,取消闹钟信号
if (empty(self::$tasks)) {
pcntl_alarm(0);
return;
}

$nowTime = time();
foreach (self::$tasks as $runTime => $taskData) {
if ($nowTime >= $runTime) {
foreach ($taskData as $index => $task) {
call_user_func_array($task[1], []);
if ($task[2]) {
self::add($task[0], $task[1], $task[2]);
}
}
// 删除任务
unset(self::$tasks[$runTime]);
}
}
}
}

/*// 初始化(安装闹钟信号处理器/处理定时任务)
Timer::init();
// 添加定时任务
Timer::add(2, function () {
echo date('Y-m-d H:i:s') . PHP_EOL;
});
while (true) {
// 调用等待信号的处理器
pcntl_signal_dispatch();
}*/
socket.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
require 'Timer.php';

class Socket
{
/**
* Process title.
*
* @var string
*/
const PROCESS_TITLE = 'Socket';

/**
* All clients.
*
* @var array
*/
public $clients = [];

/**
* Emitted when processes start.
*
* @var callback
*/
public $onStart = null;

/**
* Emitted when a socket connection is successfully established.
*
* @var callback
*/
public $onConnect = null;

/**
* Emitted when data is received.
*
* @var callback
*/
public $onMessage = null;

/**
* Emitted when the other end of the socket sends a FIN packet.
*
* @var callback
*/
public $onClose = null;

/**
* Socket name.
*
* @var string
*/
private $socketName = '';

/**
* Listening socket.
*
* @var resource
*/
private $socket = null;

/**
* All listeners for read event.
*
* @var array
*/
private $allEvents = [];

public function __construct($socketName)
{
$this->socketName = $socketName;
}

/**
* Run.
*/
public function run()
{
$this->daemonize();
$this->listenSocket();
$this->forkProcess();
$this->resetStd();
$this->monitorProcess();
}

/**
* 以守护进程模式运行
*/
public function daemonize()
{
$pid = pcntl_fork();
if ($pid === -1) {
exit('fork fail');
}
// 退出父进程
elseif ($pid > 0) {
exit(0);
}

// 创建新会话,并自任该会话的组长,脱离终端
if (posix_setsid() === -1) {
exit('setsid fail');
}

// 再次fork,并让父进程退出,使子进程不再成为会话组长来禁止进程重新打开控制终端
$pid = pcntl_fork();
if ($pid === -1) {
exit('fork fail');
}
// 结束父进程
elseif ($pid !== 0) {
exit(0);
}

// 切换工作目录
chdir('/');
// 设置文件权限掩码
umask(0);
// 设置进程标题
cli_set_process_title(self::PROCESS_TITLE);
}

/**
* 监听Socket
*/
public function listenSocket()
{
// 服务端收到客户端的请求后,会将比连接存放在连接请求队列里,而backlog就是此队列的数量限制,超过此值,则不再接收新的连接,而accet会从此队列中取出连接,相当于 socket_listen 的第二个参数
$context = stream_context_create([
'socket' => ['backlog' => 102400]
]);

// 多个进程监听相同端口(端口复用),并且由系统内核做负载均衡,决定将socket连接交给哪个进程处理,避免了惊群效应,可以提升多进程短连接应用的性能
// stream_context_set_option($context, 'socket', 'so_reuseport', 1);

// 创建套接字
$this->socket = stream_socket_server($this->socketName, $errno, $errmsg, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $context);
if (function_exists('socket_import_stream')) {
$socket = socket_import_stream($this->socket);
// 打开TCP的KEEPALIVE,当连接断开时,会将SIGPIPE信号通知写入该套接字的进程
socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
// 禁用 Nagle 算法
socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
}

// 非阻塞模式
stream_set_blocking($this->socket, 0);
}

/**
* 创建子进程并运行
*/
private function forkProcess()
{
$pid = pcntl_fork();
if ($pid === 0) {
// 设置进程标题
cli_set_process_title(self::PROCESS_TITLE);
// 当子进程创建后
call_user_func($this->onStart, $this);

// 将socket添加至clients
$this->clients[] = $this->socket;
// 添加事件
$this->allEvents[(int)$this->socket] = [
[$this, 'acceptConnection'],
$this->socket
];

$write = [];
$except = [];
while (true) {
// 调用等待信号的处理器
pcntl_signal_dispatch();

$read = $this->clients;
$ret = @stream_select($read, $write, $except, 0, null);
if (!$ret) {
continue;
}

if ($read) {
foreach ($read as $fd) {
$fdKey = (int)$fd;
if (isset($this->allEvents[$fdKey])) {
call_user_func_array($this->allEvents[$fdKey][0], [$this->allEvents[$fdKey][1]]);
}
}
}
}

exit(250);
}
}

/**
* 重定向标准输入和输出
*/
private function resetStd()
{
fclose(STDIN);
fclose(STDOUT);
fclose(STDERR);
$stdoutFile = '/dev/null';
fopen($stdoutFile, 'r');
fopen($stdoutFile, 'a');
fopen($stdoutFile, 'a');
}

/**
* 监听所有子进程
*/
private function monitorProcess()
{
while (true) {
$status = 0;
// 等待或返回fork的子进程状态
$pid = pcntl_wait($status, WUNTRACED);
// 子进程已退出
if ($pid > 0) {
// 捕获进程退出事件,exit(250) 对应的$status值为64000
if ($status == 64000) {
echo 'status=' . $status . '-' . $pid . PHP_EOL;
}
}
}
}

/**
* 接受连接
*
* @param $socket
*/
private function acceptConnection($socket)
{
$newSocket = stream_socket_accept($socket, 0, $remoteAddress);
if (!$newSocket) {
return;
}

// 非阻塞模式
stream_set_blocking($newSocket, 0);
// 设置读缓冲区
stream_set_read_buffer($newSocket, 0);

// 添加新连接到clients
$this->clients[] = $newSocket;
// 添加新事件
$this->allEvents[(int)$newSocket] = [
[$this, 'read'],
$newSocket
];

call_user_func($this->onConnect, $newSocket);
}

/**
* 读取消息
*
* @param $socket
*/
private function read($socket)
{
$buffer = fread($socket, 65535);
if ($buffer === '' || $buffer === false) {
if ((feof($socket) || !is_resource($socket) || $buffer === false)) {
fclose($socket);
call_user_func($this->onClose, $socket);
return;
}
}

call_user_func($this->onMessage, $socket, $this->decode($buffer));
}

/**
* 发送消息给指定客户端
*
* @param $socket
* @param $string
*/
public function send($socket, $string)
{
@fwrite($socket, $this->encode($string));
}

private function decode($buffer)
{
return trim($buffer);
}

private function encode($buffer)
{
return $buffer . "\n";
}
}

// 0.0.0.0 监听本机所有网卡,包括内网ip和外网ip及本地回环127.0.0.1
// 127.0.0.1 表示监听本地回环,只能本机访问,外部无法访问
// 内网IP 192.168.xx.xx, 只监听内网ip, 外网无法访问
$worker = new Socket('tcp://0.0.0.0:7000');
// 当子进程创建后
$worker->onStart = function ($worker) {
// 创建一个定时器,定时发送消息给客户端
Timer::init();
Timer::add(5, function () use ($worker) {
foreach ($worker->clients as $client) {
$worker->send($client, 'Server Time: ' . date('Y-m-d H:i:s'));
}
});
};
// 客户端连接后的处理
$worker->onConnect = function ($socket) use ($worker) {
foreach ($worker->clients as $client) {
if ($client !== $socket) {
$worker->send($client, 'Client #' . (int)$socket . ' is connected');
}
}
};
// 处理客户端发送的消息
$worker->onMessage = function ($socket, $string) use ($worker) {
foreach ($worker->clients as $client) {
if ($client === $socket) {
$worker->send($client, 'You: ' . $string);
} else {
$worker->send($client, 'Client #' . (int)$socket . ': ' . $string);
}
}
};
// 客户端断开后的处理
$worker->onClose = function ($socket) use ($worker) {
foreach ($worker->clients as $client) {
if ($client !== $socket) {
$worker->send($client, 'Client #' . (int)$socket . ' is close');
}
}
};
$worker->run();
测试
1
2
3
4
#启动服务端
php socket.php
#客户端进行连接
telnet 127.0.0.1 7000
0%