Mercurial > lbo > hg > async-google-apis
changeset 92:eb7dcf74f9fd
Implement resumable upload for AsyncRead
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sat, 24 Oct 2020 22:11:06 +0200 |
parents | 6d501f826123 |
children | d9fe171d660b |
files | async-google-apis-common/src/http.rs |
diffstat | 1 files changed, 109 insertions(+), 8 deletions(-) [+] |
line wrap: on
line diff
--- a/async-google-apis-common/src/http.rs Sat Oct 24 21:46:34 2020 +0200 +++ b/async-google-apis-common/src/http.rs Sat Oct 24 22:11:06 2020 +0200 @@ -261,6 +261,106 @@ self.max_chunksize = size; } + /// Upload data from a reader; use only if the reader cannot be seeked. Memory usage is higher, + /// because data needs to be cached if the server hasn't accepted all data. + pub async fn upload<R: tokio::io::AsyncRead + std::marker::Unpin>( + &self, + mut f: R, + size: usize, + ) -> Result<Response> { + use tokio::io::AsyncReadExt; + + // Cursor to current position in stream. + let mut current = 0; + // Buffer portion that we couldn't send previously. + let mut previously_unsent = None; + loop { + let chunksize = if (size - current) > self.max_chunksize { + self.max_chunksize + } else { + size - current + }; + + let mut buf: Vec<u8>; + let read_from_stream; + if let Some(buf2) = previously_unsent.take() { + buf = buf2; + read_from_stream = buf.len(); + } else { + buf = vec![0 as u8; chunksize]; + // Move buffer into body. + read_from_stream = f.read_exact(&mut buf).await?; + buf.resize(read_from_stream, 0); + } + + let reqb = hyper::Request::builder() + .uri(self.dest.clone()) + .method(hyper::Method::PUT) + .header(hyper::header::CONTENT_LENGTH, read_from_stream) + .header( + hyper::header::CONTENT_RANGE, + format_content_range(current, current + read_from_stream - 1, size), + ) + .header(hyper::header::CONTENT_TYPE, "application/octet-stream"); + let request = reqb.body(hyper::Body::from(buf[..].to_vec()))?; + debug!("upload_file: Launching HTTP request: {:?}", request); + + let response = self.cl.request(request).await?; + debug!("upload_file: Received response: {:?}", response); + + let status = response.status(); + // 308 means: continue upload. + if !status.is_success() && status.as_u16() != 308 { + debug!("upload_file: Encountered error: {}", status); + return Err(ApiError::HTTPResponseError(status, status.to_string())).context( + body_to_str(hyper::body::to_bytes(response.into_body()).await?), + ); + } + + let sent; + if let Some(rng) = response.headers().get(hyper::header::RANGE) { + if let Some((_, to)) = parse_response_range(rng.to_str()?) { + sent = to + 1 - current; + if sent < read_from_stream { + previously_unsent = Some(buf.split_off(sent)); + } + current = to + 1; + } else { + sent = read_from_stream; + current += read_from_stream; + } + } else { + sent = read_from_stream; + current += read_from_stream; + } + + debug!( + "upload_file: Sent {} bytes (successful: {}) of total {} to {}", + chunksize, sent, size, self.dest + ); + + if current >= size { + let headers = response.headers().clone(); + let response_body = hyper::body::to_bytes(response.into_body()).await?; + + if !status.is_success() { + return Err(Error::from(ApiError::HTTPResponseError( + status, + body_to_str(response_body), + )) + .context(format!("{:?}", headers))); + } else { + return serde_json::from_reader(response_body.as_ref()).map_err(|e| { + anyhow::Error::from(e) + .context(body_to_str(response_body)) + .context(format!("{:?}", headers)) + }); + } + } + } + } + /// Upload content from a file. This is most efficient if you have an actual file, as seek can + /// be used in case the server didn't accept all data. pub async fn upload_file(&self, mut f: tokio::fs::File) -> Result<Response> { use tokio::io::AsyncReadExt; @@ -277,16 +377,16 @@ let mut buf = vec![0 as u8; chunksize]; // Move buffer into body. - let n = f.read_exact(&mut buf).await?; - buf.resize(n, 0); + let read_from_stream = f.read_exact(&mut buf).await?; + buf.resize(read_from_stream, 0); let reqb = hyper::Request::builder() .uri(self.dest.clone()) .method(hyper::Method::PUT) - .header(hyper::header::CONTENT_LENGTH, n) + .header(hyper::header::CONTENT_LENGTH, read_from_stream) .header( hyper::header::CONTENT_RANGE, - format_content_range(current, current + n - 1, len), + format_content_range(current, current + read_from_stream - 1, len), ) .header(hyper::header::CONTENT_TYPE, "application/octet-stream"); let request = reqb.body(hyper::Body::from(buf))?; @@ -310,12 +410,13 @@ sent = to + 1 - current; current = to + 1; } else { - sent = n; - current += n; + sent = read_from_stream; + current += read_from_stream; } } else { - sent = n; - current += n; + // This can also happen if response code is 200. + sent = read_from_stream; + current += read_from_stream; } debug!(