PHP Socket 服务端与客户端的通信(IO阻塞/复用)

PHP Socket 编程提供了两套API:
socket_* 系列, 需要安装 socket 扩展(更底层)
stream_socket_* 系列, 不需要安装扩展(推荐使用, 更方便)

ps:分别以 socket 扩展 和 stream_socket 实现了服务端与客户端的通信,测试方式一样


socket 扩展 - 服务端与客户端的通信

服务端 server.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<?php
$ip = '127.0.0.1';
$port = 7000;

// 建立TCP连接
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP) or die('socket_create fail:' . socket_strerror(socket_last_error()));
// 绑定IP地址和端口
socket_bind($socket, $ip, $port) or die('socket_bind fail:' . socket_strerror(socket_last_error()));
// 监听客户端连接
socket_listen($socket, 2) or die('socket_listen fail:' . socket_strerror(socket_last_error()));

while (true) {
// 接受一个客户端的连接请求,并返回一个新的套接字
$newSocket = socket_accept($socket);
if ($newSocket === false) {
echo 'socket_accept fail:' . socket_strerror(socket_last_error());
} else {
// 向客户端发送数据, 告知客户端ID
$clientId = (int)$newSocket;
$response = [
'type' => 'login',
'message' => $clientId,
];
socket_write($newSocket, json_encode($response) . "\n");
// 输出客户端信息
socket_getpeername($newSocket, $address, $port);
echo "Client #{$clientId} Connected, IP={$address}, PORT={$port}." . PHP_EOL;

while (true) {
// 读取客户端数据
$data = @socket_read($newSocket, 2048);
if ($data !== '') {
$data = trim($data);
// 发送内容给客户端
$response = [
'message' => strrev($data),
'time' => date('Y-m-d H:i:s'),
];
socket_write($read, json_encode($response) . "\n");
} else {
socket_close($newSocket);
// 输出客户端断开连接
echo "Client #{$clientId} Disconnect." . PHP_EOL;
break;
}
}
}
}

// 关闭TCP连接
socket_close($socket);
客户端 client.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?php
$ip = '127.0.0.1';
$port = 7000;

// 建立TCP连接
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP) or die('socket_create fail:' . socket_strerror(socket_last_error()));
// 连接服务端
socket_connect($socket, $ip, $port) or die('socket_connect fail:' . socket_strerror(socket_last_error()));

// 读取服务端数据
$buffer = socket_read($socket, 2048, PHP_NORMAL_READ);
$data = json_decode($buffer, true);
echo "Connection succeeded, ClientId = #{$data['message']}." . PHP_EOL;

// 客户端与服务端交互
interaction($socket);

function interaction($socket)
{
// 获取客户端输入
$content = trim(fgets(STDIN));
// 发送数据给服务端
socket_write($socket, $content);
// 读取服务端
$buffer = socket_read($socket, 2048, PHP_NORMAL_READ);
$data = json_decode($buffer, true);
echo "{$data['message']} - {$data['time']}" . PHP_EOL;

interaction($socket);
}

// 关闭TCP连接
socket_close($socket);
测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#1. 启动服务端
php server.php

#2. 运行客户端, 并输入 'hello', 然后断开连接
[root@192 private]# php client.php
Connection succeeded, ClientId = #18.
hello
olleh - 2018-10-17 17:59:20

#3. 服务端输出
[root@192 private]# php socket.php
Client #17 Connected, IP=127.0.0.1, PORT=33994.
Client #17 Disconnect.

#4. 客户端也可以使用 telnet
[root@192 ~]# telnet 127.0.0.1 7000
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
{"type":"login","message":17}
hello
{"message":"olleh","time":"2018-10-17 17:58:06"}
^]
telnet> quit
Connection closed.

问题:只能处理一个客户端的连接和数据传输, 只有断开后才能处理新的客户端连接
解决:IO 复用

IO 复用 - 服务端 server.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
<?php
$ip = '127.0.0.1';
$port = 7000;

// 建立TCP连接
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP) or die('socket_create fail:' . socket_strerror(socket_last_error()));
// 绑定IP地址和端口
socket_bind($socket, $ip, $port) or die('socket_bind fail:' . socket_strerror(socket_last_error()));
// 监听客户端连接
socket_listen($socket, 2) or die('socket_listen fail:' . socket_strerror(socket_last_error()));
// 客户端集合
$clients[(int)$socket] = $socket;

while (true) {
// socket_select() 的 $read、$write、$except 三个参数都是引用地址类型, 所以需要将 $clients 赋值给 $reads, 保证 $reads 发生改动后不影响 $clients
$reads = $clients;

// 接受套接字数组并等待它们改变状态
if (socket_select($reads, $writes, $excepts, null) === false) {
echo 'socket_select fail:' . socket_strerror(socket_last_error()) . PHP_EOL;
continue;
}

// $socket 发生变化, 说明有新的客户端连接
if (in_array($socket, $reads)) {
// 接受一个客户端的连接请求,并返回一个新的套接字
$newSocket = socket_accept($socket);
if ($newSocket === false) {
echo 'socket_accept fail:' . socket_strerror(socket_last_error()) . PHP_EOL;
} else {
// 向客户端发送数据, 告知客户端ID
$clientId = (int)$newSocket;
$response = [
'type' => 'login',
'message' => $clientId,
];
socket_write($newSocket, json_encode($response) . "\n");
// 将客户端 $newSocket 加入到 $clients
$clients[$clientId] = $newSocket;
// 移除对 $socket 的监听
unset($reads[array_search($socket, $reads)]);

// 输出客户端信息
socket_getpeername($newSocket, $address, $port);
$content = "Client #{$clientId} Connected, IP={$address}, PORT={$port}." . PHP_EOL;
echo $content;

// 通知所有客户端,不包含当前客户端
/*foreach ($clients as $client) {
if ($client != $socket && $client != $newSocket) {
@socket_write($client, $content);
}
}*/
}
}

foreach ($reads as $key => $read) {
// 读取客户端数据
$data = @socket_read($read, 2048);
$clientId = (int)$read;

if ($data !== '') {
$data = trim($data);
// 发送内容给客户端
$response = [
'message' => strrev($data),
'time' => date('Y-m-d H:i:s'),
];
socket_write($read, json_encode($response) . "\n");
} else {
// 移除对 $read 的监听
unset($clients[$clientId]);
// 关闭 $read 连接
socket_close($read);
// 输出客户端断开连接
echo "Client #{$clientId} Disconnect." . PHP_EOL;
}
}
}

// 关闭TCP连接
socket_close($socket);
测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#1. 启动服务端
php server.php

#2. 运行多个客户端, 并输入 'hello', 然后断开连接
[root@192 private]# php client.php
Connection succeeded, ClientId = #17.
hello
olleh - 2018-10-17 17:59:20

[root@192 private]# php client.php
Connection succeeded, ClientId = #18.
hello
olleh - 2018-10-17 17:59:24

#3. 服务端输出
[root@192 private]# php socket.php
Client #17 Connected, IP=127.0.0.1, PORT=34138.
Client #18 Connected, IP=127.0.0.1, PORT=34141.
Client #17 Disconnect.
Client #18 Disconnect.

stream_socket - 服务端与客户端的通信

服务端 server.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<?php
$socketName = 'tcp://127.0.0.1:7000';

// 建立连接
$socket = stream_socket_server($socketName, $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
if (!$socket) {
die("stream_socket_server fail:{$errstr} ({$errno})");
}

// 接受由 stream_socket_server() 创建的套接字连接,返回接受套接之后的资源流
while ($conn = @stream_socket_accept($socket, -1, $remoteAddress)) {
// 向客户端发送数据, 告知客户端ID
$clientId = (int)$conn;
$response = [
'type' => 'login',
'message' => $clientId,
];
stream_socket_sendto($conn, json_encode($response) . "\n");
// 输出客户端信息
list ($address, $port) = explode(':', $remoteAddress);
echo "Client #{$clientId} Connected, IP={$address}, PORT={$port}." . PHP_EOL;

while (true) {
// 读取客户端数据
$data = stream_socket_recvfrom($conn, 2048);
if ($data !== '' && $data !== false) {
$data = trim($data);
// 发送内容给客户端
$response = [
'message' => strrev($data),
'time' => date('Y-m-d H:i:s'),
];
stream_socket_sendto($conn, json_encode($response) . "\n");
} else {
fclose($conn);
// 输出客户端断开连接
echo "Client #{$clientId} Disconnect." . PHP_EOL;
break;
}
}
}

// 关闭连接
fclose($socket);
客户端 client.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?php
$socketName = 'tcp://127.0.0.1:7000';

// 建立连接
$socket = stream_socket_client($socketName, $errno, $errstr, 30);
if (!$socket) {
die("stream_socket_client fail:{$errstr} ({$errno})");
}

// 读取服务端数据
$buffer = stream_socket_recvfrom($socket, 2048);
$data = json_decode($buffer, true);
echo "Connection succeeded, ClientId = #{$data['message']}." . PHP_EOL;

// 客户端与服务端交互
interaction($socket);

function interaction($socket)
{
// 获取客户端输入
$content = trim(fgets(STDIN));
// 发送数据给服务端
stream_socket_sendto($socket, $content);
// 读取服务端
$buffer = stream_socket_recvfrom($socket, 2048);
$data = json_decode($buffer, true);
echo "{$data['message']} - {$data['time']}" . PHP_EOL;

interaction($socket);
}

// 关闭连接
fclose($socket);
IO 复用 - 服务端 server.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
<?php
$socketName = 'tcp://127.0.0.1:7000';

// 建立连接
$socket = stream_socket_server($socketName, $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
if (!$socket) {
die("stream_socket_server fail:{$errstr} ({$errno})");
}

// 客户端集合
$clients = [(int)$socket => $socket];

while (true) {
$reads = $clients;
if (stream_select($reads, $writes, $excepts, null) === false) {
continue;
}

foreach ($reads as $read) {
// 新客户端连接
if ($read === $socket) {
// 接受由 stream_socket_server() 创建的套接字连接,返回接受套接之后的资源流
$conn = stream_socket_accept($socket, 0, $remoteAddress);
// 向客户端发送数据, 告知客户端ID
$clientId = (int)$conn;
$response = [
'type' => 'login',
'message' => $clientId,
];
stream_socket_sendto($conn, json_encode($response) . "\n");
// 将客户端 $conn 加入到 $clients
$clients[$clientId] = $conn;
// 移除对 $socket 的监听
unset($reads[array_search($socket, $reads)]);

// 输出客户端信息
list ($address, $port) = explode(':', $remoteAddress);
$content = "Client #{$clientId} Connected, IP={$address}, PORT={$port}." . PHP_EOL;
echo $content;

// 通知所有客户端,不包含当前客户端
/*foreach ($clients as $client) {
if ($client != $socket && $client != $conn) {
stream_socket_sendto($client, $content);
}
}*/
} else {
// 读取客户端数据
$data = stream_socket_recvfrom($read, 2048);
$clientId = (int)$read;

if ($data !== '' && $data !== false) {
$data = trim($data);
// 发送内容给客户端
$response = [
'message' => strrev($data),
'time' => date('Y-m-d H:i:s'),
];
stream_socket_sendto($read, json_encode($response) . "\n");
} else {
// 移除对 $read 的监听
unset($clients[$clientId]);
// 关闭连接
fclose($read);
// 输出客户端断开连接
echo "Client #{$clientId} Disconnect." . PHP_EOL;
}
}
}
}

// 关闭连接
fclose($socket);

相关说明:

  1. socket_create 创建并返回一个套接字(用于监听[listen]和接受[accept]客户端的连接请求,不能用于与客户端之间发送和接收数据)
  2. socket_accept 接受一个客户端的连接请求, 并返回一个新的套接字, 这个套接字与socket_create 返回的用于监听和接受客户端的连接请求的套接字不是同一个套接字,与本次接受的客户端的通信是通过在这个新的套接字上发送和接收数据来完成的
  3. 再次调用 socket_accept 可以接受下一个客户端的连接请求, 并再次返回一个新的套接字(与socket_create 返回的套接字、之前 socket_accept 返回的套接字都不同的新的套接字), 这个新的套接字用于与这次接受的客户端之间的通信
  4. 假设一共有3个客户端连接到服务器端, 那么在服务器端就一共有4个套接字, 第1个是 socket_create 返回的(用于监听的套接字),其余3个是分别调用3次 socket_accept 返回的不同的套接字
  5. 如果已经有客户端连接到服务器端, 不再需要监听和接受更多的客户端连接的时候, 可以关闭由 socket_create 返回的套接字, 而不会影响与客户端之间的通信
  6. 当某个客户端断开连接、或者是与某个客户端的通信完成之后, 服务器端需要关闭用于与该客户端通信的套接字
  7. socket_listen 说明:每个 connect 都是往 listen 队列填装连接, 每一个 accept 操作都会从 listen 队列取连接转成 socket, 等待这个socket close 掉之后再继续从 listen 队列中取连接, 而 listen 队列长度就是 socket_listen 函数的第二个参数 backlog
  8. socket_select 接受三个套接字数组, 分别检查数组中的套接字是否处于可以操作的状态(返回时只保留可操作的套接字), 使用最多的是 $read, 因此以读为例.
    在套接字数组 $read 中最初应保有一个服务端监听套接字, 每当该套接字可读时, 就表示有一个用户发起了连接, 此时你需要对该连接创建一个套接字, 并加入到 $read 数组中;
    当然, 并不只是服务端监听的套接字会变成可读的, 用户套接字也会变成可读的, 此时你就可以读取用户发来的数据了;
    socket_select 只在套接字数组发生了变化时才返回, 也就是说, 一旦执行到 socket_select 的下一条语句, 则必有一个套接字是需要你操作的
  9. stream_socket_server($local_socket, $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN) 相当于 socket_create()、socket_bind()、socket_listen()
  10. stream_socket_accept 第二个参数 $timeout, 设置为 0, 连接将立即超时, 设置为 -1, 无限期等待连接
  11. stream_select 与 socket_select 用法相似
0%