Skip to content

Commit 3edcfd6

Browse files
authored
Split put and put_file methods. (#53)
1 parent 7240154 commit 3edcfd6

5 files changed

Lines changed: 120 additions & 21 deletions

File tree

examples/object_store.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ async def main() -> None:
2121
),
2222
)
2323
await store.put("test2.py", Path(__file__).read_bytes())
24-
await store.put("test.py", str(Path(__file__)))
24+
await store.put_file("test.py", Path(__file__))
2525

2626
async for obj in await store.list():
2727
print(obj) # noqa: T201

python/natsrpy/_natsrpy_rs/js/object_store.pyi

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from asyncio import Future
22
from datetime import datetime, timedelta
3+
from os import PathLike
34
from typing import Any, final
45

56
from typing_extensions import Self, Writer
@@ -130,23 +131,43 @@ class ObjectStore:
130131
def put(
131132
self,
132133
name: str,
133-
value: bytes | str,
134+
value: bytes | bytearray | memoryview,
134135
chunk_size: int = ..., # 24MB
135136
description: str | None = None,
136137
headers: dict[str, str | list[str]] | None = None,
137138
metadata: dict[str, str] | None = None,
138139
) -> Future[None]:
139-
"""Upload an object to the store.
140+
"""Upload an object to the store from in-memory bytes.
140141
141142
:param name: name for the stored object.
142-
:param value: object content.
143+
:param value: object content as bytes.
143144
:param chunk_size: size of upload chunks in bytes,
144145
defaults to 24 MB.
145146
:param description: human-readable object description.
146147
:param headers: optional NATS headers.
147148
:param metadata: optional custom key-value metadata.
148149
"""
149150

151+
def put_file(
152+
self,
153+
name: str,
154+
path: str | PathLike[str],
155+
chunk_size: int | None = None,
156+
description: str | None = None,
157+
headers: dict[str, str | list[str]] | None = None,
158+
metadata: dict[str, str] | None = None,
159+
) -> Future[None]:
160+
"""Upload an object to the store by streaming from a file.
161+
162+
:param name: name for the stored object.
163+
:param path: path to the file to upload.
164+
:param chunk_size: size of read and upload chunks in bytes,
165+
defaults to 200 KB.
166+
:param description: human-readable object description.
167+
:param headers: optional NATS headers.
168+
:param metadata: optional custom key-value metadata.
169+
"""
170+
150171
def delete(self, name: str) -> Future[None]:
151172
"""Delete an object from the store.
152173

python/natsrpy/instrumentation/object_store.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def __init__(
5656
def instrument(self) -> None:
5757
"""Setup instrumentation for all ObjectStore methods."""
5858
self._instrument_put()
59+
self._instrument_put_file()
5960
self._instrument_get()
6061
self._instrument_delete()
6162
self._instrument_seal()
@@ -71,6 +72,7 @@ def uninstrument() -> None:
7172
"""Remove instrumentation from all ObjectStore methods."""
7273
for method in (
7374
"put",
75+
"put_file",
7476
"get",
7577
"delete",
7678
"seal",
@@ -117,6 +119,35 @@ def decorator(
117119

118120
wrap_function_wrapper(_OS_MODULE, "ObjectStore.put", decorator)
119121

122+
def _instrument_put_file(self) -> None:
123+
tracer = self.tracer
124+
125+
async def _wrapped(
126+
wrapper: Any,
127+
args: tuple[Any, ...],
128+
kwargs: dict[str, Any],
129+
) -> Any:
130+
if not is_instrumentation_enabled():
131+
return await wrapper(*args, **kwargs)
132+
name: str = args[0]
133+
span = (
134+
SpanBuilder(tracer, SpanKind.PRODUCER, SpanAction.OBJ_PUT)
135+
.with_object_name(name)
136+
.build()
137+
)
138+
with trace.use_span(span, end_on_exit=True):
139+
return await wrapper(*args, **kwargs)
140+
141+
def decorator(
142+
wrapper: Any,
143+
_: ObjectStore,
144+
args: tuple[Any, ...],
145+
kwargs: dict[str, Any],
146+
) -> Any:
147+
return _wrapped(wrapper, args, kwargs)
148+
149+
wrap_function_wrapper(_OS_MODULE, "ObjectStore.put_file", decorator)
150+
120151
def _instrument_get(self) -> None:
121152
tracer = self.tracer
122153

python/tests/test_object_store.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ async def test_object_store_put_with_metadata(js: JetStream) -> None:
511511
await js.object_store.delete(bucket)
512512

513513

514-
async def test_object_store_put_from_file(js: JetStream) -> None:
514+
async def test_object_store_put_file_str_path(js: JetStream) -> None:
515515
bucket = f"test-os-putfile-{uuid.uuid4().hex[:8]}"
516516
config = ObjectStoreConfig(bucket=bucket)
517517
store = await js.object_store.create(config)
@@ -522,7 +522,7 @@ async def test_object_store_put_from_file(js: JetStream) -> None:
522522
tmp_path = tmp.name
523523

524524
try:
525-
await store.put("file-object", tmp_path)
525+
await store.put_file("file-object", tmp_path)
526526
writer = io.BytesIO()
527527
await store.get("file-object", writer)
528528
assert writer.getvalue() == file_content
@@ -532,6 +532,27 @@ async def test_object_store_put_from_file(js: JetStream) -> None:
532532
await js.object_store.delete(bucket)
533533

534534

535+
async def test_object_store_put_file_path_object(js: JetStream) -> None:
536+
bucket = f"test-os-putfilepath-{uuid.uuid4().hex[:8]}"
537+
config = ObjectStoreConfig(bucket=bucket)
538+
store = await js.object_store.create(config)
539+
try:
540+
file_content = b"file content via pathlib"
541+
with tempfile.NamedTemporaryFile(delete=False) as tmp:
542+
tmp.write(file_content)
543+
tmp_path = Path(tmp.name)
544+
545+
try:
546+
await store.put_file("file-object", tmp_path)
547+
writer = io.BytesIO()
548+
await store.get("file-object", writer)
549+
assert writer.getvalue() == file_content
550+
finally:
551+
tmp_path.unlink()
552+
finally:
553+
await js.object_store.delete(bucket)
554+
555+
535556
async def test_object_store_get_with_chunk_size(js: JetStream) -> None:
536557
bucket = f"test-os-getchunk-{uuid.uuid4().hex[:8]}"
537558
config = ObjectStoreConfig(bucket=bucket)

src/js/object_store.rs

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::{
1717
utils::{
1818
headers::NatsrpyHeadermapExt,
1919
natsrpy_future,
20-
py_types::{SendableValue, TimeValue, ToPyDate},
20+
py_types::{TimeValue, ToPyDate},
2121
streamer::Streamer,
2222
},
2323
};
@@ -253,7 +253,7 @@ impl ObjectStore {
253253
&self,
254254
py: Python<'py>,
255255
name: String,
256-
value: SendableValue,
256+
value: Vec<u8>,
257257
chunk_size: Option<usize>,
258258
description: Option<String>,
259259
headers: Option<Bound<'py, PyDict>>,
@@ -269,19 +269,45 @@ impl ObjectStore {
269269
headers,
270270
};
271271
natsrpy_future(py, async move {
272-
match value {
273-
SendableValue::Bytes(data) => {
274-
let mut reader = tokio::io::BufReader::new(&*data);
275-
ctx_guard.read().await.put(meta, &mut reader).await?;
276-
}
277-
SendableValue::String(filename) => {
278-
let mut reader = tokio::io::BufReader::with_capacity(
279-
chunk_size.unwrap_or(200 * 1024),
280-
tokio::fs::File::open(filename).await?,
281-
);
282-
ctx_guard.read().await.put(meta, &mut reader).await?;
283-
}
284-
}
272+
let mut reader = tokio::io::BufReader::new(value.as_slice());
273+
ctx_guard.read().await.put(meta, &mut reader).await?;
274+
Ok(())
275+
})
276+
}
277+
278+
#[pyo3(signature=(
279+
name,
280+
path,
281+
chunk_size=None,
282+
description=None,
283+
headers=None,
284+
metadata=None,
285+
))]
286+
pub fn put_file<'py>(
287+
&self,
288+
py: Python<'py>,
289+
name: String,
290+
path: std::path::PathBuf,
291+
chunk_size: Option<usize>,
292+
description: Option<String>,
293+
headers: Option<Bound<'py, PyDict>>,
294+
metadata: Option<HashMap<String, String>>,
295+
) -> NatsrpyResult<Bound<'py, PyAny>> {
296+
let ctx_guard = self.object_store.clone();
297+
let headers = headers.map(|val| HeaderMap::from_pydict(val)).transpose()?;
298+
let meta = async_nats::jetstream::object_store::ObjectMetadata {
299+
name,
300+
chunk_size,
301+
description,
302+
metadata: metadata.unwrap_or_default(),
303+
headers,
304+
};
305+
natsrpy_future(py, async move {
306+
let mut reader = tokio::io::BufReader::with_capacity(
307+
chunk_size.unwrap_or(200 * 1024),
308+
tokio::fs::File::open(path).await?,
309+
);
310+
ctx_guard.read().await.put(meta, &mut reader).await?;
285311
Ok(())
286312
})
287313
}

0 commit comments

Comments
 (0)