Skip to content

Commit 21b18fd

Browse files
committed
Added stream config and stuff.
1 parent 8168ff7 commit 21b18fd

4 files changed

Lines changed: 490 additions & 11 deletions

File tree

python/natsrpy/_inner/js/stream.pyi

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,61 @@
1+
from datetime import timedelta
2+
13
class StorageType:
24
FILE: "StorageType"
35
MEMORY: "StorageType"
46

7+
class DiscardPolicy:
8+
OLD: "DiscardPolicy"
9+
NEW: "DiscardPolicy"
10+
11+
class RetentionPolicy:
12+
LIMITS: "RetentionPolicy"
13+
INTEREST: "RetentionPolicy"
14+
WORKQUEUE: "RetentionPolicy"
15+
16+
class Compression:
17+
S2: "Compression"
18+
NONE: "Compression"
19+
20+
class PersistenceMode:
21+
Default: "PersistenceMode"
22+
Async: "PersistenceMode"
23+
24+
class ConsumerLimits:
25+
inactive_threshold: timedelta
26+
max_ack_pending: int
27+
28+
def __init__(self, inactive_threshold: timedelta, max_ack_pending: int) -> None: ...
29+
530
class External:
31+
api_prefix: str
32+
delivery_prefix: str | None
33+
634
def __init__(
735
self,
836
api_prefix: str,
937
delivery_prefix: str | None = None,
1038
) -> None: ...
1139

1240
class SubjectTransform:
41+
source: str
42+
destination: str
43+
1344
def __init__(
1445
self,
1546
source: str,
1647
destination: str,
1748
) -> None: ...
1849

1950
class Source:
51+
name: str
52+
filter_subject: str | None = None
53+
external: External | None = None
54+
start_sequence: int | None = None
55+
start_time: int | None = None
56+
domain: str | None = None
57+
subject_transforms: SubjectTransform | None = None
58+
2059
def __init__(
2160
self,
2261
name: str,
@@ -29,17 +68,67 @@ class Source:
2968
) -> None: ...
3069

3170
class Placement:
71+
cluster: str | None
72+
tags: list[str] | None
73+
3274
def __init__(
3375
self,
3476
cluster: str | None = None,
3577
tags: list[str] | None = None,
3678
) -> None: ...
3779

3880
class Republish:
39-
pass
81+
source: str
82+
destination: str
83+
headers_only: bool
84+
4085
def __init__(
4186
self,
42-
source: str | None = None,
43-
destination: str | None = None,
44-
headers_only: str | None = None,
87+
source: str,
88+
destination: str,
89+
headers_only: bool,
90+
) -> None: ...
91+
92+
class StreamConfig:
93+
def __init__(
94+
self,
95+
name: str,
96+
subjects,
97+
max_bytes=None,
98+
max_messages=None,
99+
max_messages_per_subject=None,
100+
discard=None,
101+
discard_new_per_subject=None,
102+
retention=None,
103+
max_consumers=None,
104+
max_age=None,
105+
max_message_size=None,
106+
storage=None,
107+
num_replicas=None,
108+
no_ack=None,
109+
duplicate_window=None,
110+
template_owner=None,
111+
sealed=None,
112+
description=None,
113+
allow_rollup=None,
114+
deny_delete=None,
115+
deny_purge=None,
116+
republish=None,
117+
allow_direct=None,
118+
mirror_direct=None,
119+
mirror=None,
120+
sources=None,
121+
metadata=None,
122+
subject_transform=None,
123+
compression=None,
124+
consumer_limits=None,
125+
first_sequence=None,
126+
placement=None,
127+
persist_mode=None,
128+
pause_until=None,
129+
allow_message_ttl=None,
130+
subject_delete_marker_ttl=None,
131+
allow_atomic_publish=None,
132+
allow_message_schedules=None,
133+
allow_message_counter=None,
45134
) -> None: ...

src/exceptions/rust_err.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ pub enum NatsrpyError {
4848
KVUpdateError(#[from] async_nats::jetstream::context::UpdateKeyValueError),
4949
#[error(transparent)]
5050
DeleteError(#[from] async_nats::jetstream::kv::DeleteError),
51+
#[error(transparent)]
52+
CreateStreamError(#[from] async_nats::jetstream::context::CreateStreamError),
5153
}
5254

5355
impl From<NatsrpyError> for pyo3::PyErr {

src/js/jetstream.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use tokio::sync::RwLock;
99

1010
use crate::{
1111
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
12-
js::kv::{KVConfig, KeyValue},
12+
js::{
13+
kv::{KVConfig, KeyValue},
14+
stream::StreamConfig,
15+
},
1316
utils::{headers::NatsrpyHeadermapExt, natsrpy_future},
1417
};
1518

@@ -117,15 +120,18 @@ impl JetStream {
117120
})
118121
}
119122

120-
pub fn get_consumer<'py>(
123+
pub fn create_stream<'py>(
121124
&self,
122125
py: Python<'py>,
123-
bucket: String,
126+
config: StreamConfig,
124127
) -> NatsrpyResult<Bound<'py, PyAny>> {
125128
let ctx = self.ctx.clone();
126129
natsrpy_future(py, async move {
127130
let js = ctx.read().await;
128-
Ok(KeyValue::new(js.get_key_value(bucket).await?))
131+
Ok(super::stream::Stream::new(
132+
js.create_stream(async_nats::jetstream::stream::Config::try_from(config)?)
133+
.await?,
134+
))
129135
})
130136
}
131137
}

0 commit comments

Comments
 (0)