Mercurial > lbo > hg > analyrics
changeset 26:b1850e6f4d9a
Split up source code
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Thu, 14 Jul 2022 20:35:14 -0700 |
parents | 390a448dc8c7 |
children | 792eb8ac3d93 |
files | src/configdb.rs src/db.rs src/guards.rs src/logsdb.rs src/main.rs |
diffstat | 5 files changed, 267 insertions(+), 219 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/configdb.rs Thu Jul 14 20:35:14 2022 -0700 @@ -0,0 +1,59 @@ + +use crate::db::{PoolType, DBType}; + +use anyhow::Error; +use log::{debug, error, info, warn, Level}; + +use rocket::futures::StreamExt; +use rocket_db_pools::sqlx::{Executor, Row, Sqlite, SqlitePool}; +use rocket_db_pools::{Connection, Database, Pool}; +use sqlx::prelude::FromRow; + +// TO DO: use other databases? +#[derive(Database)] +#[database("sqlite_main")] +pub struct ConfigDB(pub PoolType); + +#[derive(Default, Clone, Debug, FromRow)] +pub struct UserRecord { + pub name: String, + pub tz_offset: i64, +} + +pub struct ConfigDBSession<'p, DB: sqlx::Database>(pub &'p mut sqlx::pool::PoolConnection<DB>); + +impl<'p> ConfigDBSession<'p, Sqlite> { + pub async fn check_user_password<S: AsRef<str>>( + &mut self, + user: S, + password: S, + ) -> Result<bool, Error> { + // TODO: salt passwords. + let salt: String = match sqlx::query("SELECT salt FROM users WHERE username = ? LIMIT 1;") + .bind(user.as_ref()) + .fetch_one(&mut *self.0) + .await + { + Ok(r) => r.get(0), + Err(e) => { + warn!("Error querying salt: {}", e); + return Ok(false); + } + }; + let pwdhash = sha256::digest(format!("{}{}", salt, password.as_ref())); + let q = sqlx::query("SELECT username FROM users WHERE username = ? AND password_hash = ?;") + .bind(user.as_ref()) + .bind(pwdhash); + let result = self.0.fetch_all(q).await?; + Ok(result.len() == 1) + } + + pub async fn get_user_details(&mut self, user: &str) -> Result<UserRecord, Error> { + let entry = sqlx::query("SELECT name, tz_offset FROM users WHERE username = ? LIMIT 1") + .bind(user) + .fetch_one(&mut *self.0) + .await?; + Ok(UserRecord::from_row(&entry)?) + } +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/db.rs Thu Jul 14 20:35:14 2022 -0700 @@ -0,0 +1,19 @@ + +use rocket_db_pools::sqlx::{Executor, Row}; +use rocket_db_pools::{Connection, Database, Pool}; +use sqlx::prelude::FromRow; + +#[cfg(feature = "sqlite")] +use rocket_db_pools::sqlx::{Sqlite, SqlitePool}; +#[cfg(feature = "sqlite")] +pub type DBType = Sqlite; +#[cfg(feature = "sqlite")] +pub type PoolType = SqlitePool; + +// Current SQL queries don't work with postgres. +#[cfg(feature = "postgres")] +use rocket_db_pools::sqlx::{PgPool, Postgres}; +#[cfg(feature = "postgres")] +pub type DBType = Postgres; +#[cfg(feature = "postgres")] +pub type PoolType = PgPool;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/guards.rs Thu Jul 14 20:35:14 2022 -0700 @@ -0,0 +1,35 @@ + +use anyhow::{self, Context, Error}; + +use rocket::http::{HeaderMap}; +use rocket::request::{self, FlashMessage, FromRequest, Outcome, Request}; + +pub const USER_ID_COOKIE_KEY: &str = "user_id"; + +pub struct LoggedInGuard(pub String); + +#[rocket::async_trait] +impl<'r> FromRequest<'r> for LoggedInGuard { + type Error = Error; + + async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> { + let cookies = req.cookies(); + if let Some(uc) = cookies.get_private(USER_ID_COOKIE_KEY) { + Outcome::Success(LoggedInGuard(uc.value().to_string())) + } else { + Outcome::Forward(()) + } + } +} + +pub struct HeadersGuard<'h>(pub HeaderMap<'h>); + +#[rocket::async_trait] +impl<'r> FromRequest<'r> for HeadersGuard<'r> { + type Error = Error; + + async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> { + Outcome::Success(HeadersGuard(req.headers().clone())) + } +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/logsdb.rs Thu Jul 14 20:35:14 2022 -0700 @@ -0,0 +1,143 @@ + +use crate::db::{PoolType, DBType}; + +use anyhow::{Context, Error}; +use log::{debug, error, info, warn, Level}; +use time::{Duration, OffsetDateTime}; + +use rocket::futures::StreamExt; +use rocket_db_pools::sqlx::{Executor, Row, Sqlite, SqlitePool}; +use rocket_db_pools::{Connection, Database, Pool}; +use sqlx::prelude::FromRow; + + +#[derive(Database)] +#[database("sqlite_logs")] +pub struct LogsDB(pub PoolType); + +pub struct LogsDBSession<'p, DB: sqlx::Database>(pub &'p mut sqlx::pool::PoolConnection<DB>); + +impl<'p> LogsDBSession<'p, Sqlite> { + pub async fn log_request< + S1: AsRef<str>, + S2: AsRef<str>, + S3: AsRef<str>, + S4: AsRef<str>, + S5: AsRef<str>, + S6: AsRef<str>, + >( + &mut self, + session: Option<u32>, + ip: S1, + domain: S2, + path: S3, + status: u32, + page: Option<S4>, + refer: Option<S5>, + ua: S6, + ntags: u32, + ) -> Result<u32, Error> { + let q = sqlx::query::<DBType>("INSERT INTO RequestLog (session, ip, atime, domain, path, status, pagename, refer, ua, ntags) VALUES (?, ?, strftime('%s', 'now'), ?, ?, ?, ?, ?, ?, ?) RETURNING id"); + let q = q + .bind(session) + .bind(ip.as_ref()) + .bind(domain.as_ref()) + .bind(path.as_ref()) + .bind(status) + .bind(page.map(|s| s.as_ref().to_string())) + .bind(refer.map(|s| s.as_ref().to_string())) + .bind(ua.as_ref()) + .bind(ntags); + let row: u32 = q.fetch_one(&mut *self.0).await?.get(0); + Ok(row) + } + + pub async fn log_tags<S: AsRef<str>, I: Iterator<Item = S>>( + &mut self, + requestid: u32, + tags: I, + ) -> Result<usize, Error> { + let mut ntags = 0; + for tag in tags { + let (k, v) = tag.as_ref().split_once("=").unwrap_or((tag.as_ref(), "")); + sqlx::query("INSERT INTO RequestTags (requestid, key, value) VALUES (?, ?, ?)") + .bind(requestid) + .bind(k) + .bind(v) + .execute(&mut *self.0) + .await + .map_err(|e| error!("Couldn't insert tag {}={}: {}", k, v, e)) + .unwrap(); + ntags += 1; + } + + Ok(ntags) + } + + pub async fn start_session(&mut self, domain: &str) -> Result<u32, Error> { + Ok(sqlx::query("INSERT INTO Sessions (start, last, domain) VALUES (strftime('%s', 'now'), strftime('%s', 'now'), ?) RETURNING id") + .bind(domain) + .fetch_one(&mut *self.0) + .await?.get(0)) + } + + pub async fn update_session_time(&mut self, id: u32) -> Result<(), Error> { + sqlx::query("UPDATE Sessions SET last = strftime('%s', 'now') WHERE id = ?") + .bind(id) + .execute(&mut *self.0) + .await?; + Ok(()) + } + + pub async fn query_visits_sessions_counts<S: AsRef<str>>( + &mut self, + from: OffsetDateTime, + to: OffsetDateTime, + tz_offset: Option<i64>, + domainpattern: Option<S>, + ) -> Result<(Vec<String>, Vec<u32>, Vec<u32>), Error> { + let domain = domainpattern.as_ref().map(AsRef::as_ref).unwrap_or("%"); + let mut results = sqlx::query( + r#" +SELECT DATE(atime + ?, 'unixepoch') AS rqdate, COUNT(requestlog.id) AS rqcount, sesscount +FROM requestlog +JOIN ( + SELECT DATE(start + ?, 'unixepoch') AS sessdate, COUNT(*) AS sesscount + FROM sessions WHERE sessions.domain LIKE ? GROUP BY sessdate) +AS sc ON (rqdate = sessdate) +WHERE atime > ? AND atime < ? AND requestlog.domain LIKE ? +GROUP BY rqdate +ORDER BY rqdate ASC;"#, + ) + .bind(tz_offset.unwrap_or(0)) + .bind(tz_offset.unwrap_or(0)) + .bind(domain) + .bind(from.unix_timestamp()) + .bind(to.unix_timestamp()) + .bind(domain) + .fetch(&mut *self.0); + + // Result table: date / visit count / session count + let mut dates = vec![]; //Vec::<String>::with_capacity(results.len()); + let mut visits = vec![]; //Vec::<u32>::with_capacity(results.len()); + let mut sessions = vec![]; //Vec::<u32>::with_capacity(results.len()); + + loop { + match results.next().await { + Some(Ok(row)) => { + dates.push(row.get(0)); + visits.push(row.get(1)); + sessions.push(row.get(2)); + } + None => break, + Some(Err(e)) => { + error!("Error querying visits/sessions: {}", e); + return Err(e).context("query visits/session counts"); + } + } + } + + Ok((dates, visits, sessions)) + } +} +
--- a/src/main.rs Wed Jul 13 21:49:41 2022 -0700 +++ b/src/main.rs Thu Jul 14 20:35:14 2022 -0700 @@ -1,3 +1,14 @@ + +mod db; +mod configdb; +mod logsdb; + +mod guards; + +use crate::configdb::{ConfigDB, ConfigDBSession}; +use crate::guards::{USER_ID_COOKIE_KEY, HeadersGuard, LoggedInGuard}; +use crate::logsdb::{LogsDB, LogsDBSession}; + use anyhow::{self, Context, Error}; use either::Either; use log::{debug, error, info, warn, Level}; @@ -23,225 +34,6 @@ use std::path::{Path, PathBuf}; use std::time::Instant; -#[cfg(feature = "sqlite")] -type DBType = Sqlite; -#[cfg(feature = "sqlite")] -type PoolType = SqlitePool; - -// Current SQL queries don't work with postgres. -#[cfg(feature = "postgres")] -use rocket_db_pools::sqlx::{PgPool, Postgres}; -#[cfg(feature = "postgres")] -type DBType = Postgres; -#[cfg(feature = "postgres")] -type PoolType = PgPool; - -// TO DO: use other databases? -#[derive(Database)] -#[database("sqlite_main")] -struct ConfigDB(PoolType); - -#[derive(Default, Clone, Debug, FromRow)] -struct UserRecord { - name: String, - tz_offset: i64, -} - -struct ConfigDBSession<'p, DB: sqlx::Database>(&'p mut sqlx::pool::PoolConnection<DB>); - -impl<'p> ConfigDBSession<'p, Sqlite> { - async fn check_user_password<S: AsRef<str>>( - &mut self, - user: S, - password: S, - ) -> Result<bool, Error> { - // TODO: salt passwords. - let salt: String = match sqlx::query("SELECT salt FROM users WHERE username = ? LIMIT 1;") - .bind(user.as_ref()) - .fetch_one(&mut *self.0) - .await - { - Ok(r) => r.get(0), - Err(e) => { - warn!("Error querying salt: {}", e); - return Ok(false); - } - }; - let pwdhash = sha256::digest(format!("{}{}", salt, password.as_ref())); - let q = sqlx::query("SELECT username FROM users WHERE username = ? AND password_hash = ?;") - .bind(user.as_ref()) - .bind(pwdhash); - let result = self.0.fetch_all(q).await?; - Ok(result.len() == 1) - } - - async fn get_user_details(&mut self, user: &str) -> Result<UserRecord, Error> { - let entry = sqlx::query("SELECT name, tz_offset FROM users WHERE username = ? LIMIT 1") - .bind(user) - .fetch_one(&mut *self.0) - .await?; - Ok(UserRecord::from_row(&entry)?) - } -} - -#[derive(Database)] -#[database("sqlite_logs")] -struct LogsDB(PoolType); - -struct LogsDBSession<'p, DB: sqlx::Database>(&'p mut sqlx::pool::PoolConnection<DB>); - -impl<'p> LogsDBSession<'p, Sqlite> { - async fn log_request< - S1: AsRef<str>, - S2: AsRef<str>, - S3: AsRef<str>, - S4: AsRef<str>, - S5: AsRef<str>, - S6: AsRef<str>, - >( - &mut self, - session: Option<u32>, - ip: S1, - domain: S2, - path: S3, - status: u32, - page: Option<S4>, - refer: Option<S5>, - ua: S6, - ntags: u32, - ) -> Result<u32, Error> { - let q = sqlx::query::<DBType>("INSERT INTO RequestLog (session, ip, atime, domain, path, status, pagename, refer, ua, ntags) VALUES (?, ?, strftime('%s', 'now'), ?, ?, ?, ?, ?, ?, ?) RETURNING id"); - let q = q - .bind(session) - .bind(ip.as_ref()) - .bind(domain.as_ref()) - .bind(path.as_ref()) - .bind(status) - .bind(page.map(|s| s.as_ref().to_string())) - .bind(refer.map(|s| s.as_ref().to_string())) - .bind(ua.as_ref()) - .bind(ntags); - let row: u32 = q.fetch_one(&mut *self.0).await?.get(0); - Ok(row) - } - - async fn log_tags<S: AsRef<str>, I: Iterator<Item = S>>( - &mut self, - requestid: u32, - tags: I, - ) -> Result<usize, Error> { - let mut ntags = 0; - for tag in tags { - let (k, v) = tag.as_ref().split_once("=").unwrap_or((tag.as_ref(), "")); - sqlx::query("INSERT INTO RequestTags (requestid, key, value) VALUES (?, ?, ?)") - .bind(requestid) - .bind(k) - .bind(v) - .execute(&mut *self.0) - .await - .map_err(|e| error!("Couldn't insert tag {}={}: {}", k, v, e)) - .unwrap(); - ntags += 1; - } - - Ok(ntags) - } - - async fn start_session(&mut self, domain: &str) -> Result<u32, Error> { - Ok(sqlx::query("INSERT INTO Sessions (start, last, domain) VALUES (strftime('%s', 'now'), strftime('%s', 'now'), ?) RETURNING id") - .bind(domain) - .fetch_one(&mut *self.0) - .await?.get(0)) - } - - async fn update_session_time(&mut self, id: u32) -> Result<(), Error> { - sqlx::query("UPDATE Sessions SET last = strftime('%s', 'now') WHERE id = ?") - .bind(id) - .execute(&mut *self.0) - .await?; - Ok(()) - } - - async fn query_visits_sessions_counts<S: AsRef<str>>( - &mut self, - from: OffsetDateTime, - to: OffsetDateTime, - tz_offset: Option<i64>, - domainpattern: Option<S>, - ) -> Result<(Vec<String>, Vec<u32>, Vec<u32>), Error> { - let domain = domainpattern.as_ref().map(AsRef::as_ref).unwrap_or("%"); - let mut results = sqlx::query( - r#" -SELECT DATE(atime + ?, 'unixepoch') AS rqdate, COUNT(requestlog.id) AS rqcount, sesscount -FROM requestlog -JOIN ( - SELECT DATE(start + ?, 'unixepoch') AS sessdate, COUNT(*) AS sesscount - FROM sessions WHERE sessions.domain LIKE ? GROUP BY sessdate) -AS sc ON (rqdate = sessdate) -WHERE atime > ? AND atime < ? AND requestlog.domain LIKE ? -GROUP BY rqdate -ORDER BY rqdate ASC;"#, - ) - .bind(tz_offset.unwrap_or(0)) - .bind(tz_offset.unwrap_or(0)) - .bind(domain) - .bind(from.unix_timestamp()) - .bind(to.unix_timestamp()) - .bind(domain) - .fetch(&mut *self.0); - - // Result table: date / visit count / session count - let mut dates = vec![]; //Vec::<String>::with_capacity(results.len()); - let mut visits = vec![]; //Vec::<u32>::with_capacity(results.len()); - let mut sessions = vec![]; //Vec::<u32>::with_capacity(results.len()); - - loop { - match results.next().await { - Some(Ok(row)) => { - dates.push(row.get(0)); - visits.push(row.get(1)); - sessions.push(row.get(2)); - } - None => break, - Some(Err(e)) => { - error!("Error querying visits/sessions: {}", e); - return Err(e).context("query visits/session counts"); - } - } - } - - Ok((dates, visits, sessions)) - } -} - -const USER_ID_COOKIE_KEY: &str = "user_id"; - -struct LoggedInGuard(String); - -#[rocket::async_trait] -impl<'r> FromRequest<'r> for LoggedInGuard { - type Error = Error; - - async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> { - let cookies = req.cookies(); - if let Some(uc) = cookies.get_private(USER_ID_COOKIE_KEY) { - Outcome::Success(LoggedInGuard(uc.value().to_string())) - } else { - Outcome::Forward(()) - } - } -} - -struct HeadersGuard<'h>(HeaderMap<'h>); - -#[rocket::async_trait] -impl<'r> FromRequest<'r> for HeadersGuard<'r> { - type Error = Error; - - async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> { - Outcome::Success(HeadersGuard(req.headers().clone())) - } -} #[derive(Responder)] enum LoginResponse {