2222import org .apache .iotdb .commons .path .PartialPath ;
2323import org .apache .iotdb .commons .pipe .sink .payload .thrift .response .PipeTransferFilePieceResp ;
2424import org .apache .iotdb .commons .schema .SchemaConstant ;
25+ import org .apache .iotdb .db .pipe .processor .twostage .exchange .payload .CombineRequest ;
26+ import org .apache .iotdb .db .pipe .processor .twostage .state .CountState ;
2527import org .apache .iotdb .db .pipe .sink .payload .evolvable .request .PipeTransferDataNodeHandshakeV1Req ;
2628import org .apache .iotdb .db .pipe .sink .payload .evolvable .request .PipeTransferPlanNodeReq ;
2729import org .apache .iotdb .db .pipe .sink .payload .evolvable .request .PipeTransferSchemaSnapshotPieceReq ;
4345import org .apache .iotdb .db .queryengine .plan .statement .Statement ;
4446import org .apache .iotdb .db .queryengine .plan .statement .crud .InsertBaseStatement ;
4547import org .apache .iotdb .rpc .RpcUtils ;
48+ import org .apache .iotdb .service .rpc .thrift .TPipeTransferReq ;
4649
4750import org .apache .tsfile .common .conf .TSFileConfig ;
4851import org .apache .tsfile .enums .TSDataType ;
@@ -69,6 +72,61 @@ public class PipeDataNodeThriftRequestTest {
6972
7073 private static final String TIME_PRECISION = "ms" ;
7174
75+ @ Test
76+ public void testCombineRequest () throws Exception {
77+ final CombineRequest req =
78+ CombineRequest .toTPipeTransferReq ("pipe" , 1L , 2 , "combine" , new CountState (123L ));
79+ final CombineRequest deserializeReq = CombineRequest .fromTPipeTransferReq (req );
80+
81+ Assert .assertEquals (req .getVersion (), deserializeReq .getVersion ());
82+ Assert .assertEquals (req .getType (), deserializeReq .getType ());
83+ Assert .assertEquals ("pipe" , deserializeReq .getPipeName ());
84+ Assert .assertEquals (1L , deserializeReq .getCreationTime ());
85+ Assert .assertEquals (2 , deserializeReq .getRegionId ());
86+ Assert .assertEquals ("combine" , deserializeReq .getCombineId ());
87+ Assert .assertTrue (deserializeReq .getState () instanceof CountState );
88+ Assert .assertEquals (123L , ((CountState ) deserializeReq .getState ()).getCount ());
89+ }
90+
91+ @ Test
92+ public void testCombineRequestWithUnexpectedStateClassName () throws Exception {
93+ final CombineRequest req =
94+ CombineRequest .toTPipeTransferReq ("pipe" , 1L , 2 , "combine" , new CountState (123L ));
95+
96+ final ByteBuffer bodyBuffer = req .body .duplicate ();
97+ final String pipeName = ReadWriteIOUtils .readString (bodyBuffer );
98+ final long creationTime = ReadWriteIOUtils .readLong (bodyBuffer );
99+ final int regionId = ReadWriteIOUtils .readInt (bodyBuffer );
100+ final String combineId = ReadWriteIOUtils .readString (bodyBuffer );
101+ ReadWriteIOUtils .readString (bodyBuffer );
102+ final long count = ReadWriteIOUtils .readLong (bodyBuffer );
103+
104+ final ByteBuffer tamperedBody ;
105+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS ();
106+ final DataOutputStream outputStream = new DataOutputStream (byteArrayOutputStream )) {
107+ ReadWriteIOUtils .write (pipeName , outputStream );
108+ ReadWriteIOUtils .write (creationTime , outputStream );
109+ ReadWriteIOUtils .write (regionId , outputStream );
110+ ReadWriteIOUtils .write (combineId , outputStream );
111+ ReadWriteIOUtils .write ("java.lang.String" , outputStream );
112+ ReadWriteIOUtils .write (count , outputStream );
113+ tamperedBody =
114+ ByteBuffer .wrap (byteArrayOutputStream .getBuf (), 0 , byteArrayOutputStream .size ());
115+ }
116+
117+ final TPipeTransferReq tamperedReq = new TPipeTransferReq ();
118+ tamperedReq .version = req .version ;
119+ tamperedReq .type = req .type ;
120+ tamperedReq .body = tamperedBody ;
121+
122+ try {
123+ CombineRequest .fromTPipeTransferReq (tamperedReq );
124+ Assert .fail ("Expected IllegalArgumentException" );
125+ } catch (final IllegalArgumentException e ) {
126+ Assert .assertTrue (e .getMessage ().contains ("Unexpected state class" ));
127+ }
128+ }
129+
72130 @ Test
73131 public void testPipeTransferDataNodeHandshakeReq () throws IOException {
74132 final PipeTransferDataNodeHandshakeV1Req req =
0 commit comments