openai-telegram-bot/main.py

249 wiersze
10 KiB
Python

import os
import re
import openai
import logging
import asyncio
import math
from dotenv import load_dotenv
from pydub import AudioSegment
from telegram import Update
from functools import wraps
from telegram.constants import ChatAction
from functools import wraps
from telegram.error import BadRequest, RetryAfter, TimedOut
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes, MessageHandler, filters
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
load_dotenv()
if os.environ.get("OPENAI_API_KEY") is None:
print("OpenAI_API_KEY is not set in.env file or OPENAI_API_KEY environment variable is not set")
exit(1)
openai.api_key = os.environ.get("OPENAI_API_KEY")
users = {
"userid": {
"context": [],
"usage": {
"chatgpt": 0,
"whisper": 0,
"dalle": 0,
}
},
}
ALLOWED_USERS=[]
SYSTEM_PROMPT=os.environ.get("CHATGPT_SYSTEM_PROMPT")
TEMPERATURE=os.environ.get("CHATGPT_TEMPERATURE")
MODEL=os.environ.get("OPENAI_MODEL")
MAX_USER_CONTEXT=int(os.environ.get("CHATGPT_MAX_USER_CONTEXT"))
def restricted(func):
@wraps(func)
async def wrapped(update, context, *args, **kwargs):
user_id = update.effective_user.id
if str(user_id) not in ALLOWED_USERS:
if "*" != ALLOWED_USERS[0]:
print(f"Unauthorized access denied for {user_id}.")
return
else:
if not f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"] = {"context": [], "usage": {"chatgpt": 0,"whisper": 0,"dalle": 0,}}
return await func(update, context, *args, **kwargs)
return wrapped
@restricted
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"] = {"context": [], "usage": {"chatgpt": 0,"whisper": 0,"dalle": 0,}}
await context.bot.send_message(chat_id=update.effective_chat.id, text="I'm a bot, please talk to me!")
@restricted
async def imagine(update: Update, context: ContextTypes.DEFAULT_TYPE):
users[f"{update.effective_chat.id}"]["usage"]['dalle'] += 1
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action=ChatAction.TYPING)
response = openai.Image.create(
prompt=update.message.text,
n=1,
size="1024x1024"
)
try:
image_url = response['data'][0]['url']
await context.bot.send_message(chat_id=update.effective_chat.id, text=image_url)
except:
await context.bot.send_message(chat_id=update.effective_chat.id, text="Error generating. Your prompt may contain text that is not allowed by OpenAI safety system.")
@restricted
async def attachment(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Initialize variables
chat_id = update.effective_chat.id
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
users[f"{chat_id}"]["usage"]['whisper'] = 0
transcript = {'text': ''}
# Check if the attachment is a voice message
if update.message.voice:
users[f"{chat_id}"]["usage"]['whisper'] += update.message.voice.duration
file_id = update.message.voice.file_id
file_format = "ogg"
# Check if the attachment is a video
elif update.message.video:
users[f"{chat_id}"]["usage"]['whisper'] += update.message.video.duration
file_id = update.message.video.file_id
file_format = "mp4"
# Check if the attachment is an audio file
elif update.message.audio:
users[f"{chat_id}"]["usage"]['whisper'] += update.message.audio.duration
file_id = update.message.audio.file_id
file_format = "mp3"
else:
await context.bot.send_message(chat_id=chat_id, text="Can't handle such file. Reason: unknown.")
return
# Download the file and convert it if necessary
file = await context.bot.get_file(file_id)
await file.download_to_drive(f"{user_id}.{file_format}")
if file_format == "ogg":
ogg_audio = AudioSegment.from_file(f"{user_id}.ogg", format="ogg")
ogg_audio.export(f"{user_id}.mp3", format="mp3")
os.remove(f"{user_id}.ogg")
file_format = "mp3"
# Transcribe the audio
with open(f"{user_id}.{file_format}", "rb") as audio_file:
try:
transcript = openai.Audio.transcribe("whisper-1", audio_file)
except:
await context.bot.send_message(chat_id=chat_id, text="Transcript failed.")
os.remove(f"{user_id}.{file_format}")
return
os.remove(f"{user_id}.{file_format}")
# Send the transcript
if transcript['text'] == "":
transcript['text'] = "[Silence]"
# Check if the transcript length is longer than 4095 characters
if len(transcript['text']) > 4095:
# Split the transcript into multiple messages without breaking words in half
max_length = 4096
words = transcript['text'].split()
current_message = ""
for word in words:
if len(current_message) + len(word) + 1 > max_length:
await context.bot.send_message(chat_id=chat_id, text=current_message)
current_message = ""
current_message += f"{word} "
if current_message:
await context.bot.send_message(chat_id=chat_id, text=current_message)
else:
await context.bot.send_message(chat_id=chat_id, text=transcript['text'])
@restricted
async def chat(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = str(update.effective_chat.id)
await context.bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
# Initialize user if not present
if chat_id not in users:
users[chat_id] = {"context": [], "usage": {"chatgpt": 0, "whisper": 0, "dalle": 0}}
# Check if replying and add context
if hasattr(update.message.reply_to_message, "text"):
user_prompt = f"In reply to: '{update.message.reply_to_message.text}' \n---\n {update.message.text}"
else:
user_prompt = update.message.text
# Update context
user_context = users[chat_id]["context"]
user_context.append({"role": "user", "content": user_prompt})
if len(user_context) > MAX_USER_CONTEXT:
user_context.pop(0)
# Interact with ChatGPT API and stream the response
try:
response = openai.ChatCompletion.create(
model=MODEL,
messages=[{"role": "system", "content": SYSTEM_PROMPT}] + user_context,
temperature=float(TEMPERATURE)
)
except:
await context.bot.send_message(chat_id=update.effective_chat.id, text="There was a problem with OpenAI, so I can't answer you.")
# Initialize variables for streaming
assistant_message = ""
if 'choices' in response:
assistant_message = response['choices'][0]['message']['content']
await context.bot.send_message(chat_id=update.effective_chat.id, text=assistant_message)
else:
await context.bot.send_message(chat_id=update.effective_chat.id, text="There was a problem with OpenAI. Maybe your prompt is forbidden? They like to censor a lot!")
# Update context
user_context.append({"role": "assistant", "content": assistant_message})
if len(user_context) > MAX_USER_CONTEXT:
user_context.pop(0)
# Update usage
users[chat_id]["usage"]['chatgpt'] += int(response['usage']['total_tokens'])
@restricted
async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"]["context"] = []
print(f"Cleared context for {update.effective_chat.id}")
await update.message.reply_text(f'Your message context history was cleared.')
@restricted
async def usage(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_info=users[f"{update.effective_chat.id}"]["usage"]
total_spent=0.0
total_spent+=(user_info['chatgpt']/750)*0.002
total_spent+=float(user_info['dalle'])*0.02
total_spent+=(user_info['whisper']/60.0)*0.006
info_message=f"""User: {update.effective_user.name}\n- Used ~{user_info["chatgpt"]} tokens with ChatGPT.\n- Generated {user_info["dalle"]} images with DALL-E.\n- Transcribed {round(float(user_info["whisper"])/60.0, 2)}min with Whisper.\n\nTotal spent: ${str(total_spent)}"""
await context.bot.send_message(chat_id=update.effective_chat.id, text=info_message)
@restricted
async def _help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
help_message="""Here's what you can do:\n\n- /imagine <prompt> to generate an image with DALL-E\n- Send a message to chat with ChatGPT\n- Send an audio to transcribe to text with Whisper.\n\n- /usage To get your usage statistics.\n - /clear To clear you chatgpt message context (start a new chat)."""
await context.bot.send_message(chat_id=update.effective_chat.id, text=help_message)
if __name__ == '__main__':
try:
ALLOWED_USERS=os.environ.get("BOT_ALLOWED_USERS").split(",")
except:
ALLOWED_USERS=ALLOWED_USERS
print(f"Allowed users: {ALLOWED_USERS}")
print(f"System prompt: {SYSTEM_PROMPT}")
application = ApplicationBuilder().token(os.environ.get("BOT_TOKEN")).build()
start_handler = CommandHandler('start', start)
application.add_handler(start_handler),
clear_handler = CommandHandler('clear', clear)
application.add_handler(clear_handler),
info_handler = CommandHandler('usage', usage)
application.add_handler(info_handler),
help_handler = CommandHandler('help', _help)
application.add_handler(help_handler),
imagine_handler = CommandHandler('imagine', imagine)
application.add_handler(imagine_handler),
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, chat))
application.add_handler(MessageHandler(filters.ATTACHMENT & ~filters.COMMAND, attachment))
application.run_polling()