Mercurial > lbo > hg > async-google-apis
changeset 89:c260d92db27f
implement resumable uploads (works for Drive, at least!)
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 24 Oct 2020 21:45:48 +0200 |
parents | cc6bd5604788 |
children | e031af86de5b |
files | async-google-apis-common/Cargo.lock async-google-apis-common/Cargo.toml async-google-apis-common/src/http.rs |
diffstat | 3 files changed, 169 insertions(+), 4 deletions(-) [+] |
line wrap: on
line diff
--- a/async-google-apis-common/Cargo.lock Sat Oct 24 19:32:49 2020 +0200 +++ b/async-google-apis-common/Cargo.lock Sat Oct 24 21:45:48 2020 +0200 @@ -18,6 +18,7 @@ dependencies = [ "anyhow", "chrono", + "futures", "hyper", "hyper-rustls", "log",
--- a/async-google-apis-common/Cargo.toml Sat Oct 24 19:32:49 2020 +0200 +++ b/async-google-apis-common/Cargo.toml Sat Oct 24 21:45:48 2020 +0200 @@ -16,6 +16,7 @@ [dependencies] anyhow = "~1.0" chrono = "~0.4" +futures = "~0.3" hyper = "~0.13" hyper-rustls = "~0.20" log = "~0.4"
--- a/async-google-apis-common/src/http.rs Sat Oct 24 19:32:49 2020 +0200 +++ b/async-google-apis-common/src/http.rs Sat Oct 24 21:45:48 2020 +0200 @@ -10,14 +10,39 @@ #[derive(Debug, Serialize)] pub struct EmptyRequest {} +/// This type is used as type parameter for when no response is expected. +#[derive(Debug, Deserialize, Clone, Default)] +pub struct EmptyResponse {} + /// The Content-Type header is set automatically to application/json. -pub async fn do_request<Req: Serialize + std::fmt::Debug, Resp: DeserializeOwned + Clone>( +pub async fn do_request< + Req: Serialize + std::fmt::Debug, + Resp: DeserializeOwned + Clone + Default, +>( cl: &TlsClient, path: &str, headers: &[(String, String)], http_method: &str, rq: Option<Req>, ) -> Result<Resp> { + use futures::future::FutureExt; + do_request_with_headers(cl, path, headers, http_method, rq) + .map(|r| r.map(|t| t.0)) + .await +} + +/// The Content-Type header is set automatically to application/json. Also returns response +/// headers. +pub async fn do_request_with_headers< + Req: Serialize + std::fmt::Debug, + Resp: DeserializeOwned + Clone + Default, +>( + cl: &TlsClient, + path: &str, + headers: &[(String, String)], + http_method: &str, + rq: Option<Req>, +) -> Result<(Resp, hyper::HeaderMap)> { let mut reqb = hyper::Request::builder().uri(path).method(http_method); for (k, v) in headers { reqb = reqb.header(k, v); @@ -49,18 +74,27 @@ status, http_response ); + let headers = http_response.headers().clone(); let response_body = hyper::body::to_bytes(http_response.into_body()).await?; if !status.is_success() { Err(ApiError::HTTPResponseError(status, body_to_str(response_body)).into()) } else { // Evaluate body_to_str lazily - serde_json::from_reader(response_body.as_ref()) - .map_err(|e| anyhow::Error::from(e).context(body_to_str(response_body))) + if response_body.len() > 0 { + serde_json::from_reader(response_body.as_ref()) + .map_err(|e| anyhow::Error::from(e).context(body_to_str(response_body))) + .map(|r| (r, headers)) + } else { + Ok((Default::default(), headers)) + } } } /// The Content-Length header is set automatically. -pub async fn do_upload_multipart<Req: Serialize + std::fmt::Debug, Resp: DeserializeOwned + Clone>( +pub async fn do_upload_multipart< + Req: Serialize + std::fmt::Debug, + Resp: DeserializeOwned + Clone, +>( cl: &TlsClient, path: &str, headers: &[(String, String)], @@ -181,3 +215,132 @@ } Ok(()) } + +/// A resumable upload in progress, useful for sending large objects. +pub struct ResumableUpload<'client, Response: DeserializeOwned> { + dest: hyper::Uri, + cl: &'client TlsClient, + max_chunksize: usize, + _resp: std::marker::PhantomData<Response>, +} + +fn format_content_range(from: usize, to: usize, total: usize) -> String { + format!("bytes {}-{}/{}", from, to, total) +} + +fn parse_response_range(rng: &str) -> Option<(usize, usize)> { + if let Some(main) = rng.strip_prefix("bytes=") { + let mut parts = main.split("-"); + let (first, second) = (parts.next(), parts.next()); + if first.is_none() || second.is_none() { + return None; + } + Some(( + usize::from_str_radix(first.unwrap(), 10).unwrap_or(0), + usize::from_str_radix(second.unwrap(), 10).unwrap_or(0), + )) + } else { + None + } +} + +impl<'client, Response: DeserializeOwned> ResumableUpload<'client, Response> { + pub fn new( + to: hyper::Uri, + cl: &'client TlsClient, + max_chunksize: usize, + ) -> ResumableUpload<'client, Response> { + ResumableUpload { + dest: to, + cl: cl, + max_chunksize: max_chunksize, + _resp: Default::default(), + } + } + pub fn set_max_chunksize(&mut self, size: usize) { + self.max_chunksize = size; + } + + pub async fn upload_file(&self, mut f: tokio::fs::File) -> Result<Response> { + use tokio::io::AsyncReadExt; + + let len = f.metadata().await?.len() as usize; + let mut current = 0; + loop { + let chunksize = if (len - current) > self.max_chunksize { + self.max_chunksize + } else { + len - current + }; + + f.seek(std::io::SeekFrom::Start(current as u64)).await?; + + let mut buf = vec![0 as u8; chunksize]; + // Move buffer into body. + let n = f.read_exact(&mut buf).await?; + buf.resize(n, 0); + + let reqb = hyper::Request::builder() + .uri(self.dest.clone()) + .method(hyper::Method::PUT) + .header(hyper::header::CONTENT_LENGTH, n) + .header( + hyper::header::CONTENT_RANGE, + format_content_range(current, current + n - 1, len), + ) + .header(hyper::header::CONTENT_TYPE, "application/octet-stream"); + let request = reqb.body(hyper::Body::from(buf))?; + debug!("upload_file: Launching HTTP request: {:?}", request); + + let response = self.cl.request(request).await?; + debug!("upload_file: Received response: {:?}", response); + + let status = response.status(); + // 308 means: continue upload. + if !status.is_success() && status.as_u16() != 308 { + debug!("upload_file: Encountered error: {}", status); + return Err(ApiError::HTTPResponseError(status, status.to_string())).context( + body_to_str(hyper::body::to_bytes(response.into_body()).await?), + ); + } + + let sent; + if let Some(rng) = response.headers().get(hyper::header::RANGE) { + if let Some((_, to)) = parse_response_range(rng.to_str()?) { + sent = to + 1 - current; + current = to + 1; + } else { + sent = n; + current += n; + } + } else { + sent = n; + current += n; + } + + debug!( + "upload_file: Sent {} bytes (successful: {}) of total {} to {}", + chunksize, sent, len, self.dest + ); + + if current >= len { + let headers = response.headers().clone(); + let response_body = hyper::body::to_bytes(response.into_body()).await?; + + if !status.is_success() { + return Err(Error::from(ApiError::HTTPResponseError( + status, + body_to_str(response_body), + )) + .context(format!("{:?}", headers))); + } else { + return serde_json::from_reader(response_body.as_ref()).map_err(|e| { + anyhow::Error::from(e) + .context(body_to_str(response_body)) + .context(format!("{:?}", headers)) + }); + } + } + } + } +}