Skip to content

Commit 107ab37

Browse files
authored
Add base implementation framework (#3)
1 parent 8e07985 commit 107ab37

23 files changed

Lines changed: 504 additions & 3 deletions

File tree

.github/workflows/paimon-python-checks.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ on:
2323
pull_request:
2424
paths-ignore:
2525
- 'dev/**'
26-
- 'java-based-implementation/paimon-python-java-bridge/**'
26+
- 'java_based_implementation/paimon-python-java-bridge/**'
2727
- '**/*.md'
2828

2929
concurrency:

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ We can use `py4j` to leverage Java code to read Paimon data. This section descri
3131
### Build paimon-python-java-bridge
3232

3333
```bash
34-
cd java-based-implementation/paimon-python-java-bridge/
34+
cd java_based_implementation/paimon-python-java-bridge/
3535
mvn clean install -DskipTests
3636
```
3737
The built target is java-based-implementation/paimon-python-java-bridge/target/paimon-python-java-bridge-<version>.jar

dev/dev-requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,6 @@
1919
pip>=20.3
2020
setuptools>=18.0
2121
wheel
22+
py4j==0.10.9.7
23+
pyarrow>=5.0.0
2224
pytest~=7.0
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
from java_based_implementation.java_gateway import get_gateway
20+
from java_based_implementation.util.java_utils import to_j_catalog_context
21+
from paimon_python_api import catalog, read_builder, table_scan, split, table_read
22+
from paimon_python_api import table
23+
from pyarrow import RecordBatchReader
24+
from typing import List
25+
from typing_extensions import Self
26+
27+
28+
class Catalog(catalog.Catalog):
29+
30+
def __init__(self, j_catalog):
31+
self._j_catalog = j_catalog
32+
33+
@staticmethod
34+
def create(catalog_context: dict) -> 'Catalog':
35+
j_catalog_context = to_j_catalog_context(catalog_context)
36+
gateway = get_gateway()
37+
j_catalog = gateway.jvm.CatalogFactory.createCatalog(j_catalog_context)
38+
return Catalog(j_catalog)
39+
40+
def get_table(self, identifier: tuple) -> 'Table':
41+
gateway = get_gateway()
42+
j_identifier = gateway.jvm.Identifier.fromString(identifier)
43+
j_table = self._j_catalog.getTable(j_identifier)
44+
return Table(j_table)
45+
46+
47+
class Table(table.Table):
48+
49+
def __init__(self, j_table):
50+
self._j_table = j_table
51+
52+
def new_read_builder(self) -> 'ReadBuilder':
53+
j_read_builder = self._j_table.newReadBuilder()
54+
return ReadBuilder(j_read_builder)
55+
56+
57+
class ReadBuilder(read_builder.ReadBuilder):
58+
59+
def __init__(self, j_read_builder):
60+
self._j_read_builder = j_read_builder
61+
62+
def with_projection(self, projection: List[List[int]]) -> Self:
63+
self._j_read_builder.withProjection(projection)
64+
return self
65+
66+
def with_limit(self, limit: int) -> Self:
67+
self._j_read_builder.withLimit(limit)
68+
return self
69+
70+
def new_scan(self) -> 'TableScan':
71+
j_table_scan = self._j_read_builder.newScan()
72+
return TableScan(j_table_scan)
73+
74+
def new_read(self) -> 'TableRead':
75+
# TODO
76+
pass
77+
78+
79+
class TableScan(table_scan.TableScan):
80+
81+
def __init__(self, j_table_scan):
82+
self._j_table_scan = j_table_scan
83+
84+
def plan(self) -> 'Plan':
85+
j_plan = self._j_table_scan.plan()
86+
j_splits = j_plan.splits()
87+
return Plan(j_splits)
88+
89+
90+
class Plan(table_scan.Plan):
91+
92+
def __init__(self, j_splits):
93+
self._j_splits = j_splits
94+
95+
def splits(self) -> List['Split']:
96+
return list(map(lambda s: Split(s), self._j_splits))
97+
98+
99+
class Split(split.Split):
100+
101+
def __init__(self, j_split):
102+
self._j_split = j_split
103+
104+
def to_j_split(self):
105+
return self._j_split
106+
107+
108+
class TableRead(table_read.TableRead):
109+
110+
def create_reader(self, split: Split) -> RecordBatchReader:
111+
# TODO
112+
pass
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
import os
20+
import platform
21+
import signal
22+
23+
from subprocess import Popen, PIPE
24+
from java_based_implementation.util.constants import (PYPAIMON_JVM_ARGS, PYPAIMON_JAVA_CLASSPATH,
25+
PYPAIMON_MAIN_ARGS, PYPAIMON_MAIN_CLASS)
26+
27+
28+
def on_windows():
29+
return platform.system() == "Windows"
30+
31+
32+
def find_java_executable():
33+
java_executable = "java.exe" if on_windows() else "java"
34+
java_home = None
35+
36+
if java_home is None and "JAVA_HOME" in os.environ:
37+
java_home = os.environ["JAVA_HOME"]
38+
39+
if java_home is not None:
40+
java_executable = os.path.join(java_home, "bin", java_executable)
41+
42+
return java_executable
43+
44+
45+
def launch_gateway_server_process(env):
46+
java_executable = find_java_executable()
47+
# TODO construct Java module log settings
48+
log_settings = []
49+
jvm_args = env.get(PYPAIMON_JVM_ARGS, '').split()
50+
classpath = env.get(PYPAIMON_JAVA_CLASSPATH)
51+
main_args = env.get(PYPAIMON_MAIN_ARGS, '').split()
52+
command = [
53+
java_executable,
54+
*jvm_args,
55+
# default jvm args
56+
"-XX:+IgnoreUnrecognizedVMOptions",
57+
"--add-opens=jdk.proxy2/jdk.proxy2=ALL-UNNAMED",
58+
*log_settings,
59+
"-cp",
60+
classpath,
61+
"-c",
62+
PYPAIMON_MAIN_CLASS,
63+
*main_args
64+
]
65+
66+
if not on_windows():
67+
def preexec_func():
68+
# ignore ctrl-c / SIGINT
69+
signal.signal(signal.SIGINT, signal.SIG_IGN)
70+
71+
preexec_fn = preexec_func
72+
return Popen(list(filter(lambda c: len(c) != 0, command)),
73+
stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env)
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
import os
20+
import shutil
21+
import struct
22+
import tempfile
23+
import time
24+
from logging import WARN
25+
from py4j.java_gateway import (java_import, logger, JavaGateway, GatewayParameters,
26+
CallbackServerParameters)
27+
from threading import RLock
28+
29+
from java_based_implementation.gateway_server import launch_gateway_server_process
30+
from java_based_implementation.util.constants import PYPAIMON_CONN_INFO_PATH
31+
from java_based_implementation.util.exceptions import install_py4j_hooks
32+
33+
_gateway = None
34+
_lock = RLock()
35+
36+
37+
def get_gateway():
38+
# type: () -> JavaGateway
39+
global _gateway
40+
global _lock
41+
with _lock:
42+
if _gateway is None:
43+
# Set the level to WARN to mute the noisy INFO level logs
44+
logger.level = WARN
45+
_gateway = launch_gateway()
46+
47+
callback_server = _gateway.get_callback_server()
48+
callback_server_listening_address = callback_server.get_listening_address()
49+
callback_server_listening_port = callback_server.get_listening_port()
50+
_gateway.jvm.org.apache.paimon.python.PythonEnvUtils.resetCallbackClient(
51+
_gateway.java_gateway_server,
52+
callback_server_listening_address,
53+
callback_server_listening_port)
54+
# import the paimon view
55+
import_paimon_view(_gateway)
56+
# TODO add exception handler for better exception stacktrace
57+
install_py4j_hooks()
58+
_gateway.entry_point.put("Watchdog", Watchdog())
59+
return _gateway
60+
61+
62+
def launch_gateway():
63+
# type: () -> JavaGateway
64+
"""
65+
launch jvm gateway
66+
"""
67+
68+
# Create a temporary directory where the gateway server should write the connection information.
69+
conn_info_dir = tempfile.mkdtemp()
70+
try:
71+
fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir)
72+
os.close(fd)
73+
os.unlink(conn_info_file)
74+
75+
env = dict(os.environ)
76+
env[PYPAIMON_CONN_INFO_PATH] = conn_info_file
77+
78+
p = launch_gateway_server_process(env)
79+
80+
while not p.poll() and not os.path.isfile(conn_info_file):
81+
time.sleep(0.1)
82+
83+
if not os.path.isfile(conn_info_file):
84+
stderr_info = p.stderr.read().decode('utf-8')
85+
raise RuntimeError(
86+
"Java gateway process exited before sending its port number.\nStderr:\n"
87+
+ stderr_info
88+
)
89+
90+
with open(conn_info_file, "rb") as info:
91+
gateway_port = struct.unpack("!I", info.read(4))[0]
92+
finally:
93+
shutil.rmtree(conn_info_dir)
94+
95+
# Connect to the gateway
96+
gateway = JavaGateway(
97+
gateway_parameters=GatewayParameters(port=gateway_port, auto_convert=True),
98+
callback_server_parameters=CallbackServerParameters(
99+
port=0, daemonize=True, daemonize_connections=True))
100+
101+
return gateway
102+
103+
104+
# TODO: import more
105+
def import_paimon_view(gateway):
106+
java_import(gateway.jvm, "org.apache.paimon.table.*")
107+
108+
109+
class Watchdog(object):
110+
"""
111+
Used to provide to Java side to check whether its parent process is alive.
112+
"""
113+
114+
def ping(self):
115+
time.sleep(10)
116+
return True
117+
118+
class Java:
119+
implements = ["org.apache.paimon.python.PythonGatewayServer$Watchdog"]

java-based-implementation/paimon-python-java-bridge/copyright.txt renamed to java_based_implementation/paimon-python-java-bridge/copyright.txt

File renamed without changes.

java-based-implementation/paimon-python-java-bridge/pom.xml renamed to java_based_implementation/paimon-python-java-bridge/pom.xml

File renamed without changes.

java-based-implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java renamed to java_based_implementation/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java

File renamed without changes.

0 commit comments

Comments
 (0)