@@ -69,11 +69,13 @@ public void setup() throws IOException {
6969 TestMessage message3 = TestMessage .newBuilder ().setOrderNumber ("test-order-3" ).setOrderDetails ("ORDER-DETAILS-3" ).build ();
7070 TestMessage message4 = TestMessage .newBuilder ().setOrderNumber ("test-order-4" ).setOrderDetails ("ORDER-DETAILS-4" ).build ();
7171 TestMessage message5 = TestMessage .newBuilder ().setOrderNumber ("test-order-5" ).setOrderDetails ("ORDER-DETAILS-5" ).build ();
72+ TestMessage message6 = TestMessage .newBuilder ().setOrderNumber ("test-order-6" ).setOrderDetails ("ORDER-DETAILS-6" ).build ();
7273 messages .add (new OdpfMessage (null , message1 .toByteArray ()));
7374 messages .add (new OdpfMessage (null , message2 .toByteArray ()));
7475 messages .add (new OdpfMessage (null , message3 .toByteArray ()));
7576 messages .add (new OdpfMessage (null , message4 .toByteArray ()));
7677 messages .add (new OdpfMessage (null , message5 .toByteArray ()));
78+ messages .add (new OdpfMessage (null , message6 .toByteArray ()));
7779 }
7880
7981 public void setupParserResponse () throws IOException {
@@ -96,14 +98,15 @@ public void shouldConvertOdpfMessageToRedisRecords() throws IOException {
9698 Map <Boolean , List <RedisRecord >> splitterRecords = parsedRecords .stream ().collect (Collectors .partitioningBy (RedisRecord ::isValid ));
9799 List <RedisRecord > invalidRecords = splitterRecords .get (Boolean .FALSE );
98100 List <RedisRecord > validRecords = splitterRecords .get (Boolean .TRUE );
99- assertEquals (5 , validRecords .size ());
101+ assertEquals (6 , validRecords .size ());
100102 assertTrue (invalidRecords .isEmpty ());
101103 List <RedisRecord > expectedRecords = new ArrayList <>();
102104 expectedRecords .add (new RedisRecord (new RedisKeyValueEntry ("test-key" , "test-order-1" , null ), 0L , null , "{}" , true ));
103105 expectedRecords .add (new RedisRecord (new RedisKeyValueEntry ("test-key" , "test-order-2" , null ), 1L , null , "{}" , true ));
104106 expectedRecords .add (new RedisRecord (new RedisKeyValueEntry ("test-key" , "test-order-3" , null ), 2L , null , "{}" , true ));
105107 expectedRecords .add (new RedisRecord (new RedisKeyValueEntry ("test-key" , "test-order-4" , null ), 3L , null , "{}" , true ));
106108 expectedRecords .add (new RedisRecord (new RedisKeyValueEntry ("test-key" , "test-order-5" , null ), 4L , null , "{}" , true ));
109+ expectedRecords .add (new RedisRecord (new RedisKeyValueEntry ("test-key" , "test-order-6" , null ), 5L , null , "{}" , true ));
107110 IntStream .range (0 , expectedRecords .size ()).forEach (index -> assertEquals (expectedRecords .get (index ).toString (), parsedRecords .get (index ).toString ()));
108111 }
109112
@@ -113,14 +116,16 @@ public void shouldReportValidAndInvalidRecords() throws IOException {
113116 when (odpfMessageParser .parse (messages .get (2 ), SinkConnectorSchemaMessageMode .LOG_MESSAGE , schemaClass )).thenThrow (new IOException ("Error while parsing protobuf" ));
114117 when (odpfMessageParser .parse (messages .get (3 ), SinkConnectorSchemaMessageMode .LOG_MESSAGE , schemaClass )).thenThrow (new ConfigurationException ("Invalid field config : INVALID" ));
115118 when (odpfMessageParser .parse (messages .get (4 ), SinkConnectorSchemaMessageMode .LOG_MESSAGE , schemaClass )).thenThrow (new IllegalArgumentException ("Config REDIS_CONFIG is empty" ));
119+ when (odpfMessageParser .parse (messages .get (5 ), SinkConnectorSchemaMessageMode .LOG_MESSAGE , schemaClass )).thenThrow (new UnsupportedOperationException ("some message" ));
116120 List <RedisRecord > parsedRecords = redisParser .convert (messages );
117121 Map <Boolean , List <RedisRecord >> splitterRecords = parsedRecords .stream ().collect (Collectors .partitioningBy (RedisRecord ::isValid ));
118122 List <RedisRecord > invalidRecords = splitterRecords .get (Boolean .FALSE );
119123 List <RedisRecord > validRecords = splitterRecords .get (Boolean .TRUE );
120124 assertEquals (2 , validRecords .size ());
121- assertEquals (3 , invalidRecords .size ());
125+ assertEquals (4 , invalidRecords .size ());
122126 assertEquals (ErrorType .DESERIALIZATION_ERROR , parsedRecords .get (2 ).getErrorInfo ().getErrorType ());
123127 assertEquals (ErrorType .UNKNOWN_FIELDS_ERROR , parsedRecords .get (3 ).getErrorInfo ().getErrorType ());
124128 assertEquals (ErrorType .DEFAULT_ERROR , parsedRecords .get (4 ).getErrorInfo ().getErrorType ());
129+ assertEquals (ErrorType .INVALID_MESSAGE_ERROR , parsedRecords .get (5 ).getErrorInfo ().getErrorType ());
125130 }
126131}
0 commit comments