Skip to content

Commit 0ad0993

Browse files
KAFKA-20246: Add clusterId and nodeId to ApiVersionsRequest (2/N)
1 parent 579a4e0 commit 0ad0993

4 files changed

Lines changed: 52 additions & 0 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,14 @@
3535
*/
3636
public interface MetadataUpdater extends Closeable {
3737

38+
/**
39+
* Gets the current cluster id without blocking.
40+
* @return the cluster id, or null if unknown
41+
*/
42+
default String clusterId() {
43+
return null;
44+
}
45+
3846
/**
3947
* Gets the current cluster info without blocking.
4048
*/

clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.clients;
1818

1919
import org.apache.kafka.common.Cluster;
20+
import org.apache.kafka.common.ClusterResource;
2021
import org.apache.kafka.common.KafkaException;
2122
import org.apache.kafka.common.Node;
2223
import org.apache.kafka.common.TopicPartition;
@@ -1107,6 +1108,13 @@ private void handleInitiateApiVersionRequests(long now) {
11071108
// not before ready.
11081109
this.connectionStates.checkingApiVersions(node);
11091110
ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue();
1111+
String clusterId = this.metadataUpdater.clusterId();
1112+
int nodeId = Integer.parseInt(node);
1113+
if (clusterId != null && nodeId < Integer.MAX_VALUE/2) {
1114+
System.out.println("CLUSTER_ID(" + clusterId + "), NODE_ID(" + nodeId + ")");
1115+
apiVersionRequestBuilder.setClusterId(clusterId);
1116+
apiVersionRequestBuilder.setNodeId(Integer.parseInt(node));
1117+
}
11101118
ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true);
11111119
doSend(clientRequest, true, now);
11121120
iter.remove();
@@ -1193,6 +1201,15 @@ class DefaultMetadataUpdater implements MetadataUpdater {
11931201
this.inProgress = null;
11941202
}
11951203

1204+
@Override
1205+
public String clusterId() {
1206+
ClusterResource clusterResource = metadata.fetch().clusterResource();
1207+
if (clusterResource != null) {
1208+
return clusterResource.clusterId();
1209+
}
1210+
return null;
1211+
}
1212+
11961213
@Override
11971214
public List<Node> fetchNodes() {
11981215
return metadata.fetch().nodes();
@@ -1296,6 +1313,7 @@ public void handleSuccessfulResponse(RequestHeader requestHeader, long now, Meta
12961313

12971314
if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP && response.topLevelError() == Errors.REBOOTSTRAP_REQUIRED) {
12981315
log.info("Rebootstrap requested by server.");
1316+
log.error("REBOOTSTRAP REQUESTED BY SERVER.");
12991317
initiateRebootstrap();
13001318
} else if (response.brokers().isEmpty()) {
13011319
// When talking to the startup phase of a broker, it is possible to receive an empty metadata set, which

clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ public Builder(
5656
this.data = data.duplicate();
5757
}
5858

59+
public void setClusterId(String clusterId) {
60+
this.data.setClusterId(clusterId);
61+
}
62+
63+
public void setNodeId(int nodeId) {
64+
this.data.setNodeId(nodeId);
65+
}
66+
5967
@Override
6068
public ApiVersionsRequest build(short version) {
6169
return new ApiVersionsRequest(data, version);
@@ -94,6 +102,12 @@ public boolean hasUnsupportedRequestVersion() {
94102
}
95103

96104
public boolean isValid() {
105+
if (version() >= 5) {
106+
// Either cluster ID and node ID are both specified, or neither is.
107+
if ((data.clusterId() == null && data.nodeId() != -1) || (data.clusterId() != null && data.nodeId() == -1)) {
108+
return false;
109+
}
110+
}
97111
if (version() >= 3) {
98112
return SOFTWARE_NAME_VERSION_PATTERN.matcher(data.clientSoftwareName()).matches() &&
99113
SOFTWARE_NAME_VERSION_PATTERN.matcher(data.clientSoftwareVersion()).matches();

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1529,10 +1529,22 @@ class KafkaApis(val requestChannel: RequestChannel,
15291529
apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
15301530
} else if (!apiVersionRequest.isValid) {
15311531
apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
1532+
} else if (clusterIdOrNodeIdIsInvalid(apiVersionRequest)) {
1533+
apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.REBOOTSTRAP_REQUIRED.exception)
15321534
} else {
15331535
apiVersionManager.apiVersionResponse(requestThrottleMs, request.header.apiVersion() < 4)
15341536
}
15351537
}
1538+
1539+
// KIP-1242 checks the cluster ID and node ID in the request if provided to ensure the
1540+
// client is connecting to the correct broker. If both are specified, they must match
1541+
// the expected values for this broker.
1542+
def clusterIdOrNodeIdIsInvalid(apiVersionRequest: ApiVersionsRequest): Boolean = {
1543+
apiVersionRequest.version >= 5 &&
1544+
apiVersionRequest.data.clusterId != null &&
1545+
(!apiVersionRequest.data.clusterId.equals(clusterId) || apiVersionRequest.data.nodeId != brokerId)
1546+
}
1547+
15361548
requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
15371549
}
15381550

0 commit comments

Comments
 (0)