Skip to content

Commit a68a422

Browse files
authored
refactor(runtime): make duplex_stream_rx optional in RunOptions (#680)
1 parent 20a8587 commit a68a422

2 files changed

Lines changed: 6 additions & 25 deletions

File tree

crates/base/src/runtime/mod.rs

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ pub enum WillTerminateReason {
299299
#[derive(Debug)]
300300
pub struct RunOptions {
301301
wait_termination_request_token: bool,
302-
duplex_stream_rx: mpsc::UnboundedReceiver<DuplexStreamEntry>,
302+
duplex_stream_rx: Option<mpsc::UnboundedReceiver<DuplexStreamEntry>>,
303303
maybe_cpu_usage_metrics_tx: Option<mpsc::UnboundedSender<CPUUsageMetrics>>,
304304
}
305305

@@ -352,11 +352,6 @@ impl RunOptionsBuilder {
352352
maybe_cpu_usage_metrics_tx,
353353
} = self;
354354

355-
// TODO(Nyannyacha): Make this as optional.
356-
let Some(duplex_stream_rx) = duplex_stream_rx else {
357-
return Err(anyhow!("stream_rx can't be empty"));
358-
};
359-
360355
Ok(RunOptions {
361356
wait_termination_request_token,
362357
duplex_stream_rx,
@@ -1291,8 +1286,10 @@ where
12911286
let op_state_rc = self.js_runtime.op_state();
12921287
let mut op_state = op_state_rc.borrow_mut();
12931288

1294-
op_state
1295-
.put::<mpsc::UnboundedReceiver<DuplexStreamEntry>>(duplex_stream_rx);
1289+
if let Some(duplex_stream_rx) = duplex_stream_rx {
1290+
op_state
1291+
.put::<mpsc::UnboundedReceiver<DuplexStreamEntry>>(duplex_stream_rx);
1292+
}
12961293

12971294
if self.conf.is_main_worker() {
12981295
op_state.put::<mpsc::UnboundedSender<UserWorkerMsgs>>(
@@ -2338,7 +2335,6 @@ mod test {
23382335

23392336
use crate::runtime::DenoRuntime;
23402337
use crate::runtime::JsRuntimeLockerGuard;
2341-
use crate::worker::DuplexStreamEntry;
23422338
use crate::worker::WorkerBuilder;
23432339

23442340
use super::GetRuntimeContext;
@@ -3201,13 +3197,10 @@ mod test {
32013197
.build()
32023198
.await;
32033199

3204-
let (_tx, duplex_stream_rx) =
3205-
mpsc::unbounded_channel::<DuplexStreamEntry>();
32063200
let (result, _) = user_rt
32073201
.run(
32083202
RunOptionsBuilder::new()
32093203
.wait_termination_request_token(false)
3210-
.stream_rx(duplex_stream_rx)
32113204
.build()
32123205
.unwrap(),
32133206
)
@@ -3231,13 +3224,10 @@ mod test {
32313224
.build()
32323225
.await;
32333226

3234-
let (_tx, duplex_stream_rx) =
3235-
mpsc::unbounded_channel::<DuplexStreamEntry>();
32363227
let (result, _) = user_rt
32373228
.run(
32383229
RunOptionsBuilder::new()
32393230
.wait_termination_request_token(false)
3240-
.stream_rx(duplex_stream_rx)
32413231
.build()
32423232
.unwrap(),
32433233
)
@@ -3259,8 +3249,6 @@ mod test {
32593249
memory_limit_mb: u64,
32603250
worker_timeout_ms: u64,
32613251
) {
3262-
let (_duplex_stream_tx, duplex_stream_rx) =
3263-
mpsc::unbounded_channel::<DuplexStreamEntry>();
32643252
let (callback_tx, mut callback_rx) = mpsc::unbounded_channel::<()>();
32653253
let mut user_rt = create_basic_user_runtime_builder(
32663254
path,
@@ -3286,7 +3274,6 @@ mod test {
32863274
.run(
32873275
RunOptionsBuilder::new()
32883276
.wait_termination_request_token(false)
3289-
.stream_rx(duplex_stream_rx)
32903277
.build()
32913278
.unwrap(),
32923279
)
@@ -3393,13 +3380,10 @@ mod test {
33933380
.build()
33943381
.await;
33953382

3396-
let (_tx, duplex_stream_rx) = mpsc::unbounded_channel();
3397-
33983383
user_rt
33993384
.run(
34003385
RunOptionsBuilder::new()
34013386
.wait_termination_request_token(false)
3402-
.stream_rx(duplex_stream_rx)
34033387
.build()
34043388
.unwrap(),
34053389
)
@@ -3425,13 +3409,10 @@ mod test {
34253409
.build()
34263410
.await;
34273411

3428-
let (_tx, duplex_stream_rx) = mpsc::unbounded_channel();
3429-
34303412
user_rt
34313413
.run(
34323414
RunOptionsBuilder::new()
34333415
.wait_termination_request_token(false)
3434-
.stream_rx(duplex_stream_rx)
34353416
.build()
34363417
.unwrap(),
34373418
)

ext/runtime/ops/net.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ pub async fn op_net_accept(
158158
};
159159

160160
let Some(rx) = rx else {
161-
return Err(bad_resource("duplex stream receiver is already used"));
161+
return Err(bad_resource("duplex stream receiver is not available"));
162162
};
163163

164164
let mut rx = scopeguard::guard(rx, {

0 commit comments

Comments
 (0)