From 57955d1480d9eee10e60750911e2cf95b69bc6d9 Mon Sep 17 00:00:00 2001 From: Michael DM Dryden Date: Thu, 28 May 2020 01:18:52 -0400 Subject: [PATCH] Initial commit --- db.py | 61 +++++++ json_dump_parser.py | 125 +++++++++++++++ log_storage.py | 79 +++++++++ main.py | 174 ++++++++++++++++++++ parse.py | 106 +++++++++++++ requirements.txt | 6 + stats.py | 379 ++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 930 insertions(+) create mode 100644 db.py create mode 100644 json_dump_parser.py create mode 100644 log_storage.py create mode 100644 main.py create mode 100644 parse.py create mode 100644 requirements.txt create mode 100644 stats.py diff --git a/db.py b/db.py new file mode 100644 index 0000000..9b83bea --- /dev/null +++ b/db.py @@ -0,0 +1,61 @@ +import logging + +from sqlalchemy.engine import Engine + + +logger = logging.getLogger(__name__) + + +def init_dbs(engine: Engine): + sql = """ + create table if not exists messages_utc + ( + message_id bigint, + date timestamptz, + from_user bigint, + forward_from_message_id bigint, + forward_from bigint, + forward_from_chat bigint, + caption text, + text text, + sticker_set_name text, + new_chat_title text, + reply_to_message bigint, + file_id text, + type text + ); + + create index if not exists messages_utc_date_index + on messages_utc (date); + + create index if not exists messages_utc_from_user_index + on messages_utc (from_user); + + create index if not exists messages_utc_type_index + on messages_utc (type); + + create table if not exists user_events + ( + message_id bigint, + user_id bigint, + date timestamp with time zone, + event text + ); + + create index if not exists ix_user_events_message_id + on user_events (message_id); + + create table if not exists user_names + ( + user_id bigint, + date timestamptz, + username text + ); + + create index if not exists user_names_user_id_date_index + on user_names (user_id, date); + + """ + + with engine.connect() as con: + con.execute(sql) diff --git a/json_dump_parser.py b/json_dump_parser.py new file mode 100644 index 0000000..7c95856 --- /dev/null +++ b/json_dump_parser.py @@ -0,0 +1,125 @@ +import json +import typing + +import pandas as pd + +media_dict = {'sticker': 'sticker', + 'animation': 'animation', + 'video_file': 'video', + 'voice_message': 'voice', + 'audio_file': 'audio', + 'video_message': 'video_note'} + +user_event_cat = pd.Categorical(['left', 'joined']) +message_type_cat = pd.Categorical(['migrate_from_group', 'text', 'pinned_message', 'photo', 'sticker', + 'new_chat_members', 'left_chat_member', 'animation', 'video', + 'location', 'new_chat_title', 'voice', 'audio', + 'new_chat_photo', 'video_note', 'poll']) + + +def text_list_parser(text: typing.Union[str, typing.Sequence]) -> str: + if isinstance(text, str): + return text + out = "" + for block in text: + try: + out += block['text'] + except TypeError: + out += block + return out + + +def convert_messages(df: pd.DataFrame) -> typing.Tuple[typing.List[dict], typing.List[dict]]: + messages_out = [] + users_out = [] + for message in df.itertuples(): + message_dict = {'message_id': message.id, + 'date': message.date, + 'from_user': None, + 'forward_from_message_id': None, + 'forward_from': None, + 'forward_from_chat': None, + 'caption': "", + 'text': "", + 'sticker_set_name': "", + 'new_chat_title': "", + 'reply_to_message': None, + 'file_id': None, + 'type': None, + } + user_event_dict = {} + if message.type == 'message': + if pd.notnull(message.from_id): + message_dict['from_user'] = message.from_id + + if pd.notnull(message.forwarded_from): + try: + message_dict['forward_from'] = int(message.forwarded_from) + except ValueError: + pass + + if pd.notnull(message.reply_to_message_id): + message_dict['reply_to_message'] = message.reply_to_message_id + + if pd.notnull(message.photo): + message_dict['type'] = 'photo' + if message.text != "": + message_dict['caption'] = text_list_parser(message.text) + elif pd.notnull(message.media_type): + if message.text != "": + message_dict['caption'] = text_list_parser(message.text) + message_dict['type'] = media_dict[message.media_type] + if message.media_type == 'sticker' and '.webp' not in message.file: + message_dict['file_id'] = message.file + elif message.text != "": + message_dict['type'] = 'text' + message_dict['text'] = text_list_parser(message.text) + elif pd.notnull(message.poll): + message_dict['type'] = 'poll' + elif pd.notnull(message.location_information): + message_dict['type'] = 'location' + + elif message.type == 'service': + if pd.notnull(message.actor_id): + message_dict['from_user'] = message.actor_id + + if message.action == 'edit_group_title': + message_dict['type'] = 'new_chat_title' + message_dict['new_chat_title'] = message.title + elif message.action == 'pin_message': + message_dict['type'] = 'pinned_message' + elif message.action == 'edit_group_photo': + message_dict['type'] = 'new_chat_photo' + elif message.action == 'invite_members' or message.action == 'join_group_by_link': + message_dict['type'] = 'new_chat_members' + try: + for i in message.members: + users_out.append({'message_id': message.id, + 'user_id': i, + 'date': message.date, + 'event': 'join'}) + except TypeError: + user_event_dict = {'message_id': message.id, + 'user_id': message.actor_id, + 'date': message.date, + 'event': 'join'} + elif message.action == 'remove_members': + message_dict['type'] = 'left_chat_member' + for i in message.members: + users_out.append({'message_id': message.id, + 'user_id': i, + 'date': message.date, + 'event': 'left'}) + else: + message_dict['type'] = message.action + messages_out.append(message_dict) + if user_event_dict != {}: + users_out.append(user_event_dict) + return messages_out, users_out + + +def parse_json(path: str): + with open(path, encoding='utf-8') as f: + js = json.load(f) + chat = js['chats']['list'][1]['messages'] + df = pd.DataFrame(chat) diff --git a/log_storage.py b/log_storage.py new file mode 100644 index 0000000..26fd272 --- /dev/null +++ b/log_storage.py @@ -0,0 +1,79 @@ +import datetime +import logging +import json +import os + +from sqlalchemy import MetaData, Table, Column, create_engine, BigInteger, TIMESTAMP, Text + +from parse import MessageDict +from db import init_dbs + +logger = logging.getLogger(__name__) +metadata = MetaData() + +messages = Table('messages_utc', metadata, + Column('message_id', BigInteger), + Column('date', TIMESTAMP), + Column('from_user', BigInteger), + Column('forward_from_message_id', BigInteger), + Column('forward_from', BigInteger), + Column('forward_from_chat', BigInteger), + Column('caption', Text), + Column('text', Text), + Column('sticker_set_name', Text), + Column('new_chat_title', Text), + Column('reply_to_message', BigInteger), + Column('file_id', Text), + Column('type', Text)) +user_events = Table('user_events', metadata, + Column('message_id', BigInteger), + Column('user_id', BigInteger), + Column('date', TIMESTAMP), + Column('event', Text)) + + +def date_converter(o): + if isinstance(o, datetime.datetime): + return o.__str__() + + +class JSONStore(object): + def __init__(self, path: str): + self.store = path + + def append_data(self, name: str, data: MessageDict): + with open(os.path.join(self.store, f"{name}.json"), 'a') as f: + f.write(json.dumps(data, default=date_converter) + "\n") + + +class PostgresStore(object): + def __init__(self, connection_url: str): + self.engine = create_engine(connection_url, echo=False) + init_dbs(self.engine) + + def append_data(self, name: str, data: MessageDict): + data['date'] = str(data['date']) + if name == 'messages': + with self.engine.connect() as con: + _ = con.execute(messages.insert(), data) + elif name == 'user_events': + with self.engine.connect() as con: + _ = con.execute(user_events.insert(), data) + else: + logger.warning("Tried to append to invalid table %s", name) + + def update_data(self, name: str, data: MessageDict): + data['date'] = str(data['date']) + if name == 'messages': + update_statement = messages.update()\ + .where(messages.c.message_id == data['message_id']) + with self.engine.connect() as con: + _ = con.execute(update_statement, data) + elif name == 'user_events': + update_statement = user_events.update()\ + .where(user_events.c.message_id == data['message_id']) + with self.engine.connect() as con: + _ = con.execute(update_statement, data) + else: + logger.warning("Tried to update to invalid table %s", name) + diff --git a/main.py b/main.py new file mode 100644 index 0000000..b0219c0 --- /dev/null +++ b/main.py @@ -0,0 +1,174 @@ +import logging +import json +import argparse +import shlex +import warnings +import os + +import telegram +from telegram.error import BadRequest +from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext +from telegram.ext.dispatcher import run_async +from telegram.update import Update +import appdirs + +from parse import parse_message +from log_storage import JSONStore, PostgresStore +from stats import StatsRunner, get_parser, HelpException + + +warnings.filterwarnings("ignore") + +logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + level=logging.INFO) +logger = logging.getLogger(__name__) + +stats = None + +try: + with open("./sticker-keys.json", 'r') as f: + stickers = json.load(f) +except FileNotFoundError: + stickers = {} +sticker_idx = None +sticker_id = None + + +def log_message(update: Update, context: CallbackContext): + if update.edited_message: + edited_message, user = parse_message(update.effective_message) + bak_store.append_data('edited-messages', edited_message) + store.update_data('messages', edited_message) + return + + try: + logger.info(update.effective_message.message_id) + except AttributeError: + logger.warning("No effective_message attribute") + message, user = parse_message(update.effective_message) + + if message: + bak_store.append_data('messages', message) + store.append_data('messages', message) + if len(user) > 0: + for i in user: + if i: + bak_store.append_data('user_events', i) + store.append_data('user_events', i) + + +@run_async +def update_usernames(context: CallbackContext): # context.job.context contains the chat_id + user_ids = stats.get_message_user_ids() + db_users = stats.get_db_users() + tg_users = {user_id: None for user_id in user_ids} + to_update = {} + for u_id in tg_users: + try: + user = context.bot.get_chat_member(chat_id=context.job.context, user_id=u_id).user + tg_users[u_id] = user.name, user.full_name + if tg_users[u_id] != db_users[u_id]: + if tg_users[u_id][1] == db_users[u_id][1]: # Flag these so we don't insert new row + to_update[u_id] = tg_users[u_id][0], None + else: + to_update[u_id] = tg_users[u_id] + except BadRequest: # Handle users no longer in chat (or maybe haven't interacted with the bot??) + pass + stats.update_user_ids(to_update) + if stats.users_lock.acquire(timeout=10): + stats.users = stats.get_db_users() + stats.users_lock.release() + else: + logger.warning("Couldn't acquire username lock.") + return + logger.info("Usernames updated") + + +@run_async +def print_stats(update: Update, context: CallbackContext): + if update.effective_user.id not in stats.users: + return + + stats_parser = get_parser(stats) + image = None + + try: + ns = stats_parser.parse_args(shlex.split(" ".join(context.args))) + except HelpException as e: + text = e.msg + try: + context.bot.send_message(chat_id=update.effective_user.id, + text=f"```\n{text}\n```", + parse_mode=telegram.ParseMode.MARKDOWN_V2) + except BadRequest: + context.bot.send_message(chat_id=update.effective_chat.id, + text=f"```\n{text}\n```", + parse_mode=telegram.ParseMode.MARKDOWN_V2) + return + except argparse.ArgumentError as e: + text = str(e) + else: + args = vars(ns) + func = args.pop('func') + + try: + if args['me']: + args['user'] = update.effective_user.id, update.effective_user.name + del args['me'] + except KeyError: + pass + + try: + text, image = func(**args) + except HelpException as e: + text = e.msg + try: + context.bot.send_message(chat_id=update.effective_user.id, + text=f"```\n{text}\n```", + parse_mode=telegram.ParseMode.MARKDOWN_V2) + except BadRequest: + context.bot.send_message(chat_id=update.effective_chat.id, + text=f"```\n{text}\n```", + parse_mode=telegram.ParseMode.MARKDOWN_V2) + return + + if text: + context.bot.send_message(chat_id=update.effective_chat.id, + text=f"```\n{text}\n```", + parse_mode=telegram.ParseMode.MARKDOWN_V2) + if image: + context.bot.send_photo(chat_id=update.effective_chat.id, photo=image) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('token', type=str, help="Telegram bot token") + parser.add_argument('chat_id', type=int, help="Telegram chat id to monitor.") + parser.add_argument('postgres_url', type=str, help="Sqlalchemy-compatible postgresql url.") + parser.add_argument('--json-path', type=str, + help="Either full path to backup storage folder or prefix (will be stored in app data dir.", + default="chat") + args = parser.parse_args() + + updater = Updater(token=args.token, use_context=True) + dispatcher = updater.dispatcher + + path = args.json_path + if not os.path.split(path)[0]: # Empty string for left part of path + path = os.path.join(appdirs.user_data_dir('telegram-stats-bot'), path) + + os.makedirs(path, exist_ok=True) + bak_store = JSONStore(path) + store = PostgresStore(args.postgres_url) + stats = StatsRunner(store.engine) + + stats_handler = CommandHandler('stats', print_stats, filters=~Filters.update.edited_message) + dispatcher.add_handler(stats_handler) + + log_handler = MessageHandler(Filters.chat(chat_id=args.chat_id), log_message) + dispatcher.add_handler(log_handler) + + job_queue = updater.job_queue + update_users_job = job_queue.run_repeating(update_usernames, interval=3600, first=0, context=args.chat_id) + + updater.start_polling() diff --git a/parse.py b/parse.py new file mode 100644 index 0000000..cd3985b --- /dev/null +++ b/parse.py @@ -0,0 +1,106 @@ +from typing import TypedDict, Tuple, Union, List +from datetime import datetime + +import telegram + + +class MessageDict(TypedDict): + message_id: int + date: Union[str, datetime] + from_user: int + forward_from_message_id: Union[int, None] + forward_from: Union[int, None] + forward_from_chat: Union[int, None] + caption: Union[str, None] + text: Union[str, None] + sticker_set_name: Union[str, None] + new_chat_title: Union[str, None] + reply_to_message: Union[int, None] + file_id: Union[str, None] + type: str + + +def parse_message(message: telegram.message.Message) -> Tuple[dict, List[dict]]: + message_dict: MessageDict = {'message_id': message.message_id, + 'date': message.date, + 'from_user': None, + 'forward_from_message_id': message.forward_from_message_id, + 'forward_from': None, + 'forward_from_chat': None, + 'caption': message.caption, + 'text': message.text, + 'sticker_set_name': None, + 'new_chat_title': message.new_chat_title, + 'reply_to_message': None, + 'file_id': None, + 'type': None, + } + user_event_dict = [{}] + + if message.from_user: + message_dict['from_user'] = message.from_user.id + + if message.forward_from: + try: + message_dict['forward_from'] = message.forward_from.id + except AttributeError: + pass + try: + message_dict['forward_from_chat'] = message.forward_from_chat.id + except AttributeError: + pass + + if message.reply_to_message: + message_dict['reply_to_message'] = message.reply_to_message.message_id + + if message.text: + message_dict['type'] = 'text' + elif message.animation: + message_dict['type'] = 'animation' + message_dict['file_id'] = message.animation.file_id + elif message.audio: + message_dict['type'] = 'audio' + message_dict['file_id'] = message.audio.file_id + elif message.document: + message_dict['type'] = 'document' + message_dict['file_id'] = message.document.file_id + elif message.game: + message_dict['type'] = 'game' + elif message.photo: + message_dict['type'] = 'photo' + elif message.sticker: + message_dict['type'] = 'sticker' + message_dict['file_id'] = message.sticker.file_id + message_dict['sticker_set_name']: message.sticker.set_name + elif message.video: + message_dict['type'] = 'video' + elif message.video_note: + message_dict['type'] = 'video_note' + elif message.voice: + message_dict['type'] = 'voice' + elif message.location: + message_dict['type'] = 'location' + elif message.poll: + message_dict['type'] = 'poll' + elif message.new_chat_title: + message_dict['type'] = 'new_chat_title' + elif message.new_chat_photo: + message_dict['type'] = 'new_chat_photo' + elif message.pinned_message: + message_dict['type'] = 'pinned_message' + elif message.new_chat_members: + message_dict['type'] = 'new_chat_members' + member: telegram.user.User + user_event_dict = [{'message_id': message.message_id, + 'user_id': u_id, + 'date': message.date, + 'event': 'joined'} + for u_id in [member.id for member in message.new_chat_members]] + elif message.left_chat_member: + message_dict['type'] = 'left_chat_member' + user_event_dict = [{'message_id': message.message_id, + 'user_id': message.left_chat_member.id, + 'date': message.date, + 'event': 'left'}] + + return message_dict, user_event_dict diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..fe0fd42 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +python-telegram-bot +SQLAlchemy~=1.3.17 +appdirs~=1.4.4 +pandas~=1.0.3 +seaborn~=0.10.1 +matplotlib~=3.2.1 diff --git a/stats.py b/stats.py new file mode 100644 index 0000000..1af466f --- /dev/null +++ b/stats.py @@ -0,0 +1,379 @@ +import logging +from typing import Dict, List, Tuple, Text, NoReturn +from threading import Lock +from io import BytesIO +import argparse +import inspect +import re + +import pandas as pd +import seaborn as sns +from matplotlib.figure import Figure +from sqlalchemy.engine import Engine + +sns.set_context('paper') +sns.set_style('whitegrid') +sns.set_palette("Set2") + +logger = logging.getLogger() + + +class HelpException(Exception): + def __init__(self, msg: str = None): + self.msg = msg + + +class InternalParser(argparse.ArgumentParser): + def error(self, message: Text) -> NoReturn: + try: + raise # Reraises mostly ArgumentError for bad arg + except RuntimeError: + raise HelpException(message) + + def print_help(self, file=None) -> None: + raise HelpException(self.format_help()) + + def exit(self, status=None, message=None): + pass + + +class StatsRunner(object): + allowed_methods = {'counts': "get_chat_counts", + 'hours': "get_counts_by_hour", + 'days': "get_counts_by_day", + 'history': "get_message_history"} + + def __init__(self, engine: Engine, tz: str = 'America/Toronto'): + self.engine = engine + self.tz = tz + + self.users: Dict[int, Tuple[str, str]] = self.get_db_users() + self.users_lock = Lock() + + def get_message_user_ids(self) -> List[int]: + with self.engine.connect() as con: + result = con.execute("SELECT DISTINCT from_user FROM messages_utc;") + return [user for user, in result.fetchall()] + + def get_db_users(self) -> Dict[int, Tuple[str, str]]: + query = """ + select user_id, username, display_name from ( + select + *, + row_number() over(partition by user_id order by date desc) as rn + from + user_names + ) t + where t.rn = 1 + """ + + with self.engine.connect() as con: + result = con.execute(query) + result = result.fetchall() + + return {user_id: (username, name) for user_id, username, name in result} + + def update_user_ids(self, user_dict: Dict[int, Tuple[str, str]]): + for uid in user_dict: + username, display_name = user_dict[uid] + query = f""" + UPDATE user_names + SET username = '{username}' + WHERE user_id = {uid} AND username IS DISTINCT FROM '{username}'; + """ + if display_name: + query += f"""\n + INSERT INTO user_names(user_id, date, username, display_name) + VALUES ({uid}, current_timestamp, '{username}', '{display_name}'); + """ + + with self.engine.connect() as con: + con.execute(query) + + def get_chat_counts(self, n: int = None, start: str = None, end: str = None) -> Tuple[str, None]: + """ + Get top chat users + :param n: Number of users to show + :param start: Start timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21") + :param end: End timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21") + :return: + """ + date_query = None + + if n is not None: + if n <= 0: + raise HelpException(f'n must be greater than 0, got: {n}') + else: + n = 20 + + if start: + start_dt = pd.to_datetime(start) + date_query = f"WHERE date >= '{start_dt}'" + + if end: + end_dt = pd.to_datetime(end) + if date_query: + date_query += f" AND date < '{end_dt}'" + else: + date_query = f"WHERE date < '{end_dt}'" + + query = f""" + SELECT "from_user", COUNT(*) + FROM "messages_utc" + {date_query} + GROUP BY "from_user" + ORDER BY "count" DESC; + """ + with self.engine.connect() as con: + df = pd.read_sql_query(query, con, index_col='from_user') + + user_df = pd.Series(self.users, name="user") + user_df = user_df.apply(lambda x: x[0]) # Take only @usernames + df = df.join(user_df) + df['Percent'] = df['count'] / df['count'].sum() * 100 + df = df[['user', 'count', 'Percent']] + df.columns = ['User', 'Total Messages', 'Percent'] + df['User'] = df['User'].str.replace(r'[^\x00-\x7F]', "", regex=True) + + return df.iloc[:n].to_string(index=False, header=True, float_format=lambda x: f"{x:.1f}"), None + + def get_counts_by_hour(self, user: Tuple[int, str] = None, start: str = None, end: str = None)\ + -> Tuple[None, BytesIO]: + """ + Get plot of messages for hours of the day + :param start: Start timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21") + :param end: End timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21") + """ + query_conditions = [] + + if start: + start_dt = pd.to_datetime(start) + query_conditions.append(f"date >= '{start_dt}'") + + if end: + end_dt = pd.to_datetime(end) + query_conditions.append(f"date < '{end_dt}'") + + if user: + query_conditions.append(f"from_user = {user[0]}") + + query_where = "" + if query_conditions: + query_where = f"WHERE {' AND '.join(query_conditions)}" + + query = f""" + SELECT date_trunc('hour', date) as day, count(*) as messages + FROM messages_utc + {query_where} + GROUP BY day + ORDER BY day + """ + + with self.engine.connect() as con: + df = pd.read_sql_query(query, con) + + df['day'] = pd.to_datetime(df.day) + df['day'] = df.day.dt.tz_convert(self.tz) + df = df.set_index('day') + df = df.asfreq('h', fill_value=0) # Insert 0s for periods with no messages + df['hour'] = df.index.hour + + if user: + # Aggregate over 1 week periods + df = df.groupby('hour').resample('7D').sum().drop(columns='hour') + df['hour'] = df.index.get_level_values('hour') + + fig = Figure(constrained_layout=True) + subplot = fig.subplots() + + sns.stripplot(x='hour', y='messages', data=df, jitter=.4, size=2, ax=subplot, alpha=.5, zorder=0) + sns.boxplot(x='hour', y='messages', data=df, whis=1, showfliers=False, whiskerprops={"zorder": 10}, + boxprops={"zorder": 10}, + ax=subplot, zorder=10) + subplot.set_ylim(bottom=0, top=df['messages'].quantile(0.999, interpolation='higher')) + + subplot.axvspan(11.5, 23.5, zorder=0, color=(0, 0, 0, 0.05)) + subplot.set_xlim(-1, 24) + + if user: + subplot.set_ylabel('Messages per Week') + subplot.set_title(f"Messages by Hour for {user[1]}") + else: + subplot.set_ylabel('Messages per Day') + subplot.set_title("Messages by Hour") + + sns.despine(fig=fig) + + bio = BytesIO() + bio.name = 'plot.png' + fig.savefig(bio) + bio.seek(0) + + return None, bio + + def get_counts_by_day(self, user: Tuple[int, str] = None, start: str = None, end: str = None, plot: str = None)\ + -> Tuple[None, BytesIO]: + """ + Get plot of messages for days of the week + :param start: Start timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21") + :param end: End timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21") + """ + query_conditions = [] + + if start: + start_dt = pd.to_datetime(start) + query_conditions.append(f"date >= '{start_dt}'") + + if end: + end_dt = pd.to_datetime(end) + query_conditions.append(f"date < '{end_dt}'") + + if user: + query_conditions.append(f"from_user = {user[0]}") + + query_where = "" + if query_conditions: + query_where = f"WHERE {' AND '.join(query_conditions)}" + + query = f""" + SELECT date_trunc('day', date) + as day, count(*) as messages + FROM messages_utc + {query_where} + GROUP BY day + ORDER BY day + """ + + with self.engine.connect() as con: + df = pd.read_sql_query(query, con) + + df['day'] = pd.to_datetime(df.day) + df['day'] = df.day.dt.tz_convert(self.tz) + df = df.set_index('day') + df = df.asfreq('d', fill_value=0) # Fill periods with no messages + df['dow'] = df.index.weekday + df['day_name'] = df.index.day_name() + df = df.sort_values('dow') # Make sure start is Monday + + fig = Figure(constrained_layout=True) + subplot = fig.subplots() + if plot == 'box': + sns.boxplot(x='day_name', y='messages', data=df, whis=1, showfliers=False, ax=subplot) + elif plot == 'violin' or plot is None: + sns.violinplot(x='day_name', y='messages', data=df, cut=0, inner="box", scale='width', ax=subplot) + else: + raise HelpException("plot must be either box or violin") + subplot.axvspan(4.5, 6.5, zorder=0, color=(0, .8, 0, 0.1)) + subplot.set_xlabel('') + subplot.set_ylabel('Messages per Day') + if user: + subplot.set_title(f"Messages by Day of Week for {user[1]}") + else: + subplot.set_title("Messages by Day of Week") + + sns.despine(fig=fig) + + bio = BytesIO() + bio.name = 'plot.png' + subplot.get_figure().savefig(bio) + bio.seek(0) + + return None, bio + + def get_message_history(self, user: Tuple[int, str] = None, averages: int = None, start: str = None, end: str = None)\ + -> Tuple[None, BytesIO]: + """ + Make a plot of message history over time + :param averages: Moving average width (in days) + :param start: Start timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21") + :param end: End timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21") + """ + query_conditions = [] + if averages is None: + averages = 30 + + if start: + start_dt = pd.to_datetime(start) + query_conditions.append(f"date >= '{start_dt}'") + + if end: + end_dt = pd.to_datetime(end) + query_conditions.append(f"date < '{end_dt}'") + + if user: + query_conditions.append(f"from_user = {user[0]}") + + query_where = "" + if query_conditions: + query_where = f"WHERE {' AND '.join(query_conditions)}" + + query = f""" + SELECT date_trunc('day', date) + as day, count(*) as messages + FROM messages_utc + {query_where} + GROUP BY day + ORDER BY day + """ + + with self.engine.connect() as con: + df = pd.read_sql_query(query, con) + df['day'] = pd.to_datetime(df.day) + df['day'] = df.day.dt.tz_convert(self.tz) + if averages: + df['msg_rolling'] = df['messages'].rolling(averages, center=True).mean() + + fig = Figure() # TODO: One day pandas will let you use constrained_layout=True here... + subplot = fig.subplots() + df.plot(x='day', y='messages', alpha=0.5, legend=False, ax=subplot) + if averages: + df.plot(x='day', y='msg_rolling', legend=False, ax=subplot) + subplot.set_ylabel("Messages") + subplot.set_xlabel("Date") + if user: + subplot.set_title(f"Message History for {user[1]}") + else: + subplot.set_title("Message History") + sns.despine(fig=fig) + fig.tight_layout() + + bio = BytesIO() + bio.name = 'plot.png' + fig.savefig(bio) + bio.seek(0) + + return None, bio + + +def get_parser(runner: StatsRunner) -> InternalParser: + parser = InternalParser(prog="/stats") + parser.set_defaults(func=runner.get_chat_counts) + subparsers = parser.add_subparsers(title="Statistics:") + + for name, func in runner.allowed_methods.items(): + try: + doc = inspect.getdoc(getattr(runner, func)).splitlines() + except AttributeError: + doc = None + subparser = subparsers.add_parser(name, help=doc[0]) + subparser.set_defaults(func=getattr(runner, func)) + f_args = inspect.signature(getattr(runner, func)).parameters + + for _, arg in f_args.items(): + arg: inspect.Parameter + if arg.name == 'self': + continue + if arg.name == 'user': + group = subparser.add_mutually_exclusive_group() + group.add_argument('-me', action='store_true', help='calculate stats for yourself') + group.add_argument('-user', type=int, help=argparse.SUPPRESS) + else: + if doc: + arg_doc = None + for line in doc: + match = re.match(rf"^:param {arg.name}: (.*)", line) + if match: + arg_doc = match.group(1) + subparser.add_argument(f"-{arg.name}", type=arg.annotation, help=arg_doc) + + return parser