|
16 | 16 | use RobiNN\Pca\Dashboards\Redis\Compatibility\RedisCompatibilityInterface; |
17 | 17 | use RobiNN\Pca\Dashboards\Redis\Compatibility\RedisJson; |
18 | 18 | use RobiNN\Pca\Dashboards\Redis\Compatibility\RedisModules; |
19 | | -use Throwable; |
20 | 19 |
|
21 | 20 | /** |
22 | 21 | * @method bool restore(string $key, int $ttl, string $value) |
@@ -202,119 +201,11 @@ public function listRem(string $key, string $value, int $count): int { |
202 | 201 | return $this->lrem($key, $count, $value); |
203 | 202 | } |
204 | 203 |
|
205 | | - private function getKeyNode(string $key): ?PredisClient { |
206 | | - $cluster = $this->getConnection(); |
207 | | - |
208 | | - if (!method_exists($cluster, 'getClusterStrategy') || !method_exists($cluster, 'getConnectionBySlot')) { |
209 | | - return null; |
210 | | - } |
211 | | - |
212 | | - $slot = $cluster->getClusterStrategy()->getSlotByKey($key); |
213 | | - |
214 | | - if ($slot === null) { |
215 | | - return null; |
216 | | - } |
217 | | - |
218 | | - try { |
219 | | - $connection = $cluster->getConnectionBySlot((string) $slot); |
220 | | - } catch (Throwable) { |
221 | | - return null; |
222 | | - } |
223 | | - |
224 | | - foreach ($this->nodes as $node) { |
225 | | - $nodeConn = $node->getConnection(); |
226 | | - if ( |
227 | | - method_exists($nodeConn, 'getParameters') && |
228 | | - $nodeConn->getParameters()->host === $connection->getParameters()->host && |
229 | | - $nodeConn->getParameters()->port === $connection->getParameters()->port |
230 | | - ) { |
231 | | - return $node; |
232 | | - } |
233 | | - } |
234 | | - |
235 | | - return null; |
236 | | - } |
237 | | - |
238 | 204 | /** |
239 | 205 | * @param array<string, string> $messages |
240 | 206 | */ |
241 | 207 | public function streamAdd(string $key, string $id, array $messages): string { |
242 | | - $args = [$key, $id]; |
243 | | - foreach ($messages as $field => $value) { |
244 | | - $args[] = $field; |
245 | | - $args[] = $value; |
246 | | - } |
247 | | - |
248 | | - $node = $this->getKeyNode($key); |
249 | | - if (!$node instanceof PredisClient) { |
250 | | - return ''; |
251 | | - } |
252 | | - |
253 | | - $raw = $node->executeRaw(array_merge(['XADD'], $args)); |
254 | | - |
255 | | - return $raw !== false && $raw !== null ? (string) $raw : ''; |
256 | | - } |
257 | | - |
258 | | - /** |
259 | | - * @return array<string, array<string, string>> |
260 | | - */ |
261 | | - public function xrange(string $key, string $start, string $end, ?int $count = null): array { |
262 | | - $args = [$key, $start, $end]; |
263 | | - if ($count !== null) { |
264 | | - $args[] = 'COUNT'; |
265 | | - $args[] = (string) $count; |
266 | | - } |
267 | | - |
268 | | - $node = $this->getKeyNode($key); |
269 | | - if (!$node instanceof PredisClient) { |
270 | | - return []; |
271 | | - } |
272 | | - |
273 | | - $raw = $node->executeRaw(array_merge(['XRANGE'], $args)); |
274 | | - |
275 | | - if (!is_array($raw)) { |
276 | | - return []; |
277 | | - } |
278 | | - |
279 | | - return $this->parseStreamEntries($raw); |
280 | | - } |
281 | | - |
282 | | - /** |
283 | | - * @param list<array{string, list<string>}> $entries |
284 | | - * |
285 | | - * @return array<string, array<string, string>> |
286 | | - */ |
287 | | - private function parseStreamEntries(array $entries): array { |
288 | | - $result = []; |
289 | | - foreach ($entries as [$id, $fields]) { |
290 | | - $assoc = []; |
291 | | - for ($i = 0, $j = count($fields); $i < $j; $i += 2) { |
292 | | - $assoc[$fields[$i]] = $fields[$i + 1]; |
293 | | - } |
294 | | - |
295 | | - $result[$id] = $assoc; |
296 | | - } |
297 | | - |
298 | | - return $result; |
299 | | - } |
300 | | - |
301 | | - /** |
302 | | - * @param string|array<int, string> ...$id |
303 | | - */ |
304 | | - public function xdel(string $key, string|array ...$id): int { |
305 | | - if (count($id) === 1 && is_array($id[0])) { |
306 | | - $id = $id[0]; |
307 | | - } |
308 | | - |
309 | | - $node = $this->getKeyNode($key); |
310 | | - if (!$node instanceof PredisClient) { |
311 | | - return 0; |
312 | | - } |
313 | | - |
314 | | - $args = array_merge([$key], $id); |
315 | | - $raw = $node->executeRaw(array_merge(['XDEL'], $args)); |
316 | | - |
317 | | - return is_int($raw) ? $raw : 0; |
| 208 | + return $this->xadd($key, $messages, $id); |
318 | 209 | } |
319 | 210 |
|
320 | 211 | public function rawcommand(string $command, mixed ...$arguments): mixed { |
|
0 commit comments