Содержание
Я изучал библиотеки phpdaemon и ratchet, они достаточно монструозны (причём используя ratchet для отправки сообщения конкретному пользователю рекомендовано дополнительно использовать wamp). Мне не совсем было понятно для чего использовать таких монстров, которые требуют установку других монстров. Почитав исходники этих, а также других библиотек, я разобрался как всё устроено и мне захотелось написать простой вебсокет-сервер на php самостоятельно. Это помогло мне закрепить изученный материал и наткнуться на некоторые подводные камни, о которых я не имел представления.
Так я решил написать необходимый для меня функционал с нуля.
Получившийся код и ссылка на демонстрационный чат в конце статьи.
Поставленные цели:
1) разобраться с серверными сокетами в php
2) разобраться с протоколом вебсокетов
3) написать с нуля простой сервер вебсокетов
1) Серверные сокеты в php
До этого момента я имел смутные представления о серверных сокетах. Почитав исходники нескольких библиотек для работы с вебсокетами я столкнулся с двумя схемами их реализаций:
используя расширение php «socket»:
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);//создаём сокет
socket_bind($socket, '127.0.0.1', 8000);//привязываем его к указанным ip и порту
socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1);//разрешаем использовать один порт для нескольких соединений
socket_listen($socket);//слушаем сокет
или используя расширение php «stream»:
$socket = stream_socket_server("tcp://127.0.0.1:8000", $errno, $errstr);
Я предпочёл второй вариант ввиду его краткости.
Итак, мы создали серверный сокет и теперь хотим обрабатывать новые соединения к нему, для этого опять же есть два варианта
while ($connect = stream_socket_accept($socket, -1)) {//ожидаем новое соединение (без таймаута)
...обрабатываем $connect
}
#!/usr/bin/env php
<?php
$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);
if (!$socket) {
die("$errstr ($errno)\n");
}
while ($connect = stream_socket_accept($socket, -1)) {
fwrite($connect, "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\nПривет");
fclose($connect);
}
fclose($socket);
или с использованием stream_select
$connects = array();
while (true) {
//формируем массив прослушиваемых сокетов:
$read = $connects;
$read[] = $socket;
$write = $except = null;
if (!stream_select($read, $write, $except, null)) {//ожидаем сокеты доступные для чтения (без таймаута)
break;
}
if (in_array($socket, $read)) {//есть новое соединение
$connect = stream_socket_accept($socket, -1);//принимаем новое соединение
$connects[] = $connect;//добавляем его в список необходимых для обработки
unset($read[ array_search($socket, $read) ]);
}
foreach($read as $connect) {//обрабатываем все соединения
...обрабатываем $connect
unset($connects[ array_search($connect, $connects) ]);
}
}
#!/usr/bin/env php
<?php
$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);
if (!$socket) {
die("$errstr ($errno)\n");
}
$connects = array();
while (true) {
//формируем массив прослушиваемых сокетов:
$read = $connects;
$read []= $socket;
$write = $except = null;
if (!stream_select($read, $write, $except, null)) {//ожидаем сокеты доступные для чтения (без таймаута)
break;
}
if (in_array($socket, $read)) {//есть новое соединение
$connect = stream_socket_accept($socket, -1);//принимаем новое соединение
$connects[] = $connect;//добавляем его в список необходимых для обработки
unset($read[ array_search($socket, $read) ]);
}
foreach($read as $connect) {//обрабатываем все соединения
$headers = '';
while ($buffer = rtrim(fgets($connect))) {
$headers .= $buffer;
}
fwrite($connect, "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\n\r\nПривет");
fclose($connect);
unset($connects[ array_search($connect, $connects) ]);
}
}
fclose($server);
Т.к. нам в дальнейшем нужно будет одновременно обрабатывать и серверный сокет на предмет новых соединений, и уже существующие подключения, на предмет новых сообщений, то остановимся на втором варианте.
2) Протокол вебсокетов
В этой статье хорошо описан протокол взаимодействия.
Нас интересует два момента:
«Рукопожатие» или handshake:
Считываем значение Sec-WebSocket-Key из пришедшего заголовка от клиента, рассчитываем на его основе Sec-WebSocket-Accept и отправляем итоговый ответ:
$SecWebSocketAccept = base64_encode(pack('H*', sha1($SecWebSocketKey . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
$response = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
"Upgrade: websocket\r\n" .
"Connection: Upgrade\r\n" .
"Sec-WebSocket-Accept:$SecWebSocketAccept\r\n\r\n";
function handshake($connect) {
$info = array();
$line = fgets($connect);
$header = explode(' ', $line);
$info['method'] = $header[0];
$info['uri'] = $header[1];
//считываем заголовки из соединения
while ($line = rtrim(fgets($connect))) {
if (preg_match('/\A(\S+): (.*)\z/', $line, $matches)) {
$info[$matches[1]] = $matches[2];
} else {
break;
}
}
$address = explode(':', stream_socket_get_name($connect, true)); //получаем адрес клиента
$info['ip'] = $address[0];
$info['port'] = $address[1];
if (empty($info['Sec-WebSocket-Key'])) {
return false;
}
//отправляем заголовок согласно протоколу вебсокета
$SecWebSocketAccept = base64_encode(pack('H*', sha1($info['Sec-WebSocket-Key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
$upgrade = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
"Upgrade: websocket\r\n" .
"Connection: Upgrade\r\n" .
"Sec-WebSocket-Accept:$SecWebSocketAccept\r\n\r\n";
fwrite($connect, $upgrade);
return $info;
}
обмен сообщениями
После получения данных из вебсокета нам нужно их раскодировать, а при отправке закодировать.
Всё в той же статье хорошо описано кодирование сообщений, но нам по-сути нужны только две функции: decode и encode.
function decode($data)
{
$unmaskedPayload = '';
$decodedData = array();
// estimate frame type:
$firstByteBinary = sprintf('%08b', ord($data[0]));
$secondByteBinary = sprintf('%08b', ord($data[1]));
$opcode = bindec(substr($firstByteBinary, 4, 4));
$isMasked = ($secondByteBinary[0] == '1') ? true : false;
$payloadLength = ord($data[1]) & 127;
// unmasked frame is received:
if (!$isMasked) {
return array('type' => '', 'payload' => '', 'error' => 'protocol error (1002)');
}
switch ($opcode) {
// text frame:
case 1:
$decodedData['type'] = 'text';
break;
case 2:
$decodedData['type'] = 'binary';
break;
// connection close frame:
case 8:
$decodedData['type'] = 'close';
break;
// ping frame:
case 9:
$decodedData['type'] = 'ping';
break;
// pong frame:
case 10:
$decodedData['type'] = 'pong';
break;
default:
return array('type' => '', 'payload' => '', 'error' => 'unknown opcode (1003)');
}
if ($payloadLength === 126) {
$mask = substr($data, 4, 4);
$payloadOffset = 8;
$dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset;
} elseif ($payloadLength === 127) {
$mask = substr($data, 10, 4);
$payloadOffset = 14;
$tmp = '';
for ($i = 0; $i < 8; $i++) {
$tmp .= sprintf('%08b', ord($data[$i + 2]));
}
$dataLength = bindec($tmp) + $payloadOffset;
unset($tmp);
} else {
$mask = substr($data, 2, 4);
$payloadOffset = 6;
$dataLength = $payloadLength + $payloadOffset;
}
/**
* We have to check for large frames here. socket_recv cuts at 1024 bytes
* so if websocket-frame is > 1024 bytes we have to wait until whole
* data is transferd.
*/
if (strlen($data) < $dataLength) {
return false;
}
if ($isMasked) {
for ($i = $payloadOffset; $i < $dataLength; $i++) {
$j = $i - $payloadOffset;
if (isset($data[$i])) {
$unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
}
}
$decodedData['payload'] = $unmaskedPayload;
} else {
$payloadOffset = $payloadOffset - 4;
$decodedData['payload'] = substr($data, $payloadOffset);
}
return $decodedData;
}
function encode($payload, $type = 'text', $masked = false)
{
$frameHead = array();
$payloadLength = strlen($payload);
switch ($type) {
case 'text':
// first byte indicates FIN, Text-Frame (10000001):
$frameHead[0] = 129;
break;
case 'close':
// first byte indicates FIN, Close Frame(10001000):
$frameHead[0] = 136;
break;
case 'ping':
// first byte indicates FIN, Ping frame (10001001):
$frameHead[0] = 137;
break;
case 'pong':
// first byte indicates FIN, Pong frame (10001010):
$frameHead[0] = 138;
break;
}
// set mask and payload length (using 1, 3 or 9 bytes)
if ($payloadLength > 65535) {
$payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
$frameHead[1] = ($masked === true) ? 255 : 127;
for ($i = 0; $i < 8; $i++) {
$frameHead[$i + 2] = bindec($payloadLengthBin[$i]);
}
// most significant bit MUST be 0
if ($frameHead[2] > 127) {
return array('type' => '', 'payload' => '', 'error' => 'frame too large (1004)');
}
} elseif ($payloadLength > 125) {
$payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
$frameHead[1] = ($masked === true) ? 254 : 126;
$frameHead[2] = bindec($payloadLengthBin[0]);
$frameHead[3] = bindec($payloadLengthBin[1]);
} else {
$frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength;
}
// convert frame-head to string:
foreach (array_keys($frameHead) as $i) {
$frameHead[$i] = chr($frameHead[$i]);
}
if ($masked === true) {
// generate a random mask:
$mask = array();
for ($i = 0; $i < 4; $i++) {
$mask[$i] = chr(rand(0, 255));
}
$frameHead = array_merge($frameHead, $mask);
}
$frame = implode('', $frameHead);
// append payload to frame:
for ($i = 0; $i < $payloadLength; $i++) {
$frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
}
return $frame;
}
Простой сервер вебсокетов
Итак, у нас есть вся необходимая информация.
Используя код простого http сервера из первой части, а также функции handshake, decode и encode из второй мы можем собрать простой сервер вебсокетов.
#!/usr/bin/env php
<?php
$socket = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr);
if (!$socket) {
die("$errstr ($errno)\n");
}
$connects = array();
while (true) {
//формируем массив прослушиваемых сокетов:
$read = $connects;
$read []= $socket;
$write = $except = null;
if (!stream_select($read, $write, $except, null)) {//ожидаем сокеты доступные для чтения (без таймаута)
break;
}
if (in_array($socket, $read)) {//есть новое соединение
//принимаем новое соединение и производим рукопожатие:
if (($connect = stream_socket_accept($socket, -1)) && $info = handshake($connect)) {
$connects[] = $connect;//добавляем его в список необходимых для обработки
onOpen($connect, $info);//вызываем пользовательский сценарий
}
unset($read[ array_search($socket, $read) ]);
}
foreach($read as $connect) {//обрабатываем все соединения
$data = fread($connect, 100000);
if (!$data) { //соединение было закрыто
fclose($connect);
unset($connects[ array_search($connect, $connects) ]);
onClose($connect);//вызываем пользовательский сценарий
continue;
}
onMessage($connect, $data);//вызываем пользовательский сценарий
}
}
fclose($server);
function handshake($connect) {
$info = array();
$line = fgets($connect);
$header = explode(' ', $line);
$info['method'] = $header[0];
$info['uri'] = $header[1];
//считываем заголовки из соединения
while ($line = rtrim(fgets($connect))) {
if (preg_match('/\A(\S+): (.*)\z/', $line, $matches)) {
$info[$matches[1]] = $matches[2];
} else {
break;
}
}
$address = explode(':', stream_socket_get_name($connect, true)); //получаем адрес клиента
$info['ip'] = $address[0];
$info['port'] = $address[1];
if (empty($info['Sec-WebSocket-Key'])) {
return false;
}
//отправляем заголовок согласно протоколу вебсокета
$SecWebSocketAccept = base64_encode(pack('H*', sha1($info['Sec-WebSocket-Key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
$upgrade = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
"Upgrade: websocket\r\n" .
"Connection: Upgrade\r\n" .
"Sec-WebSocket-Accept:$SecWebSocketAccept\r\n\r\n";
fwrite($connect, $upgrade);
return $info;
}
function encode($payload, $type = 'text', $masked = false)
{
$frameHead = array();
$payloadLength = strlen($payload);
switch ($type) {
case 'text':
// first byte indicates FIN, Text-Frame (10000001):
$frameHead[0] = 129;
break;
case 'close':
// first byte indicates FIN, Close Frame(10001000):
$frameHead[0] = 136;
break;
case 'ping':
// first byte indicates FIN, Ping frame (10001001):
$frameHead[0] = 137;
break;
case 'pong':
// first byte indicates FIN, Pong frame (10001010):
$frameHead[0] = 138;
break;
}
// set mask and payload length (using 1, 3 or 9 bytes)
if ($payloadLength > 65535) {
$payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
$frameHead[1] = ($masked === true) ? 255 : 127;
for ($i = 0; $i < 8; $i++) {
$frameHead[$i + 2] = bindec($payloadLengthBin[$i]);
}
// most significant bit MUST be 0
if ($frameHead[2] > 127) {
return array('type' => '', 'payload' => '', 'error' => 'frame too large (1004)');
}
} elseif ($payloadLength > 125) {
$payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
$frameHead[1] = ($masked === true) ? 254 : 126;
$frameHead[2] = bindec($payloadLengthBin[0]);
$frameHead[3] = bindec($payloadLengthBin[1]);
} else {
$frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength;
}
// convert frame-head to string:
foreach (array_keys($frameHead) as $i) {
$frameHead[$i] = chr($frameHead[$i]);
}
if ($masked === true) {
// generate a random mask:
$mask = array();
for ($i = 0; $i < 4; $i++) {
$mask[$i] = chr(rand(0, 255));
}
$frameHead = array_merge($frameHead, $mask);
}
$frame = implode('', $frameHead);
// append payload to frame:
for ($i = 0; $i < $payloadLength; $i++) {
$frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
}
return $frame;
}
function decode($data)
{
$unmaskedPayload = '';
$decodedData = array();
// estimate frame type:
$firstByteBinary = sprintf('%08b', ord($data[0]));
$secondByteBinary = sprintf('%08b', ord($data[1]));
$opcode = bindec(substr($firstByteBinary, 4, 4));
$isMasked = ($secondByteBinary[0] == '1') ? true : false;
$payloadLength = ord($data[1]) & 127;
// unmasked frame is received:
if (!$isMasked) {
return array('type' => '', 'payload' => '', 'error' => 'protocol error (1002)');
}
switch ($opcode) {
// text frame:
case 1:
$decodedData['type'] = 'text';
break;
case 2:
$decodedData['type'] = 'binary';
break;
// connection close frame:
case 8:
$decodedData['type'] = 'close';
break;
// ping frame:
case 9:
$decodedData['type'] = 'ping';
break;
// pong frame:
case 10:
$decodedData['type'] = 'pong';
break;
default:
return array('type' => '', 'payload' => '', 'error' => 'unknown opcode (1003)');
}
if ($payloadLength === 126) {
$mask = substr($data, 4, 4);
$payloadOffset = 8;
$dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset;
} elseif ($payloadLength === 127) {
$mask = substr($data, 10, 4);
$payloadOffset = 14;
$tmp = '';
for ($i = 0; $i < 8; $i++) {
$tmp .= sprintf('%08b', ord($data[$i + 2]));
}
$dataLength = bindec($tmp) + $payloadOffset;
unset($tmp);
} else {
$mask = substr($data, 2, 4);
$payloadOffset = 6;
$dataLength = $payloadLength + $payloadOffset;
}
/**
* We have to check for large frames here. socket_recv cuts at 1024 bytes
* so if websocket-frame is > 1024 bytes we have to wait until whole
* data is transferd.
*/
if (strlen($data) < $dataLength) {
return false;
}
if ($isMasked) {
for ($i = $payloadOffset; $i < $dataLength; $i++) {
$j = $i - $payloadOffset;
if (isset($data[$i])) {
$unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
}
}
$decodedData['payload'] = $unmaskedPayload;
} else {
$payloadOffset = $payloadOffset - 4;
$decodedData['payload'] = substr($data, $payloadOffset);
}
return $decodedData;
}
//пользовательские сценарии:
function onOpen($connect, $info) {
echo "open\n";
fwrite($connect, encode('Привет'));
}
function onClose($connect) {
echo "close\n";
}
function onMessage($connect, $data) {
echo decode($data)['payload'] . "\n";
}
В приведённом примере можно менять пользовательские сценарии onOpen, onClose и onMessage для реализации необходимого функционала.
Поставленные цели достигнуты.
Если этот материал вам покажется интересным, то в следующей статье я опишу как можно запускать несколько процессов для обработки соединений (один мастер и несколько воркеров), межпроцессное взаимодействие, интеграцию с вашим фреймворком на примере компонента yii.
демонстрационный чат с вышеописанными функциями
#!/usr/bin/env php
<?php
class WebsocketServer
{
public function __construct($config) {
$this->config = $config;
}
public function start() {
//открываем серверный сокет
$server = stream_socket_server("tcp://{$this->config['host']}:{$this->config['port']}", $errorNumber, $errorString);
if (!$server) {
die("error: stream_socket_server: $errorString ($errorNumber)\r\n");
}
list($pid, $master, $workers) = $this->spawnWorkers();//создаём дочерние процессы
if ($pid) {//мастер
fclose($server);//мастер не будет обрабатывать входящие соединения на основном сокете
$WebsocketMaster = new WebsocketMaster($workers);//мастер будет пересылать сообщения между воркерами
$WebsocketMaster->start();
} else {//воркер
$WebsocketHandler = new WebsocketHandler($server, $master);
$WebsocketHandler->start();
}
}
protected function spawnWorkers() {
$master = null;
$workers = array();
$i = 0;
while ($i < $this->config['workers']) {
$i++;
//создаём парные сокеты, через них будут связываться мастер и воркер
$pair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
$pid = pcntl_fork();//создаём форк
if ($pid == -1) {
die("error: pcntl_fork\r\n");
} elseif ($pid) { //мастер
fclose($pair[0]);
$workers[$pid] = $pair[1];//один из пары будет в мастере
} else { //воркер
fclose($pair[1]);
$master = $pair[0];//второй в воркере
break;
}
}
return array($pid, $master, $workers);
}
}
class WebsocketMaster
{
protected $workers = array();
protected $clients = array();
public function __construct($workers) {
$this->clients = $this->workers = $workers;
}
public function start() {
while (true) {
//подготавливаем массив всех сокетов, которые нужно обработать
$read = $this->clients;
//$read[] = $service;
stream_select($read, $write, $except, null);//обновляем массив сокетов, которые можно обработать
if ($read) {//пришли данные от подключенных клиентов
foreach ($read as $client) {
$data = fread($client, 1000);
if (!$data) { //соединение было закрыто
unset($this->clients[intval($client)]);
@fclose($client);
continue;
}
foreach ($this->workers as $worker) {//пересылаем данные во все воркеры
if ($worker !== $client) {
fwrite($worker, $data);
}
}
}
}
}
}
}
abstract class WebsocketWorker
{
protected $clients = array();
protected $server;
protected $master;
protected $pid;
protected $handshakes = array();
public function __construct($server, $master) {
$this->server = $server;
$this->master = $master;
$this->pid = posix_getpid();
}
public function start() {
while (true) {
//подготавливаем массив всех сокетов, которые нужно обработать
$read = $this->clients;
$read[] = $this->server;
$read[] = $this->master;
$write = array();
if ($this->handshakes) {
foreach ($this->handshakes as $clientId => $clientInfo) {
if ($clientInfo) {
$write[] = $this->clients[$clientId];
}
}
}
stream_select($read, $write, $except, null);//обновляем массив сокетов, которые можно обработать
if (in_array($this->server, $read)) { //на серверный сокет пришёл запрос от нового клиента
//подключаемся к нему и делаем рукопожатие, согласно протоколу вебсокета
if ($client = stream_socket_accept($this->server, -1)) {
$this->clients[intval($client)] = $client;
$this->handshakes[intval($client)] = array();//отмечаем, что нужно сделать рукопожатие
}
//удаляем сервеный сокет из массива, чтобы не обработать его в этом цикле ещё раз
unset($read[array_search($this->server, $read)]);
}
if (in_array($this->master, $read)) { //пришли данные от мастера
$data = fread($this->master, 1000);
$this->onSend($data);//вызываем пользовательский сценарий
//удаляем мастера из массива, чтобы не обработать его в этом цикле ещё раз
unset($read[array_search($this->master, $read)]);
}
if ($read) {//пришли данные от подключенных клиентов
foreach ($read as $client) {
if (isset($this->handshakes[intval($client)])) {
if ($this->handshakes[intval($client)]) {//если уже было получено рукопожатие от клиента
continue;//то до отправки ответа от сервера читать здесь пока ничего не надо
}
if (!$this->handshake($client)) {
unset($this->clients[intval($client)]);
unset($this->handshakes[intval($client)]);
@fclose($client);
}
} else {
$data = fread($client, 1000);
if (!$data) { //соединение было закрыто
unset($this->clients[intval($client)]);
unset($this->handshakes[intval($client)]);
@fclose($client);
$this->onClose($client);//вызываем пользовательский сценарий
continue;
}
$this->onMessage($client, $data);//вызываем пользовательский сценарий
}
}
}
if ($write) {
foreach ($write as $client) {
if (!$this->handshakes[intval($client)]) {//если ещё не было получено рукопожатие от клиента
continue;//то отвечать ему рукопожатием ещё рано
}
$info = $this->handshake($client);
$this->onOpen($client, $info);//вызываем пользовательский сценарий
}
}
}
}
protected function handshake($client) {
$info = $this->handshakes[intval($client)];
if (!$info) {
$info = array();
$line = fgets($client);
if (!$line) {
return false;
}
$header = explode(' ', $line);
$info['method'] = $header[0];
$info['uri'] = $header[1];
//считываем загаловки из соединения
while ($line = rtrim(fgets($client))) {
if (preg_match('/\A(\S+): (.*)\z/', $line, $matches)) {
$info[$matches[1]] = $matches[2];
} else {
break;
}
}
$address = explode(':', stream_socket_get_name($client, true)); //получаем адрес клиента
$info['ip'] = $address[0];
$info['port'] = $address[1];
if (empty($info['Sec-WebSocket-Key'])) {
return false;
}
$this->handshakes[intval($client)] = $info;
} else {
//отправляем заголовок согласно протоколу вебсокета
$SecWebSocketAccept = base64_encode(pack('H*', sha1($info['Sec-WebSocket-Key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
$upgrade = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
"Upgrade: websocket\r\n" .
"Connection: Upgrade\r\n" .
"Sec-WebSocket-Accept:$SecWebSocketAccept\r\n\r\n";
fwrite($client, $upgrade);
unset($this->handshakes[intval($client)]);
}
return $info;
}
protected function encode($payload, $type = 'text', $masked = false)
{
$frameHead = array();
$payloadLength = strlen($payload);
switch ($type) {
case 'text':
// first byte indicates FIN, Text-Frame (10000001):
$frameHead[0] = 129;
break;
case 'close':
// first byte indicates FIN, Close Frame(10001000):
$frameHead[0] = 136;
break;
case 'ping':
// first byte indicates FIN, Ping frame (10001001):
$frameHead[0] = 137;
break;
case 'pong':
// first byte indicates FIN, Pong frame (10001010):
$frameHead[0] = 138;
break;
}
// set mask and payload length (using 1, 3 or 9 bytes)
if ($payloadLength > 65535) {
$payloadLengthBin = str_split(sprintf('%064b', $payloadLength), 8);
$frameHead[1] = ($masked === true) ? 255 : 127;
for ($i = 0; $i < 8; $i++) {
$frameHead[$i + 2] = bindec($payloadLengthBin[$i]);
}
// most significant bit MUST be 0
if ($frameHead[2] > 127) {
return array('type' => '', 'payload' => '', 'error' => 'frame too large (1004)');
}
} elseif ($payloadLength > 125) {
$payloadLengthBin = str_split(sprintf('%016b', $payloadLength), 8);
$frameHead[1] = ($masked === true) ? 254 : 126;
$frameHead[2] = bindec($payloadLengthBin[0]);
$frameHead[3] = bindec($payloadLengthBin[1]);
} else {
$frameHead[1] = ($masked === true) ? $payloadLength + 128 : $payloadLength;
}
// convert frame-head to string:
foreach (array_keys($frameHead) as $i) {
$frameHead[$i] = chr($frameHead[$i]);
}
if ($masked === true) {
// generate a random mask:
$mask = array();
for ($i = 0; $i < 4; $i++) {
$mask[$i] = chr(rand(0, 255));
}
$frameHead = array_merge($frameHead, $mask);
}
$frame = implode('', $frameHead);
// append payload to frame:
for ($i = 0; $i < $payloadLength; $i++) {
$frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
}
return $frame;
}
protected function decode($data)
{
$unmaskedPayload = '';
$decodedData = array();
// estimate frame type:
$firstByteBinary = sprintf('%08b', ord($data[0]));
$secondByteBinary = sprintf('%08b', ord($data[1]));
$opcode = bindec(substr($firstByteBinary, 4, 4));
$isMasked = ($secondByteBinary[0] == '1') ? true : false;
$payloadLength = ord($data[1]) & 127;
// unmasked frame is received:
if (!$isMasked) {
return array('type' => '', 'payload' => '', 'error' => 'protocol error (1002)');
}
switch ($opcode) {
// text frame:
case 1:
$decodedData['type'] = 'text';
break;
case 2:
$decodedData['type'] = 'binary';
break;
// connection close frame:
case 8:
$decodedData['type'] = 'close';
break;
// ping frame:
case 9:
$decodedData['type'] = 'ping';
break;
// pong frame:
case 10:
$decodedData['type'] = 'pong';
break;
default:
return array('type' => '', 'payload' => '', 'error' => 'unknown opcode (1003)');
}
if ($payloadLength === 126) {
$mask = substr($data, 4, 4);
$payloadOffset = 8;
$dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset;
} elseif ($payloadLength === 127) {
$mask = substr($data, 10, 4);
$payloadOffset = 14;
$tmp = '';
for ($i = 0; $i < 8; $i++) {
$tmp .= sprintf('%08b', ord($data[$i + 2]));
}
$dataLength = bindec($tmp) + $payloadOffset;
unset($tmp);
} else {
$mask = substr($data, 2, 4);
$payloadOffset = 6;
$dataLength = $payloadLength + $payloadOffset;
}
/**
* We have to check for large frames here. socket_recv cuts at 1024 bytes
* so if websocket-frame is > 1024 bytes we have to wait until whole
* data is transferd.
*/
if (strlen($data) < $dataLength) {
return false;
}
if ($isMasked) {
for ($i = $payloadOffset; $i < $dataLength; $i++) {
$j = $i - $payloadOffset;
if (isset($data[$i])) {
$unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
}
}
$decodedData['payload'] = $unmaskedPayload;
} else {
$payloadOffset = $payloadOffset - 4;
$decodedData['payload'] = substr($data, $payloadOffset);
}
return $decodedData;
}
abstract protected function onOpen($client, $info);
abstract protected function onClose($client);
abstract protected function onMessage($client, $data);
abstract protected function onSend($data);
abstract protected function send($data);
}
//пример реализации чата
class WebsocketHandler extends WebsocketWorker
{
protected function onOpen($client, $info) {//вызывается при соединении с новым клиентом
}
protected function onClose($client) {//вызывается при закрытии соединения клиентом
}
protected function onMessage($client, $data) {//вызывается при получении сообщения от клиента
$data = $this->decode($data);
if (!$data['payload']) {
return;
}
if (!mb_check_encoding($data['payload'], 'utf-8')) {
return;
}
//var_export($data);
//шлем всем сообщение, о том, что пишет один из клиентов
$message = 'пользователь #' . intval($client) . ' (' . $this->pid . '): ' . strip_tags($data['payload']);
$this->send($message);
$this->sendHelper($message);
}
protected function onSend($data) {//вызывается при получении сообщения от мастера
$this->sendHelper($data);
}
protected function send($message) {//отправляем сообщение на мастер, чтобы он разослал его на все воркеры
@fwrite($this->master, $message);
}
private function sendHelper($data) {
$data = $this->encode($data);
$write = $this->clients;
if (stream_select($read, $write, $except, 0)) {
foreach ($write as $client) {
@fwrite($client, $data);
}
}
}
}
$config = array(
'host' => '0.0.0.0',
'port' => 8000,
'workers' => 1,
);
$WebsocketServer = new WebsocketServer($config);
$WebsocketServer->start();