|
1 | 1 | # SPDX-License-Identifier: Apache-2.0 |
2 | 2 | # Copyright 2025 Atlan Pte. Ltd. |
| 3 | +import os |
| 4 | +from pathlib import Path |
3 | 5 | from typing import Any |
4 | 6 |
|
5 | 7 | from pyatlan.client.constants import ( |
|
13 | 15 | from pyatlan.errors import ErrorCode |
14 | 16 | from pyatlan.model.file import CloudStorageIdentifier, PresignedURLRequest |
15 | 17 |
|
| 18 | +# System directories that must never be read from. |
| 19 | +_SENSITIVE_SYSTEM_PREFIXES = ( |
| 20 | + "/etc/", |
| 21 | + "/proc/", |
| 22 | + "/sys/", |
| 23 | + "/dev/", |
| 24 | + "/root/", |
| 25 | + "/private/etc/", # macOS: /etc is a symlink to /private/etc |
| 26 | + "/private/var/", # macOS |
| 27 | +) |
| 28 | + |
| 29 | +# Hidden credential/config directories that must never be read from. |
| 30 | +_SENSITIVE_DIR_NAMES = frozenset({".aws", ".ssh", ".gnupg"}) |
| 31 | + |
| 32 | +# File name prefixes for environment/secret files. |
| 33 | +_SENSITIVE_FILE_PREFIXES = (".env",) |
| 34 | + |
| 35 | + |
| 36 | +def _parse_env_list(env_var: str) -> list: |
| 37 | + val = os.environ.get(env_var, "") |
| 38 | + return [p.strip() for p in val.split(",") if p.strip()] if val else [] |
| 39 | + |
16 | 40 |
|
17 | 41 | class FilePresignedUrl: |
18 | 42 | """ |
@@ -54,10 +78,52 @@ def validate_file_path(file_path: str) -> Any: |
54 | 78 |
|
55 | 79 | :param file_path: path to the file to upload |
56 | 80 | :returns: opened file object |
| 81 | + :raises INVALID_UPLOAD_FILE_PATH_TRAVERSAL: if path traversal is detected |
| 82 | + :raises INVALID_UPLOAD_FILE_PATH_SENSITIVE: if path points to a sensitive location |
57 | 83 | :raises INVALID_UPLOAD_FILE_PATH: if file not found |
58 | 84 | """ |
| 85 | + path = Path(file_path) |
| 86 | + |
| 87 | + # Block directory traversal via '..' |
| 88 | + if ".." in path.parts: |
| 89 | + raise ErrorCode.INVALID_UPLOAD_FILE_PATH_TRAVERSAL.exception_with_parameters( |
| 90 | + file_path |
| 91 | + ) |
| 92 | + |
| 93 | + resolved = path.resolve() |
| 94 | + resolved_str = str(resolved) |
| 95 | + |
| 96 | + # Block sensitive system directories (e.g. /etc/, /proc/, /dev/) |
| 97 | + if resolved_str.startswith(_SENSITIVE_SYSTEM_PREFIXES): |
| 98 | + raise ErrorCode.INVALID_UPLOAD_FILE_PATH_SENSITIVE.exception_with_parameters( |
| 99 | + file_path |
| 100 | + ) |
| 101 | + |
| 102 | + # Block credential/config hidden directories (e.g. .aws, .ssh, .gnupg) |
| 103 | + if any(part in _SENSITIVE_DIR_NAMES for part in resolved.parts): |
| 104 | + raise ErrorCode.INVALID_UPLOAD_FILE_PATH_SENSITIVE.exception_with_parameters( |
| 105 | + file_path |
| 106 | + ) |
| 107 | + |
| 108 | + # Block environment/secret files (e.g. .env, .env.local, .env.production) |
| 109 | + if resolved.name.startswith(_SENSITIVE_FILE_PREFIXES): |
| 110 | + raise ErrorCode.INVALID_UPLOAD_FILE_PATH_SENSITIVE.exception_with_parameters( |
| 111 | + file_path |
| 112 | + ) |
| 113 | + |
| 114 | + # Block user-defined paths via PYATLAN_UPLOAD_FILE_BLOCKED_PATHS (comma-separated). |
| 115 | + # Each entry is matched as a substring against the full resolved path, so it |
| 116 | + # can express system prefixes ("/vault/"), dir names (".vault"), or |
| 117 | + # file prefixes (".credentials"). |
| 118 | + # e.g. PYATLAN_UPLOAD_FILE_BLOCKED_PATHS="/custom/secrets/,.vault,.credentials" |
| 119 | + user_blocked = _parse_env_list("PYATLAN_UPLOAD_FILE_BLOCKED_PATHS") |
| 120 | + if any(pattern in resolved_str for pattern in user_blocked): |
| 121 | + raise ErrorCode.INVALID_UPLOAD_FILE_PATH_SENSITIVE.exception_with_parameters( |
| 122 | + file_path |
| 123 | + ) |
| 124 | + |
59 | 125 | try: |
60 | | - return open(file_path, "rb") |
| 126 | + return open(resolved, "rb") |
61 | 127 | except FileNotFoundError as err: |
62 | 128 | raise ErrorCode.INVALID_UPLOAD_FILE_PATH.exception_with_parameters( |
63 | 129 | str(err.strerror), file_path |
|
0 commit comments