Some improvements and new features

main
pluja 2023-04-12 12:42:28 +02:00
rodzic be0b0d9c4a
commit 798b9c23b8
4 zmienionych plików z 140 dodań i 47 usunięć

3
.gitignore vendored
Wyświetl plik

@ -1,2 +1,3 @@
docker-compose.yml
venv/
venv/
.env

Wyświetl plik

@ -10,6 +10,7 @@ A telegram bot to interact with OpenAI API. You can:
Other features include:
- Talk to ChatGPT with audio transcriptions (whisper).
- Clear ChatGPT context history (to save tokens).
- Reply to any message to use it as context for ChatGPT.
- Per-user context and usage metrics and spent $.
@ -48,6 +49,7 @@ Self hosting this chatbot is pretty easy. You just need to follow this steps:
3. Set your ALLOWED_USERS (comma separated user ids). Set it to `*` to allow all users.
4. Set the SYSTEM_PROMPT for ChatGPT. This is always instructed to ChatGPT as the system.
5. Optional: Edit the MAX_CONTEXT. This variable sets the number of messages that will be sent to ChatGPT API as context for the conversation.
6. WHISPER_TO_CHAT allows you to choose wether Whisper transcripts should be instructed to ChatGPT or not.
4. Build and start the bot: `docker compose up --build -d`.
5. Enjoy!

Wyświetl plik

@ -8,5 +8,6 @@ services:
- CHATGPT_SYSTEM_PROMPT=You are a helpful assistant.
- CHATGPT_TEMPERATURE=1.0
- CHATGPT_MAX_USER_CONTEXT=5
- WHISPER_TO_CHAT=1
- BOT_TOKEN=XXX
- BOT_ALLOWED_USERS=USER_ID_1,USER_ID_2

181
main.py
Wyświetl plik

@ -11,7 +11,8 @@ from functools import wraps
from telegram.constants import ChatAction
from functools import wraps
from telegram.error import BadRequest, RetryAfter, TimedOut
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes, MessageHandler, filters
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes, MessageHandler, filters, CallbackQueryHandler
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
@ -24,7 +25,14 @@ load_dotenv()
if os.environ.get("OPENAI_API_KEY") is None:
print("OpenAI_API_KEY is not set in.env file or OPENAI_API_KEY environment variable is not set")
exit(1)
ALLOWED_USERS=os.environ.get("BOT_ALLOWED_USERS").split(",")
SYSTEM_PROMPT=os.environ.get("CHATGPT_SYSTEM_PROMPT")
TEMPERATURE=os.environ.get("CHATGPT_TEMPERATURE")
MODEL=os.environ.get("OPENAI_MODEL")
WHISPER_TO_CHAT=bool(int(os.environ.get("WHISPER_TO_CHAT")))
MAX_USER_CONTEXT=int(os.environ.get("CHATGPT_MAX_USER_CONTEXT"))
openai.api_key = os.environ.get("OPENAI_API_KEY")
users = {
"userid": {
@ -33,14 +41,14 @@ users = {
"chatgpt": 0,
"whisper": 0,
"dalle": 0,
},
"options": {
"whisper-to-chat": WHISPER_TO_CHAT,
"temperature": 0.9,
"max-context": 5
}
},
}
ALLOWED_USERS=[]
SYSTEM_PROMPT=os.environ.get("CHATGPT_SYSTEM_PROMPT")
TEMPERATURE=os.environ.get("CHATGPT_TEMPERATURE")
MODEL=os.environ.get("OPENAI_MODEL")
MAX_USER_CONTEXT=int(os.environ.get("CHATGPT_MAX_USER_CONTEXT"))
def restricted(func):
@wraps(func)
@ -52,14 +60,53 @@ def restricted(func):
return
else:
if not f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"] = {"context": [], "usage": {"chatgpt": 0,"whisper": 0,"dalle": 0,}}
users[f"{update.effective_chat.id}"] = {"context": [], "usage": {"chatgpt": 0,"whisper": 0,"dalle": 0,}, "options": {"whisper-to-chat": WHISPER_TO_CHAT, "temperature": 0.9, "max-context": 5}}
return await func(update, context, *args, **kwargs)
return wrapped
async def messageGPT(text: str, chat_id: str):
# Initialize user if not present
if chat_id not in users:
users[chat_id] = {"context": [], "usage": {"chatgpt": 0, "whisper": 0, "dalle": 0}}
# Update context
user_context = users[chat_id]["context"]
user_context.append({"role": "user", "content": text})
if len(user_context) > users[chat_id]["options"]["max-context"]:
user_context.pop(0)
# Interact with ChatGPT API and stream the response
response = None
try:
response = openai.ChatCompletion.create(
model=MODEL,
messages=[{"role": "system", "content": SYSTEM_PROMPT}] + user_context,
temperature=float(TEMPERATURE)
)
except:
return "There was a problem with OpenAI, so I can't answer you."
# Initialize variables for streaming
assistant_message = ""
if 'choices' in response:
assistant_message = response['choices'][0]['message']['content']
else:
assistant_message = "There was a problem with OpenAI. Maybe your prompt is forbidden? They like to censor a lot!"
# Update context
user_context.append({"role": "assistant", "content": assistant_message})
if len(user_context) > users[chat_id]["options"]["max-context"]:
user_context.pop(0)
# Update usage
users[chat_id]["usage"]['chatgpt'] += int(response['usage']['total_tokens'])
return assistant_message
@restricted
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not f"{update.effective_chat.id}" in users:
users[f"{update.effective_chat.id}"] = {"context": [], "usage": {"chatgpt": 0,"whisper": 0,"dalle": 0,}}
users[f"{update.effective_chat.id}"] = {"context": [], "usage": {"chatgpt": 0,"whisper": 0,"dalle": 0,}, "options": {"whisper-to-chat": WHISPER_TO_CHAT, "temperature": 0.9, "max-context": 5}}
await context.bot.send_message(chat_id=update.effective_chat.id, text="I'm a bot, please talk to me!")
@restricted
@ -86,11 +133,13 @@ async def attachment(update: Update, context: ContextTypes.DEFAULT_TYPE):
users[f"{chat_id}"]["usage"]['whisper'] = 0
transcript = {'text': ''}
audioMessage = False
# Check if the attachment is a voice message
if update.message.voice:
users[f"{chat_id}"]["usage"]['whisper'] += update.message.voice.duration
file_id = update.message.voice.file_id
file_format = "ogg"
audioMessage = True
# Check if the attachment is a video
elif update.message.video:
users[f"{chat_id}"]["usage"]['whisper'] += update.message.video.duration
@ -107,6 +156,7 @@ async def attachment(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Download the file and convert it if necessary
file = await context.bot.get_file(file_id)
user_id = update.effective_user.id
await file.download_to_drive(f"{user_id}.{file_format}")
if file_format == "ogg":
@ -129,6 +179,10 @@ async def attachment(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Send the transcript
if transcript['text'] == "":
transcript['text'] = "[Silence]"
if audioMessage and users[f"{chat_id}"]["options"]["whisper-to-chat"]:
chatGPT_response = await messageGPT(transcript['text'], str(chat_id))
transcript['text'] = "> " + transcript['text'] + "\n\n " + chatGPT_response
# Check if the transcript length is longer than 4095 characters
if len(transcript['text']) > 4095:
@ -164,37 +218,9 @@ async def chat(update: Update, context: ContextTypes.DEFAULT_TYPE):
else:
user_prompt = update.message.text
# Update context
user_context = users[chat_id]["context"]
user_context.append({"role": "user", "content": user_prompt})
if len(user_context) > MAX_USER_CONTEXT:
user_context.pop(0)
# Interact with ChatGPT API and stream the response
try:
response = openai.ChatCompletion.create(
model=MODEL,
messages=[{"role": "system", "content": SYSTEM_PROMPT}] + user_context,
temperature=float(TEMPERATURE)
)
except:
await context.bot.send_message(chat_id=update.effective_chat.id, text="There was a problem with OpenAI, so I can't answer you.")
# Initialize variables for streaming
assistant_message = ""
if 'choices' in response:
assistant_message = response['choices'][0]['message']['content']
await context.bot.send_message(chat_id=update.effective_chat.id, text=assistant_message)
else:
await context.bot.send_message(chat_id=update.effective_chat.id, text="There was a problem with OpenAI. Maybe your prompt is forbidden? They like to censor a lot!")
# Update context
user_context.append({"role": "assistant", "content": assistant_message})
if len(user_context) > MAX_USER_CONTEXT:
user_context.pop(0)
# Update usage
users[chat_id]["usage"]['chatgpt'] += int(response['usage']['total_tokens'])
# Use messageGPT function to get the response
assistant_message = await messageGPT(user_prompt, chat_id)
await context.bot.send_message(chat_id=update.effective_chat.id, text=assistant_message)
@restricted
async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
@ -215,9 +241,66 @@ async def usage(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
@restricted
async def _help(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
help_message="""Here's what you can do:\n\n- /imagine <prompt> to generate an image with DALL-E\n- Send a message to chat with ChatGPT\n- Send an audio to transcribe to text with Whisper.\n\n- /usage To get your usage statistics.\n - /clear To clear you chatgpt message context (start a new chat)."""
help_message="""Here's what you can do:\n\n
- /imagine <prompt> to generate an image with DALL-E\n- Send a message to chat with ChatGPT\n
- Send an audio to transcribe to text with Whisper.\n\n
- /settings To change your settings.\n
- /usage To get your usage statistics.\n
- /clear To clear you chatgpt message context (start a new chat)."""
await context.bot.send_message(chat_id=update.effective_chat.id, text=help_message)
# Function to generate the settings buttons
def generate_settings_markup(chat_id: str) -> InlineKeyboardMarkup:
keyboard = [
[
InlineKeyboardButton("Increase Temperature", callback_data=f"setting_increase_temperature_{chat_id}"),
InlineKeyboardButton("Decrease Temperature", callback_data=f"setting_decrease_temperature_{chat_id}")
],
[
InlineKeyboardButton("Enable Whisper to Chat", callback_data=f"setting_enable_whisper_{chat_id}"),
InlineKeyboardButton("Disable Whisper to Chat", callback_data=f"setting_disable_whisper_{chat_id}")
],
[
InlineKeyboardButton("Increase Context", callback_data=f"setting_increase_context_{chat_id}"),
InlineKeyboardButton("Decrease Context", callback_data=f"setting_decrease_context_{chat_id}")
]
]
return InlineKeyboardMarkup(keyboard)
@restricted
async def settings(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
settings_markup = generate_settings_markup(chat_id)
await context.bot.send_message(chat_id=chat_id, text="Settings:", reply_markup=settings_markup)
async def settings_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
action, chat_id = query.data.rsplit("_", 1)
if action.startswith("setting_increase_temperature"):
users[chat_id]["options"]["temperature"] = min(users[chat_id]["options"]["temperature"] + 0.1, 1)
elif action.startswith("setting_decrease_temperature"):
users[chat_id]["options"]["temperature"] = max(users[chat_id]["options"]["temperature"] - 0.1, 0)
elif action.startswith("setting_enable_whisper"):
print(f"enabling whisper for {chat_id}")
users[chat_id]["options"]["whisper-to-chat"] = True
elif action.startswith("setting_disable_whisper"):
print(f"disabling whisper for {chat_id}")
users[chat_id]["options"]["whisper-to-chat"] = False
elif action.startswith("setting_increase_context"):
users[chat_id]["options"]["max-context"] = min(users[chat_id]["options"]["max-context"] + 1, MAX_USER_CONTEXT)
elif action.startswith("setting_decrease_context"):
users[chat_id]["options"]["max-context"] = max(users[chat_id]["options"]["max-context"] - 1, 1)
settings_markup = generate_settings_markup(chat_id)
await query.edit_message_text(text="Choose a setting option:", reply_markup=settings_markup)
# Remove the settings message
await context.bot.delete_message(chat_id=query.message.chat_id, message_id=query.message.message_id)
# Send a message displaying the updated settings
settings_message = f"""Updated settings:\n\nTemperature: {users[chat_id]['options']['temperature']}\nWhisper to Chat: {users[chat_id]['options']['whisper-to-chat']}\nContext Length: {users[chat_id]["options"]["max-context"]}"""
await context.bot.send_message(chat_id=chat_id, text=settings_message)
if __name__ == '__main__':
try:
ALLOWED_USERS=os.environ.get("BOT_ALLOWED_USERS").split(",")
@ -229,21 +312,27 @@ if __name__ == '__main__':
application = ApplicationBuilder().token(os.environ.get("BOT_TOKEN")).build()
start_handler = CommandHandler('start', start)
application.add_handler(start_handler),
application.add_handler(start_handler)
clear_handler = CommandHandler('clear', clear)
application.add_handler(clear_handler),
application.add_handler(clear_handler)
info_handler = CommandHandler('usage', usage)
application.add_handler(info_handler),
application.add_handler(info_handler)
help_handler = CommandHandler('help', _help)
application.add_handler(help_handler),
application.add_handler(help_handler)
imagine_handler = CommandHandler('imagine', imagine)
application.add_handler(imagine_handler),
application.add_handler(imagine_handler)
settings_handler = CommandHandler('settings', settings)
application.add_handler(settings_handler)
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, chat))
application.add_handler(MessageHandler(filters.ATTACHMENT & ~filters.COMMAND, attachment))
settings_callback_handler = CallbackQueryHandler(settings_callback)
application.add_handler(settings_callback_handler)
application.run_polling()