openai-telegram-bot/main.py

389 wiersze
16 KiB
Python

import os
import openai
import logging
import database
from dotenv import load_dotenv
from pydub import AudioSegment
from telegram import Update
from functools import wraps
from telegram.constants import ChatAction
from functools import wraps
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes, MessageHandler, filters, CallbackQueryHandler
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
language_models = {
"en": "tts_models/multilingual/multi-dataset/your_tts",
"fr": "tts_models/multilingual/multi-dataset/your_tts",
"pt": "tts_models/multilingual/multi-dataset/your_tts",
"pt-br": "tts_models/multilingual/multi-dataset/your_tts",
"es": "tts_models/es/css10/vits",
}
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Envrionment Variables Load
load_dotenv()
if os.environ.get("OPENAI_API_KEY") is None:
print("OpenAI_API_KEY is not set in.env file or OPENAI_API_KEY environment variable is not set")
exit(1)
ALLOWED_USERS=os.environ.get("BOT_ALLOWED_USERS").split(",")
SYSTEM_PROMPT=os.environ.get("CHATGPT_SYSTEM_PROMPT")
TEMPERATURE=os.environ.get("CHATGPT_TEMPERATURE")
MODEL=os.environ.get("OPENAI_MODEL")
WHISPER_TO_CHAT=bool(int(os.environ.get("WHISPER_TO_CHAT")))
MAX_USER_CONTEXT=int(os.environ.get("CHATGPT_MAX_USER_CONTEXT"))
openai.api_key = os.environ.get("OPENAI_API_KEY")
async def getUserData(chat_id):
# Initialize user if not present
user_data = database.get_user(chat_id)
if not user_data:
user_data = {
"context": [],
"usage": {"chatgpt": 0, "whisper": 0, "dalle": 0},
"options": {
"whisper_to_chat": WHISPER_TO_CHAT,
"assistant_voice_chat": False,
"temperature": float(TEMPERATURE),
"max-context": MAX_USER_CONTEXT
}
}
database.add_user(chat_id, user_data)
user_data = database.get_user(chat_id)
return user_data
def restricted(func):
@wraps(func)
async def wrapped(update, context, *args, **kwargs):
if str(update.effective_user.id) not in ALLOWED_USERS:
if "*" != ALLOWED_USERS[0]:
print(f"Unauthorized access denied for {update.effective_user.id}.")
return
else:
_ = await getUserData(update.effective_chat.id)
return await func(update, context, *args, **kwargs)
return wrapped
async def messageGPT(text: str, chat_id: str, user_name="User"):
user_data = await getUserData(chat_id)
# Update context
user_data['context'].append({"role": "user", "content": text})
if len(user_data['context']) > user_data["options"]["max-context"]:
user_data['context'].pop(0)
# Interact with ChatGPT API and stream the response
response = None
try:
response = openai.ChatCompletion.create(
model=MODEL,
messages=[{"role": "system", "content": f"You are chatting with {user_name}. {SYSTEM_PROMPT}"}] + user_data['context'],
temperature=user_data["options"]["temperature"],
)
except Exception as e:
print(e)
return "There was a problem with OpenAI, so I can't answer you."
# Initialize variables for streaming
assistant_message = ""
if 'choices' in response:
assistant_message = response['choices'][0]['message']['content']
else:
assistant_message = "There was a problem with OpenAI. Maybe your prompt is forbidden? They like to censor a lot!"
# Update context
user_data['context'].append({"role": "assistant", "content": assistant_message})
if len(user_data['context']) > user_data["options"]["max-context"]:
user_data['context'].pop(0)
# Update usage
user_data["usage"]['chatgpt'] += int(response['usage']['total_tokens'])
# Update the user data in the database
database.update_user(chat_id, user_data)
return assistant_message
@restricted
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
_ = await getUserData(update.effective_chat.id)
await context.bot.send_message(chat_id=update.effective_chat.id, text="Hello, how can I assist you today?")
@restricted
async def imagine(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_data = await getUserData(update.effective_chat.id)
user_data["usage"]['dalle'] += 1
database.update_user(update.effective_chat.id, user_data)
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action=ChatAction.TYPING)
response = openai.Image.create(
prompt=update.message.text,
n=1,
size="1024x1024"
)
try:
image_url = response['data'][0]['url']
await context.bot.send_message(chat_id=update.effective_chat.id, text=image_url)
except Exception as e:
print(e)
await context.bot.send_message(chat_id=update.effective_chat.id, text="Error generating. Your prompt may contain text that is not allowed by OpenAI safety system.")
@restricted
async def attachment(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Initialize variables
chat_id = update.effective_chat.id
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
# Get user data or initialize if not present
user_data = await getUserData(chat_id)
#users[f"{chat_id}"]["usage"]['whisper'] = 0
transcript = {'text': ''}
audioMessage = False
# Check if the attachment is a voice message
if update.message.voice:
user_data["usage"]['whisper'] += update.message.voice.duration
file_id = update.message.voice.file_id
file_format = "ogg"
audioMessage = True
# Check if the attachment is a video
elif update.message.video:
user_data["usage"]['whisper'] += update.message.video.duration
file_id = update.message.video.file_id
file_format = "mp4"
# Check if the attachment is an audio file
elif update.message.audio:
user_data["usage"]['whisper'] += update.message.audio.duration
file_id = update.message.audio.file_id
file_format = "mp3"
else:
await context.bot.send_message(chat_id=chat_id, text="Can't handle such file. Reason: unknown.")
return
# Download the file and convert it if necessary
file = await context.bot.get_file(file_id)
user_id = update.effective_user.id
await file.download_to_drive(f"{user_id}.{file_format}")
if file_format == "ogg":
ogg_audio = AudioSegment.from_file(f"{user_id}.ogg", format="ogg")
ogg_audio.export(f"{user_id}.mp3", format="mp3")
os.remove(f"{user_id}.ogg")
file_format = "mp3"
# Transcribe the audio
with open(f"{user_id}.{file_format}", "rb") as audio_file:
try:
transcript = openai.Audio.transcribe("whisper-1", audio_file)
except Exception as e:
print(e)
await context.bot.send_message(chat_id=chat_id, text="Transcript failed.")
os.remove(f"{user_id}.{file_format}")
return
os.remove(f"{user_id}.{file_format}")
# Send the transcript
if transcript['text'] == "":
transcript['text'] = "[Silence]"
if audioMessage and user_data["options"]["whisper_to_chat"]:
chatGPT_response = await messageGPT(transcript['text'], str(chat_id), update.effective_user.name)
transcript['text'] = "> " + transcript['text'] + "\n\n" + chatGPT_response
# Check if the transcript length is longer than 4095 characters
if len(transcript['text']) > 4095:
# Split the transcript into multiple messages without breaking words in half
max_length = 4096
words = transcript['text'].split()
current_message = ""
for word in words:
if len(current_message) + len(word) + 1 > max_length:
await context.bot.send_message(chat_id=chat_id, text=current_message)
current_message = ""
current_message += f"{word} "
if current_message:
await context.bot.send_message(chat_id=chat_id, text=current_message)
else:
await context.bot.send_message(chat_id=chat_id, text=transcript['text'])
# Update user data in the database
database.update_user(str(chat_id), user_data)
@restricted
async def chat(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = str(update.effective_chat.id)
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
# Check if replying and add context
if hasattr(update.message.reply_to_message, "text"):
user_prompt = f"In reply to: '{update.message.reply_to_message.text}' \n---\n{update.message.text}"
else:
user_prompt = update.message.text
# Use messageGPT function to get the response
assistant_message = await messageGPT(user_prompt, chat_id, update.effective_user.name)
await context.bot.send_message(chat_id=update.effective_chat.id, text=assistant_message)
@restricted
async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_data = await getUserData(update.effective_chat.id)
if user_data:
user_data["context"] = []
database.update_user(str(update.effective_chat.id), user_data)
print(f"Cleared context for {update.effective_user.name}")
await update.message.reply_text('Your message context history was cleared.')
@restricted
async def usage(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
chat_id = str(update.effective_chat.id)
user_data = database.get_user(chat_id)
user_usage = user_data["usage"]
total_usage = database.get_total_usage()
user_spent = round((((user_usage['chatgpt'] / 750) * 0.002) + (float(user_usage['dalle']) * 0.02) + ((user_usage['whisper'] / 60.0) * 0.006)), 4)
total_spent = round((((total_usage['chatgpt'] / 750) * 0.002) + (float(total_usage['dalle']) * 0.02) + ((total_usage['whisper'] / 60.0) * 0.006)), 4)
user_percentage = (user_spent / total_spent) * 100 if total_spent > 0 else 0
info_message = f"""User: {update.effective_user.name}
- Used ~{user_usage["chatgpt"]} tokens with ChatGPT.
- Generated {user_usage["dalle"]} images with DALL-E.
- Transcribed {round(float(user_usage["whisper"]) / 60.0, 2)}min with Whisper.
Total spent: ${user_spent} ({user_percentage:.2f}% of total)
Total usage:
- ChatGPT tokens: {total_usage["chatgpt"]}
- DALL-E images: {total_usage["dalle"]}
- Whisper transcription: {round(float(total_usage["whisper"]) / 60.0, 2)}min
Total spent: ${total_spent}"""
await context.bot.send_message(chat_id=update.effective_chat.id, text=info_message)
@restricted
async def _help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
help_message="""Here's what you can do:\n\n
- /imagine <prompt> to generate an image with DALL-E\n- Send a message to chat with ChatGPT\n
- Send an audio to transcribe to text with Whisper.\n\n
- /settings To change your settings.\n
- /usage To get your usage statistics.\n
- /clear To clear you chatgpt message context (start a new chat)."""
await context.bot.send_message(chat_id=update.effective_chat.id, text=help_message)
# Function to generate the settings buttons
def generate_settings_markup(chat_id: str) -> InlineKeyboardMarkup:
keyboard = [
[
InlineKeyboardButton("Increase Temperature", callback_data=f"setting_increase_temperature_{chat_id}"),
InlineKeyboardButton("Decrease Temperature", callback_data=f"setting_decrease_temperature_{chat_id}")
],
[
InlineKeyboardButton("Enable Whisper to Chat", callback_data=f"setting_enable_whisper_{chat_id}"),
InlineKeyboardButton("Disable Whisper to Chat", callback_data=f"setting_disable_whisper_{chat_id}")
],
[
InlineKeyboardButton("Enable assistant voice", callback_data=f"setting_enable_voice_{chat_id}"),
InlineKeyboardButton("Disable assistant voice", callback_data=f"setting_disable_voice_{chat_id}")
],
[
InlineKeyboardButton("Increase Context", callback_data=f"setting_increase_context_{chat_id}"),
InlineKeyboardButton("Decrease Context", callback_data=f"setting_decrease_context_{chat_id}")
]
]
return InlineKeyboardMarkup(keyboard)
@restricted
async def settings(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
settings_markup = generate_settings_markup(chat_id)
await context.bot.send_message(chat_id=chat_id, text="Settings:", reply_markup=settings_markup)
async def settings_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_data = await getUserData(update.effective_chat.id)
query = update.callback_query
action, chat_id = query.data.rsplit("_", 1)
# Temperature
if action.startswith("setting_increase_temperature"):
user_data["options"]["temperature"] = min(user_data["options"]["temperature"] + 0.1, 1)
elif action.startswith("setting_decrease_temperature"):
user_data["options"]["temperature"] = max(user_data["options"]["temperature"] - 0.1, 0)
# Whisper to GPT
elif action.startswith("setting_enable_whisper"):
print(f"enabling whisper for {chat_id}")
user_data["options"]["whisper_to_chat"] = True
elif action.startswith("setting_disable_whisper"):
print(f"disabling whisper for {chat_id}")
user_data["options"]["whisper_to_chat"] = False
# TTS
elif action.startswith("setting_enable_voice"):
print(f"enabling voice for {chat_id}")
user_data["options"]["assistant_voice_chat"] = True
elif action.startswith("setting_disable_voice"):
print(f"disabling voice for {chat_id}")
user_data["options"]["assistant_voice_chat"] = False
# Context
elif action.startswith("setting_increase_context"):
user_data["options"]["max-context"] = min(user_data["options"]["max-context"] + 1, MAX_USER_CONTEXT)
elif action.startswith("setting_decrease_context"):
user_data["options"]["max-context"] = max(user_data["options"]["max-context"] - 1, 1)
settings_markup = generate_settings_markup(chat_id)
await query.edit_message_text(text="Choose a setting option:", reply_markup=settings_markup)
# Remove the settings message
await context.bot.delete_message(chat_id=query.message.chat_id, message_id=query.message.message_id)
# Send a message displaying the updated settings
settings_message = f"""Updated settings:\n\nTemperature: {user_data['options']['temperature']}\nWhisper to Chat: {user_data['options']['whisper_to_chat']}\nAssistant voice: {user_data['options']['assistant_voice_chat']}\nContext Length: {user_data["options"]["max-context"]}"""
await context.bot.send_message(chat_id=chat_id, text=settings_message)
if __name__ == '__main__':
database.init_database()
try:
ALLOWED_USERS=os.environ.get("BOT_ALLOWED_USERS").split(",")
except (Exception):
ALLOWED_USERS=ALLOWED_USERS
print(f"Allowed users: {ALLOWED_USERS}")
print(f"System prompt: {SYSTEM_PROMPT}")
application = ApplicationBuilder().token(os.environ.get("BOT_TOKEN")).build()
start_handler = CommandHandler('start', start)
application.add_handler(start_handler)
clear_handler = CommandHandler('clear', clear)
application.add_handler(clear_handler)
info_handler = CommandHandler('usage', usage)
application.add_handler(info_handler)
help_handler = CommandHandler('help', _help)
application.add_handler(help_handler)
imagine_handler = CommandHandler('imagine', imagine)
application.add_handler(imagine_handler)
settings_handler = CommandHandler('settings', settings)
application.add_handler(settings_handler)
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, chat))
application.add_handler(MessageHandler(filters.ATTACHMENT & ~filters.COMMAND, attachment))
settings_callback_handler = CallbackQueryHandler(settings_callback)
application.add_handler(settings_callback_handler)
application.run_polling()