view src/logsdb.rs @ 75:d8ab669f71e5

Remove unnecessary log statement
author Lewin Bormann <lbo@spheniscida.de>
date Wed, 17 Aug 2022 18:06:41 +0200
parents eade8c0055bf
children a58e1922e173
line wrap: on
line source

use crate::db::{DBType, PoolType};

use anyhow::{Context, Error};
use log::error;
use time::OffsetDateTime;

use rocket::futures::{future::ready, StreamExt};
use rocket::http::hyper::uri::Uri;
use rocket::serde::Serialize;
use rocket_db_pools::sqlx::{Row, Sqlite};
use rocket_db_pools::Database;
use sqlx::prelude::FromRow;

use std::collections::HashMap;

#[derive(Database)]
#[database("sqlite_logs")]
pub struct LogsDB(pub PoolType);

pub struct LogsDBSession<'p, DB: sqlx::Database>(pub &'p mut sqlx::pool::PoolConnection<DB>);

pub struct LogsQueryContext {
    pub domain: Option<String>,
    pub from: OffsetDateTime,
    pub to: OffsetDateTime,
    pub include_bots: bool,
}

#[derive(sqlx::FromRow, Serialize)]
#[serde(crate = "rocket::serde")]
pub struct RecentSessionsRow {
    pub start: i64,
    pub duration: i64,
    pub count: i64,
    pub refer: Option<String>,
    pub origin_country: Option<String>,
    pub origin_city: Option<String>,
    pub ua: String,
    pub alltags: String,
}

impl<'p> LogsDBSession<'p, Sqlite> {
    pub async fn log_request<
        S1: AsRef<str>,
        S2: AsRef<str>,
        S3: AsRef<str>,
        S4: AsRef<str>,
        S5: AsRef<str>,
        S6: AsRef<str>,
    >(
        &mut self,
        session: Option<u32>,
        ip: S1,
        domain: S2,
        path: S3,
        status: u32,
        page: Option<S4>,
        refer: Option<S5>,
        ua: S6,
        ntags: u32,
    ) -> Result<u32, Error> {
        let q = sqlx::query::<DBType>("INSERT INTO RequestLog (session, ip, atime, domain, path, status, pagename, refer, ua, ntags) VALUES (?, ?, strftime('%s', 'now'), ?, ?, ?, ?, ?, ?, ?) RETURNING id");
        let q = q
            .bind(session)
            .bind(ip.as_ref())
            .bind(domain.as_ref())
            .bind(path.as_ref())
            .bind(status)
            .bind(page.map(|s| s.as_ref().to_string()))
            .bind(refer.map(|s| s.as_ref().to_string()))
            .bind(ua.as_ref())
            .bind(ntags);
        let row: u32 = q.fetch_one(&mut *self.0).await?.get(0);
        Ok(row)
    }

    pub async fn log_tags<S: AsRef<str>, I: Iterator<Item = S>>(
        &mut self,
        requestid: u32,
        tags: I,
    ) -> Result<usize, Error> {
        let mut ntags = 0;
        for tag in tags {
            let (k, v) = tag.as_ref().split_once("=").unwrap_or((tag.as_ref(), ""));
            sqlx::query("INSERT INTO RequestTags (requestid, key, value) VALUES (?, ?, ?)")
                .bind(requestid)
                .bind(k)
                .bind(v)
                .execute(&mut *self.0)
                .await
                .map_err(|e| error!("Couldn't insert tag {}={}: {}", k, v, e))
                .unwrap();
            ntags += 1;
        }

        Ok(ntags)
    }

    // Takes domain and (country, city) tuple.
    pub async fn start_session(
        &mut self,
        domain: &str,
        orig: Option<(String, String)>,
        is_bot: bool,
    ) -> Result<u32, Error> {
        let (country, city) = orig.unwrap_or((Default::default(), Default::default()));
        Ok(sqlx::query(
            r#"
INSERT INTO Sessions (start, last, domain, origin_country, origin_city, is_bot)
VALUES (strftime('%s', 'now'), strftime('%s', 'now'), ?, ?, ?, ?)
RETURNING id"#,
        )
        .bind(domain)
        .bind(country)
        .bind(city)
        .bind(is_bot)
        .fetch_one(&mut *self.0)
        .await?
        .get(0))
    }

    pub async fn update_session_time(&mut self, id: u32) -> Result<(), Error> {
        sqlx::query("UPDATE Sessions SET last = strftime('%s', 'now') WHERE id = ?")
            .bind(id)
            .execute(&mut *self.0)
            .await?;
        Ok(())
    }

    pub async fn query_visits_sessions_counts(
        &mut self,
        ctx: &LogsQueryContext,
    ) -> Result<(Vec<String>, Vec<u32>, Vec<u32>), Error> {
        let domain = ctx.domain.as_ref().map(String::as_str).unwrap_or("%");
        let include_bots = if ctx.include_bots { 1 } else { 0 };
        let mut results = sqlx::query(
            r#"
SELECT DATE(atime, 'unixepoch') AS rqdate, COUNT(requestlog.id) AS rqcount, sesscount
FROM requestlog
JOIN (
    SELECT DATE(start, 'unixepoch') AS sessdate, COUNT(*) AS sesscount
    FROM sessions WHERE sessions.domain LIKE ? AND sessions.is_bot <= ? GROUP BY sessdate)
AS sc ON (rqdate = sessdate)
JOIN sessions ON (sessions.id = requestlog.session)
WHERE atime > ? AND atime < ? AND requestlog.domain LIKE ? AND sessions.is_bot <= ?
GROUP BY rqdate
ORDER BY rqdate ASC;"#,
        )
        .bind(domain)
        .bind(include_bots)
        .bind(ctx.from.unix_timestamp())
        .bind(ctx.to.unix_timestamp())
        .bind(domain)
        .bind(include_bots)
        .fetch(&mut *self.0);

        // Result table: date / visit count / session count
        let mut dates = vec![]; //Vec::<String>::with_capacity(results.len());
        let mut visits = vec![]; //Vec::<u32>::with_capacity(results.len());
        let mut sessions = vec![]; //Vec::<u32>::with_capacity(results.len());

        loop {
            match results.next().await {
                Some(Ok(row)) => {
                    dates.push(row.get(0));
                    visits.push(row.get(1));
                    sessions.push(row.get(2));
                }
                None => break,
                Some(Err(e)) => {
                    error!("Error querying visits/sessions: {}", e);
                    return Err(e).context("query visits/session counts");
                }
            }
        }

        Ok((dates, visits, sessions))
    }

    pub async fn query_top_countries(
        &mut self,
        ctx: &LogsQueryContext,
    ) -> Result<Vec<(String, i64)>, Error> {
        let include_bots = if ctx.include_bots { 1 } else { 0 };
        let result = sqlx::query(
            r#"
SELECT origin_country, COUNT(*) AS count
FROM sessions
WHERE start > ? AND start < ? AND domain LIKE ? AND is_bot <= ?
GROUP BY origin_country
ORDER BY count DESC;"#,
        )
        .bind(ctx.from.unix_timestamp())
        .bind(ctx.to.unix_timestamp())
        .bind(ctx.domain.as_ref().map(String::as_str).unwrap_or("%"))
        .bind(include_bots)
        .fetch_all(&mut *self.0)
        .await?;

        Ok(result
            .into_iter()
            .map(|row| (row.get(0), row.get(1)))
            .collect())
    }

    pub async fn query_top_paths(
        &mut self,
        ctx: &LogsQueryContext,
        n: i64,
    ) -> Result<Vec<(String, i64)>, Error> {
        let include_bots = if ctx.include_bots { 1 } else { 0 };
        let result = sqlx::query(
            r#"
SELECT path, COUNT(*) AS pathcount
FROM RequestLog
JOIN Sessions ON (Sessions.id = Requestlog.session)
WHERE atime > ? AND atime < ? AND RequestLog.domain LIKE ? AND Sessions.is_bot <= ?
GROUP BY path
ORDER BY pathcount DESC
LIMIT ?;"#,
        )
        .bind(ctx.from.unix_timestamp())
        .bind(ctx.to.unix_timestamp())
        .bind(ctx.domain.as_ref().map(String::as_str).unwrap_or("%"))
        .bind(include_bots)
        .bind(n)
        .fetch(&mut *self.0);

        Ok(result
            .map(|r| r.map(|e| (e.get(0), e.get(1))))
            .take_while(|e| {
                if e.is_ok() {
                    ready(true)
                } else {
                    error!("SQL error: query_top_paths: {}", e.as_ref().unwrap_err());
                    ready(false)
                }
            })
            .map(|e| e.unwrap())
            .collect()
            .await)
    }

    pub async fn query_requests_per_session(
        &mut self,
        ctx: &LogsQueryContext,
    ) -> Result<Vec<(String, f32)>, Error> {
        let include_bots = if ctx.include_bots { 1 } else { 0 };
        let result = sqlx::query(
            r#"
SELECT d, CAST(nreq AS REAL)/nses
FROM
    (SELECT DATE(atime, 'unixepoch') AS d,
            COUNT(*) AS nreq
    FROM RequestLog
    JOIN Sessions ON (Sessions.id = RequestLog.session)
    WHERE atime > ? AND atime < ? AND RequestLog.domain LIKE ? AND is_bot <= ?
    GROUP BY d)
JOIN
    (SELECT DATE(start, 'unixepoch') AS d2,
            COUNT(*) as nses
     FROM Sessions
     WHERE start > ? AND start < ? AND Sessions.domain LIKE ? AND is_bot <= ?
     GROUP BY d2)
ON (d = d2)
ORDER BY d ASC"#,
        )
        .bind(ctx.from.unix_timestamp())
        .bind(ctx.to.unix_timestamp())
        .bind(ctx.domain.as_ref().map(String::as_str).unwrap_or("%"))
        .bind(include_bots)
        .bind(ctx.from.unix_timestamp())
        .bind(ctx.to.unix_timestamp())
        .bind(ctx.domain.as_ref().map(String::as_str).unwrap_or("%"))
        .bind(include_bots)
        .fetch(&mut *self.0);

        Ok(result
            .map(|r| r.map(|e| (e.get(0), e.get(1))))
            .take_while(|e| {
                if e.is_ok() {
                    ready(true)
                } else {
                    error!(
                        "SQL error: query_requests_per_session: {}",
                        e.as_ref().unwrap_err()
                    );
                    ready(false)
                }
            })
            .map(|e| e.unwrap())
            .collect()
            .await)
    }

    pub async fn query_top_refer_domains(
        &mut self,
        ctx: &LogsQueryContext,
    ) -> Result<Vec<(String, i64)>, Error> {
        let include_bots = if ctx.include_bots { 1 } else { 0 };
        let result = sqlx::query(
            r#"
SELECT refer, count(*)
FROM RequestLog
JOIN Sessions ON (RequestLog.session = Sessions.id)
WHERE instr(RequestLog.refer, RequestLog.domain) = 0 AND
    RequestLog.atime > ? AND RequestLog.atime < ? AND
    Sessions.is_bot <= ? AND
    RequestLog.domain LIKE ?
GROUP BY refer;
"#,
        )
        .bind(ctx.from.unix_timestamp())
        .bind(ctx.to.unix_timestamp())
        .bind(include_bots)
        .bind(ctx.domain.as_ref().map(String::as_str).unwrap_or("%"))
        .fetch(&mut *self.0);

        let mut by_origin = HashMap::<String, i64>::new();

        result
            .take_while(|row| {
                if let Err(ref e) = row {
                    error!("query_top_refer_domains: Unable to fetch row: {}", e);
                    ready(false)
                } else {
                    ready(true)
                }
            })
            .filter(|row| ready(row.is_ok()))
            .map(|row| {
                let row = row.unwrap();
                (row.get(0), row.get(1))
            })
            .map(|(refer, count): (String, i64)| {
                if let Ok(u) = refer.parse::<Uri>() {
                    by_origin
                        .entry(u.host().map(str::to_string).unwrap_or(String::new()))
                        .and_modify(|c| *c += count)
                        .or_insert(count);
                } else if refer.is_empty() {
                    by_origin
                        .entry(String::new())
                        .and_modify(|c| *c += count)
                        .or_insert(count);
                }
            })
            .collect::<()>()
            .await;

        let mut by_origin_vec = by_origin.into_iter().collect::<Vec<(String, i64)>>();
        by_origin_vec.sort_by_key(|(_, v)| -*v);
        Ok(by_origin_vec)
    }

    pub async fn query_recent_sessions(
        &mut self,
        ctx: &LogsQueryContext,
        n: i64,
    ) -> Result<Vec<RecentSessionsRow>, Error> {
        let include_bots = if ctx.include_bots { 1 } else { 0 };
        // Check later if this query possibly has bad scaling behavior due to no restrictions on
        // requesttags query.
        let result = sqlx::query(
            r#"
SELECT Sessions.start AS start,
    Sessions.last-Sessions.start AS duration,
    COUNT(RequestLog.id) AS count,
    RequestLog.Refer AS refer,
    Sessions.origin_country AS origin_country,
    Sessions.origin_city AS origin_city,
    RequestLog.ua AS ua,
    GROUP_CONCAT(tagstring, ",") AS alltags
FROM RequestLog
JOIN Sessions ON (RequestLog.session = Sessions.id)
LEFT JOIN
	(SELECT
		requestid,
		GROUP_CONCAT(IIF(value, PRINTF("%s=%s", IIF(key, key, ""), value), key)) AS tagstring
	FROM RequestTags GROUP BY requestid) AS AccumTags
	ON (RequestLog.id = AccumTags.requestid)
WHERE atime >= ? AND atime <= ? AND Sessions.is_bot <= ?
GROUP BY Sessions.id
ORDER BY Sessions.start DESC, RequestLog.atime DESC
LIMIT ?;"#,
        )
        .bind(ctx.from.unix_timestamp())
        .bind(ctx.to.unix_timestamp())
        .bind(include_bots)
        .bind(n)
        .fetch(&mut *self.0)
        .take_while(|r| {
            if let Err(e) = r {
                error!("Error fetching in query_last_visits: {}", e);
            }
            ready(r.is_ok())
        })
        .map(Result::unwrap)
        .map(|r| RecentSessionsRow::from_row(&r))
        .take_while(|r| {
            if let Err(e) = r {
                error!("Error parsing row in query_last_visits: {}", e);
            }
            ready(r.is_ok())
        })
        .map(Result::unwrap)
        .collect::<Vec<RecentSessionsRow>>()
        .await;
        Ok(result)
    }
}