changeset 89:c260d92db27f

implement resumable uploads (works for Drive, at least!)
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 24 Oct 2020 21:45:48 +0200
parents cc6bd5604788
children e031af86de5b
files async-google-apis-common/Cargo.lock async-google-apis-common/Cargo.toml async-google-apis-common/src/http.rs
diffstat 3 files changed, 169 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/async-google-apis-common/Cargo.lock	Sat Oct 24 19:32:49 2020 +0200
+++ b/async-google-apis-common/Cargo.lock	Sat Oct 24 21:45:48 2020 +0200
@@ -18,6 +18,7 @@
 dependencies = [
  "anyhow",
  "chrono",
+ "futures",
  "hyper",
  "hyper-rustls",
  "log",
--- a/async-google-apis-common/Cargo.toml	Sat Oct 24 19:32:49 2020 +0200
+++ b/async-google-apis-common/Cargo.toml	Sat Oct 24 21:45:48 2020 +0200
@@ -16,6 +16,7 @@
 [dependencies]
 anyhow = "~1.0"
 chrono = "~0.4"
+futures = "~0.3"
 hyper = "~0.13"
 hyper-rustls = "~0.20"
 log = "~0.4"
--- a/async-google-apis-common/src/http.rs	Sat Oct 24 19:32:49 2020 +0200
+++ b/async-google-apis-common/src/http.rs	Sat Oct 24 21:45:48 2020 +0200
@@ -10,14 +10,39 @@
 #[derive(Debug, Serialize)]
 pub struct EmptyRequest {}
 
+/// This type is used as type parameter for when no response is expected.
+#[derive(Debug, Deserialize, Clone, Default)]
+pub struct EmptyResponse {}
+
 /// The Content-Type header is set automatically to application/json.
-pub async fn do_request<Req: Serialize + std::fmt::Debug, Resp: DeserializeOwned + Clone>(
+pub async fn do_request<
+    Req: Serialize + std::fmt::Debug,
+    Resp: DeserializeOwned + Clone + Default,
+>(
     cl: &TlsClient,
     path: &str,
     headers: &[(String, String)],
     http_method: &str,
     rq: Option<Req>,
 ) -> Result<Resp> {
+    use futures::future::FutureExt;
+    do_request_with_headers(cl, path, headers, http_method, rq)
+        .map(|r| r.map(|t| t.0))
+        .await
+}
+
+/// The Content-Type header is set automatically to application/json. Also returns response
+/// headers.
+pub async fn do_request_with_headers<
+    Req: Serialize + std::fmt::Debug,
+    Resp: DeserializeOwned + Clone + Default,
+>(
+    cl: &TlsClient,
+    path: &str,
+    headers: &[(String, String)],
+    http_method: &str,
+    rq: Option<Req>,
+) -> Result<(Resp, hyper::HeaderMap)> {
     let mut reqb = hyper::Request::builder().uri(path).method(http_method);
     for (k, v) in headers {
         reqb = reqb.header(k, v);
@@ -49,18 +74,27 @@
         status, http_response
     );
 
+    let headers = http_response.headers().clone();
     let response_body = hyper::body::to_bytes(http_response.into_body()).await?;
     if !status.is_success() {
         Err(ApiError::HTTPResponseError(status, body_to_str(response_body)).into())
     } else {
         // Evaluate body_to_str lazily
-        serde_json::from_reader(response_body.as_ref())
-            .map_err(|e| anyhow::Error::from(e).context(body_to_str(response_body)))
+        if response_body.len() > 0 {
+            serde_json::from_reader(response_body.as_ref())
+                .map_err(|e| anyhow::Error::from(e).context(body_to_str(response_body)))
+                .map(|r| (r, headers))
+        } else {
+            Ok((Default::default(), headers))
+        }
     }
 }
 
 /// The Content-Length header is set automatically.
-pub async fn do_upload_multipart<Req: Serialize + std::fmt::Debug, Resp: DeserializeOwned + Clone>(
+pub async fn do_upload_multipart<
+    Req: Serialize + std::fmt::Debug,
+    Resp: DeserializeOwned + Clone,
+>(
     cl: &TlsClient,
     path: &str,
     headers: &[(String, String)],
@@ -181,3 +215,132 @@
     }
     Ok(())
 }
+
+/// A resumable upload in progress, useful for sending large objects.
+pub struct ResumableUpload<'client, Response: DeserializeOwned> {
+    dest: hyper::Uri,
+    cl: &'client TlsClient,
+    max_chunksize: usize,
+    _resp: std::marker::PhantomData<Response>,
+}
+
+fn format_content_range(from: usize, to: usize, total: usize) -> String {
+    format!("bytes {}-{}/{}", from, to, total)
+}
+
+fn parse_response_range(rng: &str) -> Option<(usize, usize)> {
+    if let Some(main) = rng.strip_prefix("bytes=") {
+        let mut parts = main.split("-");
+        let (first, second) = (parts.next(), parts.next());
+        if first.is_none() || second.is_none() {
+            return None;
+        }
+        Some((
+            usize::from_str_radix(first.unwrap(), 10).unwrap_or(0),
+            usize::from_str_radix(second.unwrap(), 10).unwrap_or(0),
+        ))
+    } else {
+        None
+    }
+}
+
+impl<'client, Response: DeserializeOwned> ResumableUpload<'client, Response> {
+    pub fn new(
+        to: hyper::Uri,
+        cl: &'client TlsClient,
+        max_chunksize: usize,
+    ) -> ResumableUpload<'client, Response> {
+        ResumableUpload {
+            dest: to,
+            cl: cl,
+            max_chunksize: max_chunksize,
+            _resp: Default::default(),
+        }
+    }
+    pub fn set_max_chunksize(&mut self, size: usize) {
+        self.max_chunksize = size;
+    }
+
+    pub async fn upload_file(&self, mut f: tokio::fs::File) -> Result<Response> {
+        use tokio::io::AsyncReadExt;
+
+        let len = f.metadata().await?.len() as usize;
+        let mut current = 0;
+        loop {
+            let chunksize = if (len - current) > self.max_chunksize {
+                self.max_chunksize
+            } else {
+                len - current
+            };
+
+            f.seek(std::io::SeekFrom::Start(current as u64)).await?;
+
+            let mut buf = vec![0 as u8; chunksize];
+            // Move buffer into body.
+            let n = f.read_exact(&mut buf).await?;
+            buf.resize(n, 0);
+
+            let reqb = hyper::Request::builder()
+                .uri(self.dest.clone())
+                .method(hyper::Method::PUT)
+                .header(hyper::header::CONTENT_LENGTH, n)
+                .header(
+                    hyper::header::CONTENT_RANGE,
+                    format_content_range(current, current + n - 1, len),
+                )
+                .header(hyper::header::CONTENT_TYPE, "application/octet-stream");
+            let request = reqb.body(hyper::Body::from(buf))?;
+            debug!("upload_file: Launching HTTP request: {:?}", request);
+
+            let response = self.cl.request(request).await?;
+            debug!("upload_file: Received response: {:?}", response);
+
+            let status = response.status();
+            // 308 means: continue upload.
+            if !status.is_success() && status.as_u16() != 308 {
+                debug!("upload_file: Encountered error: {}", status);
+                return Err(ApiError::HTTPResponseError(status, status.to_string())).context(
+                    body_to_str(hyper::body::to_bytes(response.into_body()).await?),
+                );
+            }
+
+            let sent;
+            if let Some(rng) = response.headers().get(hyper::header::RANGE) {
+                if let Some((_, to)) = parse_response_range(rng.to_str()?) {
+                    sent = to + 1 - current;
+                    current = to + 1;
+                } else {
+                    sent = n;
+                    current += n;
+                }
+            } else {
+                sent = n;
+                current += n;
+            }
+
+            debug!(
+                "upload_file: Sent {} bytes (successful: {}) of total {} to {}",
+                chunksize, sent, len, self.dest
+            );
+
+            if current >= len {
+                let headers = response.headers().clone();
+                let response_body = hyper::body::to_bytes(response.into_body()).await?;
+
+                if !status.is_success() {
+                    return Err(Error::from(ApiError::HTTPResponseError(
+                        status,
+                        body_to_str(response_body),
+                    ))
+                    .context(format!("{:?}", headers)));
+                } else {
+                    return serde_json::from_reader(response_body.as_ref()).map_err(|e| {
+                        anyhow::Error::from(e)
+                            .context(body_to_str(response_body))
+                            .context(format!("{:?}", headers))
+                    });
+                }
+            }
+        }
+    }
+}