Mercurial > lbo > hg > analyrics
view src/logsdb.rs @ 76:a58e1922e173
Fix some redirects and date calculations
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Fri, 09 Sep 2022 07:08:20 +0200 |
parents | d8ab669f71e5 |
children | fd0237049be0 |
line wrap: on
line source
use crate::db::{DBType, PoolType}; use anyhow::{Context, Error}; use log::error; use time::OffsetDateTime; use rocket::futures::{future::ready, StreamExt}; use rocket::http::hyper::uri::Uri; use rocket::serde::Serialize; use rocket_db_pools::sqlx::{Row, Sqlite}; use rocket_db_pools::Database; use sqlx::prelude::FromRow; use std::collections::HashMap; #[derive(Database)] #[database("sqlite_logs")] pub struct LogsDB(pub PoolType); pub struct LogsDBSession<'p, DB: sqlx::Database>(pub &'p mut sqlx::pool::PoolConnection<DB>); pub struct LogsQueryContext { pub domain: Option<String>, pub from: OffsetDateTime, pub to: OffsetDateTime, pub tz_offset: i64, pub include_bots: bool, } #[derive(sqlx::FromRow, Serialize)] #[serde(crate = "rocket::serde")] pub struct RecentSessionsRow { pub start: i64, pub duration: i64, pub count: i64, pub refer: Option<String>, pub origin_country: Option<String>, pub origin_city: Option<String>, pub ua: String, pub alltags: String, } impl<'p> LogsDBSession<'p, Sqlite> { pub async fn log_request< S1: AsRef<str>, S2: AsRef<str>, S3: AsRef<str>, S4: AsRef<str>, S5: AsRef<str>, S6: AsRef<str>, >( &mut self, session: Option<u32>, ip: S1, domain: S2, path: S3, status: u32, page: Option<S4>, refer: Option<S5>, ua: S6, ntags: u32, ) -> Result<u32, Error> { let q = sqlx::query::<DBType>("INSERT INTO RequestLog (session, ip, atime, domain, path, status, pagename, refer, ua, ntags) VALUES (?, ?, strftime('%s', 'now'), ?, ?, ?, ?, ?, ?, ?) RETURNING id"); let q = q .bind(session) .bind(ip.as_ref()) .bind(domain.as_ref()) .bind(path.as_ref()) .bind(status) .bind(page.map(|s| s.as_ref().to_string())) .bind(refer.map(|s| s.as_ref().to_string())) .bind(ua.as_ref()) .bind(ntags); let row: u32 = q.fetch_one(&mut *self.0).await?.get(0); Ok(row) } pub async fn log_tags<S: AsRef<str>, I: Iterator<Item = S>>( &mut self, requestid: u32, tags: I, ) -> Result<usize, Error> { let mut ntags = 0; for tag in tags { let (k, v) = tag.as_ref().split_once("=").unwrap_or((tag.as_ref(), "")); sqlx::query("INSERT INTO RequestTags (requestid, key, value) VALUES (?, ?, ?)") .bind(requestid) .bind(k) .bind(v) .execute(&mut *self.0) .await .map_err(|e| error!("Couldn't insert tag {}={}: {}", k, v, e)) .unwrap(); ntags += 1; } Ok(ntags) } // Takes domain and (country, city) tuple. pub async fn start_session( &mut self, domain: &str, orig: Option<(String, String)>, is_bot: bool, ) -> Result<u32, Error> { let (country, city) = orig.unwrap_or((Default::default(), Default::default())); Ok(sqlx::query( r#" INSERT INTO Sessions (start, last, domain, origin_country, origin_city, is_bot) VALUES (strftime('%s', 'now'), strftime('%s', 'now'), ?, ?, ?, ?) RETURNING id"#, ) .bind(domain) .bind(country) .bind(city) .bind(is_bot) .fetch_one(&mut *self.0) .await? .get(0)) } pub async fn update_session_time(&mut self, id: u32) -> Result<(), Error> { sqlx::query("UPDATE Sessions SET last = strftime('%s', 'now') WHERE id = ?") .bind(id) .execute(&mut *self.0) .await?; Ok(()) } pub async fn query_visits_sessions_counts( &mut self, ctx: &LogsQueryContext, ) -> Result<(Vec<String>, Vec<u32>, Vec<u32>), Error> { let domain = ctx.domain.as_ref().map(String::as_str).unwrap_or("%"); let tz_offset = ctx.tz_offset; let include_bots = if ctx.include_bots { 1 } else { 0 }; let mut results = sqlx::query( r#" SELECT DATE(atime + ?, 'unixepoch') AS rqdate, COUNT(requestlog.id) AS rqcount, sesscount FROM requestlog JOIN ( SELECT DATE(start + ?, 'unixepoch') AS sessdate, COUNT(*) AS sesscount FROM sessions WHERE sessions.domain LIKE ? AND sessions.is_bot <= ? GROUP BY sessdate) AS sc ON (rqdate = sessdate) JOIN sessions ON (sessions.id = requestlog.session) WHERE atime > ? AND atime < ? AND requestlog.domain LIKE ? AND sessions.is_bot <= ? GROUP BY rqdate ORDER BY rqdate ASC;"#, ) .bind(tz_offset) .bind(tz_offset) .bind(domain) .bind(include_bots) .bind(ctx.from.unix_timestamp()) .bind(ctx.to.unix_timestamp()) .bind(domain) .bind(include_bots) .fetch(&mut *self.0); // Result table: date / visit count / session count let mut dates = vec![]; //Vec::<String>::with_capacity(results.len()); let mut visits = vec![]; //Vec::<u32>::with_capacity(results.len()); let mut sessions = vec![]; //Vec::<u32>::with_capacity(results.len()); loop { match results.next().await { Some(Ok(row)) => { dates.push(row.get(0)); visits.push(row.get(1)); sessions.push(row.get(2)); } None => break, Some(Err(e)) => { error!("Error querying visits/sessions: {}", e); return Err(e).context("query visits/session counts"); } } } Ok((dates, visits, sessions)) } pub async fn query_top_countries( &mut self, ctx: &LogsQueryContext, ) -> Result<Vec<(String, i64)>, Error> { let tz_offset = ctx.tz_offset; let include_bots = if ctx.include_bots { 1 } else { 0 }; let result = sqlx::query( r#" SELECT origin_country, COUNT(*) AS count FROM sessions WHERE start+? > ? AND start+? < ? AND domain LIKE ? AND is_bot <= ? GROUP BY origin_country ORDER BY count DESC;"#, ) .bind(tz_offset) .bind(ctx.from.unix_timestamp()) .bind(tz_offset) .bind(ctx.to.unix_timestamp()) .bind(ctx.domain.as_ref().map(String::as_str).unwrap_or("%")) .bind(include_bots) .fetch_all(&mut *self.0) .await?; Ok(result .into_iter() .map(|row| (row.get(0), row.get(1))) .collect()) } pub async fn query_top_paths( &mut self, ctx: &LogsQueryContext, n: i64, ) -> Result<Vec<(String, i64)>, Error> { let tz_offset = ctx.tz_offset; let include_bots = if ctx.include_bots { 1 } else { 0 }; let result = sqlx::query( r#" SELECT path, COUNT(*) AS pathcount FROM RequestLog JOIN Sessions ON (Sessions.id = Requestlog.session) WHERE atime > ? AND atime < ? AND RequestLog.domain LIKE ? AND Sessions.is_bot <= ? GROUP BY path ORDER BY pathcount DESC LIMIT ?;"#, ) .bind(ctx.from.unix_timestamp()) .bind(ctx.to.unix_timestamp()) .bind(ctx.domain.as_ref().map(String::as_str).unwrap_or("%")) .bind(include_bots) .bind(n) .fetch(&mut *self.0); Ok(result .map(|r| r.map(|e| (e.get(0), e.get(1)))) .take_while(|e| { if e.is_ok() { ready(true) } else { error!("SQL error: query_top_paths: {}", e.as_ref().unwrap_err()); ready(false) } }) .map(|e| e.unwrap()) .collect() .await) } pub async fn query_requests_per_session( &mut self, ctx: &LogsQueryContext, ) -> Result<Vec<(String, f32)>, Error> { let tz_offset = ctx.tz_offset; let include_bots = if ctx.include_bots { 1 } else { 0 }; let result = sqlx::query( r#" SELECT d, CAST(nreq AS REAL)/nses FROM (SELECT DATE(atime+?, 'unixepoch') AS d, COUNT(*) AS nreq FROM RequestLog JOIN Sessions ON (Sessions.id = RequestLog.session) WHERE atime > ? AND atime < ? AND RequestLog.domain LIKE ? AND is_bot <= ? GROUP BY d) JOIN (SELECT DATE(start+?, 'unixepoch') AS d2, COUNT(*) as nses FROM Sessions WHERE start > ? AND start < ? AND Sessions.domain LIKE ? AND is_bot <= ? GROUP BY d2) ON (d = d2) ORDER BY d ASC"#, ) .bind(tz_offset) .bind(ctx.from.unix_timestamp()) .bind(ctx.to.unix_timestamp()) .bind(ctx.domain.as_ref().map(String::as_str).unwrap_or("%")) .bind(include_bots) .bind(tz_offset) .bind(ctx.from.unix_timestamp()) .bind(ctx.to.unix_timestamp()) .bind(ctx.domain.as_ref().map(String::as_str).unwrap_or("%")) .bind(include_bots) .fetch(&mut *self.0); Ok(result .map(|r| r.map(|e| (e.get(0), e.get(1)))) .take_while(|e| { if e.is_ok() { ready(true) } else { error!( "SQL error: query_requests_per_session: {}", e.as_ref().unwrap_err() ); ready(false) } }) .map(|e| e.unwrap()) .collect() .await) } pub async fn query_top_refer_domains( &mut self, ctx: &LogsQueryContext, ) -> Result<Vec<(String, i64)>, Error> { let tz_offset = ctx.tz_offset; let include_bots = if ctx.include_bots { 1 } else { 0 }; let result = sqlx::query( r#" SELECT refer, count(*) FROM RequestLog JOIN Sessions ON (RequestLog.session = Sessions.id) WHERE instr(RequestLog.refer, RequestLog.domain) = 0 AND RequestLog.atime > ? AND RequestLog.atime < ? AND Sessions.is_bot <= ? AND RequestLog.domain LIKE ? GROUP BY refer; "#, ) .bind(ctx.from.unix_timestamp()) .bind(ctx.to.unix_timestamp()) .bind(include_bots) .bind(ctx.domain.as_ref().map(String::as_str).unwrap_or("%")) .fetch(&mut *self.0); let mut by_origin = HashMap::<String, i64>::new(); result .take_while(|row| { if let Err(ref e) = row { error!("query_top_refer_domains: Unable to fetch row: {}", e); ready(false) } else { ready(true) } }) .filter(|row| ready(row.is_ok())) .map(|row| { let row = row.unwrap(); (row.get(0), row.get(1)) }) .map(|(refer, count): (String, i64)| { if let Ok(u) = refer.parse::<Uri>() { by_origin .entry(u.host().map(str::to_string).unwrap_or(String::new())) .and_modify(|c| *c += count) .or_insert(count); } else if refer.is_empty() { by_origin .entry(String::new()) .and_modify(|c| *c += count) .or_insert(count); } }) .collect::<()>() .await; let mut by_origin_vec = by_origin.into_iter().collect::<Vec<(String, i64)>>(); by_origin_vec.sort_by_key(|(_, v)| -*v); Ok(by_origin_vec) } pub async fn query_recent_sessions( &mut self, ctx: &LogsQueryContext, n: i64, ) -> Result<Vec<RecentSessionsRow>, Error> { let tz_offset = ctx.tz_offset; let include_bots = if ctx.include_bots { 1 } else { 0 }; // Check later if this query possibly has bad scaling behavior due to no restrictions on // requesttags query. let result = sqlx::query( r#" SELECT Sessions.start AS start, Sessions.last-Sessions.start AS duration, COUNT(RequestLog.id) AS count, RequestLog.Refer AS refer, Sessions.origin_country AS origin_country, Sessions.origin_city AS origin_city, RequestLog.ua AS ua, GROUP_CONCAT(tagstring, ",") AS alltags FROM RequestLog JOIN Sessions ON (RequestLog.session = Sessions.id) LEFT JOIN (SELECT requestid, GROUP_CONCAT(IIF(value, PRINTF("%s=%s", IIF(key, key, ""), value), key)) AS tagstring FROM RequestTags GROUP BY requestid) AS AccumTags ON (RequestLog.id = AccumTags.requestid) WHERE atime >= ? AND atime <= ? AND Sessions.is_bot <= ? GROUP BY Sessions.id ORDER BY Sessions.start DESC, RequestLog.atime DESC LIMIT ?;"#, ) .bind(ctx.from.unix_timestamp()) .bind(ctx.to.unix_timestamp()) .bind(include_bots) .bind(n) .fetch(&mut *self.0) .take_while(|r| { if let Err(e) = r { error!("Error fetching in query_last_visits: {}", e); } ready(r.is_ok()) }) .map(Result::unwrap) .map(|r| RecentSessionsRow::from_row(&r)) .take_while(|r| { if let Err(e) = r { error!("Error parsing row in query_last_visits: {}", e); } ready(r.is_ok()) }) .map(Result::unwrap) .collect::<Vec<RecentSessionsRow>>() .await; Ok(result) } }