[ISSUE #10197] Add traffic size distribution per requestCode in RemotingCodeDistributionHandler#10198
[ISSUE #10197] Add traffic size distribution per requestCode in RemotingCodeDistributionHandler#10198RongtongJin wants to merge 3 commits intoapache:developfrom
Conversation
…deDistributionHandler
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #10198 +/- ##
=============================================
- Coverage 48.93% 48.86% -0.08%
+ Complexity 13384 13369 -15
=============================================
Files 1373 1373
Lines 99924 99940 +16
Branches 12908 12913 +5
=============================================
- Hits 48898 48835 -63
- Misses 45098 45161 +63
- Partials 5928 5944 +16 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Wubabalala
left a comment
There was a problem hiding this comment.
Opened this one up looking to learn how RocketMQ's Netty pipeline handles observability, and ended up spending most of an hour on RemotingCodeDistributionHandler.java. The refactor from ChannelDuplexHandler into a plain POJO called from NettyDecoder / NettyEncoder reads really clean — and measuring actual ByteBuf positions (readableBytes() + 4 on decode, writerIndex() - beginIndex on encode) sidesteps the whole "how do I estimate command size without iterating extFields" problem. Nice.
Four things came up while reading. Two I think are worth changing before merge, one is a description/code mismatch I couldn't explain, and one is a test suggestion.
1. Count and traffic snapshots don't share the same time window [P2]
printRemotingCodeDistribution() makes four separate getter calls:
// NettyRemotingServer.java:418-435
String inBoundSnapshotString = distributionHandler.getInBoundSnapshotString();
String outBoundSnapshotString = distributionHandler.getOutBoundSnapshotString();
String inBoundTrafficSnapshotString = distributionHandler.getInBoundTrafficSnapshotString();
String outBoundTrafficSnapshotString = distributionHandler.getOutBoundTrafficSnapshotString();Each one runs its own for (entry : statsMap.entrySet()) loop inside getSnapshot() and calls sumThenReset() on either count or trafficSize. Between the two inbound calls — which can be microseconds or longer if the logging thread is contended — new records can land on the same TrafficStats entry, incrementing count (already reset) and adding to trafficSize (not yet reset).
End result: the count line and the traffic line in the log describe slightly different time windows. If anyone downstream computes avg_bytes_per_req = traffic / count, it's biased upward by whatever arrived in the gap. In steady state the skew is probably small, but it breaks the invariant I'd instinctively assume reading the log — that a count / traffic pair for the same requestCode in the same log cycle describes the same traffic.
I'd fold both reads into one pass so a given entry's two LongAdders reset within a few instructions of each other:
public static final class Snapshot {
public final Map<Integer, Long> counts;
public final Map<Integer, Long> traffic;
private Snapshot(Map<Integer, Long> c, Map<Integer, Long> t) {
this.counts = c;
this.traffic = t;
}
}
private Snapshot snapshotAndReset(ConcurrentMap<Integer, TrafficStats> statsMap) {
Map<Integer, Long> counts = new HashMap<>(statsMap.size());
Map<Integer, Long> traffic = new HashMap<>(statsMap.size());
for (Map.Entry<Integer, TrafficStats> entry : statsMap.entrySet()) {
// Reset count before traffic. If a recorder slips in between these
// two lines, we undercount traffic rather than overcount — the
// safer direction of skew.
counts.put(entry.getKey(), entry.getValue().count.sumThenReset());
traffic.put(entry.getKey(), entry.getValue().trafficSize.sumThenReset());
}
return new Snapshot(counts, traffic);
}Still not strictly atomic for a single entry (a record can arrive between the two sumThenReset() calls), but the gap shrinks from "one getter call" to "a few field loads", and printRemotingCodeDistribution drops from four getter calls to two.
2. No-arg NettyDecoder / NettyEncoder constructors silently record nothing [P2]
// NettyDecoder.java:25-27
public NettyDecoder() {
this(null);
}Same pattern in NettyEncoder.java:59-61. Both decode() and encode() then guard with if (distributionHandler != null) before calling the recorder.
The two call sites touched in this PR (NettyRemotingServer, RemotingProtocolHandler) are updated to pass the handler, but remoting is public API and NettyDecoder has been around long enough that I'd expect some external extension to be instantiating new NettyDecoder() by hand. Those callers keep compiling, keep running, and quietly stop producing any traffic distribution for their pipelines — no warning, no log line, just missing metrics.
A few ways to make the failure louder without outright breaking them:
@Deprecatedthe no-arg constructors with Javadoc noting "does not record traffic distribution; use the handler-aware constructor"- Log a one-time
WARNon first null-handler construction so it at least shows up in startup logs - Expose a
RemotingCodeDistributionHandler.NOOPsentinel and use it as the no-arg default — the contract becomes explicit at the type level
My preference is the first. Lowest friction for existing consumers and gives them a clear signal on the next build.
3. PR description doesn't match the code I read [P3]
This one tripped me up on the first pass and I want to flag it because the next reviewer will hit the same wall:
| PR body says | What the diff actually does |
|---|---|
calcCommandSize() with 29-byte fixed overhead + body length |
No calcCommandSize(); wire size measured directly via ByteBuf positions |
enableDetailedTrafficSize flag in NettyServerConfig, togglable at runtime |
No such flag; NettyServerConfig isn't touched in this PR |
Added RemotingCodeDistributionBenchmark (JMH) comparing detail on/off |
No benchmark file in the changeset |
| 10 unit test cases | 6 new @Test methods |
FWIW I think the actual implementation is a better design than the one described — measuring the real ByteBuf positions sidesteps the whole "what exactly does the 29 include" question and handles remark / extFields correctly for free. But if the next person opens the PR expecting a togglable flag, they'll be looking for something that isn't there. Could you update the body to reflect the current approach?
4. testConcurrentAccess only exercises inbound [nit]
All four threads call handler.recordInbound(1, wireSize) in a loop. Since inboundStats and outboundStats are separate ConcurrentHashMaps, four inbound-only threads don't really test the concurrency scenario that happens at runtime — in a live Netty server, inbound and outbound are on different threads hammering different maps at the same time. Half the threads on recordInbound and half on recordOutbound, then asserting both final counts and traffic, would cost a few extra lines and cover the contract I'd actually worry about.
#1 and #2 I'd want fixed before merge; #3 is doc-only; #4 is a nice-to-have. Let me know if #1 or #2 were intentional trade-offs I missed — there might be a reason I didn't pick up on from the diff alone.
Which Issue(s) This PR Fixes
Brief Description
Add per-requestCode traffic size (bytes) distribution to
RemotingCodeDistributionHandler, in addition to the existing request count distribution.Key changes:
ConcurrentHashMaps with 2 using a compositeTrafficStatsclass (LongAdder count+LongAdder trafficSize)calcCommandSize()default path: fixed overhead (29 bytes) + body length — O(1), no iteration on hot pathenableDetailedTrafficSizeinNettyServerConfig: when enabled, also counts remark + extFields byte lengths. Togglable at runtime viaupdateBrokerConfigNettyRemotingServer.printRemotingCodeDistribution()now logs inbound/outbound traffic snapshotsHow Did You Test This Change?
RemotingCodeDistributionHandlerTestwith 10 unit test cases covering: count, traffic with/without body, detailed mode on/off, snapshot reset, multiple requestCodes, non-RemotingCommand passthrough, concurrent correctness, and runtime toggleRemotingCodeDistributionBenchmark(JMH) comparing detail-off vs detail-on across 1/4/8 threads.recordInbound()is called directly (package-private) to isolate recording overhead from Netty pipeline cost