|
| 1 | +use anyhow::anyhow; |
| 2 | +use aws_smithy_async::rt::sleep::{AsyncSleep, Sleep}; |
| 3 | +use aws_smithy_runtime_api::client::http::{ |
| 4 | + HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector, |
| 5 | +}; |
| 6 | +use aws_smithy_runtime_api::client::orchestrator::HttpRequest; |
| 7 | +use aws_smithy_runtime_api::client::result::ConnectorError; |
| 8 | +use aws_smithy_runtime_api::client::retries::ErrorKind; |
| 9 | +use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents; |
| 10 | +use aws_smithy_runtime_api::http::Response; |
| 11 | +use aws_smithy_types::body::SdkBody; |
| 12 | +use http_body_util::{BodyStream, StreamBody}; |
| 13 | +use std::time::Duration; |
| 14 | +use sync_wrapper::SyncStream; |
| 15 | +use wstd::http::{Body as WstdBody, BodyExt, Client}; |
| 16 | + |
| 17 | +pub fn sleep_impl() -> impl AsyncSleep + 'static { |
| 18 | + WstdSleep |
| 19 | +} |
| 20 | + |
| 21 | +#[derive(Debug)] |
| 22 | +struct WstdSleep; |
| 23 | +impl AsyncSleep for WstdSleep { |
| 24 | + fn sleep(&self, duration: Duration) -> Sleep { |
| 25 | + Sleep::new(async move { |
| 26 | + wstd::task::sleep(wstd::time::Duration::from(duration)).await; |
| 27 | + }) |
| 28 | + } |
| 29 | +} |
| 30 | + |
| 31 | +pub fn http_client() -> impl HttpClient + 'static { |
| 32 | + WstdHttpClient |
| 33 | +} |
| 34 | + |
| 35 | +#[derive(Debug)] |
| 36 | +struct WstdHttpClient; |
| 37 | + |
| 38 | +impl HttpClient for WstdHttpClient { |
| 39 | + fn http_connector( |
| 40 | + &self, |
| 41 | + settings: &HttpConnectorSettings, |
| 42 | + // afaict, none of these components are relevant to this |
| 43 | + // implementation. |
| 44 | + _components: &RuntimeComponents, |
| 45 | + ) -> SharedHttpConnector { |
| 46 | + let mut client = Client::new(); |
| 47 | + if let Some(timeout) = settings.connect_timeout() { |
| 48 | + client.set_connect_timeout(timeout); |
| 49 | + } |
| 50 | + if let Some(timeout) = settings.read_timeout() { |
| 51 | + client.set_first_byte_timeout(timeout); |
| 52 | + } |
| 53 | + SharedHttpConnector::new(WstdHttpConnector(client)) |
| 54 | + } |
| 55 | +} |
| 56 | + |
| 57 | +#[derive(Debug)] |
| 58 | +struct WstdHttpConnector(Client); |
| 59 | + |
| 60 | +impl HttpConnector for WstdHttpConnector { |
| 61 | + fn call(&self, request: HttpRequest) -> HttpConnectorFuture { |
| 62 | + let client = self.0.clone(); |
| 63 | + HttpConnectorFuture::new(async move { |
| 64 | + let request = request |
| 65 | + .try_into_http1x() |
| 66 | + // This can only fail if the Extensions fail to convert |
| 67 | + .map_err(|e| ConnectorError::other(Box::new(e), None))?; |
| 68 | + // smithy's SdkBody Error is a non-'static boxed dyn stderror. |
| 69 | + // Anyhow can't represent that, so convert it to the debug impl. |
| 70 | + let request = |
| 71 | + request.map(|body| WstdBody::from_http_body(body.map_err(|e| anyhow!("{e:?}")))); |
| 72 | + // Any error given by send is considered a "ClientError" kind |
| 73 | + // which should prevent smithy from retrying like it would for a |
| 74 | + // throttling error |
| 75 | + let response = client |
| 76 | + .send(request) |
| 77 | + .await |
| 78 | + .map_err(|e| ConnectorError::other(e.into(), Some(ErrorKind::ClientError)))?; |
| 79 | + |
| 80 | + Response::try_from(response.map(|wstd_body| { |
| 81 | + // You'd think that an SdkBody would just be an impl Body with |
| 82 | + // the usual error type dance. |
| 83 | + let nonsync_body = wstd_body |
| 84 | + .into_boxed_body() |
| 85 | + .map_err(|e| e.into_boxed_dyn_error()); |
| 86 | + // But we have to do this weird dance: because Axum insists |
| 87 | + // bodies are not Sync, wstd settled on non-Sync bodies. |
| 88 | + // Smithy insists on Sync bodies. The SyncStream type exists |
| 89 | + // to assert, because all Stream operations are on &mut self, |
| 90 | + // all Streams are Sync. So, turn the Body into a Stream, make |
| 91 | + // it sync, then back to a Body. |
| 92 | + let nonsync_stream = BodyStream::new(nonsync_body); |
| 93 | + let sync_stream = SyncStream::new(nonsync_stream); |
| 94 | + let sync_body = StreamBody::new(sync_stream); |
| 95 | + SdkBody::from_body_1_x(sync_body) |
| 96 | + })) |
| 97 | + // This can only fail if the Extensions fail to convert |
| 98 | + .map_err(|e| ConnectorError::other(Box::new(e), None)) |
| 99 | + }) |
| 100 | + } |
| 101 | +} |
0 commit comments