|
20 | 20 | # Provisions Paimon tables into the warehouse (file:/tmp/paimon-warehouse) |
21 | 21 | # for paimon-rust integration tests to read. |
22 | 22 |
|
| 23 | +import shutil |
| 24 | +from pathlib import Path |
| 25 | +from urllib.parse import unquote, urlparse |
| 26 | + |
23 | 27 | from pyspark.sql import SparkSession |
24 | 28 |
|
25 | 29 |
|
| 30 | +def _warehouse_path_from_spark_conf(spark: SparkSession) -> Path: |
| 31 | + warehouse_uri = spark.conf.get("spark.sql.catalog.paimon.warehouse") |
| 32 | + parsed = urlparse(warehouse_uri) |
| 33 | + |
| 34 | + if parsed.scheme not in ("", "file"): |
| 35 | + raise ValueError( |
| 36 | + f"Unsupported Paimon warehouse URI scheme {parsed.scheme!r}: {warehouse_uri}" |
| 37 | + ) |
| 38 | + |
| 39 | + if parsed.netloc not in ("", "localhost"): |
| 40 | + raise ValueError( |
| 41 | + f"Unsupported remote Paimon warehouse location {parsed.netloc!r}: {warehouse_uri}" |
| 42 | + ) |
| 43 | + |
| 44 | + warehouse_path = Path(unquote(parsed.path if parsed.scheme else warehouse_uri)) |
| 45 | + if not warehouse_path.is_absolute() or str(warehouse_path) == "/": |
| 46 | + raise ValueError(f"Refusing to clear unsafe warehouse path: {warehouse_path}") |
| 47 | + |
| 48 | + return warehouse_path |
| 49 | + |
| 50 | + |
| 51 | +def _reset_warehouse_dir(warehouse_path: Path) -> None: |
| 52 | + warehouse_path.mkdir(parents=True, exist_ok=True) |
| 53 | + |
| 54 | + for child in warehouse_path.iterdir(): |
| 55 | + if child.is_symlink() or child.is_file(): |
| 56 | + child.unlink() |
| 57 | + else: |
| 58 | + shutil.rmtree(child) |
| 59 | + |
| 60 | + |
26 | 61 | def main(): |
27 | 62 | spark = SparkSession.builder.getOrCreate() |
28 | 63 |
|
| 64 | + warehouse_path = _warehouse_path_from_spark_conf(spark) |
| 65 | + _reset_warehouse_dir(warehouse_path) |
| 66 | + |
29 | 67 | # Use Paimon catalog (configured in spark-defaults.conf with warehouse file:/tmp/paimon-warehouse) |
30 | 68 | spark.sql("USE paimon.default") |
31 | 69 |
|
|
0 commit comments