tokio_stream::ReceiverStream and tokio_stream::StreamExt::skip() together behave in an unexpected way #8024
-
|
I'm not sure if this is unexpected or if my thinking is bad :), but let me try to explain my issue with this example: I post 10 backups. Each backup sends a message in a channel, when the backup is done. I post an eleventh backup, which posts a different message, because only 10 backups are allowed: I check the eleventh message with a combination of mpsc::ReceiverStream and skip(), both of tokio_stream: Now comes the problem. After this code, I execute another post request, that wants to send to the receiver, but it is closed, because, I think, skip() consumes self and makes it go out of scope after the code beforehand. My expectation was, that if ReceiverStream's purpose is to make a mpsc::channel into an useful Stream, that methods on StreamExt work on it, without consuming a single consumer, since this is a quite drastic thing to do with a mpsc. I can fix the problem by putting the code that skip()'s at the end, after all POST-requests, and with it after all sends to the channel. But this makes the code unnice. |
Beta Was this translation helpful? Give feedback.
Replies: 9 comments
-
|
The issue is that use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
let (tx, rx) = tokio::sync::mpsc::channel(16);
let mut stream = ReceiverStream::new(rx);
// Skip the first 2 items without consuming `stream`
stream.by_ref().skip(2).collect::<Vec<_>>().await;
// `stream` is still alive here — you can keep polling it
while let Some(item) = stream.next().await {
println!({item});
}
If you need to skip lazily without collecting, wrap in a pinned variable first: let mut skipping = stream.by_ref().skip(2);
// drive it until skip is satisfied
while let Some(_) = skipping.next().await {}
drop(skipping); // release the borrow
// now `stream` is usable againThis is the same pattern used with |
Beta Was this translation helpful? Give feedback.
-
|
The issue is that use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
let (tx, rx) = tokio::sync::mpsc::channel(16);
let mut stream = ReceiverStream::new(rx);
// Skip the first 2 items WITHOUT consuming stream
stream.by_ref().skip(2).collect::<Vec<_>>().await;
// stream is still alive here
while let Some(item) = stream.next().await {
println!("{item}");
}
If you need to drain lazily without collecting: {
let mut skipping = stream.by_ref().skip(2);
while let Some(_) = skipping.next().await {}
} // borrow released here
while let Some(item) = stream.next().await { /* ... */ } |
Beta Was this translation helpful? Give feedback.
-
|
The issue is that use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
let (tx, rx) = tokio::sync::mpsc::channel(16);
let mut stream = ReceiverStream::new(rx);
// Skip the first 2 items WITHOUT consuming `stream`
stream.by_ref().skip(2).collect::<Vec<_>>().await;
// stream is still alive here
while let Some(item) = stream.next().await {
println!("{item}");
}
If you need to drain lazily without collecting: {
let mut skipping = stream.by_ref().skip(2);
while let Some(_) = skipping.next().await {}
} // borrow released here — stream is usable again
while let Some(item) = stream.next().await { /* ... */ } |
Beta Was this translation helpful? Give feedback.
-
|
The issue is that use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
let (tx, rx) = tokio::sync::mpsc::channel(16);
let mut stream = ReceiverStream::new(rx);
// Skip the first 2 items WITHOUT consuming `stream`
stream.by_ref().skip(2).collect::<Vec<_>>().await;
// stream is still alive here
while let Some(item) = stream.next().await {
println!("{item}");
}
If you need to drain lazily without collecting: {
let mut skipping = stream.by_ref().skip(2);
while let Some(_) = skipping.next().await {}
} // borrow released here — stream is usable again
while let Some(item) = stream.next().await { /* ... */ } |
Beta Was this translation helpful? Give feedback.
-
|
AI slop?! |
Beta Was this translation helpful? Give feedback.
-
|
There are tokio/tokio-stream/src/wrappers/mpsc_bounded.rs Lines 92 to 98 in ad8c59a |
Beta Was this translation helpful? Give feedback.
-
Thank you, I know. And also tried that before writing this post. |
Beta Was this translation helpful? Give feedback.
-
|
Hi Erik, A reference to a stream is a stream. This should work: (&mut receiver_stream).skip(10).next() |
Beta Was this translation helpful? Give feedback.
-
|
Hey Alice, I have now learned some things:
|
Beta Was this translation helpful? Give feedback.
Hi Erik,
A reference to a stream is a stream. This should work: