Skip to content

Commit e11a53b

Browse files
Oleh DokukaOlegDokuka
authored andcommitted
merge 1.0.x into master
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
2 parents cad188e + 61652c3 commit e11a53b

4 files changed

Lines changed: 248 additions & 87 deletions

File tree

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,9 @@ void drainRegular(long expectedState, Subscriber<? super ByteBuf> a) {
227227
}
228228

229229
if (checkTerminated(empty, a)) {
230+
if (!empty) {
231+
release(t);
232+
}
230233
return;
231234
}
232235

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1362,6 +1362,51 @@ public void errorFragmentTooSmall(
13621362
leaksTrackingByteBufAllocator.assertHasNoLeaks();
13631363
}
13641364

1365+
@ParameterizedTest
1366+
@ValueSource(strings = {"stream", "channel"})
1367+
// see https://github.com/rsocket/rsocket-java/issues/959
1368+
public void testWorkaround959(String type) {
1369+
for (int i = 1; i < 20000; i += 2) {
1370+
ByteBuf buffer = rule.alloc().buffer();
1371+
buffer.writeCharSequence("test", CharsetUtil.UTF_8);
1372+
1373+
final AssertSubscriber<Payload> assertSubscriber = new AssertSubscriber<>(3);
1374+
if (type.equals("stream")) {
1375+
rule.socket.requestStream(ByteBufPayload.create(buffer)).subscribe(assertSubscriber);
1376+
} else if (type.equals("channel")) {
1377+
rule.socket
1378+
.requestChannel(Flux.just(ByteBufPayload.create(buffer)))
1379+
.subscribe(assertSubscriber);
1380+
}
1381+
1382+
final ByteBuf payloadFrame =
1383+
PayloadFrameCodec.encode(
1384+
rule.alloc(), i, false, false, true, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER);
1385+
1386+
RaceTestUtils.race(
1387+
() -> {
1388+
rule.connection.addToReceivedBuffer(payloadFrame.copy());
1389+
rule.connection.addToReceivedBuffer(payloadFrame.copy());
1390+
rule.connection.addToReceivedBuffer(payloadFrame);
1391+
},
1392+
() -> {
1393+
assertSubscriber.request(1);
1394+
assertSubscriber.request(1);
1395+
assertSubscriber.request(1);
1396+
});
1397+
1398+
Assertions.assertThat(rule.connection.getSent()).allMatch(ByteBuf::release);
1399+
1400+
Assertions.assertThat(rule.socket.isDisposed()).isFalse();
1401+
1402+
assertSubscriber.values().forEach(ReferenceCountUtil::safeRelease);
1403+
assertSubscriber.assertNoError();
1404+
1405+
rule.connection.clearSendReceiveBuffers();
1406+
rule.assertHasNoLeaks();
1407+
}
1408+
}
1409+
13651410
public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester> {
13661411
@Override
13671412
protected RSocketRequester newRSocket() {
Lines changed: 132 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-present the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,44 +16,45 @@
1616

1717
package io.rsocket.internal;
1818

19+
import static org.assertj.core.api.Assertions.assertThat;
20+
1921
import io.netty.buffer.ByteBuf;
22+
import io.netty.buffer.ByteBufAllocator;
2023
import io.netty.buffer.Unpooled;
2124
import io.netty.util.CharsetUtil;
22-
import java.util.concurrent.CountDownLatch;
23-
import org.junit.Assert;
24-
import org.junit.Test;
25+
import io.netty.util.ReferenceCountUtil;
26+
import io.rsocket.buffer.LeaksTrackingByteBufAllocator;
27+
import io.rsocket.internal.subscriber.AssertSubscriber;
28+
import java.time.Duration;
29+
import org.junit.jupiter.api.AfterAll;
30+
import org.junit.jupiter.api.BeforeAll;
31+
import org.junit.jupiter.api.RepeatedTest;
32+
import org.junit.jupiter.api.Timeout;
33+
import org.junit.jupiter.params.ParameterizedTest;
34+
import org.junit.jupiter.params.provider.ValueSource;
35+
import reactor.core.Fuseable;
36+
import reactor.core.publisher.Hooks;
37+
import reactor.core.publisher.Operators;
38+
import reactor.core.scheduler.Schedulers;
39+
import reactor.test.StepVerifier;
40+
import reactor.test.util.RaceTestUtils;
2541

2642
public class UnboundedProcessorTest {
27-
@Test
28-
public void testOnNextBeforeSubscribe_10() {
29-
testOnNextBeforeSubscribeN(10);
30-
}
31-
32-
@Test
33-
public void testOnNextBeforeSubscribe_100() {
34-
testOnNextBeforeSubscribeN(100);
35-
}
36-
37-
@Test
38-
public void testOnNextBeforeSubscribe_10_000() {
39-
testOnNextBeforeSubscribeN(10_000);
40-
}
41-
42-
@Test
43-
public void testOnNextBeforeSubscribe_100_000() {
44-
testOnNextBeforeSubscribeN(100_000);
45-
}
4643

47-
@Test
48-
public void testOnNextBeforeSubscribe_1_000_000() {
49-
testOnNextBeforeSubscribeN(1_000_000);
44+
@BeforeAll
45+
public static void setup() {
46+
Hooks.onErrorDropped(__ -> {});
5047
}
5148

52-
@Test
53-
public void testOnNextBeforeSubscribe_10_000_000() {
54-
testOnNextBeforeSubscribeN(10_000_000);
49+
@AfterAll
50+
public static void teardown() {
51+
Hooks.resetOnErrorDropped();
5552
}
5653

54+
@ParameterizedTest(
55+
name =
56+
"Test that emitting {0} onNext before subscribe and requestN should deliver all the signals once the subscriber is available")
57+
@ValueSource(ints = {10, 100, 10_000, 100_000, 1_000_000, 10_000_000})
5758
public void testOnNextBeforeSubscribeN(int n) {
5859
UnboundedProcessor processor = new UnboundedProcessor();
5960

@@ -63,68 +64,127 @@ public void testOnNextBeforeSubscribeN(int n) {
6364

6465
processor.onComplete();
6566

66-
long count = processor.count().block();
67-
68-
Assert.assertEquals(n, count);
69-
}
70-
71-
@Test
72-
public void testOnNextAfterSubscribe_10() throws Exception {
73-
testOnNextAfterSubscribeN(10);
67+
StepVerifier.create(processor.count()).expectNext(Long.valueOf(n)).verifyComplete();
7468
}
7569

76-
@Test
77-
public void testOnNextAfterSubscribe_100() throws Exception {
78-
testOnNextAfterSubscribeN(100);
79-
}
80-
81-
@Test
82-
public void testOnNextAfterSubscribe_1000() throws Exception {
83-
testOnNextAfterSubscribeN(1000);
84-
}
85-
86-
@Test
87-
public void testPrioritizedSending() {
70+
@ParameterizedTest(
71+
name =
72+
"Test that emitting {0} onNext after subscribe and requestN should deliver all the signals")
73+
@ValueSource(ints = {10, 100, 10_000})
74+
public void testOnNextAfterSubscribeN(int n) {
8875
UnboundedProcessor processor = new UnboundedProcessor();
76+
AssertSubscriber<ByteBuf> assertSubscriber = AssertSubscriber.create();
8977

90-
for (int i = 0; i < 1000; i++) {
78+
processor.subscribe(assertSubscriber);
79+
80+
for (int i = 0; i < n; i++) {
9181
processor.onNext(Unpooled.EMPTY_BUFFER);
9282
}
9383

94-
processor.onNextPrioritized(Unpooled.wrappedBuffer("test".getBytes(CharsetUtil.UTF_8)));
95-
96-
ByteBuf byteBuf = processor.next().block();
97-
98-
Assert.assertEquals(byteBuf.toString(CharsetUtil.UTF_8), "test");
84+
assertSubscriber.awaitAndAssertNextValueCount(n);
9985
}
10086

101-
@Test
102-
public void testPrioritizedFused() {
87+
@ParameterizedTest(
88+
name =
89+
"Test that prioritized value sending deliver prioritized signals before the others mode[fusionEnabled={0}]")
90+
@ValueSource(booleans = {true, false})
91+
public void testPrioritizedSending(boolean fusedCase) {
10392
UnboundedProcessor processor = new UnboundedProcessor();
10493

10594
for (int i = 0; i < 1000; i++) {
10695
processor.onNext(Unpooled.EMPTY_BUFFER);
10796
}
10897

109-
processor.onNextPrioritized(Unpooled.wrappedBuffer("test".getBytes(CharsetUtil.UTF_8)));
110-
111-
ByteBuf byteBuf = processor.poll();
98+
processor.onNextPrioritized(Unpooled.copiedBuffer("test", CharsetUtil.UTF_8));
11299

113-
Assert.assertEquals(byteBuf.toString(CharsetUtil.UTF_8), "test");
100+
assertThat(fusedCase ? processor.poll() : processor.next().block())
101+
.isNotNull()
102+
.extracting(bb -> bb.toString(CharsetUtil.UTF_8))
103+
.isEqualTo("test");
114104
}
115105

116-
public void testOnNextAfterSubscribeN(int n) throws Exception {
117-
CountDownLatch latch = new CountDownLatch(n);
118-
UnboundedProcessor processor = new UnboundedProcessor();
119-
processor.log().doOnNext(integer -> latch.countDown()).subscribe();
120-
121-
for (int i = 0; i < n; i++) {
122-
System.out.println("onNexting -> " + i);
123-
processor.onNext(Unpooled.EMPTY_BUFFER);
106+
@ParameterizedTest(
107+
name =
108+
"Ensures that racing between onNext | dispose | cancel | request(n) will not cause any issues and leaks; mode[fusionEnabled={0}]")
109+
@ValueSource(booleans = {true, false})
110+
public void ensureUnboundedProcessorDisposesQueueProperly(boolean withFusionEnabled) {
111+
final LeaksTrackingByteBufAllocator allocator =
112+
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
113+
for (int i = 0; i < 100000; i++) {
114+
final UnboundedProcessor unboundedProcessor = new UnboundedProcessor();
115+
116+
final ByteBuf buffer1 = allocator.buffer(1);
117+
final ByteBuf buffer2 = allocator.buffer(2);
118+
119+
final AssertSubscriber<ByteBuf> assertSubscriber =
120+
new AssertSubscriber<ByteBuf>(0)
121+
.requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE);
122+
123+
unboundedProcessor.subscribe(assertSubscriber);
124+
125+
RaceTestUtils.race(
126+
() ->
127+
RaceTestUtils.race(
128+
() ->
129+
RaceTestUtils.race(
130+
() -> {
131+
unboundedProcessor.onNext(buffer1);
132+
unboundedProcessor.onNext(buffer2);
133+
},
134+
unboundedProcessor::dispose,
135+
Schedulers.elastic()),
136+
assertSubscriber::cancel,
137+
Schedulers.elastic()),
138+
() -> {
139+
assertSubscriber.request(1);
140+
assertSubscriber.request(1);
141+
},
142+
Schedulers.elastic());
143+
144+
assertSubscriber.values().forEach(ReferenceCountUtil::safeRelease);
145+
146+
allocator.assertHasNoLeaks();
124147
}
148+
}
125149

126-
processor.drain();
127-
128-
latch.await();
150+
@RepeatedTest(
151+
name =
152+
"Ensures that racing between onNext + dispose | downstream async drain should not cause any issues and leaks",
153+
value = 100000)
154+
@Timeout(60)
155+
public void ensuresAsyncFusionAndDisposureHasNoDeadlock() {
156+
// TODO: enable leaks tracking
157+
// final LeaksTrackingByteBufAllocator allocator =
158+
// LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
159+
final UnboundedProcessor unboundedProcessor = new UnboundedProcessor();
160+
161+
// final ByteBuf buffer1 = allocator.buffer(1);
162+
// final ByteBuf buffer2 = allocator.buffer(2);
163+
164+
final AssertSubscriber<ByteBuf> assertSubscriber =
165+
new AssertSubscriber<>(Operators.enableOnDiscard(null, ReferenceCountUtil::safeRelease));
166+
167+
unboundedProcessor.publishOn(Schedulers.parallel()).subscribe(assertSubscriber);
168+
169+
RaceTestUtils.race(
170+
() -> {
171+
// unboundedProcessor.onNext(buffer1);
172+
// unboundedProcessor.onNext(buffer2);
173+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
174+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
175+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
176+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
177+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
178+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
179+
unboundedProcessor.dispose();
180+
},
181+
unboundedProcessor::dispose);
182+
183+
assertSubscriber
184+
.await(Duration.ofSeconds(50))
185+
.values()
186+
.forEach(ReferenceCountUtil::safeRelease);
187+
188+
// allocator.assertHasNoLeaks();
129189
}
130190
}

0 commit comments

Comments
 (0)