Skip to content

Commit 2c521ad

Browse files
committed
Add QUIC and HTTP/3 netty transport prototypes
Signed-off-by: jeroen.veltman <jeroen.veltman@nextend.nl>
1 parent fc64b43 commit 2c521ad

16 files changed

Lines changed: 1814 additions & 0 deletions

rsocket-transport-netty/build.gradle

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,19 @@ if (osdetector.classifier in ["linux-x86_64", "linux-aarch_64", "osx-x86_64", "o
2626
os_suffix = "::" + osdetector.classifier
2727
}
2828

29+
def netty_incubator_quic_version = "0.0.72.Final"
30+
def netty_incubator_http3_version = "0.0.29.Final"
31+
2932
dependencies {
3033
api project(':rsocket-core')
3134
api "io.projectreactor.netty:reactor-netty-core"
3235
api "io.projectreactor.netty:reactor-netty-http"
36+
implementation "io.netty.incubator:netty-incubator-codec-classes-quic:${netty_incubator_quic_version}"
37+
implementation "io.netty.incubator:netty-incubator-codec-http3:${netty_incubator_http3_version}"
38+
runtimeOnly "io.netty.incubator:netty-incubator-codec-native-quic:${netty_incubator_quic_version}"
39+
if (os_suffix) {
40+
runtimeOnly(group: "io.netty.incubator", name: "netty-incubator-codec-native-quic", version: netty_incubator_quic_version, classifier: osdetector.classifier)
41+
}
3342
api 'org.slf4j:slf4j-api'
3443

3544
testImplementation project(':rsocket-test')
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright 2015-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.transport.netty;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.ByteBufAllocator;
21+
import io.netty.channel.Channel;
22+
import io.netty.channel.ChannelFutureListener;
23+
import io.netty.incubator.codec.http3.DefaultHttp3DataFrame;
24+
import io.netty.incubator.codec.http3.Http3DataFrame;
25+
import io.netty.incubator.codec.http3.Http3HeadersFrame;
26+
import io.netty.incubator.codec.quic.QuicChannel;
27+
import io.netty.incubator.codec.quic.QuicStreamChannel;
28+
import io.rsocket.DuplexConnection;
29+
import io.rsocket.RSocketErrorException;
30+
import io.rsocket.frame.ErrorFrameCodec;
31+
import io.rsocket.internal.BaseDuplexConnection;
32+
import io.rsocket.internal.UnboundedProcessor;
33+
import java.nio.channels.ClosedChannelException;
34+
import java.net.SocketAddress;
35+
import java.util.Objects;
36+
import reactor.core.publisher.Flux;
37+
import reactor.core.publisher.Mono;
38+
39+
/** An implementation of {@link DuplexConnection} that connects via HTTP/3. */
40+
public final class Http3DuplexConnection extends BaseDuplexConnection {
41+
private final String side;
42+
private final Channel connection;
43+
private final UnboundedProcessor inbound;
44+
45+
public Http3DuplexConnection(Channel connection) {
46+
this("unknown", connection);
47+
}
48+
49+
public Http3DuplexConnection(String side, Channel connection) {
50+
this.connection = Objects.requireNonNull(connection, "connection must not be null");
51+
this.side = side;
52+
this.inbound = new UnboundedProcessor();
53+
54+
Flux.from(sender)
55+
.concatMap(
56+
frame ->
57+
Mono.<Void>create(
58+
sink ->
59+
this.connection
60+
.writeAndFlush(new DefaultHttp3DataFrame(frame))
61+
.addListener(
62+
future -> {
63+
if (future.isSuccess()) {
64+
sink.success();
65+
} else {
66+
sink.error(future.cause());
67+
}
68+
})),
69+
1)
70+
.doOnError(
71+
throwable -> {
72+
inbound.onError(throwable);
73+
onClose.tryEmitError(throwable);
74+
this.connection.close();
75+
})
76+
.doFinally(
77+
__ -> {
78+
if (this.connection instanceof QuicStreamChannel) {
79+
((QuicStreamChannel) this.connection)
80+
.shutdownOutput()
81+
.addListener(ChannelFutureListener.CLOSE);
82+
} else {
83+
this.connection.close();
84+
}
85+
})
86+
.subscribe();
87+
}
88+
89+
@Override
90+
public ByteBufAllocator alloc() {
91+
return connection.alloc();
92+
}
93+
94+
@Override
95+
public SocketAddress remoteAddress() {
96+
if (connection.parent() instanceof QuicChannel) {
97+
return ((QuicChannel) connection.parent()).remoteSocketAddress();
98+
}
99+
return connection.remoteAddress();
100+
}
101+
102+
@Override
103+
protected void doOnClose() {
104+
connection.close();
105+
}
106+
107+
@Override
108+
public Mono<Void> onClose() {
109+
return super.onClose();
110+
}
111+
112+
@Override
113+
public Flux<ByteBuf> receive() {
114+
return inbound;
115+
}
116+
117+
@Override
118+
public void sendErrorAndClose(RSocketErrorException e) {
119+
final ByteBuf errorFrame = ErrorFrameCodec.encode(alloc(), 0, e);
120+
sender.tryEmitFinal(errorFrame);
121+
}
122+
123+
public void handleHeaders(Http3HeadersFrame frame) {
124+
}
125+
126+
public void handleData(Http3DataFrame frame) {
127+
ByteBuf byteBuf = frame.content().retain();
128+
frame.release();
129+
inbound.onNext(byteBuf);
130+
}
131+
132+
public void handleError(Throwable cause) {
133+
inbound.onError(cause);
134+
onClose.tryEmitError(cause);
135+
}
136+
137+
public void handleInputClosed() {
138+
inbound.onError(new ClosedChannelException());
139+
onClose.tryEmitEmpty();
140+
}
141+
142+
@Override
143+
public String toString() {
144+
return "Http3DuplexConnection{" + "side='" + side + '\'' + ", connection=" + connection + '}';
145+
}
146+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.transport.netty;
18+
19+
import java.net.URI;
20+
import java.util.Objects;
21+
22+
/** Immutable configuration for HTTP/3 based transports. */
23+
public final class Http3TransportConfig {
24+
25+
public static final int DEFAULT_PORT = 443;
26+
public static final String DEFAULT_PATH = "/rsocket";
27+
public static final String DEFAULT_METHOD = "POST";
28+
public static final String DEFAULT_CONTENT_TYPE = "application/rsocket";
29+
30+
private final QuicTransportConfig quicConfig;
31+
private final String path;
32+
private final String method;
33+
private final String contentType;
34+
35+
private Http3TransportConfig(Builder builder) {
36+
this.quicConfig = builder.quicConfig;
37+
this.path = builder.path;
38+
this.method = builder.method;
39+
this.contentType = builder.contentType;
40+
}
41+
42+
public static Http3TransportConfig create() {
43+
return builder().build();
44+
}
45+
46+
public static Builder builder() {
47+
return new Builder();
48+
}
49+
50+
public static Http3TransportConfig from(URI uri) {
51+
final String uriPath = uri.getPath();
52+
return builder()
53+
.path(uriPath == null || uriPath.isEmpty() ? DEFAULT_PATH : uriPath)
54+
.build();
55+
}
56+
57+
public QuicTransportConfig quicConfig() {
58+
return quicConfig;
59+
}
60+
61+
public String path() {
62+
return path;
63+
}
64+
65+
public String method() {
66+
return method;
67+
}
68+
69+
public String contentType() {
70+
return contentType;
71+
}
72+
73+
public Builder mutate() {
74+
return builder()
75+
.quicConfig(quicConfig)
76+
.path(path)
77+
.method(method)
78+
.contentType(contentType);
79+
}
80+
81+
public static final class Builder {
82+
private QuicTransportConfig quicConfig =
83+
QuicTransportConfig.builder().alpn("h3").secure(true).build();
84+
private String path = DEFAULT_PATH;
85+
private String method = DEFAULT_METHOD;
86+
private String contentType = DEFAULT_CONTENT_TYPE;
87+
88+
public Builder quicConfig(QuicTransportConfig quicConfig) {
89+
this.quicConfig = Objects.requireNonNull(quicConfig, "quicConfig must not be null");
90+
return this;
91+
}
92+
93+
public Builder path(String path) {
94+
Objects.requireNonNull(path, "path must not be null");
95+
this.path = path.startsWith("/") ? path : "/" + path;
96+
return this;
97+
}
98+
99+
public Builder method(String method) {
100+
this.method = Objects.requireNonNull(method, "method must not be null");
101+
return this;
102+
}
103+
104+
public Builder contentType(String contentType) {
105+
this.contentType = Objects.requireNonNull(contentType, "contentType must not be null");
106+
return this;
107+
}
108+
109+
public Http3TransportConfig build() {
110+
return new Http3TransportConfig(this);
111+
}
112+
}
113+
}

0 commit comments

Comments
 (0)