Skip to content

Commit b0e6bc0

Browse files
committed
Initial commit
0 parents  commit b0e6bc0

6 files changed

Lines changed: 253 additions & 0 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/composer.lock
2+
/vendor

composer.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"require": {
3+
"ratchet/rfc6455": "^0.2.3",
4+
"react/http": "dev-master"
5+
},
6+
"autoload": {
7+
"psr-4": {
8+
"Voryx\\WebSocketMiddleware\\": "src/"
9+
}
10+
}
11+
}

example/chat_ws_server.php

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?php
2+
3+
use Psr\Http\Message\ResponseInterface;
4+
use Psr\Http\Message\ServerRequestInterface;
5+
use Ratchet\RFC6455\Messaging\Message;
6+
use React\EventLoop\Factory;
7+
use React\Http\MiddlewareRunner;
8+
use React\Http\Response;
9+
use React\Http\Server;
10+
use React\Stream\ThroughStream;
11+
use Voryx\WebSocketMiddleware\WebSocketConnection;
12+
use Voryx\WebSocketMiddleware\WebSocketMiddleware;
13+
14+
require __DIR__ . '/../vendor/autoload.php';
15+
16+
$loop = Factory::create();
17+
18+
$frontend = file_get_contents(__DIR__ . '/test.html');
19+
20+
$broadcast = new ThroughStream();
21+
22+
$ws = new WebSocketMiddleware(['/ws'], function (WebSocketConnection $conn, ServerRequestInterface $request, ResponseInterface $response) use ($broadcast, $loop) {
23+
static $user = 0;
24+
25+
// do not send on the connection before the react http server has a chance to start listening
26+
// on the streams
27+
$loop->addTimer(0, function () use ($conn, $user, $broadcast) {
28+
$broadcast->write('user ' . $user . ' connected');
29+
$conn->send('Welcome. You are user ' . $user);
30+
});
31+
32+
$broadcastHandler = function ($data) use ($conn) {
33+
$conn->send($data);
34+
};
35+
36+
$broadcast->on('data', $broadcastHandler);
37+
38+
$conn->on('message', function (Message $message) use ($broadcast, $conn, $user) {
39+
$broadcast->write('user ' . $user . ': ' . $message->getPayload());
40+
});
41+
42+
$conn->on('error', function (Throwable $e) use ($broadcast, $user, $broadcastHandler) {
43+
$broadcast->removeListener('data', $broadcastHandler);
44+
$broadcast->write('user ' . $user . ' left because of error: ' . $e->getMessage());
45+
});
46+
47+
$conn->on('close', function () use ($broadcast, $user, $broadcastHandler) {
48+
$broadcast->removeListener('data', $broadcastHandler);
49+
$broadcast->write('user ' . $user . ' closed their connection');
50+
});
51+
52+
$user++;
53+
});
54+
55+
$server = new Server(new MiddlewareRunner([
56+
function (ServerRequestInterface $request, callable $next) use ($broadcast) {
57+
// lets let the people chatting see what requests are happening too.
58+
$broadcast->write('<i>Request: ' . $request->getUri()->getPath() . '</i>');
59+
return $next($request);
60+
},
61+
$ws,
62+
function (ServerRequestInterface $request, callable $next) {
63+
$request = $request->withHeader('Request-Time', time());
64+
return $next($request);
65+
},
66+
function (ServerRequestInterface $request, callable $next) use ($frontend) {
67+
return new Response(200, [], $frontend);
68+
},
69+
]));
70+
71+
$server->listen(new \React\Socket\Server('127.0.0.1:4321', $loop));
72+
73+
$loop->run();

example/test.html

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<!DOCTYPE html>
2+
<html lang="en">
3+
<head>
4+
<meta charset="UTF-8">
5+
<title>Test WebSocket</title>
6+
</head>
7+
<body>
8+
<input id="input">
9+
<div id="messages">
10+
11+
</div>
12+
<script>
13+
var ws = new WebSocket('ws://127.0.0.1:4321/ws');
14+
15+
ws.onopen = x => console.log('opened', x);
16+
17+
var messages = document.getElementById('messages');
18+
ws.onmessage = m => {
19+
var newMsg = document.createElement('div');
20+
newMsg.innerHTML = m.data;
21+
messages.appendChild(newMsg);
22+
console.log('message', m);
23+
};
24+
25+
var input = document.getElementById('input');
26+
input.onkeydown = e => {
27+
if (e.keyCode == 13) {
28+
ws.send(input.value);
29+
input.value = '';
30+
}
31+
};
32+
</script>
33+
34+
</body>
35+
</html>

src/WebSocketConnection.php

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?php
2+
3+
namespace Voryx\WebSocketMiddleware;
4+
5+
use Evenement\EventEmitterInterface;
6+
use Evenement\EventEmitterTrait;
7+
use Ratchet\RFC6455\Messaging\CloseFrameChecker;
8+
use Ratchet\RFC6455\Messaging\Frame;
9+
use Ratchet\RFC6455\Messaging\Message;
10+
use Ratchet\RFC6455\Messaging\MessageBuffer;
11+
use Ratchet\RFC6455\Messaging\MessageInterface;
12+
use React\Stream\DuplexStreamInterface;
13+
14+
class WebSocketConnection implements EventEmitterInterface
15+
{
16+
use EventEmitterTrait;
17+
18+
private $stream;
19+
20+
public function __construct(DuplexStreamInterface $stream)
21+
{
22+
$this->stream = $stream;
23+
24+
$mb = new MessageBuffer(
25+
new CloseFrameChecker(),
26+
function (Message $message) {
27+
echo "Got " . $message->getPayload() . "\n";
28+
$this->emit('message', [$message, $this]);
29+
},
30+
function (Frame $frame) {
31+
switch ($frame->getOpcode()) {
32+
case Frame::OP_PING:
33+
$this->stream->write((new Frame($frame->getPayload(), true, Frame::OP_PONG))->getContents());
34+
return;
35+
case Frame::OP_CLOSE:
36+
$closeCode = 1000;
37+
if ($frame->getPayloadLength() >= 2) {
38+
list($closeCode) = array_merge(unpack('n*', substr($frame->getPayload(), 0, 2)));
39+
}
40+
41+
if ($closeCode >= 2000) {
42+
// emit close code as error
43+
$exception = new \Exception('WebSocket closed with code ' . $closeCode);
44+
$this->emit('error', [$exception, $this]);
45+
return;
46+
}
47+
48+
$this->emit('close', [$closeCode, $this]);
49+
50+
$this->stream->close();
51+
return;
52+
}
53+
},
54+
true
55+
);
56+
57+
$stream->on('data', [$mb, 'onData']);
58+
}
59+
60+
public function send($data)
61+
{
62+
if (!($data instanceof MessageInterface)) {
63+
$data = new Frame($data, true, Frame::OP_TEXT);
64+
}
65+
66+
$this->stream->write($data->getContents());
67+
}
68+
69+
public function close()
70+
{
71+
$this->stream->end((new Frame(pack('n', 1000), true, Frame::OP_CLOSE))->getContents());
72+
}
73+
}

src/WebSocketMiddleware.php

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
3+
namespace Voryx\WebSocketMiddleware;
4+
5+
use Psr\Http\Message\ServerRequestInterface;
6+
use Ratchet\RFC6455\Handshake\RequestVerifier;
7+
use Ratchet\RFC6455\Handshake\ServerNegotiator;
8+
use React\Http\Response;
9+
use React\Stream\CompositeStream;
10+
use React\Stream\ThroughStream;
11+
12+
final class WebSocketMiddleware
13+
{
14+
private $paths;
15+
private $connectionHandler = null;
16+
17+
public function __construct(array $paths = [], callable $connectionHandler = null)
18+
{
19+
$this->paths = $paths;
20+
$this->connectionHandler = $connectionHandler ?: function () {};
21+
}
22+
23+
public function __invoke(ServerRequestInterface $request, callable $next)
24+
{
25+
// check path at some point - for now we just go go ws
26+
if (count($this->paths) > 0) {
27+
if (!in_array($request->getUri()->getPath(), $this->paths)) {
28+
return $next($request);
29+
}
30+
}
31+
32+
$negotiator = new ServerNegotiator(new RequestVerifier());
33+
34+
$response = $negotiator->handshake($request);
35+
36+
if ($response->getStatusCode() != '101') {
37+
// TODO: this should return an error or something not continue the chain
38+
return $next($request);
39+
}
40+
41+
$inStream = new ThroughStream();
42+
$outStream = new ThroughStream();
43+
44+
$response = new Response(
45+
$response->getStatusCode(),
46+
$response->getHeaders(),
47+
new CompositeStream(
48+
$outStream,
49+
$inStream
50+
)
51+
);
52+
53+
$conn = new WebSocketConnection(new CompositeStream($inStream, $outStream));
54+
55+
call_user_func($this->connectionHandler, $conn, $request, $response);
56+
57+
return $response;
58+
}
59+
}

0 commit comments

Comments
 (0)