Skip to content

Commit 1b199c7

Browse files
committed
Integrate configVars & timeUnitToMill()
1 parent 394e245 commit 1b199c7

8 files changed

Lines changed: 296 additions & 44 deletions

File tree

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.zeppelin.common;
19+
20+
import java.time.Duration;
21+
22+
/**
23+
* Utility for parsing time values from configuration strings.
24+
* Consolidates the logic previously duplicated in
25+
* {@code ZeppelinConfiguration.timeUnitToMill()} and
26+
* {@code TimeoutLifecycleManager.parseTimeValue()}.
27+
*/
28+
public final class ConfigTimeUtils {
29+
private ConfigTimeUtils() {}
30+
31+
/**
32+
* Parses a configuration time string to milliseconds.
33+
*
34+
* <p>Supported formats:
35+
* <ul>
36+
* <li>Plain integer: treated as milliseconds (e.g. {@code "60000"})</li>
37+
* <li>{@code "ms"} suffix: parsed as milliseconds (e.g. {@code "500ms"})</li>
38+
* <li>ISO 8601 duration component: H, M, S or combinations (e.g. {@code "1H"}, {@code "30M"},
39+
* {@code "1H30M"})</li>
40+
* </ul>
41+
*
42+
* @param value the time string from a configuration property
43+
* @return the equivalent duration in milliseconds
44+
* @throws IllegalArgumentException if {@code value} is null or empty
45+
* @throws java.time.format.DateTimeParseException if the value is not a recognised format
46+
*/
47+
public static long parseTimeValueToMillis(String value) {
48+
if (value == null || value.trim().isEmpty()) {
49+
throw new IllegalArgumentException("Time value must not be null or empty");
50+
}
51+
try {
52+
return Long.parseLong(value);
53+
} catch (NumberFormatException e) {
54+
if (value.endsWith("ms")) {
55+
return Long.parseLong(value.substring(0, value.length() - 2));
56+
}
57+
return Duration.parse("PT" + value).toMillis();
58+
}
59+
}
60+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.zeppelin.common;
19+
20+
/**
21+
* Canonical string constants for interpreter-related configuration keys.
22+
* Use these instead of plain string literals to catch typos at compile time.
23+
*/
24+
public final class InterpreterConfigKeys {
25+
private InterpreterConfigKeys() {}
26+
27+
public static final String INTERPRETER_CONNECTION_POOL_SIZE =
28+
"zeppelin.interpreter.connection.poolsize";
29+
public static final String INTERPRETER_LIFECYCLE_MANAGER_CLASS =
30+
"zeppelin.interpreter.lifecyclemanager.class";
31+
public static final String INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL =
32+
"zeppelin.interpreter.lifecyclemanager.timeout.checkinterval";
33+
public static final String INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD =
34+
"zeppelin.interpreter.lifecyclemanager.timeout.threshold";
35+
public static final String PROXY_URL = "zeppelin.proxy.url";
36+
public static final String PROXY_USER = "zeppelin.proxy.user";
37+
public static final String PROXY_PASSWORD = "zeppelin.proxy.password";
38+
public static final String INTERPRETER_DEP_MVNREPO = "zeppelin.interpreter.dep.mvnRepo";
39+
public static final String SCHEDULER_THREADPOOL_SIZE = "zeppelin.scheduler.threadpool.size";
40+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.zeppelin.common;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import static org.junit.jupiter.api.Assertions.assertEquals;
23+
import static org.junit.jupiter.api.Assertions.assertThrows;
24+
25+
class ConfigTimeUtilsTest {
26+
27+
@Test
28+
void plainNumberReturnedAsMillis() {
29+
assertEquals(60000L, ConfigTimeUtils.parseTimeValueToMillis("60000"));
30+
}
31+
32+
@Test
33+
void msSuffixParsed() {
34+
assertEquals(500L, ConfigTimeUtils.parseTimeValueToMillis("500ms"));
35+
}
36+
37+
@Test
38+
void hourUnitParsed() {
39+
assertEquals(3600000L, ConfigTimeUtils.parseTimeValueToMillis("1H"));
40+
}
41+
42+
@Test
43+
void minuteUnitParsed() {
44+
assertEquals(1800000L, ConfigTimeUtils.parseTimeValueToMillis("30M"));
45+
}
46+
47+
@Test
48+
void secondUnitParsed() {
49+
assertEquals(10000L, ConfigTimeUtils.parseTimeValueToMillis("10S"));
50+
}
51+
52+
@Test
53+
void compoundDurationParsed() {
54+
assertEquals(5400000L, ConfigTimeUtils.parseTimeValueToMillis("1H30M"));
55+
}
56+
57+
@Test
58+
void defaultCheckIntervalCompatible() {
59+
assertEquals(60000L, ConfigTimeUtils.parseTimeValueToMillis("60000"));
60+
}
61+
62+
@Test
63+
void defaultThresholdCompatible() {
64+
assertEquals(3600000L, ConfigTimeUtils.parseTimeValueToMillis("3600000"));
65+
}
66+
67+
@Test
68+
void nullInputThrows() {
69+
assertThrows(IllegalArgumentException.class,
70+
() -> ConfigTimeUtils.parseTimeValueToMillis(null));
71+
}
72+
73+
@Test
74+
void emptyInputThrows() {
75+
assertThrows(IllegalArgumentException.class,
76+
() -> ConfigTimeUtils.parseTimeValueToMillis(""));
77+
}
78+
79+
@Test
80+
void invalidFormatThrows() {
81+
assertThrows(Exception.class,
82+
() -> ConfigTimeUtils.parseTimeValueToMillis("abc"));
83+
}
84+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.zeppelin.common;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import static org.junit.jupiter.api.Assertions.assertEquals;
23+
24+
class InterpreterConfigKeysTest {
25+
26+
@Test
27+
void interpreterConnectionPoolSizeKey() {
28+
assertEquals("zeppelin.interpreter.connection.poolsize",
29+
InterpreterConfigKeys.INTERPRETER_CONNECTION_POOL_SIZE);
30+
}
31+
32+
@Test
33+
void interpreterLifecycleManagerClassKey() {
34+
assertEquals("zeppelin.interpreter.lifecyclemanager.class",
35+
InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_CLASS);
36+
}
37+
38+
@Test
39+
void interpreterLifecycleManagerTimeoutCheckIntervalKey() {
40+
assertEquals("zeppelin.interpreter.lifecyclemanager.timeout.checkinterval",
41+
InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL);
42+
}
43+
44+
@Test
45+
void interpreterLifecycleManagerTimeoutThresholdKey() {
46+
assertEquals("zeppelin.interpreter.lifecyclemanager.timeout.threshold",
47+
InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD);
48+
}
49+
50+
@Test
51+
void proxyUrlKey() {
52+
assertEquals("zeppelin.proxy.url", InterpreterConfigKeys.PROXY_URL);
53+
}
54+
55+
@Test
56+
void proxyUserKey() {
57+
assertEquals("zeppelin.proxy.user", InterpreterConfigKeys.PROXY_USER);
58+
}
59+
60+
@Test
61+
void proxyPasswordKey() {
62+
assertEquals("zeppelin.proxy.password", InterpreterConfigKeys.PROXY_PASSWORD);
63+
}
64+
65+
@Test
66+
void interpreterDepMvnRepoKey() {
67+
assertEquals("zeppelin.interpreter.dep.mvnRepo",
68+
InterpreterConfigKeys.INTERPRETER_DEP_MVNREPO);
69+
}
70+
71+
@Test
72+
void schedulerThreadpoolSizeKey() {
73+
assertEquals("zeppelin.scheduler.threadpool.size",
74+
InterpreterConfigKeys.SCHEDULER_THREADPOOL_SIZE);
75+
}
76+
}

zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
package org.apache.zeppelin.interpreter.lifecycle;
1919

2020
import org.apache.thrift.TException;
21+
import org.apache.zeppelin.common.ConfigTimeUtils;
22+
import org.apache.zeppelin.common.InterpreterConfigKeys;
2123
import org.apache.zeppelin.interpreter.LifecycleManager;
2224
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer;
2325
import org.apache.zeppelin.scheduler.ExecutorFactory;
2426
import org.slf4j.Logger;
2527
import org.slf4j.LoggerFactory;
2628

27-
import java.time.Duration;
2829
import java.util.Properties;
2930
import java.util.concurrent.ScheduledExecutorService;
3031
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -50,11 +51,11 @@ public class TimeoutLifecycleManager extends LifecycleManager {
5051
public TimeoutLifecycleManager(Properties properties,
5152
RemoteInterpreterServer remoteInterpreterServer) {
5253
super(properties, remoteInterpreterServer);
53-
long checkInterval = parseTimeValue(properties.getProperty(
54-
"zeppelin.interpreter.lifecyclemanager.timeout.checkinterval",
55-
String.valueOf(DEFAULT_CHECK_INTERVAL)));
56-
long timeoutThreshold = parseTimeValue(properties.getProperty(
57-
"zeppelin.interpreter.lifecyclemanager.timeout.threshold",
54+
long checkInterval = ConfigTimeUtils.parseTimeValueToMillis(properties.getProperty(
55+
InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL,
56+
String.valueOf(DEFAULT_CHECK_INTERVAL)));
57+
long timeoutThreshold = ConfigTimeUtils.parseTimeValueToMillis(properties.getProperty(
58+
InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD,
5859
String.valueOf(DEFAULT_TIMEOUT_THRESHOLD)));
5960
ScheduledExecutorService checkScheduler = ExecutorFactory.singleton()
6061
.createOrGetScheduled("TimeoutLifecycleManager", 1);
@@ -74,17 +75,6 @@ public TimeoutLifecycleManager(Properties properties,
7475
timeoutThreshold);
7576
}
7677

77-
static long parseTimeValue(String value) {
78-
try {
79-
return Long.parseLong(value);
80-
} catch (NumberFormatException e) {
81-
if (value.endsWith("ms")) {
82-
return Long.parseLong(value.substring(0, value.length() - 2));
83-
}
84-
return Duration.parse("PT" + value).toMillis();
85-
}
86-
}
87-
8878
@Override
8979
public void onInterpreterProcessStarted(String interpreterGroupId) {
9080
LOGGER.info("Interpreter process: {} is started", interpreterGroupId);

zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.zeppelin.helium.ApplicationLoader;
3636
import org.apache.zeppelin.helium.HeliumAppAngularObjectRegistry;
3737
import org.apache.zeppelin.helium.HeliumPackage;
38+
import org.apache.zeppelin.common.InterpreterConfigKeys;
3839
import org.apache.zeppelin.interpreter.Constants;
3940
import org.apache.zeppelin.interpreter.Interpreter;
4041
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -211,7 +212,7 @@ public void init(Map<String, String> properties) throws InterpreterRPCException,
211212

212213
if (!isTest) {
213214
int connectionPoolSize = Integer.parseInt(
214-
zProperties.getProperty("zeppelin.interpreter.connection.poolsize", "100"));
215+
zProperties.getProperty(InterpreterConfigKeys.INTERPRETER_CONNECTION_POOL_SIZE, "100"));
215216
LOGGER.info("Creating RemoteInterpreterEventClient with connection pool size: {}",
216217
connectionPoolSize);
217218
intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort,
@@ -268,8 +269,8 @@ public boolean isRunning() {
268269

269270
private LifecycleManager createLifecycleManager() throws Exception {
270271
String lifecycleManagerClass = zProperties.getProperty(
271-
"zeppelin.interpreter.lifecyclemanager.class",
272-
"org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager");
272+
InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_CLASS,
273+
"org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager");
273274
Class<?> clazz = Class.forName(lifecycleManagerClass);
274275
LOGGER.info("Creating interpreter lifecycle manager: {}", lifecycleManagerClass);
275276
return (LifecycleManager) clazz.getConstructor(Properties.class, RemoteInterpreterServer.class)
@@ -335,10 +336,10 @@ public void createInterpreter(String interpreterGroupId, String sessionId, Strin
335336
}
336337

337338
depLoader = new DependencyResolver(localRepoPath,
338-
zProperties.getProperty("zeppelin.proxy.url"),
339-
zProperties.getProperty("zeppelin.proxy.user"),
340-
zProperties.getProperty("zeppelin.proxy.password"),
341-
zProperties.getProperty("zeppelin.interpreter.dep.mvnRepo"));
339+
zProperties.getProperty(InterpreterConfigKeys.PROXY_URL),
340+
zProperties.getProperty(InterpreterConfigKeys.PROXY_USER),
341+
zProperties.getProperty(InterpreterConfigKeys.PROXY_PASSWORD),
342+
zProperties.getProperty(InterpreterConfigKeys.INTERPRETER_DEP_MVNREPO));
342343
appLoader = new ApplicationLoader(resourcePool, depLoader);
343344

344345
resultCacheInSeconds =
@@ -486,7 +487,7 @@ public void reconnect(String host, int port) throws InterpreterRPCException, TEx
486487
this.intpEventServerHost = host;
487488
this.intpEventServerPort = port;
488489
intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort,
489-
Integer.parseInt(zProperties.getProperty("zeppelin.interpreter.connection.poolsize", "100")));
490+
Integer.parseInt(zProperties.getProperty(InterpreterConfigKeys.INTERPRETER_CONNECTION_POOL_SIZE, "100")));
490491
intpEventClient.setIntpGroupId(interpreterGroupId);
491492

492493
this.angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient);

zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.zeppelin.scheduler;
1919

20+
import org.apache.zeppelin.common.InterpreterConfigKeys;
2021
import org.apache.zeppelin.util.ExecutorUtil;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
@@ -59,7 +60,7 @@ private static int getSchedulerPoolSize() {
5960
if (envValue != null) {
6061
return Integer.parseInt(envValue);
6162
}
62-
String propValue = System.getProperty("zeppelin.scheduler.threadpool.size");
63+
String propValue = System.getProperty(InterpreterConfigKeys.SCHEDULER_THREADPOOL_SIZE);
6364
if (propValue != null) {
6465
return Integer.parseInt(propValue);
6566
}

0 commit comments

Comments
 (0)