view src/logsdb.rs @ 44:e7e91f288868

Add origin country chart
author Lewin Bormann <lbo@spheniscida.de>
date Tue, 19 Jul 2022 20:18:25 -0700
parents 17a1cf5af6f7
children 90a4e4589d5e
line wrap: on
line source

use crate::db::{DBType, PoolType};

use anyhow::{Context, Error};
use log::{debug, error, info, warn, Level};
use time::{Duration, OffsetDateTime};

use rocket::futures::{future::ready, StreamExt};
use rocket_db_pools::sqlx::{Executor, Row, Sqlite, SqlitePool};
use rocket_db_pools::{Connection, Database, Pool};
use sqlx::prelude::FromRow;

#[derive(Database)]
#[database("sqlite_logs")]
pub struct LogsDB(pub PoolType);

pub struct LogsDBSession<'p, DB: sqlx::Database>(pub &'p mut sqlx::pool::PoolConnection<DB>);

impl<'p> LogsDBSession<'p, Sqlite> {
    pub async fn log_request<
        S1: AsRef<str>,
        S2: AsRef<str>,
        S3: AsRef<str>,
        S4: AsRef<str>,
        S5: AsRef<str>,
        S6: AsRef<str>,
    >(
        &mut self,
        session: Option<u32>,
        ip: S1,
        domain: S2,
        path: S3,
        status: u32,
        page: Option<S4>,
        refer: Option<S5>,
        ua: S6,
        ntags: u32,
    ) -> Result<u32, Error> {
        let q = sqlx::query::<DBType>("INSERT INTO RequestLog (session, ip, atime, domain, path, status, pagename, refer, ua, ntags) VALUES (?, ?, strftime('%s', 'now'), ?, ?, ?, ?, ?, ?, ?) RETURNING id");
        let q = q
            .bind(session)
            .bind(ip.as_ref())
            .bind(domain.as_ref())
            .bind(path.as_ref())
            .bind(status)
            .bind(page.map(|s| s.as_ref().to_string()))
            .bind(refer.map(|s| s.as_ref().to_string()))
            .bind(ua.as_ref())
            .bind(ntags);
        let row: u32 = q.fetch_one(&mut *self.0).await?.get(0);
        Ok(row)
    }

    pub async fn log_tags<S: AsRef<str>, I: Iterator<Item = S>>(
        &mut self,
        requestid: u32,
        tags: I,
    ) -> Result<usize, Error> {
        let mut ntags = 0;
        for tag in tags {
            let (k, v) = tag.as_ref().split_once("=").unwrap_or((tag.as_ref(), ""));
            sqlx::query("INSERT INTO RequestTags (requestid, key, value) VALUES (?, ?, ?)")
                .bind(requestid)
                .bind(k)
                .bind(v)
                .execute(&mut *self.0)
                .await
                .map_err(|e| error!("Couldn't insert tag {}={}: {}", k, v, e))
                .unwrap();
            ntags += 1;
        }

        Ok(ntags)
    }

    // Takes domain and (country, city) tuple.
    pub async fn start_session(
        &mut self,
        domain: &str,
        orig: Option<(String, String)>,
        is_bot: bool,
    ) -> Result<u32, Error> {
        let (country, city) = orig.unwrap_or((Default::default(), Default::default()));
        Ok(sqlx::query(
            r#"
INSERT INTO Sessions (start, last, domain, origin_country, origin_city, is_bot)
VALUES (strftime('%s', 'now'), strftime('%s', 'now'), ?, ?, ?, ?)
RETURNING id"#,
        )
        .bind(domain)
        .bind(country)
        .bind(city)
        .bind(is_bot)
        .fetch_one(&mut *self.0)
        .await?
        .get(0))
    }

    pub async fn update_session_time(&mut self, id: u32) -> Result<(), Error> {
        sqlx::query("UPDATE Sessions SET last = strftime('%s', 'now') WHERE id = ?")
            .bind(id)
            .execute(&mut *self.0)
            .await?;
        Ok(())
    }

    pub async fn query_visits_sessions_counts<S: AsRef<str>>(
        &mut self,
        from: OffsetDateTime,
        to: OffsetDateTime,
        tz_offset: Option<i64>,
        domainpattern: Option<S>,
    ) -> Result<(Vec<String>, Vec<u32>, Vec<u32>), Error> {
        let domain = domainpattern.as_ref().map(AsRef::as_ref).unwrap_or("%");
        let tz_offset = tz_offset.unwrap_or(0);
        let mut results = sqlx::query(
            r#"
SELECT DATE(atime + ?, 'unixepoch') AS rqdate, COUNT(requestlog.id) AS rqcount, sesscount
FROM requestlog
JOIN (
    SELECT DATE(start + ?, 'unixepoch') AS sessdate, COUNT(*) AS sesscount
    FROM sessions WHERE sessions.domain LIKE ? GROUP BY sessdate)
AS sc ON (rqdate = sessdate)
WHERE atime+? > ? AND atime+? < ? AND requestlog.domain LIKE ?
GROUP BY rqdate
ORDER BY rqdate ASC;"#,
        )
        .bind(tz_offset)
        .bind(tz_offset)
        .bind(domain)
        .bind(tz_offset)
        .bind(from.unix_timestamp())
        .bind(tz_offset)
        .bind(to.unix_timestamp())
        .bind(domain)
        .fetch(&mut *self.0);

        // Result table: date / visit count / session count
        let mut dates = vec![]; //Vec::<String>::with_capacity(results.len());
        let mut visits = vec![]; //Vec::<u32>::with_capacity(results.len());
        let mut sessions = vec![]; //Vec::<u32>::with_capacity(results.len());

        loop {
            match results.next().await {
                Some(Ok(row)) => {
                    dates.push(row.get(0));
                    visits.push(row.get(1));
                    sessions.push(row.get(2));
                }
                None => break,
                Some(Err(e)) => {
                    error!("Error querying visits/sessions: {}", e);
                    return Err(e).context("query visits/session counts");
                }
            }
        }

        Ok((dates, visits, sessions))
    }

    pub async fn query_top_countries(
        &mut self,
        from: OffsetDateTime,
        to: OffsetDateTime,
        tz_offset: Option<i64>,
        domain: Option<&str>,
    ) -> Result<Vec<(String, i64)>, Error> {
        let tz_offset = tz_offset.unwrap_or(0);
        let result = sqlx::query(
            r#"
SELECT origin_country, COUNT(*) AS count
FROM sessions
WHERE start+? > ? AND start+? < ? AND domain LIKE ?
GROUP BY origin_country
ORDER BY count DESC;"#,
        )
        .bind(tz_offset)
        .bind(from.unix_timestamp())
        .bind(tz_offset)
        .bind(to.unix_timestamp())
        .bind(domain.unwrap_or("%"))
        .fetch_all(&mut *self.0).await?;

        Ok(result.into_iter().map(|row| (row.get(0), row.get(1))).collect())
    }

    pub async fn query_top_paths(
        &mut self,
        from: OffsetDateTime,
        to: OffsetDateTime,
        tz_offset: Option<i64>,
        domainpattern: Option<&str>,
        n: i64,
    ) -> Result<Vec<(String, i64)>, Error> {
        let tz_offset = tz_offset.unwrap_or(0);
        let result = sqlx::query(
            r#"
SELECT path, COUNT(*) AS pathcount
FROM RequestLog
WHERE atime+? > ? AND atime+? < ? AND domain LIKE ?
GROUP BY path
ORDER BY pathcount DESC
LIMIT ?;"#,
        )
        .bind(tz_offset)
        .bind(from.unix_timestamp())
        .bind(tz_offset)
        .bind(to.unix_timestamp())
        .bind(domainpattern.unwrap_or("%"))
        .bind(n)
        .fetch(&mut *self.0);

        Ok(result
            .map(|r| r.map(|e| (e.get(0), e.get(1))))
            .take_while(|e| {
                if e.is_ok() {
                    ready(true)
                } else {
                    error!("SQL error: query_top_paths: {}", e.as_ref().unwrap_err());
                    ready(false)
                }
            })
            .map(|e| e.unwrap())
            .collect()
            .await)
    }

    pub async fn query_requests_per_session(
        &mut self,
        from: OffsetDateTime,
        to: OffsetDateTime,
        tz_offset: Option<i64>,
        domainpattern: Option<&str>,
    ) -> Result<Vec<(String, f32)>, Error> {
        let tz_offset = tz_offset.unwrap_or(0);
        let result = sqlx::query(
            r#"
SELECT d, CAST(nreq AS REAL)/nses
FROM
    (SELECT DATE(atime, 'unixepoch') AS d,
            COUNT(*) AS nreq
    FROM RequestLog
    WHERE atime+? > ? AND atime+? < ? AND domain LIKE ?
    GROUP BY d)
JOIN
    (SELECT DATE(start, 'unixepoch') AS d2,
            COUNT(*) as nses
     FROM Sessions
     WHERE start+? > ? AND start+? < ? AND domain LIKE ?
     GROUP BY d2)
ON (d = d2)
ORDER BY d ASC"#,
        )
        .bind(tz_offset)
        .bind(from.unix_timestamp())
        .bind(tz_offset)
        .bind(to.unix_timestamp())
        .bind(domainpattern.unwrap_or("%"))
        .bind(tz_offset)
        .bind(from.unix_timestamp())
        .bind(tz_offset)
        .bind(to.unix_timestamp())
        .bind(domainpattern.unwrap_or("%"))
        .fetch(&mut *self.0);

        Ok(result
            .map(|r| r.map(|e| (e.get(0), e.get(1))))
            .take_while(|e| {
                if e.is_ok() {
                    ready(true)
                } else {
                    error!(
                        "SQL error: query_requests_per_session: {}",
                        e.as_ref().unwrap_err()
                    );
                    ready(false)
                }
            })
            .map(|e| e.unwrap())
            .collect()
            .await)
    }
}