1818
1919package org .apache .paimon .python ;
2020
21+ import org .apache .paimon .arrow .ArrowUtils ;
2122import org .apache .paimon .arrow .reader .ArrowBatchReader ;
2223import org .apache .paimon .data .InternalRow ;
2324import org .apache .paimon .table .sink .TableWrite ;
2728import org .apache .arrow .memory .RootAllocator ;
2829import org .apache .arrow .vector .VectorSchemaRoot ;
2930import org .apache .arrow .vector .ipc .ArrowStreamReader ;
31+ import org .apache .arrow .vector .types .pojo .Field ;
3032
3133import java .io .ByteArrayInputStream ;
34+ import java .util .List ;
35+ import java .util .Objects ;
36+ import java .util .stream .Collectors ;
3237
3338/** Write Arrow bytes to Paimon. */
3439public class BytesWriter {
3540
3641 private final TableWrite tableWrite ;
3742 private final ArrowBatchReader arrowBatchReader ;
3843 private final BufferAllocator allocator ;
44+ private final List <Field > arrowFields ;
3945
4046 public BytesWriter (TableWrite tableWrite , RowType rowType ) {
4147 this .tableWrite = tableWrite ;
4248 this .arrowBatchReader = new ArrowBatchReader (rowType );
4349 this .allocator = new RootAllocator ();
50+ arrowFields =
51+ rowType .getFields ().stream ()
52+ .map (f -> ArrowUtils .toArrowField (f .name (), f .type ()))
53+ .collect (Collectors .toList ());
4454 }
4555
4656 public void write (byte [] bytes ) throws Exception {
4757 ByteArrayInputStream bais = new ByteArrayInputStream (bytes );
4858 ArrowStreamReader arrowStreamReader = new ArrowStreamReader (bais , allocator );
4959 VectorSchemaRoot vsr = arrowStreamReader .getVectorSchemaRoot ();
60+ if (!checkSchema (arrowFields , vsr .getSchema ().getFields ())) {
61+ throw new RuntimeException (
62+ String .format (
63+ "Input schema isn't consistent with table schema.\n "
64+ + "\t Table schema is: %s\n "
65+ + "\t Input schema is: %s" ,
66+ arrowFields , vsr .getSchema ().getFields ()));
67+ }
68+
5069 while (arrowStreamReader .loadNextBatch ()) {
5170 Iterable <InternalRow > rows = arrowBatchReader .readBatch (vsr );
5271 for (InternalRow row : rows ) {
@@ -59,4 +78,26 @@ public void write(byte[] bytes) throws Exception {
5978 public void close () {
6079 allocator .close ();
6180 }
81+
82+ private boolean checkSchema (List <Field > expectedFields , List <Field > actualFields ) {
83+ if (expectedFields .size () != actualFields .size ()) {
84+ return false ;
85+ }
86+
87+ for (int i = 0 ; i < expectedFields .size (); i ++) {
88+ Field expectedField = expectedFields .get (i );
89+ Field actualField = actualFields .get (i );
90+ if (!checkField (expectedField , actualField )
91+ || !checkSchema (expectedField .getChildren (), actualField .getChildren ())) {
92+ return false ;
93+ }
94+ }
95+
96+ return true ;
97+ }
98+
99+ private boolean checkField (Field expected , Field actual ) {
100+ return Objects .equals (expected .getName (), actual .getName ())
101+ && Objects .equals (expected .getType (), actual .getType ());
102+ }
62103}
0 commit comments