changeset 92:eb7dcf74f9fd

Implement resumable upload for AsyncRead
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 24 Oct 2020 22:11:06 +0200
parents 6d501f826123
children d9fe171d660b
files async-google-apis-common/src/http.rs
diffstat 1 files changed, 109 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/async-google-apis-common/src/http.rs	Sat Oct 24 21:46:34 2020 +0200
+++ b/async-google-apis-common/src/http.rs	Sat Oct 24 22:11:06 2020 +0200
@@ -261,6 +261,106 @@
         self.max_chunksize = size;
     }
 
+    /// Upload data from a reader; use only if the reader cannot be seeked. Memory usage is higher,
+    /// because data needs to be cached if the server hasn't accepted all data.
+    pub async fn upload<R: tokio::io::AsyncRead + std::marker::Unpin>(
+        &self,
+        mut f: R,
+        size: usize,
+    ) -> Result<Response> {
+        use tokio::io::AsyncReadExt;
+
+        // Cursor to current position in stream.
+        let mut current = 0;
+        // Buffer portion that we couldn't send previously.
+        let mut previously_unsent = None;
+        loop {
+            let chunksize = if (size - current) > self.max_chunksize {
+                self.max_chunksize
+            } else {
+                size - current
+            };
+
+            let mut buf: Vec<u8>;
+            let read_from_stream;
+            if let Some(buf2) = previously_unsent.take() {
+                buf = buf2;
+                read_from_stream = buf.len();
+            } else {
+                buf = vec![0 as u8; chunksize];
+                // Move buffer into body.
+                read_from_stream = f.read_exact(&mut buf).await?;
+                buf.resize(read_from_stream, 0);
+            }
+
+            let reqb = hyper::Request::builder()
+                .uri(self.dest.clone())
+                .method(hyper::Method::PUT)
+                .header(hyper::header::CONTENT_LENGTH, read_from_stream)
+                .header(
+                    hyper::header::CONTENT_RANGE,
+                    format_content_range(current, current + read_from_stream - 1, size),
+                )
+                .header(hyper::header::CONTENT_TYPE, "application/octet-stream");
+            let request = reqb.body(hyper::Body::from(buf[..].to_vec()))?;
+            debug!("upload_file: Launching HTTP request: {:?}", request);
+
+            let response = self.cl.request(request).await?;
+            debug!("upload_file: Received response: {:?}", response);
+
+            let status = response.status();
+            // 308 means: continue upload.
+            if !status.is_success() && status.as_u16() != 308 {
+                debug!("upload_file: Encountered error: {}", status);
+                return Err(ApiError::HTTPResponseError(status, status.to_string())).context(
+                    body_to_str(hyper::body::to_bytes(response.into_body()).await?),
+                );
+            }
+
+            let sent;
+            if let Some(rng) = response.headers().get(hyper::header::RANGE) {
+                if let Some((_, to)) = parse_response_range(rng.to_str()?) {
+                    sent = to + 1 - current;
+                    if sent < read_from_stream {
+                        previously_unsent = Some(buf.split_off(sent));
+                    }
+                    current = to + 1;
+                } else {
+                    sent = read_from_stream;
+                    current += read_from_stream;
+                }
+            } else {
+                sent = read_from_stream;
+                current += read_from_stream;
+            }
+
+            debug!(
+                "upload_file: Sent {} bytes (successful: {}) of total {} to {}",
+                chunksize, sent, size, self.dest
+            );
+
+            if current >= size {
+                let headers = response.headers().clone();
+                let response_body = hyper::body::to_bytes(response.into_body()).await?;
+
+                if !status.is_success() {
+                    return Err(Error::from(ApiError::HTTPResponseError(
+                        status,
+                        body_to_str(response_body),
+                    ))
+                    .context(format!("{:?}", headers)));
+                } else {
+                    return serde_json::from_reader(response_body.as_ref()).map_err(|e| {
+                        anyhow::Error::from(e)
+                            .context(body_to_str(response_body))
+                            .context(format!("{:?}", headers))
+                    });
+                }
+            }
+        }
+    }
+    /// Upload content from a file. This is most efficient if you have an actual file, as seek can
+    /// be used in case the server didn't accept all data.
     pub async fn upload_file(&self, mut f: tokio::fs::File) -> Result<Response> {
         use tokio::io::AsyncReadExt;
 
@@ -277,16 +377,16 @@
 
             let mut buf = vec![0 as u8; chunksize];
             // Move buffer into body.
-            let n = f.read_exact(&mut buf).await?;
-            buf.resize(n, 0);
+            let read_from_stream = f.read_exact(&mut buf).await?;
+            buf.resize(read_from_stream, 0);
 
             let reqb = hyper::Request::builder()
                 .uri(self.dest.clone())
                 .method(hyper::Method::PUT)
-                .header(hyper::header::CONTENT_LENGTH, n)
+                .header(hyper::header::CONTENT_LENGTH, read_from_stream)
                 .header(
                     hyper::header::CONTENT_RANGE,
-                    format_content_range(current, current + n - 1, len),
+                    format_content_range(current, current + read_from_stream - 1, len),
                 )
                 .header(hyper::header::CONTENT_TYPE, "application/octet-stream");
             let request = reqb.body(hyper::Body::from(buf))?;
@@ -310,12 +410,13 @@
                     sent = to + 1 - current;
                     current = to + 1;
                 } else {
-                    sent = n;
-                    current += n;
+                    sent = read_from_stream;
+                    current += read_from_stream;
                 }
             } else {
-                sent = n;
-                current += n;
+                // This can also happen if response code is 200.
+                sent = read_from_stream;
+                current += read_from_stream;
             }
 
             debug!(