Skip to content

Commit 8de60f1

Browse files
Fix async stream cancel corrupting read/write state (#12704)
When `stream.cancel-read` or `stream.cancel-write` is called with the `async` option and the cancel cannot complete immediately (returns BLOCKED), the code was unconditionally transitioning the read/write state from GuestReady to Open. This destroyed the buffer address/count info stored in GuestReady, causing incorrect behavior when the host producer/consumer later tried to access the stream state. Guard the GuestReady -> Open state transition with a check that the cancel did not return BLOCKED. When blocked, the cancel is still in-flight and the read/write state must be preserved until the cancel completes. Adds a regression test that creates a host StreamProducer, starts an async read (BLOCKED), then async-cancels (BLOCKED), and waits for cancel completion.
1 parent 0dc25e6 commit 8de60f1

2 files changed

Lines changed: 156 additions & 14 deletions

File tree

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3925,14 +3925,16 @@ impl Instance {
39253925
ReturnCode::Cancelled(0)
39263926
};
39273927

3928-
let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
3928+
if !matches!(code, ReturnCode::Blocked) {
3929+
let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
39293930

3930-
match &transmit.write {
3931-
WriteState::GuestReady { .. } => {
3932-
transmit.write = WriteState::Open;
3931+
match &transmit.write {
3932+
WriteState::GuestReady { .. } => {
3933+
transmit.write = WriteState::Open;
3934+
}
3935+
WriteState::HostReady { .. } => bail_bug!("support host write cancellation"),
3936+
WriteState::Open | WriteState::Dropped => {}
39333937
}
3934-
WriteState::HostReady { .. } => bail_bug!("support host write cancellation"),
3935-
WriteState::Open | WriteState::Dropped => {}
39363938
}
39373939

39383940
log::trace!("cancelled write {transmit_id:?}: {code:?}");
@@ -4010,16 +4012,18 @@ impl Instance {
40104012
ReturnCode::Cancelled(0)
40114013
};
40124014

4013-
let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
4015+
if !matches!(code, ReturnCode::Blocked) {
4016+
let transmit = store.concurrent_state_mut().get_mut(transmit_id)?;
40144017

4015-
match &transmit.read {
4016-
ReadState::GuestReady { .. } => {
4017-
transmit.read = ReadState::Open;
4018-
}
4019-
ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
4020-
bail_bug!("support host read cancellation")
4018+
match &transmit.read {
4019+
ReadState::GuestReady { .. } => {
4020+
transmit.read = ReadState::Open;
4021+
}
4022+
ReadState::HostReady { .. } | ReadState::HostToHost { .. } => {
4023+
bail_bug!("support host read cancellation")
4024+
}
4025+
ReadState::Open | ReadState::Dropped => {}
40214026
}
4022-
ReadState::Open | ReadState::Dropped => {}
40234027
}
40244028

40254029
log::trace!("cancelled read {transmit_id:?}: {code:?}");

tests/all/component_model/async.rs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,3 +1039,141 @@ async fn sync_lower_async_host_does_not_leak() -> Result<()> {
10391039

10401040
Ok(())
10411041
}
1042+
1043+
/// Regression test: `stream.cancel-read` with `async` option must not corrupt
1044+
/// the read state when it returns BLOCKED.
1045+
///
1046+
/// Bug: cancel_read/cancel_write unconditionally transitioned the read/write
1047+
/// state from GuestReady to Open after the cancel, even when the cancel
1048+
/// returned BLOCKED. This destroyed the buffer address/count info, causing
1049+
/// an error when the host later tried to access the stream state.
1050+
#[tokio::test]
1051+
#[cfg_attr(miri, ignore)]
1052+
async fn stream_cancel_read_async_does_not_corrupt_state() -> Result<()> {
1053+
_ = env_logger::try_init();
1054+
1055+
let mut config = Config::new();
1056+
config.wasm_component_model_async(true);
1057+
config.wasm_component_model_async_builtins(true);
1058+
config.wasm_component_model_async_stackful(true);
1059+
let engine = Engine::new(&config)?;
1060+
1061+
let component = Component::new(
1062+
&engine,
1063+
r#"
1064+
(component
1065+
(core module $libc (memory (export "memory") 1))
1066+
(core instance $libc (instantiate $libc))
1067+
(core module $m
1068+
(import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32)))
1069+
(import "" "stream.cancel-read" (func $stream.cancel-read (param i32) (result i32)))
1070+
(import "" "stream.drop-readable" (func $stream.drop-readable (param i32)))
1071+
(import "" "waitable.join" (func $waitable.join (param i32 i32)))
1072+
(import "" "waitable-set.new" (func $waitable-set.new (result i32)))
1073+
(import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
1074+
(import "" "waitable-set.drop" (func $waitable-set.drop (param i32)))
1075+
(memory (export "memory") 1)
1076+
1077+
(func (export "run") (param $sr i32)
1078+
(local $cancel_result i32)
1079+
(local $ws i32)
1080+
1081+
;; Async read into buffer at 0x100, length 4.
1082+
;; Should return BLOCKED (-1) since the host producer never writes.
1083+
(call $stream.read (local.get $sr) (i32.const 0x100) (i32.const 4))
1084+
i32.const -1 ;; BLOCKED
1085+
i32.ne
1086+
if unreachable end
1087+
1088+
;; Async cancel-read. The host write end is HostReady, so this returns
1089+
;; BLOCKED. Bug: the cancel unconditionally transitions GuestReady -> Open,
1090+
;; destroying the buffer info.
1091+
(local.set $cancel_result (call $stream.cancel-read (local.get $sr)))
1092+
1093+
;; If cancel returned BLOCKED (-1), wait for the cancel to complete.
1094+
;; This is where the bug manifests: when the host processes the cancel,
1095+
;; it accesses the read state which was corrupted from GuestReady to Open.
1096+
(if (i32.eq (local.get $cancel_result) (i32.const -1))
1097+
(then
1098+
(local.set $ws (call $waitable-set.new))
1099+
(call $waitable.join (local.get $sr) (local.get $ws))
1100+
;; Wait for the stream event (cancel completion). Event buffer at 0x200.
1101+
(drop (call $waitable-set.wait (local.get $ws) (i32.const 0x200)))
1102+
;; Unjoin stream from waitable-set (join to 0 = unjoin)
1103+
(call $waitable.join (local.get $sr) (i32.const 0))
1104+
(call $waitable-set.drop (local.get $ws))
1105+
)
1106+
)
1107+
1108+
;; Drop the stream
1109+
(call $stream.drop-readable (local.get $sr))
1110+
)
1111+
)
1112+
1113+
(type $s (stream u8))
1114+
(core func $stream.read (canon stream.read $s async (memory $libc "memory")))
1115+
(core func $stream.cancel-read (canon stream.cancel-read $s async))
1116+
(core func $stream.drop-readable (canon stream.drop-readable $s))
1117+
(canon waitable.join (core func $waitable.join))
1118+
(canon waitable-set.new (core func $waitable-set.new))
1119+
(canon waitable-set.wait (memory $libc "memory") (core func $waitable-set.wait))
1120+
(canon waitable-set.drop (core func $waitable-set.drop))
1121+
1122+
(core instance $i (instantiate $m
1123+
(with "" (instance
1124+
(export "stream.read" (func $stream.read))
1125+
(export "stream.cancel-read" (func $stream.cancel-read))
1126+
(export "stream.drop-readable" (func $stream.drop-readable))
1127+
(export "waitable.join" (func $waitable.join))
1128+
(export "waitable-set.new" (func $waitable-set.new))
1129+
(export "waitable-set.wait" (func $waitable-set.wait))
1130+
(export "waitable-set.drop" (func $waitable-set.drop))
1131+
))
1132+
))
1133+
1134+
(func (export "run") async (param "s" (stream u8))
1135+
(canon lift
1136+
(core func $i "run")
1137+
(memory $libc "memory")
1138+
)
1139+
)
1140+
)
1141+
"#,
1142+
)?;
1143+
1144+
let mut store = Store::new(&engine, ());
1145+
let instance = Linker::new(&engine)
1146+
.instantiate_async(&mut store, &component)
1147+
.await?;
1148+
let func = instance.get_typed_func::<(StreamReader<u8>,), ()>(&mut store, "run")?;
1149+
1150+
// Create a host-side stream that never produces data (always Pending).
1151+
// When cancel is requested (finish=true), it acknowledges the cancellation.
1152+
let reader = StreamReader::new(&mut store, NeverWriteStreamProducer)?;
1153+
func.call_async(&mut store, (reader,)).await?;
1154+
1155+
return Ok(());
1156+
1157+
struct NeverWriteStreamProducer;
1158+
1159+
impl StreamProducer<()> for NeverWriteStreamProducer {
1160+
type Item = u8;
1161+
type Buffer = Option<u8>;
1162+
1163+
fn poll_produce<'a>(
1164+
self: Pin<&mut Self>,
1165+
_cx: &mut Context<'_>,
1166+
_store: StoreContextMut<'a, ()>,
1167+
_destination: Destination<'a, Self::Item, Self::Buffer>,
1168+
finish: bool,
1169+
) -> Poll<Result<StreamResult>> {
1170+
if finish {
1171+
// Cancel requested — acknowledge it.
1172+
Poll::Ready(Ok(StreamResult::Cancelled))
1173+
} else {
1174+
// Never produce data.
1175+
Poll::Pending
1176+
}
1177+
}
1178+
}
1179+
}

0 commit comments

Comments
 (0)