Skip to content

[ISSUE #10197] Add traffic size distribution per requestCode in RemotingCodeDistributionHandler#10198

Open
RongtongJin wants to merge 3 commits intoapache:developfrom
RongtongJin:develop-0324
Open

[ISSUE #10197] Add traffic size distribution per requestCode in RemotingCodeDistributionHandler#10198
RongtongJin wants to merge 3 commits intoapache:developfrom
RongtongJin:develop-0324

Conversation

@RongtongJin
Copy link
Copy Markdown
Contributor

Which Issue(s) This PR Fixes

Brief Description

Add per-requestCode traffic size (bytes) distribution to RemotingCodeDistributionHandler, in addition to the existing request count distribution.

Key changes:

  • Replace 4 ConcurrentHashMaps with 2 using a composite TrafficStats class (LongAdder count + LongAdder trafficSize)
  • calcCommandSize() default path: fixed overhead (29 bytes) + body length — O(1), no iteration on hot path
  • enableDetailedTrafficSize in NettyServerConfig: when enabled, also counts remark + extFields byte lengths. Togglable at runtime via updateBrokerConfig
  • NettyRemotingServer.printRemotingCodeDistribution() now logs inbound/outbound traffic snapshots

How Did You Test This Change?

  • Added RemotingCodeDistributionHandlerTest with 10 unit test cases covering: count, traffic with/without body, detailed mode on/off, snapshot reset, multiple requestCodes, non-RemotingCommand passthrough, concurrent correctness, and runtime toggle
  • Added RemotingCodeDistributionBenchmark (JMH) comparing detail-off vs detail-on across 1/4/8 threads. recordInbound() is called directly (package-private) to isolate recording overhead from Netty pipeline cost

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Mar 24, 2026

Codecov Report

❌ Patch coverage is 96.42857% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 48.86%. Comparing base (e100743) to head (dd2bb4b).
⚠️ Report is 2 commits behind head on develop.

Files with missing lines Patch % Lines
...ing/protocol/remoting/RemotingProtocolHandler.java 0.00% 1 Missing ⚠️
...emoting/netty/RemotingCodeDistributionHandler.java 96.66% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             develop   #10198      +/-   ##
=============================================
- Coverage      48.93%   48.86%   -0.08%     
+ Complexity     13384    13369      -15     
=============================================
  Files           1373     1373              
  Lines          99924    99940      +16     
  Branches       12908    12913       +5     
=============================================
- Hits           48898    48835      -63     
- Misses         45098    45161      +63     
- Partials        5928     5944      +16     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@guyinyou guyinyou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Copy link
Copy Markdown
Contributor

@fuyou001 fuyou001 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Copy Markdown

@Wubabalala Wubabalala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened this one up looking to learn how RocketMQ's Netty pipeline handles observability, and ended up spending most of an hour on RemotingCodeDistributionHandler.java. The refactor from ChannelDuplexHandler into a plain POJO called from NettyDecoder / NettyEncoder reads really clean — and measuring actual ByteBuf positions (readableBytes() + 4 on decode, writerIndex() - beginIndex on encode) sidesteps the whole "how do I estimate command size without iterating extFields" problem. Nice.

Four things came up while reading. Two I think are worth changing before merge, one is a description/code mismatch I couldn't explain, and one is a test suggestion.

1. Count and traffic snapshots don't share the same time window [P2]

printRemotingCodeDistribution() makes four separate getter calls:

// NettyRemotingServer.java:418-435
String inBoundSnapshotString         = distributionHandler.getInBoundSnapshotString();
String outBoundSnapshotString        = distributionHandler.getOutBoundSnapshotString();
String inBoundTrafficSnapshotString  = distributionHandler.getInBoundTrafficSnapshotString();
String outBoundTrafficSnapshotString = distributionHandler.getOutBoundTrafficSnapshotString();

Each one runs its own for (entry : statsMap.entrySet()) loop inside getSnapshot() and calls sumThenReset() on either count or trafficSize. Between the two inbound calls — which can be microseconds or longer if the logging thread is contended — new records can land on the same TrafficStats entry, incrementing count (already reset) and adding to trafficSize (not yet reset).

End result: the count line and the traffic line in the log describe slightly different time windows. If anyone downstream computes avg_bytes_per_req = traffic / count, it's biased upward by whatever arrived in the gap. In steady state the skew is probably small, but it breaks the invariant I'd instinctively assume reading the log — that a count / traffic pair for the same requestCode in the same log cycle describes the same traffic.

I'd fold both reads into one pass so a given entry's two LongAdders reset within a few instructions of each other:

public static final class Snapshot {
    public final Map<Integer, Long> counts;
    public final Map<Integer, Long> traffic;
    private Snapshot(Map<Integer, Long> c, Map<Integer, Long> t) {
        this.counts = c;
        this.traffic = t;
    }
}

private Snapshot snapshotAndReset(ConcurrentMap<Integer, TrafficStats> statsMap) {
    Map<Integer, Long> counts  = new HashMap<>(statsMap.size());
    Map<Integer, Long> traffic = new HashMap<>(statsMap.size());
    for (Map.Entry<Integer, TrafficStats> entry : statsMap.entrySet()) {
        // Reset count before traffic. If a recorder slips in between these
        // two lines, we undercount traffic rather than overcount — the
        // safer direction of skew.
        counts.put(entry.getKey(),  entry.getValue().count.sumThenReset());
        traffic.put(entry.getKey(), entry.getValue().trafficSize.sumThenReset());
    }
    return new Snapshot(counts, traffic);
}

Still not strictly atomic for a single entry (a record can arrive between the two sumThenReset() calls), but the gap shrinks from "one getter call" to "a few field loads", and printRemotingCodeDistribution drops from four getter calls to two.

2. No-arg NettyDecoder / NettyEncoder constructors silently record nothing [P2]

// NettyDecoder.java:25-27
public NettyDecoder() {
    this(null);
}

Same pattern in NettyEncoder.java:59-61. Both decode() and encode() then guard with if (distributionHandler != null) before calling the recorder.

The two call sites touched in this PR (NettyRemotingServer, RemotingProtocolHandler) are updated to pass the handler, but remoting is public API and NettyDecoder has been around long enough that I'd expect some external extension to be instantiating new NettyDecoder() by hand. Those callers keep compiling, keep running, and quietly stop producing any traffic distribution for their pipelines — no warning, no log line, just missing metrics.

A few ways to make the failure louder without outright breaking them:

  • @Deprecated the no-arg constructors with Javadoc noting "does not record traffic distribution; use the handler-aware constructor"
  • Log a one-time WARN on first null-handler construction so it at least shows up in startup logs
  • Expose a RemotingCodeDistributionHandler.NOOP sentinel and use it as the no-arg default — the contract becomes explicit at the type level

My preference is the first. Lowest friction for existing consumers and gives them a clear signal on the next build.

3. PR description doesn't match the code I read [P3]

This one tripped me up on the first pass and I want to flag it because the next reviewer will hit the same wall:

PR body says What the diff actually does
calcCommandSize() with 29-byte fixed overhead + body length No calcCommandSize(); wire size measured directly via ByteBuf positions
enableDetailedTrafficSize flag in NettyServerConfig, togglable at runtime No such flag; NettyServerConfig isn't touched in this PR
Added RemotingCodeDistributionBenchmark (JMH) comparing detail on/off No benchmark file in the changeset
10 unit test cases 6 new @Test methods

FWIW I think the actual implementation is a better design than the one described — measuring the real ByteBuf positions sidesteps the whole "what exactly does the 29 include" question and handles remark / extFields correctly for free. But if the next person opens the PR expecting a togglable flag, they'll be looking for something that isn't there. Could you update the body to reflect the current approach?

4. testConcurrentAccess only exercises inbound [nit]

All four threads call handler.recordInbound(1, wireSize) in a loop. Since inboundStats and outboundStats are separate ConcurrentHashMaps, four inbound-only threads don't really test the concurrency scenario that happens at runtime — in a live Netty server, inbound and outbound are on different threads hammering different maps at the same time. Half the threads on recordInbound and half on recordOutbound, then asserting both final counts and traffic, would cost a few extra lines and cover the contract I'd actually worry about.


#1 and #2 I'd want fixed before merge; #3 is doc-only; #4 is a nice-to-have. Let me know if #1 or #2 were intentional trade-offs I missed — there might be a reason I didn't pick up on from the diff alone.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Enhancement] Add traffic size distribution per requestCode in RemotingCodeDistributionHandler

6 participants