|
22 | 22 | import unittest |
23 | 23 | import pandas as pd |
24 | 24 | import pyarrow as pa |
| 25 | +from py4j.protocol import Py4JJavaError |
25 | 26 |
|
26 | 27 | from paimon_python_api import Schema |
27 | 28 | from paimon_python_java import Catalog |
@@ -371,3 +372,76 @@ def test_overwrite(self): |
371 | 372 | df2['f0'] = df2['f0'].astype('int32') |
372 | 373 | pd.testing.assert_frame_equal( |
373 | 374 | actual_df2.reset_index(drop=True), df2.reset_index(drop=True)) |
| 375 | + |
| 376 | + def testWriteWrongSchema(self): |
| 377 | + schema = Schema(self.simple_pa_schema) |
| 378 | + self.catalog.create_table('default.test_wrong_schema', schema, False) |
| 379 | + table = self.catalog.get_table('default.test_wrong_schema') |
| 380 | + |
| 381 | + data = { |
| 382 | + 'f0': [1, 2, 3], |
| 383 | + 'f1': ['a', 'b', 'c'], |
| 384 | + } |
| 385 | + df = pd.DataFrame(data) |
| 386 | + schema = pa.schema([ |
| 387 | + ('f0', pa.int64()), |
| 388 | + ('f1', pa.string()) |
| 389 | + ]) |
| 390 | + record_batch = pa.RecordBatch.from_pandas(df, schema) |
| 391 | + |
| 392 | + write_builder = table.new_batch_write_builder() |
| 393 | + table_write = write_builder.new_write() |
| 394 | + |
| 395 | + with self.assertRaises(Py4JJavaError) as e: |
| 396 | + table_write.write_arrow_batch(record_batch) |
| 397 | + self.assertEqual( |
| 398 | + str(e.exception.java_exception), |
| 399 | + '''java.lang.RuntimeException: Input schema isn't consistent with table schema. |
| 400 | +\tTable schema is: [f0: Int(32, true), f1: Utf8] |
| 401 | +\tInput schema is: [f0: Int(64, true), f1: Utf8]''') |
| 402 | + |
| 403 | + def testIgnoreNullable(self): |
| 404 | + pa_schema1 = pa.schema([ |
| 405 | + ('f0', pa.int32(), False), |
| 406 | + ('f1', pa.string()) |
| 407 | + ]) |
| 408 | + |
| 409 | + pa_schema2 = pa.schema([ |
| 410 | + ('f0', pa.int32()), |
| 411 | + ('f1', pa.string()) |
| 412 | + ]) |
| 413 | + |
| 414 | + # write nullable to non-null |
| 415 | + self._testIgnoreNullableImpl('test_ignore_nullable1', pa_schema1, pa_schema2) |
| 416 | + |
| 417 | + # write non-null to nullable |
| 418 | + self._testIgnoreNullableImpl('test_ignore_nullable2', pa_schema2, pa_schema1) |
| 419 | + |
| 420 | + def _testIgnoreNullableImpl(self, table_name, table_schema, data_schema): |
| 421 | + schema = Schema(table_schema) |
| 422 | + self.catalog.create_table(f'default.{table_name}', schema, False) |
| 423 | + table = self.catalog.get_table(f'default.{table_name}') |
| 424 | + |
| 425 | + data = { |
| 426 | + 'f0': [1, 2, 3], |
| 427 | + 'f1': ['a', 'b', 'c'], |
| 428 | + } |
| 429 | + df = pd.DataFrame(data) |
| 430 | + record_batch = pa.RecordBatch.from_pandas(pd.DataFrame(data), data_schema) |
| 431 | + |
| 432 | + write_builder = table.new_batch_write_builder() |
| 433 | + table_write = write_builder.new_write() |
| 434 | + table_commit = write_builder.new_commit() |
| 435 | + table_write.write_arrow_batch(record_batch) |
| 436 | + table_commit.commit(table_write.prepare_commit()) |
| 437 | + |
| 438 | + table_write.close() |
| 439 | + table_commit.close() |
| 440 | + |
| 441 | + read_builder = table.new_read_builder() |
| 442 | + table_scan = read_builder.new_scan() |
| 443 | + table_read = read_builder.new_read() |
| 444 | + actual_df = table_read.to_pandas(table_scan.plan().splits()) |
| 445 | + df['f0'] = df['f0'].astype('int32') |
| 446 | + pd.testing.assert_frame_equal( |
| 447 | + actual_df.reset_index(drop=True), df.reset_index(drop=True)) |
0 commit comments