|
17 | 17 | package io.cdap.plugin.snowflake.source.batch; |
18 | 18 |
|
19 | 19 | import au.com.bytecode.opencsv.CSVReader; |
| 20 | + |
20 | 21 | import org.apache.hadoop.io.NullWritable; |
21 | 22 | import org.apache.hadoop.mapreduce.InputSplit; |
22 | 23 | import org.apache.hadoop.mapreduce.RecordReader; |
23 | 24 | import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| 25 | +import org.slf4j.Logger; |
| 26 | +import org.slf4j.LoggerFactory; |
24 | 27 |
|
25 | 28 | import java.io.IOException; |
| 29 | +import java.util.Arrays; |
26 | 30 | import java.util.HashMap; |
27 | 31 | import java.util.Map; |
| 32 | +import java.util.stream.Collectors; |
28 | 33 |
|
29 | 34 | /** |
30 | 35 | * RecordReader implementation, which reads object from Snowflake. |
31 | 36 | */ |
32 | 37 | public class SnowflakeRecordReader extends RecordReader<NullWritable, Map<String, String>> { |
33 | 38 |
|
| 39 | + private static final Logger LOG = LoggerFactory.getLogger(SnowflakeRecordReader.class); |
34 | 40 | private final String stageSplit; |
35 | 41 | private final SnowflakeSourceAccessor snowflakeAccessor; |
36 | 42 | private CSVReader csvReader; |
@@ -62,7 +68,10 @@ public NullWritable getCurrentKey() { |
62 | 68 | @Override |
63 | 69 | public Map<String, String> getCurrentValue() { |
64 | 70 | Map<String, String> result = new HashMap<>(); |
65 | | - for (int i = 0; i < headers.length; i++) { |
| 71 | + if (headers.length != nextLine.length) { |
| 72 | + LOG.warn("Row with wrong data in csv -> {}", Arrays.stream(nextLine).collect(Collectors.joining(","))); |
| 73 | + } |
| 74 | + for (int i = 0; i < headers.length && i < nextLine.length; i++) { |
66 | 75 | result.put(headers[i], nextLine[i]); |
67 | 76 | } |
68 | 77 | return result; |
|
0 commit comments