Skip to content

Commit 6ec3a51

Browse files
authored
Pipe: Configured the air gap timeout to avoid packet loss (#17231)
* good-game * ifx * fxi * xif
1 parent 19d7670 commit 6ec3a51

4 files changed

Lines changed: 37 additions & 0 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc
9999
final AirGapSocket socket = sockets.get(socketIndex);
100100

101101
try {
102+
// When receiver encountered packet loss, the transfer will time out
103+
// We need to restore the transfer quickly by retry under this circumstance
104+
socket.setSoTimeout(PIPE_CONFIG.getPipeAirGapSinkTabletTimeoutMs());
102105
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
103106
doTransferWrapper(socket, (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
104107
} else {
@@ -112,6 +115,8 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc
112115
"Network error when transfer tablet insertion event %s, because %s.",
113116
((EnrichedEvent) tabletInsertionEvent).coreReportMessage(), e.getMessage()),
114117
e);
118+
} finally {
119+
socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
115120
}
116121
}
117122

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ public class CommonConfig {
270270
private long pipeSourceMatcherCacheSize = 1024;
271271

272272
private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
273+
private int pipeAirGapSinkTabletTimeoutMs = 60 * 1000; // 1 min
273274
private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
274275
private int pipeSinkReadFileBufferSize = 5242880; // 5MB
275276
private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
@@ -1077,6 +1078,26 @@ public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) {
10771078
}
10781079
}
10791080

1081+
public int getPipeAirGapSinkTabletTimeoutMs() {
1082+
return pipeAirGapSinkTabletTimeoutMs;
1083+
}
1084+
1085+
public void setPipeAirGapSinkTabletTimeoutMs(long pipeAirGapSinkTabletTimeoutMs) {
1086+
final int fPipeAirGapSinkTabletTimeoutMs = this.pipeAirGapSinkTabletTimeoutMs;
1087+
try {
1088+
this.pipeAirGapSinkTabletTimeoutMs = Math.toIntExact(pipeAirGapSinkTabletTimeoutMs);
1089+
} catch (ArithmeticException e) {
1090+
this.pipeAirGapSinkTabletTimeoutMs = Integer.MAX_VALUE;
1091+
logger.warn(
1092+
"Given pipe air gap sink tablet timeout is too large, set to {} ms.", Integer.MAX_VALUE);
1093+
} finally {
1094+
if (fPipeAirGapSinkTabletTimeoutMs != this.pipeAirGapSinkTabletTimeoutMs) {
1095+
logger.info(
1096+
"pipeAirGapSinkTabletTimeoutMs is set to {}.", this.pipeAirGapSinkTabletTimeoutMs);
1097+
}
1098+
}
1099+
}
1100+
10801101
public int getPipeSinkTransferTimeoutMs() {
10811102
return pipeSinkTransferTimeoutMs;
10821103
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,10 @@ public int getPipeSinkHandshakeTimeoutMs() {
179179
return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
180180
}
181181

182+
public int getPipeAirGapSinkTabletTimeoutMs() {
183+
return COMMON_CONFIG.getPipeAirGapSinkTabletTimeoutMs();
184+
}
185+
182186
public int getPipeSinkTransferTimeoutMs() {
183187
return COMMON_CONFIG.getPipeSinkTransferTimeoutMs();
184188
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,13 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
360360
properties.getProperty(
361361
"pipe_connector_handshake_timeout_ms",
362362
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
363+
config.setPipeAirGapSinkTabletTimeoutMs(
364+
Long.parseLong(
365+
Optional.ofNullable(properties.getProperty("pipe_air_gap_sink_tablet_timeout_ms"))
366+
.orElse(
367+
properties.getProperty(
368+
"pipe_air_gap_connector_tablet_timeout_ms",
369+
String.valueOf(config.getPipeAirGapSinkTabletTimeoutMs())))));
363370
config.setPipeSinkReadFileBufferSize(
364371
Integer.parseInt(
365372
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))

0 commit comments

Comments
 (0)