feat(mq): relay messages from microservices to clients via request.client.notify#1094
Conversation
Trusted microservices can now push arbitrary JSON to connected clients without going through HTTP polling. They publish to the existing faf-lobby exchange with routing key `client.push`; the new ClientMessageQueueService consumes the queue and forwards the body verbatim. Addressing lives in AMQP headers (`user-id` for a single player, none for broadcast; `channel` is reserved for future pub/sub). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughAdds ChangesRabbitMQ client.notify consumer
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
server/client_message_queue_service.py (2)
109-109: 💤 Low valueConsider adding type hint for
user_idparameter.The
user_idparameter lacks a type annotation. Since it comes from AMQP headers and can be any type (validated at lines 110-112), consider annotating it asuser_id: Anyfor clarity.📝 Suggested type hint
+from typing import Any, TYPE_CHECKING, ClassVar, Optional - def _dispatch_to_user(self, user_id, payload: dict) -> None: + def _dispatch_to_user(self, user_id: Any, payload: dict) -> None: try: player_id = int(user_id)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@server/client_message_queue_service.py` at line 109, Add a type annotation to the user_id parameter in the _dispatch_to_user method signature. Since user_id comes from AMQP headers and can be any type (as validated in the subsequent lines 110-112), annotate it as Any to clarify its type and improve code documentation.
98-107: 💤 Low valueClarify header precedence in the wire contract.
Lines 98-107 implement header routing with
user-idtaking precedence overchannelwhen both are present. The module docstring (lines 9-15) documents each header separately but doesn't specify behavior when multiple headers are present.Consider adding a note to the docstring clarifying that
user-idtakes precedence, or explicitly validate that only one routing header is present.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@server/client_message_queue_service.py` around lines 98 - 107, The module docstring does not clarify the behavior when both user-id and channel headers are present in a message, but the if/elif/else routing logic in the message handling block shows that user-id takes precedence over channel. Update the module docstring (around lines 9-15) to explicitly document that user-id header takes precedence over channel header when both are present in the wire contract, ensuring consumers understand the routing precedence and avoiding confusion about which header will be used for routing.tests/unit_tests/test_client_message_queue_service.py (1)
122-166: 💤 Low valueConsider verifying message acknowledgment in tests.
The tests correctly mock
message.process()but don't verify it's called withrequeue=False. Adding assertions like:msg.process.assert_called_once_with(requeue=False)would strengthen the tests by confirming messages are properly acknowledged even when dropped.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unit_tests/test_client_message_queue_service.py` around lines 122 - 166, The test functions test_malformed_json_body_is_dropped, test_non_object_json_body_is_dropped, test_invalid_user_id_header_is_dropped, and test_channel_header_currently_drops verify that invalid messages are dropped and not broadcast, but they do not verify that the messages are properly acknowledged. Add an assertion at the end of each of these four test functions to verify that msg.process was called exactly once with requeue=False, which confirms that dropped messages are properly acknowledged to the message queue system.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@server/message_queue_service.py`:
- Around line 196-237: The consumer tag returned by queue.consume() is not being
captured, which breaks proper queue cancellation. Fix this across all three
sites: In server/message_queue_service.py lines 196-237, modify the
declare_queue_and_consume method to capture the consumer tag returned by the
queue.consume(callback) call at line 231 and return it as a tuple (queue,
consumer_tag) instead of just the queue. In
server/client_message_queue_service.py lines 50-59, add a new field
self._consumer_tag: Optional[str] = None to store the consumer tag. In
server/client_message_queue_service.py lines 61-66, unpack both the queue and
consumer tag from the declare_queue_and_consume return value by changing the
assignment to self._queue, self._consumer_tag = await
declare_queue_and_consume(...). In server/client_message_queue_service.py lines
68-76, update the queue cancellation logic to use self._consumer_tag instead of
self._queue.name when calling await queue.cancel(), and add a null check to
ensure self._consumer_tag is not None before performing the cancellation.
---
Nitpick comments:
In `@server/client_message_queue_service.py`:
- Line 109: Add a type annotation to the user_id parameter in the
_dispatch_to_user method signature. Since user_id comes from AMQP headers and
can be any type (as validated in the subsequent lines 110-112), annotate it as
Any to clarify its type and improve code documentation.
- Around line 98-107: The module docstring does not clarify the behavior when
both user-id and channel headers are present in a message, but the if/elif/else
routing logic in the message handling block shows that user-id takes precedence
over channel. Update the module docstring (around lines 9-15) to explicitly
document that user-id header takes precedence over channel header when both are
present in the wire contract, ensuring consumers understand the routing
precedence and avoiding confusion about which header will be used for routing.
In `@tests/unit_tests/test_client_message_queue_service.py`:
- Around line 122-166: The test functions test_malformed_json_body_is_dropped,
test_non_object_json_body_is_dropped, test_invalid_user_id_header_is_dropped,
and test_channel_header_currently_drops verify that invalid messages are dropped
and not broadcast, but they do not verify that the messages are properly
acknowledged. Add an assertion at the end of each of these four test functions
to verify that msg.process was called exactly once with requeue=False, which
confirms that dropped messages are properly acknowledged to the message queue
system.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 0c51ff7d-a649-4b5d-aa79-6b72e229d0b9
📒 Files selected for processing (4)
server/__init__.pyserver/client_message_queue_service.pyserver/message_queue_service.pytests/unit_tests/test_client_message_queue_service.py
- declare_queue_and_consume now returns (queue, consumer_tag) so the caller can cancel by tag instead of by queue name. - ClientMessageQueueService stores the tag and uses it on shutdown. - Add unit tests for declare_queue_and_consume (success, unknown exchange, broker not ready) and ClientMessageQueueService shutdown paths, lifting diff coverage. - Add Any type hint on _dispatch_to_user(user_id). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
84e3e8b to
3a23aab
Compare
D205/D212/D215/D200/D203/D107 nits flagged by Codacy on the new client_message_queue_service and its test module. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
c11ff0c to
d8e7253
Compare
Revert to multi-line summary on second line and no blank line before class docstring, matching message_queue_service.py et al. Codacy fires both D212/D213 and D203/D211 (mutually exclusive PEP 257 conventions); pick the one the rest of the repo uses. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
One-line docstrings don't trip the multi-line summary rules at all, which lets us satisfy the codebase's D211 convention without re-firing D212. D203 notice for the class remains (unavoidable without Codacy config changes). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Match the #675 <state>.<topic>.<action> convention. The flat `client.push` name didn't fit; the relay is a fire-and-forget "ask the lobby to notify a client" → `request.client.notify`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
server/client_message_queue_service.py (1)
40-42: 💤 Low valueConsider adding a blank line before the class docstring per PEP 257.
The static analysis tool flagged D203, which requires one blank line between decorators and class docstrings for PEP 257 compliance.
📝 Proposed fix
`@with_logger` class ClientMessageQueueService(Service): + """Consume `request.client.notify` messages and forward to local clients."""🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@server/client_message_queue_service.py` around lines 40 - 42, Add a blank line between the `@with_logger` decorator and the ClientMessageQueueService class definition to comply with PEP 257 formatting requirements. The class docstring should start after this blank line, ensuring proper spacing between decorators and class definitions.Source: Linters/SAST tools
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@server/client_message_queue_service.py`:
- Around line 40-42: Add a blank line between the `@with_logger` decorator and the
ClientMessageQueueService class definition to comply with PEP 257 formatting
requirements. The class docstring should start after this blank line, ensuring
proper spacing between decorators and class definitions.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 502ff50b-d140-4dcf-b0ad-e2e05f1beac7
📒 Files selected for processing (2)
server/client_message_queue_service.pytests/unit_tests/test_client_message_queue_service.py
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/unit_tests/test_client_message_queue_service.py
…name Previously the queue was server-named (amq.gen-...), which is opaque in the RabbitMQ UI. Use `faf-lobby.client-notify.<hostname>` instead. On k8s `socket.gethostname()` returns the pod name (e.g. `faf-lobby-server-6d9c4588ff-lzdcr`); on a dev machine it's the local hostname. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`<exchange>.<service>.<routing-key>` like the API's `faf-lobby.api.event.update`, plus the hostname suffix because each lobby pod has its own queue (shared-queue services omit the suffix). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Default LOG_LEVEL filtering can drop INFO. The event is worth seeing: in single-instance mode it means 'user offline' and the producer's message is being discarded. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
ClientMessageQueueServicewhich consumes from the existingfaf-lobbytopic exchange on routing keyrequest.client.notifyand forwards the JSON body verbatim to connected clients.user-id(int) targets a single connected player; absent header broadcasts to everyone on this instance.channel(str) is recognised but reserved for future per-channel pub/sub.MessageQueueServicewith adeclare_queue_and_consume()helper so the new service does not need to reach into private channel state. Returns(queue, consumer_tag)so callers can cancel cleanly on shutdown.Why
Right now only the lobby server itself can push live events over the client connection — every other microservice has to expose data over HTTP and rely on the client polling. With this change any trusted internal service can publish a JSON message to RabbitMQ and have it delivered to the right player (or all players) in real time. Messages are forwarded as-is; we trust the publisher because the broker is only reachable from internal services.
Producer contract
MQ_EXCHANGE_NAME(defaultfaf-lobby)request.client.notify— follows the<state>.<topic>.<action>convention from RFC: Take matchmaking requests from RabbitMQ #675user-id: <int>for direct delivery, omit for broadcastBroadcast example
Targeted example
Topology
Each lobby instance declares its own queue named
faf-lobby.lobby.client.notify.<hostname>(pod name on k8s, dev's hostname locally) — matches the<exchange>.<service>.<routing-key>pattern used by e.g.faf-lobby.api.event.update, with a hostname suffix because the queue is per-pod, not shared. Queues areexclusive=True, auto_delete=Trueso a pod restart cleans up. Every lobby instance receives every push and forwards to whichever connections it owns locally; auser-idnot connected here is logged at INFO and acked.Test plan
user-id, reservedchannelheader, consumer cancel on shutdown, no-op shutdown when broker was unavailable,declare_queue_and_consumesuccess + unknown-exchange + broker-not-ready.faforever/faf-python-server:client-mq-test): publishing withuser-id=<my_id>delivers the JSON verbatim to that client; publishing without the header broadcasts.🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Tests