Mercurial > lbo > hg > scrapeprice
changeset 21:e4c4a7c00fbd
Use a new trait system
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Tue, 22 Sep 2020 19:18:20 +0200 |
parents | b16039ffcb17 |
children | 0e7b6f3050d0 |
files | src/driver.rs src/err.rs |
diffstat | 2 files changed, 21 insertions(+), 22 deletions(-) [+] |
line wrap: on
line diff
--- a/src/driver.rs Tue Sep 22 19:08:29 2020 +0200 +++ b/src/driver.rs Tue Sep 22 19:18:20 2020 +0200 @@ -17,61 +17,58 @@ async fn store(&mut self, d: Box<dyn Iterator<Item=T> + Send>) ->Result<(), err::HTTPError>; } -/// Return Uris to explore, both as initial set and for every fetched page. -#[async_trait::async_trait] -pub trait Explorer { - /// Return pages to fetch in any case, e.g. time-based. Called on every iteration of the - /// driver. All returned Uris are appended to the queue. - async fn idle(&mut self) -> Vec<Uri>; - /// Return pages to fetch based on a fetched document. - fn next(&mut self, uri: &Uri, doc: &extract::Document) -> Vec<Uri>; -} - /// An Extractor retrieves information from a Document. pub trait Extractor<T: Send> { fn extract(&mut self, uri: &Uri, doc: &extract::Document) -> Vec<T> { vec![] } + /// Return pages to fetch based on a fetched document. + fn next_sites(&mut self, uri: &Uri, doc: &extract::Document) -> Vec<Uri>; +} + +/// The Queue manages and prioritizes order and volume of sites to fetch. +#[async_trait::async_trait] +pub trait Queue { + /// Add a site to the queue. + async fn add(&mut self, uris: &[Uri]) -> Result<(), err::HTTPError>; + /// Returns a site to scrape next. + async fn next(&mut self) -> Result<Option<Uri>, err::HTTPError>; } /// DriverLogic holds the driven implementation. The members tell the driver what to fetch, and /// what and how to store it. pub struct DriverLogic<T> { - pub explore: Box<dyn Explorer>, pub store: Box<dyn Storage<T>>, pub extract: Box<dyn Extractor<T>>, + pub queue: Box<dyn Queue>, } pub struct Driver<T> { https: http::HTTPS, logic: DriverLogic<T>, - - // This could be made into a more elaborate scheduler. - queue: Vec<Uri>, } impl<T: 'static + Send> Driver<T> { /// Create a new Driver instance. pub fn new(logic: DriverLogic<T>, https: Option<http::HTTPS>) -> Driver<T> { - Driver { https: https.unwrap_or(http::HTTPS::new()), logic: logic, queue: Vec::with_capacity(64) } + Driver { https: https.unwrap_or(http::HTTPS::new()), logic: logic } } /// Run Driver a single step, i.e. first explore, then process one page. Returns true if a page /// was processed. pub async fn drive(&mut self) -> Result<bool, err::HTTPError> { - let new = self.logic.explore.idle().await; - info!("Appended URIs to queue: {:?}", new); - self.queue.extend(new.into_iter()); + let next = self.logic.queue.next().await?; + info!("Next URL: {:?}", next); - if let Some(uri) = self.queue.pop() { + if let Some(uri) = next { info!("Starting fetch of {}", uri); let resp = self.https.get(&uri).await?; let doc = extract::parse_response(resp)?; let extracted = self.logic.extract.extract(&uri, &doc); self.logic.store.store(Box::new(extracted.into_iter())); - let next = self.logic.explore.next(&uri, &doc); - info!("Appended URIs after fetch: {:?}", next); - self.queue.extend(next); + let next_urls = self.logic.extract.next_sites(&uri, &doc); + info!("Appended URIs after fetch: {:?}", next_urls); + self.logic.queue.add(&next_urls); return Ok(true); } else { Ok(false)
--- a/src/err.rs Tue Sep 22 19:08:29 2020 +0200 +++ b/src/err.rs Tue Sep 22 19:18:20 2020 +0200 @@ -16,6 +16,7 @@ HyperError(hyper::Error), LogicError(String), StatusError(hyper::StatusCode), + StorageError(String), } impl fmt::Display for HTTPError { @@ -26,6 +27,7 @@ HTTPError::HttpError(he) => e = format!("{}", he), HTTPError::HyperError(he) => e = format!("{}", he), HTTPError::LogicError(s) => e = s.clone(), + HTTPError::StorageError(s) => e = s.clone(), HTTPError::StatusError(sc) => e = format!("{}", sc), } write!(f, "HTTPError({})", e)?;