Skip to content

feat(mq): relay messages from microservices to clients via request.client.notify#1094

Merged
Brutus5000 merged 9 commits into
developfrom
feat/client-message-queue
Jun 15, 2026
Merged

feat(mq): relay messages from microservices to clients via request.client.notify#1094
Brutus5000 merged 9 commits into
developfrom
feat/client-message-queue

Conversation

@Brutus5000

@Brutus5000 Brutus5000 commented Jun 14, 2026

Copy link
Copy Markdown
Member

Summary

  • Adds ClientMessageQueueService which consumes from the existing faf-lobby topic exchange on routing key request.client.notify and forwards the JSON body verbatim to connected clients.
  • Addressing lives in AMQP message headers: user-id (int) targets a single connected player; absent header broadcasts to everyone on this instance. channel (str) is recognised but reserved for future per-channel pub/sub.
  • Extends MessageQueueService with a declare_queue_and_consume() helper so the new service does not need to reach into private channel state. Returns (queue, consumer_tag) so callers can cancel cleanly on shutdown.

Why

Right now only the lobby server itself can push live events over the client connection — every other microservice has to expose data over HTTP and rely on the client polling. With this change any trusted internal service can publish a JSON message to RabbitMQ and have it delivered to the right player (or all players) in real time. Messages are forwarded as-is; we trust the publisher because the broker is only reachable from internal services.

Producer contract

  • Exchange: MQ_EXCHANGE_NAME (default faf-lobby)
  • Routing key: request.client.notify — follows the <state>.<topic>.<action> convention from RFC: Take matchmaking requests from RabbitMQ #675
  • Headers: user-id: <int> for direct delivery, omit for broadcast
  • Body: UTF-8 JSON object — delivered to the client verbatim

Broadcast example

exchange:    faf-lobby
routing_key: request.client.notify
headers:     {}
body:        {"command":"notice","style":"info","text":"Server maintenance in 10 minutes"}

Targeted example

exchange:    faf-lobby
routing_key: request.client.notify
headers:     {"user-id": 42}
body:        {"command":"notice","style":"info","text":"Welcome back!"}

Topology

Each lobby instance declares its own queue named faf-lobby.lobby.client.notify.<hostname> (pod name on k8s, dev's hostname locally) — matches the <exchange>.<service>.<routing-key> pattern used by e.g. faf-lobby.api.event.update, with a hostname suffix because the queue is per-pod, not shared. Queues are exclusive=True, auto_delete=True so a pod restart cleans up. Every lobby instance receives every push and forwards to whichever connections it owns locally; a user-id not connected here is logged at INFO and acked.

Test plan

  • Unit tests cover: connected user, disconnected user, broadcast (no headers, empty headers), malformed JSON body, non-object JSON body, invalid user-id, reserved channel header, consumer cancel on shutdown, no-op shutdown when broker was unavailable, declare_queue_and_consume success + unknown-exchange + broker-not-ready.
  • Local end-to-end against a built image (faforever/faf-python-server:client-mq-test): publishing with user-id=<my_id> delivers the JSON verbatim to that client; publishing without the header broadcasts.
  • Multi-instance soak: two lobby instances, broadcast hits both clients exactly once; user-targeted hits only the instance the user is on.

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added notification delivery system that routes messages to specific connected users or broadcasts to all authenticated clients based on message headers.
    • Validates message payloads and handles invalid or malformed messages gracefully.
  • Tests

    • Added comprehensive unit tests for message routing, delivery, and validation logic.

Trusted microservices can now push arbitrary JSON to connected clients
without going through HTTP polling. They publish to the existing
faf-lobby exchange with routing key `client.push`; the new
ClientMessageQueueService consumes the queue and forwards the body
verbatim. Addressing lives in AMQP headers (`user-id` for a single
player, none for broadcast; `channel` is reserved for future pub/sub).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 14, 2026

Copy link
Copy Markdown

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 010790ac-6631-4c6a-96ab-b27efe90dfb6

📥 Commits

Reviewing files that changed from the base of the PR and between 17f2467 and 2ff834a.

📒 Files selected for processing (1)
  • server/client_message_queue_service.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • server/client_message_queue_service.py

📝 Walkthrough

Walkthrough

Adds ClientMessageQueueService, a new service that consumes request.client.notify messages from RabbitMQ and routes them to connected lobby clients based on AMQP headers (user-id, channel, or broadcast). MessageQueueService gains a declare_queue_and_consume helper to handle queue binding and consumer lifecycle. The new service is exported from the server package and covered by comprehensive unit tests.

Changes

RabbitMQ client.notify consumer

Layer / File(s) Summary
MessageQueueService.declare_queue_and_consume
server/message_queue_service.py
Adds Awaitable/Callable typing imports and aio_pika.abc interface imports for better type annotations. Implements declare_queue_and_consume which validates exchange existence in the internal cache, declares and binds a queue with configurable parameters, starts consuming with a callback, and returns the queue handle and consumer tag (or None when broker is not ready).
ClientMessageQueueService implementation and export
server/client_message_queue_service.py, server/__init__.py
Adds module docstring for the request.client.notify wire contract describing AMQP header semantics. Defines CLIENT_NOTIFY_ROUTING_KEY constant and implements the service class: initialize() registers the queue consumer using the message queue service; shutdown() cancels it and clears internal state; _on_message() parses JSON and routes by user-id header (calls _dispatch_to_user), channel header (logs not implemented and drops), or neither (broadcasts); _dispatch_to_user() validates the integer id, looks up the player via PlayerService, and calls player.write_message if connected. Imports and re-exports the class from server/__init__.py.
MessageQueueService unit tests
tests/unit_tests/test_message_queue_service.py
Adds three tests: successful queue declaration and message consumption through the callback, KeyError raised for undeclared exchanges, and None return when the service cannot connect.
ClientMessageQueueService unit tests
tests/unit_tests/test_client_message_queue_service.py
Adds make_incoming_message helper to construct mock AMQP messages, fixtures for mocked ServerInstance and in-memory fake_player_service, and comprehensive tests covering consumer lifecycle (initialize declares with correct args, shutdown cancels and clears state, shutdown is no-op when broker unavailable), message routing (connected user receives direct dispatch, disconnected user is dropped with log), broadcast behavior (missing or empty headers trigger broadcast), and input validation (malformed JSON, non-object JSON, invalid user-id, and channel header all drop with appropriate log messages).

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐇 A message hops through the RabbitMQ gate,
parsed as JSON—oh, what a fate!
By user-id it finds the right player to greet,
or broadcasts to all when the headers are sweet.
The channel awaits its turn yet to come—
but for now, this rabbit's work is done! 🎉

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 11.76% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(mq): relay messages from microservices to clients via request.client.notify' directly and clearly summarizes the primary change—introducing a new message relay service from microservices to clients via the request.client.notify RabbitMQ routing key.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/client-message-queue

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
server/client_message_queue_service.py (2)

109-109: 💤 Low value

Consider adding type hint for user_id parameter.

The user_id parameter lacks a type annotation. Since it comes from AMQP headers and can be any type (validated at lines 110-112), consider annotating it as user_id: Any for clarity.

📝 Suggested type hint
+from typing import Any, TYPE_CHECKING, ClassVar, Optional
 
-    def _dispatch_to_user(self, user_id, payload: dict) -> None:
+    def _dispatch_to_user(self, user_id: Any, payload: dict) -> None:
         try:
             player_id = int(user_id)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@server/client_message_queue_service.py` at line 109, Add a type annotation to
the user_id parameter in the _dispatch_to_user method signature. Since user_id
comes from AMQP headers and can be any type (as validated in the subsequent
lines 110-112), annotate it as Any to clarify its type and improve code
documentation.

98-107: 💤 Low value

Clarify header precedence in the wire contract.

Lines 98-107 implement header routing with user-id taking precedence over channel when both are present. The module docstring (lines 9-15) documents each header separately but doesn't specify behavior when multiple headers are present.

Consider adding a note to the docstring clarifying that user-id takes precedence, or explicitly validate that only one routing header is present.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@server/client_message_queue_service.py` around lines 98 - 107, The module
docstring does not clarify the behavior when both user-id and channel headers
are present in a message, but the if/elif/else routing logic in the message
handling block shows that user-id takes precedence over channel. Update the
module docstring (around lines 9-15) to explicitly document that user-id header
takes precedence over channel header when both are present in the wire contract,
ensuring consumers understand the routing precedence and avoiding confusion
about which header will be used for routing.
tests/unit_tests/test_client_message_queue_service.py (1)

122-166: 💤 Low value

Consider verifying message acknowledgment in tests.

The tests correctly mock message.process() but don't verify it's called with requeue=False. Adding assertions like:

msg.process.assert_called_once_with(requeue=False)

would strengthen the tests by confirming messages are properly acknowledged even when dropped.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit_tests/test_client_message_queue_service.py` around lines 122 -
166, The test functions test_malformed_json_body_is_dropped,
test_non_object_json_body_is_dropped, test_invalid_user_id_header_is_dropped,
and test_channel_header_currently_drops verify that invalid messages are dropped
and not broadcast, but they do not verify that the messages are properly
acknowledged. Add an assertion at the end of each of these four test functions
to verify that msg.process was called exactly once with requeue=False, which
confirms that dropped messages are properly acknowledged to the message queue
system.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@server/message_queue_service.py`:
- Around line 196-237: The consumer tag returned by queue.consume() is not being
captured, which breaks proper queue cancellation. Fix this across all three
sites: In server/message_queue_service.py lines 196-237, modify the
declare_queue_and_consume method to capture the consumer tag returned by the
queue.consume(callback) call at line 231 and return it as a tuple (queue,
consumer_tag) instead of just the queue. In
server/client_message_queue_service.py lines 50-59, add a new field
self._consumer_tag: Optional[str] = None to store the consumer tag. In
server/client_message_queue_service.py lines 61-66, unpack both the queue and
consumer tag from the declare_queue_and_consume return value by changing the
assignment to self._queue, self._consumer_tag = await
declare_queue_and_consume(...). In server/client_message_queue_service.py lines
68-76, update the queue cancellation logic to use self._consumer_tag instead of
self._queue.name when calling await queue.cancel(), and add a null check to
ensure self._consumer_tag is not None before performing the cancellation.

---

Nitpick comments:
In `@server/client_message_queue_service.py`:
- Line 109: Add a type annotation to the user_id parameter in the
_dispatch_to_user method signature. Since user_id comes from AMQP headers and
can be any type (as validated in the subsequent lines 110-112), annotate it as
Any to clarify its type and improve code documentation.
- Around line 98-107: The module docstring does not clarify the behavior when
both user-id and channel headers are present in a message, but the if/elif/else
routing logic in the message handling block shows that user-id takes precedence
over channel. Update the module docstring (around lines 9-15) to explicitly
document that user-id header takes precedence over channel header when both are
present in the wire contract, ensuring consumers understand the routing
precedence and avoiding confusion about which header will be used for routing.

In `@tests/unit_tests/test_client_message_queue_service.py`:
- Around line 122-166: The test functions test_malformed_json_body_is_dropped,
test_non_object_json_body_is_dropped, test_invalid_user_id_header_is_dropped,
and test_channel_header_currently_drops verify that invalid messages are dropped
and not broadcast, but they do not verify that the messages are properly
acknowledged. Add an assertion at the end of each of these four test functions
to verify that msg.process was called exactly once with requeue=False, which
confirms that dropped messages are properly acknowledged to the message queue
system.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0c51ff7d-a649-4b5d-aa79-6b72e229d0b9

📥 Commits

Reviewing files that changed from the base of the PR and between dc132e0 and b677155.

📒 Files selected for processing (4)
  • server/__init__.py
  • server/client_message_queue_service.py
  • server/message_queue_service.py
  • tests/unit_tests/test_client_message_queue_service.py

Comment thread server/message_queue_service.py Outdated
- declare_queue_and_consume now returns (queue, consumer_tag) so the
  caller can cancel by tag instead of by queue name.
- ClientMessageQueueService stores the tag and uses it on shutdown.
- Add unit tests for declare_queue_and_consume (success, unknown
  exchange, broker not ready) and ClientMessageQueueService shutdown
  paths, lifting diff coverage.
- Add Any type hint on _dispatch_to_user(user_id).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Brutus5000 Brutus5000 force-pushed the feat/client-message-queue branch from 84e3e8b to 3a23aab Compare June 14, 2026 20:10
D205/D212/D215/D200/D203/D107 nits flagged by Codacy on the new
client_message_queue_service and its test module.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Brutus5000 Brutus5000 force-pushed the feat/client-message-queue branch from c11ff0c to d8e7253 Compare June 14, 2026 20:16
Brutus5000 and others added 3 commits June 14, 2026 22:18
Revert to multi-line summary on second line and no blank line before
class docstring, matching message_queue_service.py et al. Codacy fires
both D212/D213 and D203/D211 (mutually exclusive PEP 257 conventions);
pick the one the rest of the repo uses.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
One-line docstrings don't trip the multi-line summary rules at all,
which lets us satisfy the codebase's D211 convention without re-firing
D212. D203 notice for the class remains (unavoidable without Codacy
config changes).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Match the #675 <state>.<topic>.<action> convention.
The flat `client.push` name didn't fit; the relay is a fire-and-forget
"ask the lobby to notify a client" → `request.client.notify`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
server/client_message_queue_service.py (1)

40-42: 💤 Low value

Consider adding a blank line before the class docstring per PEP 257.

The static analysis tool flagged D203, which requires one blank line between decorators and class docstrings for PEP 257 compliance.

📝 Proposed fix
 `@with_logger`
 class ClientMessageQueueService(Service):
+
     """Consume `request.client.notify` messages and forward to local clients."""
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@server/client_message_queue_service.py` around lines 40 - 42, Add a blank
line between the `@with_logger` decorator and the ClientMessageQueueService class
definition to comply with PEP 257 formatting requirements. The class docstring
should start after this blank line, ensuring proper spacing between decorators
and class definitions.

Source: Linters/SAST tools

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@server/client_message_queue_service.py`:
- Around line 40-42: Add a blank line between the `@with_logger` decorator and the
ClientMessageQueueService class definition to comply with PEP 257 formatting
requirements. The class docstring should start after this blank line, ensuring
proper spacing between decorators and class definitions.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 502ff50b-d140-4dcf-b0ad-e2e05f1beac7

📥 Commits

Reviewing files that changed from the base of the PR and between 3a23aab and d99c189.

📒 Files selected for processing (2)
  • server/client_message_queue_service.py
  • tests/unit_tests/test_client_message_queue_service.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/unit_tests/test_client_message_queue_service.py

Brutus5000 and others added 2 commits June 15, 2026 00:04
…name

Previously the queue was server-named (amq.gen-...), which is opaque in
the RabbitMQ UI. Use `faf-lobby.client-notify.<hostname>` instead. On
k8s `socket.gethostname()` returns the pod name (e.g.
`faf-lobby-server-6d9c4588ff-lzdcr`); on a dev machine it's the local
hostname.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`<exchange>.<service>.<routing-key>` like the API's
`faf-lobby.api.event.update`, plus the hostname suffix because each
lobby pod has its own queue (shared-queue services omit the suffix).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Brutus5000 Brutus5000 changed the title feat(mq): forward RabbitMQ messages to clients via client.push feat(mq): relay messages from microservices to clients via request.client.notify Jun 14, 2026
Default LOG_LEVEL filtering can drop INFO. The event is worth seeing:
in single-instance mode it means 'user offline' and the producer's
message is being discarded.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Brutus5000 Brutus5000 merged commit 1b98cda into develop Jun 15, 2026
9 of 10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant