|
18 | 18 |
|
19 | 19 | from java_based_implementation.java_gateway import get_gateway |
20 | 20 | from java_based_implementation.util.java_utils import to_j_catalog_context |
21 | | -from paimon_python_api import catalog, read_builder, table_scan, split, table_read |
22 | | -from paimon_python_api import table |
23 | | -from pyarrow import RecordBatchReader |
| 21 | +from paimon_python_api import (catalog, table, read_builder, table_scan, split, table_read, |
| 22 | + write_builder, table_write, commit_message, table_commit) |
| 23 | +from pyarrow import RecordBatchReader, RecordBatch |
24 | 24 | from typing import List |
25 | 25 | from typing_extensions import Self |
26 | 26 |
|
@@ -53,6 +53,10 @@ def new_read_builder(self) -> 'ReadBuilder': |
53 | 53 | j_read_builder = self._j_table.newReadBuilder() |
54 | 54 | return ReadBuilder(j_read_builder) |
55 | 55 |
|
| 56 | + def new_batch_write_builder(self) -> 'BatchWriteBuilder': |
| 57 | + j_batch_write_builder = self._j_table.newBatchWriteBuilder() |
| 58 | + return BatchWriteBuilder(j_batch_write_builder) |
| 59 | + |
56 | 60 |
|
57 | 61 | class ReadBuilder(read_builder.ReadBuilder): |
58 | 62 |
|
@@ -110,3 +114,54 @@ class TableRead(table_read.TableRead): |
110 | 114 | def create_reader(self, split: Split) -> RecordBatchReader: |
111 | 115 | # TODO |
112 | 116 | pass |
| 117 | + |
| 118 | + |
| 119 | +class BatchWriteBuilder(write_builder.BatchWriteBuilder): |
| 120 | + |
| 121 | + def __init__(self, j_batch_write_builder): |
| 122 | + self._j_batch_write_builder = j_batch_write_builder |
| 123 | + |
| 124 | + def with_overwrite(self, static_partition: dict) -> Self: |
| 125 | + self._j_batch_write_builder.withOverwrite(static_partition) |
| 126 | + return self |
| 127 | + |
| 128 | + def new_write(self) -> 'BatchTableWrite': |
| 129 | + j_batch_table_write = self._j_batch_write_builder.newWrite() |
| 130 | + return BatchTableWrite(j_batch_table_write) |
| 131 | + |
| 132 | + def new_commit(self) -> 'BatchTableCommit': |
| 133 | + j_batch_table_commit = self._j_batch_write_builder.newCommit() |
| 134 | + return BatchTableCommit(j_batch_table_commit) |
| 135 | + |
| 136 | + |
| 137 | +class BatchTableWrite(table_write.BatchTableWrite): |
| 138 | + |
| 139 | + def __init__(self, j_batch_table_write): |
| 140 | + self._j_batch_table_write = j_batch_table_write |
| 141 | + |
| 142 | + def write(self, record_batch: RecordBatch): |
| 143 | + # TODO |
| 144 | + pass |
| 145 | + |
| 146 | + def prepare_commit(self) -> List['CommitMessage']: |
| 147 | + j_commit_messages = self._j_batch_table_write.prepareCommit() |
| 148 | + return list(map(lambda cm: CommitMessage(cm), j_commit_messages)) |
| 149 | + |
| 150 | + |
| 151 | +class CommitMessage(commit_message.CommitMessage): |
| 152 | + |
| 153 | + def __init__(self, j_commit_message): |
| 154 | + self._j_commit_message = j_commit_message |
| 155 | + |
| 156 | + def to_j_commit_message(self): |
| 157 | + return self._j_commit_message |
| 158 | + |
| 159 | + |
| 160 | +class BatchTableCommit(table_commit.BatchTableCommit): |
| 161 | + |
| 162 | + def __init__(self, j_batch_table_commit): |
| 163 | + self._j_batch_table_commit = j_batch_table_commit |
| 164 | + |
| 165 | + def commit(self, commit_messages: List[CommitMessage]): |
| 166 | + j_commit_messages = list(map(lambda cm: cm.to_j_commit_message(), commit_messages)) |
| 167 | + self._j_batch_table_commit.commit(j_commit_messages) |
0 commit comments