openai-telegram-bot/main.py

265 wiersze
10 KiB
Python

import os
import re
import openai
import logging
import asyncio
import math
from dotenv import load_dotenv
from pydub import AudioSegment
from telegram import Update
from functools import wraps
from telegram.error import BadRequest, RetryAfter
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes, MessageHandler, filters
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
load_dotenv()
if os.environ.get("OPENAI_API_KEY") is None:
print("OpenAI_API_KEY is not set in.env file or OPENAI_API_KEY environment variable is not set")
exit(1)
openai.api_key = os.environ.get("OPENAI_API_KEY")
users = {
"userid": {
"context": [],
"usage": {
"chatgpt": 0,
"whisper": 0,
"dalle": 0,
}
},
}
ALLOWED_USERS=[]
SYSTEM_PROMPT=os.environ.get("CHATGPT_SYSTEM_PROMPT")
TEMPERATURE=os.environ.get("CHATGPT_TEMPERATURE")
MODEL=os.environ.get("OPENAI_MODEL")
MAX_USER_CONTEXT=int(os.environ.get("CHATGPT_MAX_USER_CONTEXT"))
def restricted(func):
@wraps(func)
async def wrapped(update, context, *args, **kwargs):
user_id = update.effective_user.id
if str(user_id) not in ALLOWED_USERS:
if "*" != ALLOWED_USERS[0]:
print(f"Unauthorized access denied for {user_id}.")
return
else:
if not f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"] = {"context": [], "usage": {"chatgpt": 0,"whisper": 0,"dalle": 0,}}
return await func(update, context, *args, **kwargs)
return wrapped
@restricted
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"] = {"context": [], "usage": {"chatgpt": 0,"whisper": 0,"dalle": 0,}}
await context.bot.send_message(chat_id=update.effective_chat.id, text="I'm a bot, please talk to me!")
@restricted
async def imagine(update: Update, context: ContextTypes.DEFAULT_TYPE):
users[f"{update.effective_chat.id}"]["usage"]['dalle'] += 1
response = openai.Image.create(
prompt=update.message.text,
n=1,
size="1024x1024"
)
try:
image_url = response['data'][0]['url']
await context.bot.send_message(chat_id=update.effective_chat.id, text=image_url)
except:
await context.bot.send_message(chat_id=update.effective_chat.id, text="Error generating. Your prompt may contain text that is not allowed by OpenAI safety system.")
@restricted
async def attachment(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Initialize variables
chat_id = update.effective_chat.id
user_id = update.effective_user.id
users[f"{chat_id}"]["usage"]['whisper'] = 0
transcript = {'text': ''}
# Check if the attachment is a voice message
if update.message.voice:
users[f"{chat_id}"]["usage"]['whisper'] += update.message.voice.duration
file_id = update.message.voice.file_id
file_format = "ogg"
# Check if the attachment is a video
elif update.message.video:
users[f"{chat_id}"]["usage"]['whisper'] += update.message.video.duration
file_id = update.message.video.file_id
file_format = "mp4"
# Check if the attachment is an audio file
elif update.message.audio:
users[f"{chat_id}"]["usage"]['whisper'] += update.message.audio.duration
file_id = update.message.audio.file_id
file_format = "mp3"
else:
await context.bot.send_message(chat_id=chat_id, text="Can't handle such file. Reason: unknown.")
return
# Download the file and convert it if necessary
file = await context.bot.get_file(file_id)
await file.download_to_drive(f"{user_id}.{file_format}")
if file_format == "ogg":
ogg_audio = AudioSegment.from_file(f"{user_id}.ogg", format="ogg")
ogg_audio.export(f"{user_id}.mp3", format="mp3")
os.remove(f"{user_id}.ogg")
file_format = "mp3"
# Transcribe the audio
with open(f"{user_id}.{file_format}", "rb") as audio_file:
try:
transcript = openai.Audio.transcribe("whisper-1", audio_file)
except:
await context.bot.send_message(chat_id=chat_id, text="Transcript failed.")
os.remove(f"{user_id}.{file_format}")
return
os.remove(f"{user_id}.{file_format}")
# Send the transcript
if transcript['text'] == "":
transcript['text'] = "[Silence]"
# Check if the transcript length is longer than 4095 characters
if len(transcript['text']) > 4095:
# Split the transcript into multiple messages without breaking words in half
max_length = 4096
words = transcript['text'].split()
current_message = ""
for word in words:
if len(current_message) + len(word) + 1 > max_length:
await context.bot.send_message(chat_id=chat_id, text=current_message)
current_message = ""
current_message += f"{word} "
if current_message:
await context.bot.send_message(chat_id=chat_id, text=current_message)
else:
await context.bot.send_message(chat_id=chat_id, text=transcript['text'])
@restricted
async def chat(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = str(update.effective_chat.id)
# Initialize user if not present
if chat_id not in users:
users[chat_id] = {"context": [], "usage": {"chatgpt": 0, "whisper": 0, "dalle": 0}}
# Check if replying and add context
if hasattr(update.message.reply_to_message, "text"):
user_prompt = f"In reply to: '{update.message.reply_to_message.text}' \n---\n {update.message.text}"
else:
user_prompt = update.message.text
# Update context
user_context = users[chat_id]["context"]
user_context.append({"role": "user", "content": user_prompt})
if len(user_context) > MAX_USER_CONTEXT:
user_context.pop(0)
# Interact with ChatGPT API and stream the response
response = openai.ChatCompletion.create(
model=MODEL,
messages=[{"role": "system", "content": SYSTEM_PROMPT}] + user_context,
stream=True,
temperature=float(TEMPERATURE)
)
# Initialize variables for streaming
assistant_message = ""
message_sent = False
sent_message = None
# Process response chunks
batch = 3 # Batches of 10 to update
for chunk in response:
batch -= 1
if 'choices' in chunk:
choice = chunk['choices'][0]
if 'delta' in choice and 'content' in choice['delta']:
new_content = choice['delta']['content']
assistant_message += new_content
# Edit the message in real time
if not message_sent:
sent_message = await context.bot.send_message(chat_id=update.effective_chat.id, text=assistant_message)
message_sent = True
else:
if batch == 0:
try:
if new_content.strip() != "":
await context.bot.edit_message_text(chat_id=update.effective_chat.id, message_id=sent_message.message_id, text=assistant_message)
except BadRequest as e:
if "Message is not modified" not in str(e):
raise e
except RetryAfter as e:
await asyncio.sleep(e.retry_after)
batch = 3
# Update context
user_context.append({"role": "assistant", "content": assistant_message})
if len(user_context) > MAX_USER_CONTEXT:
user_context.pop(0)
# Update usage
users[chat_id]["usage"]['chatgpt'] += round(len(str(user_context))/3, 0)
@restricted
async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"]["context"] = []
print(f"Cleared context for {update.effective_chat.id}")
await update.message.reply_text(f'Your message context history was cleared.')
@restricted
async def usage(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_info=users[f"{update.effective_chat.id}"]["usage"]
total_spent=0.0
total_spent+=(user_info['chatgpt']/750)*0.002
total_spent+=float(user_info['dalle'])*0.02
total_spent+=(user_info['whisper']/60.0)*0.006
info_message=f"""User: {update.effective_user.name}\n- Used ~{user_info["chatgpt"]} tokens with ChatGPT.\n- Generated {user_info["dalle"]} images with DALL-E.\n- Transcribed {round(float(user_info["whisper"])/60.0, 2)}min with Whisper.\n\nTotal spent: ${str(total_spent)}"""
await context.bot.send_message(chat_id=update.effective_chat.id, text=info_message)
@restricted
async def _help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
help_message="""Here's what you can do:\n\n- /imagine <prompt> to generate an image with DALL-E\n- Send a message to chat with ChatGPT\n- Send an audio to transcribe to text with Whisper.\n\n- /usage To get your usage statistics.\n - /clear To clear you chatgpt message context (start a new chat)."""
await context.bot.send_message(chat_id=update.effective_chat.id, text=help_message)
if __name__ == '__main__':
try:
ALLOWED_USERS=os.environ.get("BOT_ALLOWED_USERS").split(",")
except:
ALLOWED_USERS=ALLOWED_USERS
print(f"Allowed users: {ALLOWED_USERS}")
print(f"System prompt: {SYSTEM_PROMPT}")
application = ApplicationBuilder().token(os.environ.get("BOT_TOKEN")).build()
start_handler = CommandHandler('start', start)
application.add_handler(start_handler),
clear_handler = CommandHandler('clear', clear)
application.add_handler(clear_handler),
info_handler = CommandHandler('usage', usage)
application.add_handler(info_handler),
help_handler = CommandHandler('help', _help)
application.add_handler(help_handler),
imagine_handler = CommandHandler('imagine', imagine)
application.add_handler(imagine_handler),
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, chat))
application.add_handler(MessageHandler(filters.ATTACHMENT & ~filters.COMMAND, attachment))
application.run_polling()