Skip to content

Commit bfacd80

Browse files
SiyaoIsHidingabsurdfarce
authored andcommitted
CASSJAVA-104: Fix Flaky tests
patch by Jane He; review by Bret McGuire and Andy Tolbert
1 parent bf2b36f commit bfacd80

8 files changed

Lines changed: 178 additions & 71 deletions

File tree

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareAsyncProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import com.datastax.oss.driver.internal.core.session.RequestProcessor;
3535
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
3636
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
37-
import com.datastax.oss.driver.shaded.guava.common.base.Functions;
3837
import com.datastax.oss.driver.shaded.guava.common.cache.Cache;
3938
import com.datastax.oss.driver.shaded.guava.common.cache.CacheBuilder;
4039
import com.datastax.oss.driver.shaded.guava.common.collect.Iterables;
@@ -64,14 +63,15 @@ public CqlPrepareAsyncProcessor() {
6463
}
6564

6665
public CqlPrepareAsyncProcessor(@NonNull Optional<? extends DefaultDriverContext> context) {
67-
this(context, Functions.identity());
66+
// Use weakValues to evict prepared statements from the cache as soon are they are
67+
// no longer referenced elsewhere.
68+
this(context, CacheBuilder::weakValues);
6869
}
6970

7071
protected CqlPrepareAsyncProcessor(
7172
Optional<? extends DefaultDriverContext> context,
7273
Function<CacheBuilder<Object, Object>, CacheBuilder<Object, Object>> decorator) {
73-
74-
CacheBuilder<Object, Object> baseCache = CacheBuilder.newBuilder().weakValues();
74+
CacheBuilder<Object, Object> baseCache = CacheBuilder.newBuilder();
7575
this.cache = decorator.apply(baseCache).build();
7676
context.ifPresent(
7777
(ctx) -> {

integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCachingIT.java

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ private void invalidationResultSetTest(
194194
Consumer<CqlSession> setupTestSchema, Set<String> expectedChangedTypes) {
195195
invalidationTestInner(
196196
setupTestSchema,
197-
"select f from test_table_1 where e = ?",
198-
"select h from test_table_2 where g = ?",
197+
"select f from test_table_caching_1 where e = ?",
198+
"select h from test_table_caching_2 where g = ?",
199199
expectedChangedTypes);
200200
}
201201

@@ -206,8 +206,8 @@ private void invalidationVariableDefsTest(
206206
String condition = isCollection ? "contains ?" : "= ?";
207207
invalidationTestInner(
208208
setupTestSchema,
209-
String.format("select e from test_table_1 where f %s allow filtering", condition),
210-
String.format("select g from test_table_2 where h %s allow filtering", condition),
209+
String.format("select e from test_table_caching_1 where f %s allow filtering", condition),
210+
String.format("select g from test_table_caching_2 where h %s allow filtering", condition),
211211
expectedChangedTypes);
212212
}
213213

@@ -263,16 +263,18 @@ private void invalidationTestInner(
263263
preparedStmtCacheRemoveLatch.countDown();
264264
});
265265

266-
// alter test_type_2 to trigger cache invalidation and above events
267-
session.execute("ALTER TYPE test_type_2 add i blob");
266+
// alter test_type_caching_2 to trigger cache invalidation and above events
267+
session.execute("ALTER TYPE test_type_caching_2 add i blob");
268+
269+
session.checkSchemaAgreement();
268270

269271
// wait for latches and fail if they don't reach zero before timeout
270272
assertThat(
271273
Uninterruptibles.awaitUninterruptibly(
272-
preparedStmtCacheRemoveLatch, 10, TimeUnit.SECONDS))
274+
preparedStmtCacheRemoveLatch, 120, TimeUnit.SECONDS))
273275
.withFailMessage("preparedStmtCacheRemoveLatch did not trigger before timeout")
274276
.isTrue();
275-
assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 10, TimeUnit.SECONDS))
277+
assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 20, TimeUnit.SECONDS))
276278
.withFailMessage("typeChangeEventLatch did not trigger before timeout")
277279
.isTrue();
278280

@@ -295,17 +297,20 @@ private void invalidationTestInner(
295297

296298
Consumer<CqlSession> setupCacheEntryTestBasic =
297299
(session) -> {
298-
session.execute("CREATE TYPE test_type_1 (a text, b int)");
299-
session.execute("CREATE TYPE test_type_2 (c int, d text)");
300-
session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen<test_type_1>)");
301-
session.execute("CREATE TABLE test_table_2 (g int primary key, h frozen<test_type_2>)");
300+
session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
301+
session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
302+
session.execute(
303+
"CREATE TABLE test_table_caching_1 (e int primary key, f frozen<test_type_caching_1>)");
304+
session.execute(
305+
"CREATE TABLE test_table_caching_2 (g int primary key, h frozen<test_type_caching_2>)");
302306
};
303307

304308
@Test
305309
public void should_invalidate_cache_entry_on_basic_udt_change_result_set() {
306310
SchemaChangeSynchronizer.withLock(
307311
() -> {
308-
invalidationResultSetTest(setupCacheEntryTestBasic, ImmutableSet.of("test_type_2"));
312+
invalidationResultSetTest(
313+
setupCacheEntryTestBasic, ImmutableSet.of("test_type_caching_2"));
309314
});
310315
}
311316

@@ -314,25 +319,26 @@ public void should_invalidate_cache_entry_on_basic_udt_change_variable_defs() {
314319
SchemaChangeSynchronizer.withLock(
315320
() -> {
316321
invalidationVariableDefsTest(
317-
setupCacheEntryTestBasic, false, ImmutableSet.of("test_type_2"));
322+
setupCacheEntryTestBasic, false, ImmutableSet.of("test_type_caching_2"));
318323
});
319324
}
320325

321326
Consumer<CqlSession> setupCacheEntryTestCollection =
322327
(session) -> {
323-
session.execute("CREATE TYPE test_type_1 (a text, b int)");
324-
session.execute("CREATE TYPE test_type_2 (c int, d text)");
328+
session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
329+
session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
325330
session.execute(
326-
"CREATE TABLE test_table_1 (e int primary key, f list<frozen<test_type_1>>)");
331+
"CREATE TABLE test_table_caching_1 (e int primary key, f list<frozen<test_type_caching_1>>)");
327332
session.execute(
328-
"CREATE TABLE test_table_2 (g int primary key, h list<frozen<test_type_2>>)");
333+
"CREATE TABLE test_table_caching_2 (g int primary key, h list<frozen<test_type_caching_2>>)");
329334
};
330335

331336
@Test
332337
public void should_invalidate_cache_entry_on_collection_udt_change_result_set() {
333338
SchemaChangeSynchronizer.withLock(
334339
() -> {
335-
invalidationResultSetTest(setupCacheEntryTestCollection, ImmutableSet.of("test_type_2"));
340+
invalidationResultSetTest(
341+
setupCacheEntryTestCollection, ImmutableSet.of("test_type_caching_2"));
336342
});
337343
}
338344

@@ -341,25 +347,26 @@ public void should_invalidate_cache_entry_on_collection_udt_change_variable_defs
341347
SchemaChangeSynchronizer.withLock(
342348
() -> {
343349
invalidationVariableDefsTest(
344-
setupCacheEntryTestCollection, true, ImmutableSet.of("test_type_2"));
350+
setupCacheEntryTestCollection, true, ImmutableSet.of("test_type_caching_2"));
345351
});
346352
}
347353

348354
Consumer<CqlSession> setupCacheEntryTestTuple =
349355
(session) -> {
350-
session.execute("CREATE TYPE test_type_1 (a text, b int)");
351-
session.execute("CREATE TYPE test_type_2 (c int, d text)");
356+
session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
357+
session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
352358
session.execute(
353-
"CREATE TABLE test_table_1 (e int primary key, f tuple<int, test_type_1, text>)");
359+
"CREATE TABLE test_table_caching_1 (e int primary key, f tuple<int, test_type_caching_1, text>)");
354360
session.execute(
355-
"CREATE TABLE test_table_2 (g int primary key, h tuple<text, test_type_2, int>)");
361+
"CREATE TABLE test_table_caching_2 (g int primary key, h tuple<text, test_type_caching_2, int>)");
356362
};
357363

358364
@Test
359365
public void should_invalidate_cache_entry_on_tuple_udt_change_result_set() {
360366
SchemaChangeSynchronizer.withLock(
361367
() -> {
362-
invalidationResultSetTest(setupCacheEntryTestTuple, ImmutableSet.of("test_type_2"));
368+
invalidationResultSetTest(
369+
setupCacheEntryTestTuple, ImmutableSet.of("test_type_caching_2"));
363370
});
364371
}
365372

@@ -368,26 +375,29 @@ public void should_invalidate_cache_entry_on_tuple_udt_change_variable_defs() {
368375
SchemaChangeSynchronizer.withLock(
369376
() -> {
370377
invalidationVariableDefsTest(
371-
setupCacheEntryTestTuple, false, ImmutableSet.of("test_type_2"));
378+
setupCacheEntryTestTuple, false, ImmutableSet.of("test_type_caching_2"));
372379
});
373380
}
374381

375382
Consumer<CqlSession> setupCacheEntryTestNested =
376383
(session) -> {
377-
session.execute("CREATE TYPE test_type_1 (a text, b int)");
378-
session.execute("CREATE TYPE test_type_2 (c int, d text)");
379-
session.execute("CREATE TYPE test_type_3 (e frozen<test_type_1>, f int)");
380-
session.execute("CREATE TYPE test_type_4 (g int, h frozen<test_type_2>)");
381-
session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen<test_type_3>)");
382-
session.execute("CREATE TABLE test_table_2 (g int primary key, h frozen<test_type_4>)");
384+
session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
385+
session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
386+
session.execute("CREATE TYPE test_type_caching_3 (e frozen<test_type_caching_1>, f int)");
387+
session.execute("CREATE TYPE test_type_caching_4 (g int, h frozen<test_type_caching_2>)");
388+
session.execute(
389+
"CREATE TABLE test_table_caching_1 (e int primary key, f frozen<test_type_caching_3>)");
390+
session.execute(
391+
"CREATE TABLE test_table_caching_2 (g int primary key, h frozen<test_type_caching_4>)");
383392
};
384393

385394
@Test
386395
public void should_invalidate_cache_entry_on_nested_udt_change_result_set() {
387396
SchemaChangeSynchronizer.withLock(
388397
() -> {
389398
invalidationResultSetTest(
390-
setupCacheEntryTestNested, ImmutableSet.of("test_type_2", "test_type_4"));
399+
setupCacheEntryTestNested,
400+
ImmutableSet.of("test_type_caching_2", "test_type_caching_4"));
391401
});
392402
}
393403

@@ -396,7 +406,9 @@ public void should_invalidate_cache_entry_on_nested_udt_change_variable_defs() {
396406
SchemaChangeSynchronizer.withLock(
397407
() -> {
398408
invalidationVariableDefsTest(
399-
setupCacheEntryTestNested, false, ImmutableSet.of("test_type_2", "test_type_4"));
409+
setupCacheEntryTestNested,
410+
false,
411+
ImmutableSet.of("test_type_caching_2", "test_type_caching_4"));
400412
});
401413
}
402414

integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementCancellationIT.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,34 @@
2121
import static org.junit.Assert.fail;
2222

2323
import com.datastax.oss.driver.api.core.CqlSession;
24+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
25+
import com.datastax.oss.driver.api.core.context.DriverContext;
2426
import com.datastax.oss.driver.api.core.cql.PrepareRequest;
2527
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
28+
import com.datastax.oss.driver.api.core.session.ProgrammaticArguments;
29+
import com.datastax.oss.driver.api.core.session.SessionBuilder;
2630
import com.datastax.oss.driver.api.testinfra.ccm.CustomCcmRule;
2731
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
2832
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
2933
import com.datastax.oss.driver.categories.IsolatedTests;
3034
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
3135
import com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor;
36+
import com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor;
37+
import com.datastax.oss.driver.internal.core.session.BuiltInRequestProcessors;
38+
import com.datastax.oss.driver.internal.core.session.RequestProcessor;
39+
import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry;
3240
import com.datastax.oss.driver.shaded.guava.common.base.Predicates;
3341
import com.datastax.oss.driver.shaded.guava.common.cache.Cache;
3442
import com.datastax.oss.driver.shaded.guava.common.collect.Iterables;
43+
import edu.umd.cs.findbugs.annotations.NonNull;
44+
import java.util.List;
45+
import java.util.Optional;
3546
import java.util.concurrent.CompletableFuture;
47+
import java.util.function.Function;
3648
import org.junit.After;
49+
import org.junit.AfterClass;
3750
import org.junit.Before;
51+
import org.junit.BeforeClass;
3852
import org.junit.Rule;
3953
import org.junit.Test;
4054
import org.junit.experimental.categories.Category;
@@ -50,6 +64,69 @@ public class PreparedStatementCancellationIT {
5064

5165
@Rule public TestRule chain = RuleChain.outerRule(ccmRule).around(sessionRule);
5266

67+
private static class TestCqlPrepareAsyncProcessor extends CqlPrepareAsyncProcessor {
68+
69+
public TestCqlPrepareAsyncProcessor(@NonNull Optional<DefaultDriverContext> context) {
70+
// Default CqlPrepareAsyncProcessor uses weak values here as well. We avoid doing so
71+
// to prevent cache entries from unexpectedly disappearing mid-test.
72+
super(context, Function.identity());
73+
}
74+
}
75+
76+
private static class TestDefaultDriverContext extends DefaultDriverContext {
77+
public TestDefaultDriverContext(
78+
DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) {
79+
super(configLoader, programmaticArguments);
80+
}
81+
82+
@Override
83+
protected RequestProcessorRegistry buildRequestProcessorRegistry() {
84+
// Re-create the processor cache to insert the TestCqlPrepareAsyncProcessor with it's strong
85+
// prepared statement cache, see JAVA-3062
86+
List<RequestProcessor<?, ?>> processors =
87+
BuiltInRequestProcessors.createDefaultProcessors(this);
88+
processors.removeIf((processor) -> processor instanceof CqlPrepareAsyncProcessor);
89+
processors.removeIf((processor) -> processor instanceof CqlPrepareSyncProcessor);
90+
CqlPrepareAsyncProcessor asyncProcessor =
91+
new PreparedStatementCancellationIT.TestCqlPrepareAsyncProcessor(Optional.of(this));
92+
processors.add(2, asyncProcessor);
93+
processors.add(3, new CqlPrepareSyncProcessor(asyncProcessor));
94+
return new RequestProcessorRegistry(
95+
getSessionName(), processors.toArray(new RequestProcessor[0]));
96+
}
97+
}
98+
99+
private static class TestSessionBuilder extends SessionBuilder {
100+
101+
@Override
102+
protected Object wrap(@NonNull CqlSession defaultSession) {
103+
return defaultSession;
104+
}
105+
106+
@Override
107+
protected DriverContext buildContext(
108+
DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) {
109+
return new PreparedStatementCancellationIT.TestDefaultDriverContext(
110+
configLoader, programmaticArguments);
111+
}
112+
}
113+
114+
@BeforeClass
115+
public static void setupBeforeClass() {
116+
System.setProperty(
117+
SessionUtils.SESSION_BUILDER_CLASS_PROPERTY,
118+
PreparedStatementCancellationIT.class.getName());
119+
}
120+
121+
@AfterClass
122+
public static void teardownAfterClass() {
123+
System.clearProperty(SessionUtils.SESSION_BUILDER_CLASS_PROPERTY);
124+
}
125+
126+
public static SessionBuilder builder() {
127+
return new PreparedStatementCancellationIT.TestSessionBuilder();
128+
}
129+
53130
@Before
54131
public void setup() {
55132

integration-tests/src/test/java/com/datastax/oss/driver/core/cql/reactive/DefaultReactiveResultSetIT.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
3232
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
3333
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
34+
import com.datastax.oss.driver.api.core.cql.Statement;
3435
import com.datastax.oss.driver.api.testinfra.ccm.CcmRule;
3536
import com.datastax.oss.driver.api.testinfra.ccm.SchemaChangeSynchronizer;
3637
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
@@ -67,19 +68,15 @@ public static void initialize() {
6768
CqlSession session = sessionRule.session();
6869
SchemaChangeSynchronizer.withLock(
6970
() -> {
70-
session.execute("DROP TABLE IF EXISTS test_reactive_read");
71-
session.execute("DROP TABLE IF EXISTS test_reactive_write");
71+
session.execute(createSlowStatement("DROP TABLE IF EXISTS test_reactive_read"));
72+
session.execute(createSlowStatement("DROP TABLE IF EXISTS test_reactive_write"));
7273
session.checkSchemaAgreement();
7374
session.execute(
74-
SimpleStatement.builder(
75-
"CREATE TABLE test_reactive_read (pk int, cc int, v int, PRIMARY KEY ((pk), cc))")
76-
.setExecutionProfile(sessionRule.slowProfile())
77-
.build());
75+
createSlowStatement(
76+
"CREATE TABLE test_reactive_read (pk int, cc int, v int, PRIMARY KEY ((pk), cc))"));
7877
session.execute(
79-
SimpleStatement.builder(
80-
"CREATE TABLE test_reactive_write (pk int, cc int, v int, PRIMARY KEY ((pk), cc))")
81-
.setExecutionProfile(sessionRule.slowProfile())
82-
.build());
78+
createSlowStatement(
79+
"CREATE TABLE test_reactive_write (pk int, cc int, v int, PRIMARY KEY ((pk), cc))"));
8380
session.checkSchemaAgreement();
8481
});
8582
for (int i = 0; i < 1000; i++) {
@@ -92,6 +89,12 @@ public static void initialize() {
9289
}
9390
}
9491

92+
static Statement<?> createSlowStatement(String statement) {
93+
return SimpleStatement.builder(statement)
94+
.setExecutionProfile(sessionRule.slowProfile())
95+
.build();
96+
}
97+
9598
@Before
9699
public void truncateTables() throws Exception {
97100
CqlSession session = sessionRule.session();

integration-tests/src/test/java/com/datastax/oss/driver/core/metadata/SchemaIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,11 @@ public void should_disable_schema_programmatically_when_enabled_in_config() {
151151
sessionRule
152152
.session()
153153
.execute(
154-
SimpleStatement.builder("CREATE TABLE foo(k int primary key)")
154+
SimpleStatement.builder("CREATE TABLE foo_schema_it(k int primary key)")
155155
.setExecutionProfile(slowProfile)
156156
.build());
157157
assertThat(session.getMetadata().getKeyspace(sessionRule.keyspace()).get().getTables())
158-
.doesNotContainKey(CqlIdentifier.fromInternal("foo"));
158+
.doesNotContainKey(CqlIdentifier.fromInternal("foo_schema_it"));
159159

160160
// Reset to config value (true), should refresh and load the new table
161161
session.setSchemaMetadataEnabled(null);
@@ -167,7 +167,7 @@ public void should_disable_schema_programmatically_when_enabled_in_config() {
167167
() ->
168168
assertThat(
169169
session.getMetadata().getKeyspace(sessionRule.keyspace()).get().getTables())
170-
.containsKey(CqlIdentifier.fromInternal("foo")));
170+
.containsKey(CqlIdentifier.fromInternal("foo_schema_it")));
171171
}
172172

173173
@Test

0 commit comments

Comments
 (0)