Skip to content

Commit 7afedfe

Browse files
authored
Pipe: Refactor AirGap receiver with configurable payload size control (#17443)
* Pipe: add hot-reloadable AirGap payload size guard to mitigate DoS risk. Introduce a dedicated AirGap receiver payload limit in the pipe config and enforce it before request buffer allocation, so oversized payloads are rejected early and memory pressure is bounded under malicious or malformed inputs. Made-with: Cursor * update * update * update * update * update * spotless
1 parent b5199bd commit 7afedfe

5 files changed

Lines changed: 132 additions & 3 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,14 +222,22 @@ private boolean checkSum(byte[] bytes) {
222222
}
223223
}
224224

225-
private byte[] readData(final InputStream inputStream) throws IOException {
225+
byte[] readData(final InputStream inputStream) throws IOException {
226226
final int length = readLength(inputStream);
227227

228228
if (length <= 0) {
229229
// Will fail() after checkSum()
230230
return new byte[0];
231231
}
232232

233+
final int maxLength = PipeConfig.getInstance().getPipeAirGapReceiverMaxPayloadSizeInBytes();
234+
if (length > maxLength) {
235+
throw new IOException(
236+
String.format(
237+
"AirGap payload length (%d) exceeds maximum allowed (%d). Closing connection from %s",
238+
length, maxLength, socket.getRemoteSocketAddress()));
239+
}
240+
233241
final byte[] resultBuffer = new byte[length];
234242
readTillFull(inputStream, resultBuffer);
235243
if (isELanguagePayload) {
@@ -238,11 +246,16 @@ private byte[] readData(final InputStream inputStream) throws IOException {
238246
return resultBuffer;
239247
}
240248

249+
private int readLength(final InputStream inputStream) throws IOException {
250+
return readLength(inputStream, false);
251+
}
252+
241253
/**
242254
* Read the length of the following data. The thread may typically block here when there is no
243255
* data to read.
244256
*/
245-
private int readLength(final InputStream inputStream) throws IOException {
257+
private int readLength(final InputStream inputStream, final boolean isELanguage)
258+
throws IOException {
246259
final byte[] doubleIntLengthBytes = new byte[2 * INT_LEN];
247260
readTillFull(inputStream, doubleIntLengthBytes);
248261

@@ -251,10 +264,16 @@ private int readLength(final InputStream inputStream) throws IOException {
251264
if (Arrays.equals(
252265
doubleIntLengthBytes,
253266
BytesUtils.subBytes(AirGapELanguageConstant.E_LANGUAGE_PREFIX, 0, 2 * INT_LEN))) {
267+
if (isELanguage) {
268+
throw new IOException(
269+
String.format(
270+
"Detected suspicious nested E-Language prefix. Closing connection from %s",
271+
socket.getRemoteSocketAddress()));
272+
}
254273
isELanguagePayload = true;
255274
skipTillEnough(
256275
inputStream, (long) AirGapELanguageConstant.E_LANGUAGE_PREFIX.length - 2 * INT_LEN);
257-
return readLength(inputStream);
276+
return readLength(inputStream, true);
258277
}
259278

260279
final byte[] dataLengthBytes = BytesUtils.subBytes(doubleIntLengthBytes, 0, INT_LEN);
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.receiver.protocol.airgap;
21+
22+
import org.apache.iotdb.commons.conf.CommonConfig;
23+
import org.apache.iotdb.commons.conf.CommonDescriptor;
24+
import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapELanguageConstant;
25+
26+
import org.apache.tsfile.utils.BytesUtils;
27+
import org.junit.Assert;
28+
import org.junit.Test;
29+
30+
import java.io.ByteArrayInputStream;
31+
import java.io.IOException;
32+
import java.io.InputStream;
33+
import java.net.Socket;
34+
35+
public class IoTDBAirGapReceiverTest {
36+
37+
@Test
38+
public void testRejectOversizedAirGapPayload() throws Exception {
39+
final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
40+
final int originalMaxPayload = commonConfig.getPipeAirGapReceiverMaxPayloadSizeInBytes();
41+
42+
try {
43+
commonConfig.setPipeAirGapReceiverMaxPayloadSizeInBytes(16);
44+
final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(new Socket(), 1L);
45+
46+
final byte[] oversizedLength = BytesUtils.intToBytes(32);
47+
final InputStream inputStream =
48+
new ByteArrayInputStream(BytesUtils.concatByteArray(oversizedLength, oversizedLength));
49+
50+
final IOException exception =
51+
Assert.assertThrows(IOException.class, () -> receiver.readData(inputStream));
52+
Assert.assertTrue(exception.getMessage().contains("payload length"));
53+
} finally {
54+
commonConfig.setPipeAirGapReceiverMaxPayloadSizeInBytes(originalMaxPayload);
55+
}
56+
}
57+
58+
@Test
59+
public void testRejectNestedELanguagePrefix() throws Exception {
60+
final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(new Socket(), 2L);
61+
62+
final InputStream inputStream =
63+
new ByteArrayInputStream(
64+
BytesUtils.concatByteArray(
65+
AirGapELanguageConstant.E_LANGUAGE_PREFIX,
66+
AirGapELanguageConstant.E_LANGUAGE_PREFIX));
67+
68+
final IOException exception =
69+
Assert.assertThrows(IOException.class, () -> receiver.readData(inputStream));
70+
Assert.assertTrue(exception.getMessage().contains("nested E-Language prefix"));
71+
}
72+
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,11 @@ public class CommonConfig {
311311
private double pipeReceiverActualToEstimatedMemoryRatio = 3;
312312

313313
private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
314+
// Align with default thrift frame size calculation.
315+
private int pipeAirGapReceiverMaxPayloadSizeInBytes =
316+
Math.min(
317+
64 * 1024 * 1024,
318+
(int) Math.min(Runtime.getRuntime().maxMemory() / 64, Integer.MAX_VALUE));
314319
private boolean pipeReceiverLoadConversionEnabled = false;
315320
private volatile long pipePeriodicalLogMinIntervalSeconds = 60;
316321
private volatile long pipeLoggerCacheMaxSizeInBytes = 16 * MB;
@@ -1702,6 +1707,23 @@ public void setPipeReceiverReqDecompressedMaxLengthInBytes(
17021707
pipeReceiverReqDecompressedMaxLengthInBytes);
17031708
}
17041709

1710+
public void setPipeAirGapReceiverMaxPayloadSizeInBytes(
1711+
int pipeAirGapReceiverMaxPayloadSizeInBytes) {
1712+
if (pipeAirGapReceiverMaxPayloadSizeInBytes <= 0) {
1713+
logger.info(
1714+
"Ignore invalid pipeAirGapReceiverMaxPayloadSizeInBytes {}, because it must be greater than 0.",
1715+
pipeAirGapReceiverMaxPayloadSizeInBytes);
1716+
return;
1717+
}
1718+
if (this.pipeAirGapReceiverMaxPayloadSizeInBytes == pipeAirGapReceiverMaxPayloadSizeInBytes) {
1719+
return;
1720+
}
1721+
this.pipeAirGapReceiverMaxPayloadSizeInBytes = pipeAirGapReceiverMaxPayloadSizeInBytes;
1722+
logger.info(
1723+
"pipeAirGapReceiverMaxPayloadSizeInBytes is set to {}.",
1724+
pipeAirGapReceiverMaxPayloadSizeInBytes);
1725+
}
1726+
17051727
public boolean isPipeReceiverLoadConversionEnabled() {
17061728
return pipeReceiverLoadConversionEnabled;
17071729
}
@@ -1743,6 +1765,10 @@ public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
17431765
return pipeReceiverReqDecompressedMaxLengthInBytes;
17441766
}
17451767

1768+
public int getPipeAirGapReceiverMaxPayloadSizeInBytes() {
1769+
return pipeAirGapReceiverMaxPayloadSizeInBytes;
1770+
}
1771+
17461772
public double getPipeMetaReportMaxLogNumPerRound() {
17471773
return pipeMetaReportMaxLogNumPerRound;
17481774
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,10 @@ public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
361361
return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes();
362362
}
363363

364+
public int getPipeAirGapReceiverMaxPayloadSizeInBytes() {
365+
return COMMON_CONFIG.getPipeAirGapReceiverMaxPayloadSizeInBytes();
366+
}
367+
364368
public boolean isPipeReceiverLoadConversionEnabled() {
365369
return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled();
366370
}
@@ -627,6 +631,9 @@ public void printAllConfigs() {
627631
LOGGER.info(
628632
"PipeReceiverReqDecompressedMaxLengthInBytes: {}",
629633
getPipeReceiverReqDecompressedMaxLengthInBytes());
634+
LOGGER.info(
635+
"PipeAirGapReceiverMaxPayloadSizeInBytes: {}",
636+
getPipeAirGapReceiverMaxPayloadSizeInBytes());
630637
LOGGER.info("PipeReceiverLoadConversionEnabled: {}", isPipeReceiverLoadConversionEnabled());
631638
LOGGER.info(
632639
"PipePeriodicalLogMinIntervalSeconds: {}", getPipePeriodicalLogMinIntervalSeconds());

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
464464
properties.getProperty(
465465
"pipe_receiver_req_decompressed_max_length_in_bytes",
466466
String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes()))));
467+
config.setPipeAirGapReceiverMaxPayloadSizeInBytes(
468+
Integer.parseInt(
469+
properties.getProperty(
470+
"pipe_air_gap_receiver_max_payload_size_in_bytes",
471+
String.valueOf(config.getPipeAirGapReceiverMaxPayloadSizeInBytes()))));
467472
config.setPipeReceiverLoadConversionEnabled(
468473
Boolean.parseBoolean(
469474
properties.getProperty(

0 commit comments

Comments
 (0)