Skip to content

Commit f4d9b2e

Browse files
committed
redis驱动支持断线重连
1 parent 77b3eb6 commit f4d9b2e

2 files changed

Lines changed: 43 additions & 12 deletions

File tree

src/queue/connector/Database.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ public static function __make(Db $db, $config)
6565

6666
public function size($queue = null)
6767
{
68-
return $this->db->name($this->table)
68+
return $this->db
69+
->name($this->table)
6970
->where('queue', $this->getQueue($queue))
7071
->count();
7172
}
@@ -170,7 +171,7 @@ protected function getNextAvailableJob($queue)
170171
->where(function (Query $query) {
171172
$query->where(function (Query $query) {
172173
$query->whereNull('reserve_time')
173-
->where('available_time', '<=', $this->currentTime());
174+
->where('available_time', '<=', $this->currentTime());
174175
});
175176

176177
//超时任务重试

src/queue/connector/Redis.php

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use Closure;
1515
use Exception;
16+
use RedisException;
1617
use think\helper\Str;
1718
use think\queue\Connector;
1819
use think\queue\InteractsWithTime;
@@ -46,7 +47,7 @@ class Redis extends Connector
4647
*/
4748
protected $blockFor = null;
4849

49-
public function __construct(\Redis $redis, $default = 'default', $retryAfter = 60, $blockFor = null)
50+
public function __construct($redis, $default = 'default', $retryAfter = 60, $blockFor = null)
5051
{
5152
$this->redis = $redis;
5253
$this->default = $default;
@@ -60,18 +61,47 @@ public static function __make($config)
6061
throw new Exception('redis扩展未安装');
6162
}
6263

63-
$func = $config['persistent'] ? 'pconnect' : 'connect';
64+
$redis = new class($config) {
65+
protected $config;
66+
protected $client;
6467

65-
$redis = new \Redis;
66-
$redis->$func($config['host'], $config['port'], $config['timeout']);
68+
public function __construct($config)
69+
{
70+
$this->config = $config;
71+
$this->client = $this->createClient();
72+
}
6773

68-
if ('' != $config['password']) {
69-
$redis->auth($config['password']);
70-
}
74+
protected function createClient()
75+
{
76+
$config = $this->config;
77+
$func = $config['persistent'] ? 'pconnect' : 'connect';
7178

72-
if (0 != $config['select']) {
73-
$redis->select($config['select']);
74-
}
79+
$client = new \Redis;
80+
$client->$func($config['host'], $config['port'], $config['timeout']);
81+
82+
if ('' != $config['password']) {
83+
$client->auth($config['password']);
84+
}
85+
86+
if (0 != $config['select']) {
87+
$client->select($config['select']);
88+
}
89+
return $client;
90+
}
91+
92+
public function __call($name, $arguments)
93+
{
94+
try {
95+
return call_user_func_array([$this->client, $name], $arguments);
96+
} catch (RedisException $e) {
97+
if (Str::contains($e->getMessage(), 'went away')) {
98+
$this->client = $this->createClient();
99+
}
100+
101+
throw $e;
102+
}
103+
}
104+
};
75105

76106
return new self($redis, $config['queue'], $config['retry_after'] ?? 60, $config['block_for'] ?? null);
77107
}

0 commit comments

Comments
 (0)