Skip to content

Commit 00245e5

Browse files
author
webman-php
authored
Commit
1 parent 50e9573 commit 00245e5

4 files changed

Lines changed: 225 additions & 0 deletions

File tree

composer.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"name": "webman/stomp",
3+
"description": "Stomp client written in PHP for webman.",
4+
"require": {
5+
"workerman/stomp": "^1.0"
6+
},
7+
"autoload": {
8+
"psr-4": {"Webman\\Stomp\\": "./src"}
9+
}
10+
}

src/Client.php

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
<?php
2+
/**
3+
* This file is part of webman.
4+
*
5+
* Licensed under The MIT License
6+
* For full copyright and license information, please see the MIT-LICENSE.txt
7+
* Redistributions of files must retain the above copyright notice.
8+
*
9+
* @author walkor<walkor@workerman.net>
10+
* @copyright walkor<walkor@workerman.net>
11+
* @link http://www.workerman.net/
12+
* @license http://www.opensource.org/licenses/mit-license.php MIT License
13+
*/
14+
namespace Webman\Stomp;
15+
16+
use Workerman\Stomp\Client as StompClient;
17+
18+
/**
19+
* Class Stomp
20+
* @package support
21+
*
22+
* Strings methods
23+
* @method static void send($queue, $body, array $headers = [])
24+
*/
25+
class Client
26+
{
27+
28+
/**
29+
* @var Client[]
30+
*/
31+
protected static $_connections = null;
32+
33+
/**
34+
* @var array
35+
*/
36+
protected $_queue = [];
37+
38+
/**
39+
* @var StompClient
40+
*/
41+
protected $_client;
42+
43+
/**
44+
* Client constructor.
45+
* @param $host
46+
* @param array $options
47+
*/
48+
public function __construct($host, $options = [])
49+
{
50+
$this->_client = new StompClient($host, $options);
51+
$this->_client->onConnect = function ($client) {
52+
foreach ($this->_queue as $item) {
53+
$client->{$item[0]}(... $item[1]);
54+
}
55+
$this->_queue = [];
56+
};
57+
$this->_client->connect();
58+
}
59+
60+
/**
61+
* @param $name
62+
* @param $arguments
63+
*
64+
* @return mixed
65+
*/
66+
public function __call($name, $arguments)
67+
{
68+
if ($this->_client->getState() != StompClient::STATE_ESTABLISHED) {
69+
if (in_array($name, [
70+
'subscribe',
71+
'subscribeWithAck',
72+
'unsubscribe',
73+
'send',
74+
'ack',
75+
'nack',
76+
'disconnect'])) {
77+
$this->_queue[] = [$name, $arguments];
78+
return null;
79+
}
80+
}
81+
return $this->_client->{$name}(...$arguments);
82+
}
83+
84+
/**
85+
* @param string $name
86+
* @return Client
87+
*/
88+
public static function connection($name = 'default') {
89+
if (!isset(static::$_connections[$name])) {
90+
$config = config('stomp', []);
91+
if (!isset($config[$name])) {
92+
throw new \RuntimeException("RedisQueue connection $name not found");
93+
}
94+
$host = $config[$name]['host'];
95+
$options = $config[$name]['options'];
96+
$client = new static($host, $options);
97+
static::$_connections[$name] = $client;
98+
}
99+
return static::$_connections[$name];
100+
}
101+
102+
/**
103+
* @param $name
104+
* @param $arguments
105+
* @return mixed
106+
*/
107+
public static function __callStatic($name, $arguments)
108+
{
109+
return static::connection('default')->{$name}(... $arguments);
110+
}
111+
}

src/Consumer.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
/**
3+
* This file is part of webman.
4+
*
5+
* Licensed under The MIT License
6+
* For full copyright and license information, please see the MIT-LICENSE.txt
7+
* Redistributions of files must retain the above copyright notice.
8+
*
9+
* @author walkor<walkor@workerman.net>
10+
* @copyright walkor<walkor@workerman.net>
11+
* @link http://www.workerman.net/
12+
* @license http://www.opensource.org/licenses/mit-license.php MIT License
13+
*/
14+
15+
namespace Webman\Stomp;
16+
17+
18+
/**
19+
* Interface Consumer
20+
* @package Webman\Stomp
21+
*/
22+
interface Consumer
23+
{
24+
public function consume($data);
25+
}

src/Process/Consumer.php

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?php
2+
/**
3+
* This file is part of webman.
4+
*
5+
* Licensed under The MIT License
6+
* For full copyright and license information, please see the MIT-LICENSE.txt
7+
* Redistributions of files must retain the above copyright notice.
8+
*
9+
* @author walkor<walkor@workerman.net>
10+
* @copyright walkor<walkor@workerman.net>
11+
* @link http://www.workerman.net/
12+
* @license http://www.opensource.org/licenses/mit-license.php MIT License
13+
*/
14+
15+
namespace Webman\Stomp\Process;
16+
17+
use support\bootstrap\Container;
18+
use Workerman\Stomp\Client as StompClient;
19+
use Webman\Stomp\Client;
20+
21+
/**
22+
* Class StompConsumer
23+
* @package process
24+
*/
25+
class Consumer
26+
{
27+
/**
28+
* @var string
29+
*/
30+
protected $_consumerDir = '';
31+
32+
/**
33+
* StompConsumer constructor.
34+
* @param string $consumer_dir
35+
*/
36+
public function __construct($consumer_dir = '')
37+
{
38+
$this->_consumerDir = $consumer_dir;
39+
}
40+
41+
/**
42+
* onWorkerStart.
43+
*/
44+
public function onWorkerStart()
45+
{
46+
$dir_iterator = new \RecursiveDirectoryIterator($this->_consumerDir);
47+
$iterator = new \RecursiveIteratorIterator($dir_iterator);
48+
foreach ($iterator as $file) {
49+
if (is_dir($file)) {
50+
continue;
51+
}
52+
$fileinfo = new \SplFileInfo($file);
53+
$ext = $fileinfo->getExtension();
54+
if ($ext === 'php') {
55+
$class = str_replace('/', "\\", substr(substr($file, strlen(base_path())), 0, -4));
56+
if (!is_a($class, 'Webman\Stomp\Consumer', true)) {
57+
continue;
58+
}
59+
$consumer = Container::get($class);
60+
$connection_name = $consumer->connection ?? 'default';
61+
$queue = $consumer->queue;
62+
$ack = $consumer->ack ?? 'auto';
63+
$connection = Client::connection($connection_name);
64+
$cb = function ($client, $package, $ack) use ($consumer) {
65+
\call_user_func([$consumer, 'consume'], $package['body'], $ack, $client);
66+
};
67+
$connection->subscribe($queue, $cb, ['ack' => $ack]);
68+
/*if ($connection->getState() == StompClient::STATE_ESTABLISHED) {
69+
$connection->subscribe($queue, $cb, ['ack' => $ack]);
70+
} else {
71+
$connection->onConnect = function (Client $connection) use ($queue, $ack, $cb) {
72+
$connection->subscribe($queue, $cb, ['ack' => $ack]);
73+
};
74+
}*/
75+
}
76+
}
77+
78+
}
79+
}

0 commit comments

Comments
 (0)