kopia lustrzana https://github.com/mkdryden/telegram-stats-bot
Initial commit
commit
57955d1480
|
@ -0,0 +1,61 @@
|
|||
import logging
|
||||
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def init_dbs(engine: Engine):
|
||||
sql = """
|
||||
create table if not exists messages_utc
|
||||
(
|
||||
message_id bigint,
|
||||
date timestamptz,
|
||||
from_user bigint,
|
||||
forward_from_message_id bigint,
|
||||
forward_from bigint,
|
||||
forward_from_chat bigint,
|
||||
caption text,
|
||||
text text,
|
||||
sticker_set_name text,
|
||||
new_chat_title text,
|
||||
reply_to_message bigint,
|
||||
file_id text,
|
||||
type text
|
||||
);
|
||||
|
||||
create index if not exists messages_utc_date_index
|
||||
on messages_utc (date);
|
||||
|
||||
create index if not exists messages_utc_from_user_index
|
||||
on messages_utc (from_user);
|
||||
|
||||
create index if not exists messages_utc_type_index
|
||||
on messages_utc (type);
|
||||
|
||||
create table if not exists user_events
|
||||
(
|
||||
message_id bigint,
|
||||
user_id bigint,
|
||||
date timestamp with time zone,
|
||||
event text
|
||||
);
|
||||
|
||||
create index if not exists ix_user_events_message_id
|
||||
on user_events (message_id);
|
||||
|
||||
create table if not exists user_names
|
||||
(
|
||||
user_id bigint,
|
||||
date timestamptz,
|
||||
username text
|
||||
);
|
||||
|
||||
create index if not exists user_names_user_id_date_index
|
||||
on user_names (user_id, date);
|
||||
|
||||
"""
|
||||
|
||||
with engine.connect() as con:
|
||||
con.execute(sql)
|
|
@ -0,0 +1,125 @@
|
|||
import json
|
||||
import typing
|
||||
|
||||
import pandas as pd
|
||||
|
||||
media_dict = {'sticker': 'sticker',
|
||||
'animation': 'animation',
|
||||
'video_file': 'video',
|
||||
'voice_message': 'voice',
|
||||
'audio_file': 'audio',
|
||||
'video_message': 'video_note'}
|
||||
|
||||
user_event_cat = pd.Categorical(['left', 'joined'])
|
||||
message_type_cat = pd.Categorical(['migrate_from_group', 'text', 'pinned_message', 'photo', 'sticker',
|
||||
'new_chat_members', 'left_chat_member', 'animation', 'video',
|
||||
'location', 'new_chat_title', 'voice', 'audio',
|
||||
'new_chat_photo', 'video_note', 'poll'])
|
||||
|
||||
|
||||
def text_list_parser(text: typing.Union[str, typing.Sequence]) -> str:
|
||||
if isinstance(text, str):
|
||||
return text
|
||||
out = ""
|
||||
for block in text:
|
||||
try:
|
||||
out += block['text']
|
||||
except TypeError:
|
||||
out += block
|
||||
return out
|
||||
|
||||
|
||||
def convert_messages(df: pd.DataFrame) -> typing.Tuple[typing.List[dict], typing.List[dict]]:
|
||||
messages_out = []
|
||||
users_out = []
|
||||
for message in df.itertuples():
|
||||
message_dict = {'message_id': message.id,
|
||||
'date': message.date,
|
||||
'from_user': None,
|
||||
'forward_from_message_id': None,
|
||||
'forward_from': None,
|
||||
'forward_from_chat': None,
|
||||
'caption': "",
|
||||
'text': "",
|
||||
'sticker_set_name': "",
|
||||
'new_chat_title': "",
|
||||
'reply_to_message': None,
|
||||
'file_id': None,
|
||||
'type': None,
|
||||
}
|
||||
user_event_dict = {}
|
||||
if message.type == 'message':
|
||||
if pd.notnull(message.from_id):
|
||||
message_dict['from_user'] = message.from_id
|
||||
|
||||
if pd.notnull(message.forwarded_from):
|
||||
try:
|
||||
message_dict['forward_from'] = int(message.forwarded_from)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if pd.notnull(message.reply_to_message_id):
|
||||
message_dict['reply_to_message'] = message.reply_to_message_id
|
||||
|
||||
if pd.notnull(message.photo):
|
||||
message_dict['type'] = 'photo'
|
||||
if message.text != "":
|
||||
message_dict['caption'] = text_list_parser(message.text)
|
||||
elif pd.notnull(message.media_type):
|
||||
if message.text != "":
|
||||
message_dict['caption'] = text_list_parser(message.text)
|
||||
message_dict['type'] = media_dict[message.media_type]
|
||||
if message.media_type == 'sticker' and '.webp' not in message.file:
|
||||
message_dict['file_id'] = message.file
|
||||
elif message.text != "":
|
||||
message_dict['type'] = 'text'
|
||||
message_dict['text'] = text_list_parser(message.text)
|
||||
elif pd.notnull(message.poll):
|
||||
message_dict['type'] = 'poll'
|
||||
elif pd.notnull(message.location_information):
|
||||
message_dict['type'] = 'location'
|
||||
|
||||
elif message.type == 'service':
|
||||
if pd.notnull(message.actor_id):
|
||||
message_dict['from_user'] = message.actor_id
|
||||
|
||||
if message.action == 'edit_group_title':
|
||||
message_dict['type'] = 'new_chat_title'
|
||||
message_dict['new_chat_title'] = message.title
|
||||
elif message.action == 'pin_message':
|
||||
message_dict['type'] = 'pinned_message'
|
||||
elif message.action == 'edit_group_photo':
|
||||
message_dict['type'] = 'new_chat_photo'
|
||||
elif message.action == 'invite_members' or message.action == 'join_group_by_link':
|
||||
message_dict['type'] = 'new_chat_members'
|
||||
try:
|
||||
for i in message.members:
|
||||
users_out.append({'message_id': message.id,
|
||||
'user_id': i,
|
||||
'date': message.date,
|
||||
'event': 'join'})
|
||||
except TypeError:
|
||||
user_event_dict = {'message_id': message.id,
|
||||
'user_id': message.actor_id,
|
||||
'date': message.date,
|
||||
'event': 'join'}
|
||||
elif message.action == 'remove_members':
|
||||
message_dict['type'] = 'left_chat_member'
|
||||
for i in message.members:
|
||||
users_out.append({'message_id': message.id,
|
||||
'user_id': i,
|
||||
'date': message.date,
|
||||
'event': 'left'})
|
||||
else:
|
||||
message_dict['type'] = message.action
|
||||
messages_out.append(message_dict)
|
||||
if user_event_dict != {}:
|
||||
users_out.append(user_event_dict)
|
||||
return messages_out, users_out
|
||||
|
||||
|
||||
def parse_json(path: str):
|
||||
with open(path, encoding='utf-8') as f:
|
||||
js = json.load(f)
|
||||
chat = js['chats']['list'][1]['messages']
|
||||
df = pd.DataFrame(chat)
|
|
@ -0,0 +1,79 @@
|
|||
import datetime
|
||||
import logging
|
||||
import json
|
||||
import os
|
||||
|
||||
from sqlalchemy import MetaData, Table, Column, create_engine, BigInteger, TIMESTAMP, Text
|
||||
|
||||
from parse import MessageDict
|
||||
from db import init_dbs
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
metadata = MetaData()
|
||||
|
||||
messages = Table('messages_utc', metadata,
|
||||
Column('message_id', BigInteger),
|
||||
Column('date', TIMESTAMP),
|
||||
Column('from_user', BigInteger),
|
||||
Column('forward_from_message_id', BigInteger),
|
||||
Column('forward_from', BigInteger),
|
||||
Column('forward_from_chat', BigInteger),
|
||||
Column('caption', Text),
|
||||
Column('text', Text),
|
||||
Column('sticker_set_name', Text),
|
||||
Column('new_chat_title', Text),
|
||||
Column('reply_to_message', BigInteger),
|
||||
Column('file_id', Text),
|
||||
Column('type', Text))
|
||||
user_events = Table('user_events', metadata,
|
||||
Column('message_id', BigInteger),
|
||||
Column('user_id', BigInteger),
|
||||
Column('date', TIMESTAMP),
|
||||
Column('event', Text))
|
||||
|
||||
|
||||
def date_converter(o):
|
||||
if isinstance(o, datetime.datetime):
|
||||
return o.__str__()
|
||||
|
||||
|
||||
class JSONStore(object):
|
||||
def __init__(self, path: str):
|
||||
self.store = path
|
||||
|
||||
def append_data(self, name: str, data: MessageDict):
|
||||
with open(os.path.join(self.store, f"{name}.json"), 'a') as f:
|
||||
f.write(json.dumps(data, default=date_converter) + "\n")
|
||||
|
||||
|
||||
class PostgresStore(object):
|
||||
def __init__(self, connection_url: str):
|
||||
self.engine = create_engine(connection_url, echo=False)
|
||||
init_dbs(self.engine)
|
||||
|
||||
def append_data(self, name: str, data: MessageDict):
|
||||
data['date'] = str(data['date'])
|
||||
if name == 'messages':
|
||||
with self.engine.connect() as con:
|
||||
_ = con.execute(messages.insert(), data)
|
||||
elif name == 'user_events':
|
||||
with self.engine.connect() as con:
|
||||
_ = con.execute(user_events.insert(), data)
|
||||
else:
|
||||
logger.warning("Tried to append to invalid table %s", name)
|
||||
|
||||
def update_data(self, name: str, data: MessageDict):
|
||||
data['date'] = str(data['date'])
|
||||
if name == 'messages':
|
||||
update_statement = messages.update()\
|
||||
.where(messages.c.message_id == data['message_id'])
|
||||
with self.engine.connect() as con:
|
||||
_ = con.execute(update_statement, data)
|
||||
elif name == 'user_events':
|
||||
update_statement = user_events.update()\
|
||||
.where(user_events.c.message_id == data['message_id'])
|
||||
with self.engine.connect() as con:
|
||||
_ = con.execute(update_statement, data)
|
||||
else:
|
||||
logger.warning("Tried to update to invalid table %s", name)
|
||||
|
|
@ -0,0 +1,174 @@
|
|||
import logging
|
||||
import json
|
||||
import argparse
|
||||
import shlex
|
||||
import warnings
|
||||
import os
|
||||
|
||||
import telegram
|
||||
from telegram.error import BadRequest
|
||||
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext
|
||||
from telegram.ext.dispatcher import run_async
|
||||
from telegram.update import Update
|
||||
import appdirs
|
||||
|
||||
from parse import parse_message
|
||||
from log_storage import JSONStore, PostgresStore
|
||||
from stats import StatsRunner, get_parser, HelpException
|
||||
|
||||
|
||||
warnings.filterwarnings("ignore")
|
||||
|
||||
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
stats = None
|
||||
|
||||
try:
|
||||
with open("./sticker-keys.json", 'r') as f:
|
||||
stickers = json.load(f)
|
||||
except FileNotFoundError:
|
||||
stickers = {}
|
||||
sticker_idx = None
|
||||
sticker_id = None
|
||||
|
||||
|
||||
def log_message(update: Update, context: CallbackContext):
|
||||
if update.edited_message:
|
||||
edited_message, user = parse_message(update.effective_message)
|
||||
bak_store.append_data('edited-messages', edited_message)
|
||||
store.update_data('messages', edited_message)
|
||||
return
|
||||
|
||||
try:
|
||||
logger.info(update.effective_message.message_id)
|
||||
except AttributeError:
|
||||
logger.warning("No effective_message attribute")
|
||||
message, user = parse_message(update.effective_message)
|
||||
|
||||
if message:
|
||||
bak_store.append_data('messages', message)
|
||||
store.append_data('messages', message)
|
||||
if len(user) > 0:
|
||||
for i in user:
|
||||
if i:
|
||||
bak_store.append_data('user_events', i)
|
||||
store.append_data('user_events', i)
|
||||
|
||||
|
||||
@run_async
|
||||
def update_usernames(context: CallbackContext): # context.job.context contains the chat_id
|
||||
user_ids = stats.get_message_user_ids()
|
||||
db_users = stats.get_db_users()
|
||||
tg_users = {user_id: None for user_id in user_ids}
|
||||
to_update = {}
|
||||
for u_id in tg_users:
|
||||
try:
|
||||
user = context.bot.get_chat_member(chat_id=context.job.context, user_id=u_id).user
|
||||
tg_users[u_id] = user.name, user.full_name
|
||||
if tg_users[u_id] != db_users[u_id]:
|
||||
if tg_users[u_id][1] == db_users[u_id][1]: # Flag these so we don't insert new row
|
||||
to_update[u_id] = tg_users[u_id][0], None
|
||||
else:
|
||||
to_update[u_id] = tg_users[u_id]
|
||||
except BadRequest: # Handle users no longer in chat (or maybe haven't interacted with the bot??)
|
||||
pass
|
||||
stats.update_user_ids(to_update)
|
||||
if stats.users_lock.acquire(timeout=10):
|
||||
stats.users = stats.get_db_users()
|
||||
stats.users_lock.release()
|
||||
else:
|
||||
logger.warning("Couldn't acquire username lock.")
|
||||
return
|
||||
logger.info("Usernames updated")
|
||||
|
||||
|
||||
@run_async
|
||||
def print_stats(update: Update, context: CallbackContext):
|
||||
if update.effective_user.id not in stats.users:
|
||||
return
|
||||
|
||||
stats_parser = get_parser(stats)
|
||||
image = None
|
||||
|
||||
try:
|
||||
ns = stats_parser.parse_args(shlex.split(" ".join(context.args)))
|
||||
except HelpException as e:
|
||||
text = e.msg
|
||||
try:
|
||||
context.bot.send_message(chat_id=update.effective_user.id,
|
||||
text=f"```\n{text}\n```",
|
||||
parse_mode=telegram.ParseMode.MARKDOWN_V2)
|
||||
except BadRequest:
|
||||
context.bot.send_message(chat_id=update.effective_chat.id,
|
||||
text=f"```\n{text}\n```",
|
||||
parse_mode=telegram.ParseMode.MARKDOWN_V2)
|
||||
return
|
||||
except argparse.ArgumentError as e:
|
||||
text = str(e)
|
||||
else:
|
||||
args = vars(ns)
|
||||
func = args.pop('func')
|
||||
|
||||
try:
|
||||
if args['me']:
|
||||
args['user'] = update.effective_user.id, update.effective_user.name
|
||||
del args['me']
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
try:
|
||||
text, image = func(**args)
|
||||
except HelpException as e:
|
||||
text = e.msg
|
||||
try:
|
||||
context.bot.send_message(chat_id=update.effective_user.id,
|
||||
text=f"```\n{text}\n```",
|
||||
parse_mode=telegram.ParseMode.MARKDOWN_V2)
|
||||
except BadRequest:
|
||||
context.bot.send_message(chat_id=update.effective_chat.id,
|
||||
text=f"```\n{text}\n```",
|
||||
parse_mode=telegram.ParseMode.MARKDOWN_V2)
|
||||
return
|
||||
|
||||
if text:
|
||||
context.bot.send_message(chat_id=update.effective_chat.id,
|
||||
text=f"```\n{text}\n```",
|
||||
parse_mode=telegram.ParseMode.MARKDOWN_V2)
|
||||
if image:
|
||||
context.bot.send_photo(chat_id=update.effective_chat.id, photo=image)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('token', type=str, help="Telegram bot token")
|
||||
parser.add_argument('chat_id', type=int, help="Telegram chat id to monitor.")
|
||||
parser.add_argument('postgres_url', type=str, help="Sqlalchemy-compatible postgresql url.")
|
||||
parser.add_argument('--json-path', type=str,
|
||||
help="Either full path to backup storage folder or prefix (will be stored in app data dir.",
|
||||
default="chat")
|
||||
args = parser.parse_args()
|
||||
|
||||
updater = Updater(token=args.token, use_context=True)
|
||||
dispatcher = updater.dispatcher
|
||||
|
||||
path = args.json_path
|
||||
if not os.path.split(path)[0]: # Empty string for left part of path
|
||||
path = os.path.join(appdirs.user_data_dir('telegram-stats-bot'), path)
|
||||
|
||||
os.makedirs(path, exist_ok=True)
|
||||
bak_store = JSONStore(path)
|
||||
store = PostgresStore(args.postgres_url)
|
||||
stats = StatsRunner(store.engine)
|
||||
|
||||
stats_handler = CommandHandler('stats', print_stats, filters=~Filters.update.edited_message)
|
||||
dispatcher.add_handler(stats_handler)
|
||||
|
||||
log_handler = MessageHandler(Filters.chat(chat_id=args.chat_id), log_message)
|
||||
dispatcher.add_handler(log_handler)
|
||||
|
||||
job_queue = updater.job_queue
|
||||
update_users_job = job_queue.run_repeating(update_usernames, interval=3600, first=0, context=args.chat_id)
|
||||
|
||||
updater.start_polling()
|
|
@ -0,0 +1,106 @@
|
|||
from typing import TypedDict, Tuple, Union, List
|
||||
from datetime import datetime
|
||||
|
||||
import telegram
|
||||
|
||||
|
||||
class MessageDict(TypedDict):
|
||||
message_id: int
|
||||
date: Union[str, datetime]
|
||||
from_user: int
|
||||
forward_from_message_id: Union[int, None]
|
||||
forward_from: Union[int, None]
|
||||
forward_from_chat: Union[int, None]
|
||||
caption: Union[str, None]
|
||||
text: Union[str, None]
|
||||
sticker_set_name: Union[str, None]
|
||||
new_chat_title: Union[str, None]
|
||||
reply_to_message: Union[int, None]
|
||||
file_id: Union[str, None]
|
||||
type: str
|
||||
|
||||
|
||||
def parse_message(message: telegram.message.Message) -> Tuple[dict, List[dict]]:
|
||||
message_dict: MessageDict = {'message_id': message.message_id,
|
||||
'date': message.date,
|
||||
'from_user': None,
|
||||
'forward_from_message_id': message.forward_from_message_id,
|
||||
'forward_from': None,
|
||||
'forward_from_chat': None,
|
||||
'caption': message.caption,
|
||||
'text': message.text,
|
||||
'sticker_set_name': None,
|
||||
'new_chat_title': message.new_chat_title,
|
||||
'reply_to_message': None,
|
||||
'file_id': None,
|
||||
'type': None,
|
||||
}
|
||||
user_event_dict = [{}]
|
||||
|
||||
if message.from_user:
|
||||
message_dict['from_user'] = message.from_user.id
|
||||
|
||||
if message.forward_from:
|
||||
try:
|
||||
message_dict['forward_from'] = message.forward_from.id
|
||||
except AttributeError:
|
||||
pass
|
||||
try:
|
||||
message_dict['forward_from_chat'] = message.forward_from_chat.id
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
if message.reply_to_message:
|
||||
message_dict['reply_to_message'] = message.reply_to_message.message_id
|
||||
|
||||
if message.text:
|
||||
message_dict['type'] = 'text'
|
||||
elif message.animation:
|
||||
message_dict['type'] = 'animation'
|
||||
message_dict['file_id'] = message.animation.file_id
|
||||
elif message.audio:
|
||||
message_dict['type'] = 'audio'
|
||||
message_dict['file_id'] = message.audio.file_id
|
||||
elif message.document:
|
||||
message_dict['type'] = 'document'
|
||||
message_dict['file_id'] = message.document.file_id
|
||||
elif message.game:
|
||||
message_dict['type'] = 'game'
|
||||
elif message.photo:
|
||||
message_dict['type'] = 'photo'
|
||||
elif message.sticker:
|
||||
message_dict['type'] = 'sticker'
|
||||
message_dict['file_id'] = message.sticker.file_id
|
||||
message_dict['sticker_set_name']: message.sticker.set_name
|
||||
elif message.video:
|
||||
message_dict['type'] = 'video'
|
||||
elif message.video_note:
|
||||
message_dict['type'] = 'video_note'
|
||||
elif message.voice:
|
||||
message_dict['type'] = 'voice'
|
||||
elif message.location:
|
||||
message_dict['type'] = 'location'
|
||||
elif message.poll:
|
||||
message_dict['type'] = 'poll'
|
||||
elif message.new_chat_title:
|
||||
message_dict['type'] = 'new_chat_title'
|
||||
elif message.new_chat_photo:
|
||||
message_dict['type'] = 'new_chat_photo'
|
||||
elif message.pinned_message:
|
||||
message_dict['type'] = 'pinned_message'
|
||||
elif message.new_chat_members:
|
||||
message_dict['type'] = 'new_chat_members'
|
||||
member: telegram.user.User
|
||||
user_event_dict = [{'message_id': message.message_id,
|
||||
'user_id': u_id,
|
||||
'date': message.date,
|
||||
'event': 'joined'}
|
||||
for u_id in [member.id for member in message.new_chat_members]]
|
||||
elif message.left_chat_member:
|
||||
message_dict['type'] = 'left_chat_member'
|
||||
user_event_dict = [{'message_id': message.message_id,
|
||||
'user_id': message.left_chat_member.id,
|
||||
'date': message.date,
|
||||
'event': 'left'}]
|
||||
|
||||
return message_dict, user_event_dict
|
|
@ -0,0 +1,6 @@
|
|||
python-telegram-bot
|
||||
SQLAlchemy~=1.3.17
|
||||
appdirs~=1.4.4
|
||||
pandas~=1.0.3
|
||||
seaborn~=0.10.1
|
||||
matplotlib~=3.2.1
|
|
@ -0,0 +1,379 @@
|
|||
import logging
|
||||
from typing import Dict, List, Tuple, Text, NoReturn
|
||||
from threading import Lock
|
||||
from io import BytesIO
|
||||
import argparse
|
||||
import inspect
|
||||
import re
|
||||
|
||||
import pandas as pd
|
||||
import seaborn as sns
|
||||
from matplotlib.figure import Figure
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
sns.set_context('paper')
|
||||
sns.set_style('whitegrid')
|
||||
sns.set_palette("Set2")
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
class HelpException(Exception):
|
||||
def __init__(self, msg: str = None):
|
||||
self.msg = msg
|
||||
|
||||
|
||||
class InternalParser(argparse.ArgumentParser):
|
||||
def error(self, message: Text) -> NoReturn:
|
||||
try:
|
||||
raise # Reraises mostly ArgumentError for bad arg
|
||||
except RuntimeError:
|
||||
raise HelpException(message)
|
||||
|
||||
def print_help(self, file=None) -> None:
|
||||
raise HelpException(self.format_help())
|
||||
|
||||
def exit(self, status=None, message=None):
|
||||
pass
|
||||
|
||||
|
||||
class StatsRunner(object):
|
||||
allowed_methods = {'counts': "get_chat_counts",
|
||||
'hours': "get_counts_by_hour",
|
||||
'days': "get_counts_by_day",
|
||||
'history': "get_message_history"}
|
||||
|
||||
def __init__(self, engine: Engine, tz: str = 'America/Toronto'):
|
||||
self.engine = engine
|
||||
self.tz = tz
|
||||
|
||||
self.users: Dict[int, Tuple[str, str]] = self.get_db_users()
|
||||
self.users_lock = Lock()
|
||||
|
||||
def get_message_user_ids(self) -> List[int]:
|
||||
with self.engine.connect() as con:
|
||||
result = con.execute("SELECT DISTINCT from_user FROM messages_utc;")
|
||||
return [user for user, in result.fetchall()]
|
||||
|
||||
def get_db_users(self) -> Dict[int, Tuple[str, str]]:
|
||||
query = """
|
||||
select user_id, username, display_name from (
|
||||
select
|
||||
*,
|
||||
row_number() over(partition by user_id order by date desc) as rn
|
||||
from
|
||||
user_names
|
||||
) t
|
||||
where t.rn = 1
|
||||
"""
|
||||
|
||||
with self.engine.connect() as con:
|
||||
result = con.execute(query)
|
||||
result = result.fetchall()
|
||||
|
||||
return {user_id: (username, name) for user_id, username, name in result}
|
||||
|
||||
def update_user_ids(self, user_dict: Dict[int, Tuple[str, str]]):
|
||||
for uid in user_dict:
|
||||
username, display_name = user_dict[uid]
|
||||
query = f"""
|
||||
UPDATE user_names
|
||||
SET username = '{username}'
|
||||
WHERE user_id = {uid} AND username IS DISTINCT FROM '{username}';
|
||||
"""
|
||||
if display_name:
|
||||
query += f"""\n
|
||||
INSERT INTO user_names(user_id, date, username, display_name)
|
||||
VALUES ({uid}, current_timestamp, '{username}', '{display_name}');
|
||||
"""
|
||||
|
||||
with self.engine.connect() as con:
|
||||
con.execute(query)
|
||||
|
||||
def get_chat_counts(self, n: int = None, start: str = None, end: str = None) -> Tuple[str, None]:
|
||||
"""
|
||||
Get top chat users
|
||||
:param n: Number of users to show
|
||||
:param start: Start timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21")
|
||||
:param end: End timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21")
|
||||
:return:
|
||||
"""
|
||||
date_query = None
|
||||
|
||||
if n is not None:
|
||||
if n <= 0:
|
||||
raise HelpException(f'n must be greater than 0, got: {n}')
|
||||
else:
|
||||
n = 20
|
||||
|
||||
if start:
|
||||
start_dt = pd.to_datetime(start)
|
||||
date_query = f"WHERE date >= '{start_dt}'"
|
||||
|
||||
if end:
|
||||
end_dt = pd.to_datetime(end)
|
||||
if date_query:
|
||||
date_query += f" AND date < '{end_dt}'"
|
||||
else:
|
||||
date_query = f"WHERE date < '{end_dt}'"
|
||||
|
||||
query = f"""
|
||||
SELECT "from_user", COUNT(*)
|
||||
FROM "messages_utc"
|
||||
{date_query}
|
||||
GROUP BY "from_user"
|
||||
ORDER BY "count" DESC;
|
||||
"""
|
||||
with self.engine.connect() as con:
|
||||
df = pd.read_sql_query(query, con, index_col='from_user')
|
||||
|
||||
user_df = pd.Series(self.users, name="user")
|
||||
user_df = user_df.apply(lambda x: x[0]) # Take only @usernames
|
||||
df = df.join(user_df)
|
||||
df['Percent'] = df['count'] / df['count'].sum() * 100
|
||||
df = df[['user', 'count', 'Percent']]
|
||||
df.columns = ['User', 'Total Messages', 'Percent']
|
||||
df['User'] = df['User'].str.replace(r'[^\x00-\x7F]', "", regex=True)
|
||||
|
||||
return df.iloc[:n].to_string(index=False, header=True, float_format=lambda x: f"{x:.1f}"), None
|
||||
|
||||
def get_counts_by_hour(self, user: Tuple[int, str] = None, start: str = None, end: str = None)\
|
||||
-> Tuple[None, BytesIO]:
|
||||
"""
|
||||
Get plot of messages for hours of the day
|
||||
:param start: Start timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21")
|
||||
:param end: End timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21")
|
||||
"""
|
||||
query_conditions = []
|
||||
|
||||
if start:
|
||||
start_dt = pd.to_datetime(start)
|
||||
query_conditions.append(f"date >= '{start_dt}'")
|
||||
|
||||
if end:
|
||||
end_dt = pd.to_datetime(end)
|
||||
query_conditions.append(f"date < '{end_dt}'")
|
||||
|
||||
if user:
|
||||
query_conditions.append(f"from_user = {user[0]}")
|
||||
|
||||
query_where = ""
|
||||
if query_conditions:
|
||||
query_where = f"WHERE {' AND '.join(query_conditions)}"
|
||||
|
||||
query = f"""
|
||||
SELECT date_trunc('hour', date) as day, count(*) as messages
|
||||
FROM messages_utc
|
||||
{query_where}
|
||||
GROUP BY day
|
||||
ORDER BY day
|
||||
"""
|
||||
|
||||
with self.engine.connect() as con:
|
||||
df = pd.read_sql_query(query, con)
|
||||
|
||||
df['day'] = pd.to_datetime(df.day)
|
||||
df['day'] = df.day.dt.tz_convert(self.tz)
|
||||
df = df.set_index('day')
|
||||
df = df.asfreq('h', fill_value=0) # Insert 0s for periods with no messages
|
||||
df['hour'] = df.index.hour
|
||||
|
||||
if user:
|
||||
# Aggregate over 1 week periods
|
||||
df = df.groupby('hour').resample('7D').sum().drop(columns='hour')
|
||||
df['hour'] = df.index.get_level_values('hour')
|
||||
|
||||
fig = Figure(constrained_layout=True)
|
||||
subplot = fig.subplots()
|
||||
|
||||
sns.stripplot(x='hour', y='messages', data=df, jitter=.4, size=2, ax=subplot, alpha=.5, zorder=0)
|
||||
sns.boxplot(x='hour', y='messages', data=df, whis=1, showfliers=False, whiskerprops={"zorder": 10},
|
||||
boxprops={"zorder": 10},
|
||||
ax=subplot, zorder=10)
|
||||
subplot.set_ylim(bottom=0, top=df['messages'].quantile(0.999, interpolation='higher'))
|
||||
|
||||
subplot.axvspan(11.5, 23.5, zorder=0, color=(0, 0, 0, 0.05))
|
||||
subplot.set_xlim(-1, 24)
|
||||
|
||||
if user:
|
||||
subplot.set_ylabel('Messages per Week')
|
||||
subplot.set_title(f"Messages by Hour for {user[1]}")
|
||||
else:
|
||||
subplot.set_ylabel('Messages per Day')
|
||||
subplot.set_title("Messages by Hour")
|
||||
|
||||
sns.despine(fig=fig)
|
||||
|
||||
bio = BytesIO()
|
||||
bio.name = 'plot.png'
|
||||
fig.savefig(bio)
|
||||
bio.seek(0)
|
||||
|
||||
return None, bio
|
||||
|
||||
def get_counts_by_day(self, user: Tuple[int, str] = None, start: str = None, end: str = None, plot: str = None)\
|
||||
-> Tuple[None, BytesIO]:
|
||||
"""
|
||||
Get plot of messages for days of the week
|
||||
:param start: Start timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21")
|
||||
:param end: End timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21")
|
||||
"""
|
||||
query_conditions = []
|
||||
|
||||
if start:
|
||||
start_dt = pd.to_datetime(start)
|
||||
query_conditions.append(f"date >= '{start_dt}'")
|
||||
|
||||
if end:
|
||||
end_dt = pd.to_datetime(end)
|
||||
query_conditions.append(f"date < '{end_dt}'")
|
||||
|
||||
if user:
|
||||
query_conditions.append(f"from_user = {user[0]}")
|
||||
|
||||
query_where = ""
|
||||
if query_conditions:
|
||||
query_where = f"WHERE {' AND '.join(query_conditions)}"
|
||||
|
||||
query = f"""
|
||||
SELECT date_trunc('day', date)
|
||||
as day, count(*) as messages
|
||||
FROM messages_utc
|
||||
{query_where}
|
||||
GROUP BY day
|
||||
ORDER BY day
|
||||
"""
|
||||
|
||||
with self.engine.connect() as con:
|
||||
df = pd.read_sql_query(query, con)
|
||||
|
||||
df['day'] = pd.to_datetime(df.day)
|
||||
df['day'] = df.day.dt.tz_convert(self.tz)
|
||||
df = df.set_index('day')
|
||||
df = df.asfreq('d', fill_value=0) # Fill periods with no messages
|
||||
df['dow'] = df.index.weekday
|
||||
df['day_name'] = df.index.day_name()
|
||||
df = df.sort_values('dow') # Make sure start is Monday
|
||||
|
||||
fig = Figure(constrained_layout=True)
|
||||
subplot = fig.subplots()
|
||||
if plot == 'box':
|
||||
sns.boxplot(x='day_name', y='messages', data=df, whis=1, showfliers=False, ax=subplot)
|
||||
elif plot == 'violin' or plot is None:
|
||||
sns.violinplot(x='day_name', y='messages', data=df, cut=0, inner="box", scale='width', ax=subplot)
|
||||
else:
|
||||
raise HelpException("plot must be either box or violin")
|
||||
subplot.axvspan(4.5, 6.5, zorder=0, color=(0, .8, 0, 0.1))
|
||||
subplot.set_xlabel('')
|
||||
subplot.set_ylabel('Messages per Day')
|
||||
if user:
|
||||
subplot.set_title(f"Messages by Day of Week for {user[1]}")
|
||||
else:
|
||||
subplot.set_title("Messages by Day of Week")
|
||||
|
||||
sns.despine(fig=fig)
|
||||
|
||||
bio = BytesIO()
|
||||
bio.name = 'plot.png'
|
||||
subplot.get_figure().savefig(bio)
|
||||
bio.seek(0)
|
||||
|
||||
return None, bio
|
||||
|
||||
def get_message_history(self, user: Tuple[int, str] = None, averages: int = None, start: str = None, end: str = None)\
|
||||
-> Tuple[None, BytesIO]:
|
||||
"""
|
||||
Make a plot of message history over time
|
||||
:param averages: Moving average width (in days)
|
||||
:param start: Start timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21")
|
||||
:param end: End timestamp (e.g. 2019, 2019-01, 2019-01-01, "2019-01-01 14:21")
|
||||
"""
|
||||
query_conditions = []
|
||||
if averages is None:
|
||||
averages = 30
|
||||
|
||||
if start:
|
||||
start_dt = pd.to_datetime(start)
|
||||
query_conditions.append(f"date >= '{start_dt}'")
|
||||
|
||||
if end:
|
||||
end_dt = pd.to_datetime(end)
|
||||
query_conditions.append(f"date < '{end_dt}'")
|
||||
|
||||
if user:
|
||||
query_conditions.append(f"from_user = {user[0]}")
|
||||
|
||||
query_where = ""
|
||||
if query_conditions:
|
||||
query_where = f"WHERE {' AND '.join(query_conditions)}"
|
||||
|
||||
query = f"""
|
||||
SELECT date_trunc('day', date)
|
||||
as day, count(*) as messages
|
||||
FROM messages_utc
|
||||
{query_where}
|
||||
GROUP BY day
|
||||
ORDER BY day
|
||||
"""
|
||||
|
||||
with self.engine.connect() as con:
|
||||
df = pd.read_sql_query(query, con)
|
||||
df['day'] = pd.to_datetime(df.day)
|
||||
df['day'] = df.day.dt.tz_convert(self.tz)
|
||||
if averages:
|
||||
df['msg_rolling'] = df['messages'].rolling(averages, center=True).mean()
|
||||
|
||||
fig = Figure() # TODO: One day pandas will let you use constrained_layout=True here...
|
||||
subplot = fig.subplots()
|
||||
df.plot(x='day', y='messages', alpha=0.5, legend=False, ax=subplot)
|
||||
if averages:
|
||||
df.plot(x='day', y='msg_rolling', legend=False, ax=subplot)
|
||||
subplot.set_ylabel("Messages")
|
||||
subplot.set_xlabel("Date")
|
||||
if user:
|
||||
subplot.set_title(f"Message History for {user[1]}")
|
||||
else:
|
||||
subplot.set_title("Message History")
|
||||
sns.despine(fig=fig)
|
||||
fig.tight_layout()
|
||||
|
||||
bio = BytesIO()
|
||||
bio.name = 'plot.png'
|
||||
fig.savefig(bio)
|
||||
bio.seek(0)
|
||||
|
||||
return None, bio
|
||||
|
||||
|
||||
def get_parser(runner: StatsRunner) -> InternalParser:
|
||||
parser = InternalParser(prog="/stats")
|
||||
parser.set_defaults(func=runner.get_chat_counts)
|
||||
subparsers = parser.add_subparsers(title="Statistics:")
|
||||
|
||||
for name, func in runner.allowed_methods.items():
|
||||
try:
|
||||
doc = inspect.getdoc(getattr(runner, func)).splitlines()
|
||||
except AttributeError:
|
||||
doc = None
|
||||
subparser = subparsers.add_parser(name, help=doc[0])
|
||||
subparser.set_defaults(func=getattr(runner, func))
|
||||
f_args = inspect.signature(getattr(runner, func)).parameters
|
||||
|
||||
for _, arg in f_args.items():
|
||||
arg: inspect.Parameter
|
||||
if arg.name == 'self':
|
||||
continue
|
||||
if arg.name == 'user':
|
||||
group = subparser.add_mutually_exclusive_group()
|
||||
group.add_argument('-me', action='store_true', help='calculate stats for yourself')
|
||||
group.add_argument('-user', type=int, help=argparse.SUPPRESS)
|
||||
else:
|
||||
if doc:
|
||||
arg_doc = None
|
||||
for line in doc:
|
||||
match = re.match(rf"^:param {arg.name}: (.*)", line)
|
||||
if match:
|
||||
arg_doc = match.group(1)
|
||||
subparser.add_argument(f"-{arg.name}", type=arg.annotation, help=arg_doc)
|
||||
|
||||
return parser
|
Ładowanie…
Reference in New Issue