Mercurial > lbo > hg > analyrics
view src/logsdb.rs @ 26:b1850e6f4d9a
Split up source code
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Thu, 14 Jul 2022 20:35:14 -0700 |
parents | |
children | 792eb8ac3d93 |
line wrap: on
line source
use crate::db::{PoolType, DBType}; use anyhow::{Context, Error}; use log::{debug, error, info, warn, Level}; use time::{Duration, OffsetDateTime}; use rocket::futures::StreamExt; use rocket_db_pools::sqlx::{Executor, Row, Sqlite, SqlitePool}; use rocket_db_pools::{Connection, Database, Pool}; use sqlx::prelude::FromRow; #[derive(Database)] #[database("sqlite_logs")] pub struct LogsDB(pub PoolType); pub struct LogsDBSession<'p, DB: sqlx::Database>(pub &'p mut sqlx::pool::PoolConnection<DB>); impl<'p> LogsDBSession<'p, Sqlite> { pub async fn log_request< S1: AsRef<str>, S2: AsRef<str>, S3: AsRef<str>, S4: AsRef<str>, S5: AsRef<str>, S6: AsRef<str>, >( &mut self, session: Option<u32>, ip: S1, domain: S2, path: S3, status: u32, page: Option<S4>, refer: Option<S5>, ua: S6, ntags: u32, ) -> Result<u32, Error> { let q = sqlx::query::<DBType>("INSERT INTO RequestLog (session, ip, atime, domain, path, status, pagename, refer, ua, ntags) VALUES (?, ?, strftime('%s', 'now'), ?, ?, ?, ?, ?, ?, ?) RETURNING id"); let q = q .bind(session) .bind(ip.as_ref()) .bind(domain.as_ref()) .bind(path.as_ref()) .bind(status) .bind(page.map(|s| s.as_ref().to_string())) .bind(refer.map(|s| s.as_ref().to_string())) .bind(ua.as_ref()) .bind(ntags); let row: u32 = q.fetch_one(&mut *self.0).await?.get(0); Ok(row) } pub async fn log_tags<S: AsRef<str>, I: Iterator<Item = S>>( &mut self, requestid: u32, tags: I, ) -> Result<usize, Error> { let mut ntags = 0; for tag in tags { let (k, v) = tag.as_ref().split_once("=").unwrap_or((tag.as_ref(), "")); sqlx::query("INSERT INTO RequestTags (requestid, key, value) VALUES (?, ?, ?)") .bind(requestid) .bind(k) .bind(v) .execute(&mut *self.0) .await .map_err(|e| error!("Couldn't insert tag {}={}: {}", k, v, e)) .unwrap(); ntags += 1; } Ok(ntags) } pub async fn start_session(&mut self, domain: &str) -> Result<u32, Error> { Ok(sqlx::query("INSERT INTO Sessions (start, last, domain) VALUES (strftime('%s', 'now'), strftime('%s', 'now'), ?) RETURNING id") .bind(domain) .fetch_one(&mut *self.0) .await?.get(0)) } pub async fn update_session_time(&mut self, id: u32) -> Result<(), Error> { sqlx::query("UPDATE Sessions SET last = strftime('%s', 'now') WHERE id = ?") .bind(id) .execute(&mut *self.0) .await?; Ok(()) } pub async fn query_visits_sessions_counts<S: AsRef<str>>( &mut self, from: OffsetDateTime, to: OffsetDateTime, tz_offset: Option<i64>, domainpattern: Option<S>, ) -> Result<(Vec<String>, Vec<u32>, Vec<u32>), Error> { let domain = domainpattern.as_ref().map(AsRef::as_ref).unwrap_or("%"); let mut results = sqlx::query( r#" SELECT DATE(atime + ?, 'unixepoch') AS rqdate, COUNT(requestlog.id) AS rqcount, sesscount FROM requestlog JOIN ( SELECT DATE(start + ?, 'unixepoch') AS sessdate, COUNT(*) AS sesscount FROM sessions WHERE sessions.domain LIKE ? GROUP BY sessdate) AS sc ON (rqdate = sessdate) WHERE atime > ? AND atime < ? AND requestlog.domain LIKE ? GROUP BY rqdate ORDER BY rqdate ASC;"#, ) .bind(tz_offset.unwrap_or(0)) .bind(tz_offset.unwrap_or(0)) .bind(domain) .bind(from.unix_timestamp()) .bind(to.unix_timestamp()) .bind(domain) .fetch(&mut *self.0); // Result table: date / visit count / session count let mut dates = vec![]; //Vec::<String>::with_capacity(results.len()); let mut visits = vec![]; //Vec::<u32>::with_capacity(results.len()); let mut sessions = vec![]; //Vec::<u32>::with_capacity(results.len()); loop { match results.next().await { Some(Ok(row)) => { dates.push(row.get(0)); visits.push(row.get(1)); sessions.push(row.get(2)); } None => break, Some(Err(e)) => { error!("Error querying visits/sessions: {}", e); return Err(e).context("query visits/session counts"); } } } Ok((dates, visits, sessions)) } }