Skip to content

Commit 4130c45

Browse files
authored
Merge pull request #99 from data-integrations/bugfix/CDAP-17330-handle-tinyint
CDAP-17330 Handle TINYINT in mysql.
2 parents 1cb63ac + 974797d commit 4130c45

2 files changed

Lines changed: 47 additions & 1 deletion

File tree

delta-plugins-common/src/main/java/io/cdap/delta/plugin/common/Records.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,12 +211,13 @@ private static Object convert(org.apache.kafka.connect.data.Schema schema, Objec
211211
case BYTES:
212212
case STRING:
213213
case INT8:
214-
case INT16:
215214
case INT32:
216215
case INT64:
217216
case FLOAT32:
218217
case FLOAT64:
219218
return val;
219+
case INT16:
220+
return ((Short) val).intValue();
220221
case ARRAY:
221222
return ((List<?>) val).stream()
222223
.map(o -> convert(schema.valueSchema(), o))
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright © 2020 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.delta.plugin.common;
18+
19+
import io.cdap.cdap.api.data.format.StructuredRecord;
20+
import org.apache.kafka.connect.data.Schema;
21+
import org.apache.kafka.connect.data.SchemaBuilder;
22+
import org.apache.kafka.connect.data.Struct;
23+
import org.junit.Assert;
24+
import org.junit.Test;
25+
26+
/**
27+
* Test case for {@link Records} class.
28+
*/
29+
public class RecordsTest {
30+
31+
@Test
32+
public void testConvert() {
33+
String fieldName = "priority";
34+
Schema dataSchema = SchemaBuilder.struct().name("TinyIntSchema").field(fieldName, Schema.INT16_SCHEMA).build();
35+
Struct struct = new Struct(dataSchema);
36+
Short val = 1;
37+
struct.put(fieldName, val);
38+
StructuredRecord convert = Records.convert(struct);
39+
io.cdap.cdap.api.data.schema.Schema.Field priority = convert.getSchema().getField(fieldName);
40+
Assert.assertNotNull(priority);
41+
Assert.assertEquals(priority.getSchema(), io.cdap.cdap.api.data.schema.Schema.of(
42+
io.cdap.cdap.api.data.schema.Schema.Type.INT));
43+
Assert.assertEquals(val.intValue(), (int) convert.get(fieldName));
44+
}
45+
}

0 commit comments

Comments
 (0)