2023-03-06 15:20:37 +00:00
import os
import re
import openai
import logging
2023-03-07 20:06:14 +00:00
import math
2023-03-06 15:20:37 +00:00
from pydub import AudioSegment
from telegram import Update
from functools import wraps
from telegram . ext import ApplicationBuilder , CommandHandler , ContextTypes , MessageHandler , filters
logging . basicConfig (
format = ' %(asctime)s - %(name)s - %(levelname)s - %(message)s ' ,
level = logging . INFO
)
logger = logging . getLogger ( __name__ )
openai . api_key = os . getenv ( " OPENAI_API_KEY " )
users = {
" userid " : {
" context " : [ ] ,
" usage " : {
" chatgpt " : 0 ,
" whisper " : 0 ,
" dalle " : 0 ,
}
} ,
}
ALLOWED_USERS = [ ]
SYSTEM_PROMPT = os . getenv ( " CHATGPT_SYSTEM_PROMPT " )
MAX_USER_CONTEXT = int ( os . getenv ( " CHATGPT_MAX_USER_CONTEXT " ) )
def restricted ( func ) :
@wraps ( func )
async def wrapped ( update , context , * args , * * kwargs ) :
user_id = update . effective_user . id
if str ( user_id ) not in ALLOWED_USERS :
if " * " != ALLOWED_USERS [ 0 ] :
print ( f " Unauthorized access denied for { user_id } . " )
return
else :
if not f " { update . effective_chat . id } " in users :
users [ f " { update . effective_chat . id } " ] = { " context " : [ ] , " usage " : { " chatgpt " : 0 , " whisper " : 0 , " dalle " : 0 , } }
return await func ( update , context , * args , * * kwargs )
return wrapped
@restricted
async def start ( update : Update , context : ContextTypes . DEFAULT_TYPE ) :
if not f " { update . effective_chat . id } " in users :
users [ f " { update . effective_chat . id } " ] = { " context " : [ ] , " usage " : { " chatgpt " : 0 , " whisper " : 0 , " dalle " : 0 , } }
await context . bot . send_message ( chat_id = update . effective_chat . id , text = " I ' m a bot, please talk to me! " )
@restricted
async def imagine ( update : Update , context : ContextTypes . DEFAULT_TYPE ) :
users [ f " { update . effective_chat . id } " ] [ " usage " ] [ ' dalle ' ] + = 1
response = openai . Image . create (
prompt = update . message . text ,
n = 1 ,
size = " 1024x1024 "
)
try :
image_url = response [ ' data ' ] [ 0 ] [ ' url ' ]
await context . bot . send_message ( chat_id = update . effective_chat . id , text = image_url )
except :
await context . bot . send_message ( chat_id = update . effective_chat . id , text = " Error generating. Your prompt may contain text that is not allowed by OpenAI safety system. " )
@restricted
async def attachment ( update : Update , context : ContextTypes . DEFAULT_TYPE ) :
2023-03-07 20:06:14 +00:00
if update . message . voice :
2023-03-06 15:20:37 +00:00
users [ f " { update . effective_chat . id } " ] [ " usage " ] [ ' whisper ' ] + = update . message . voice . duration
file = await context . bot . get_file ( update . message . voice . file_id )
await file . download_to_drive ( f " { update . effective_user . id } .ogg " )
ogg_audio = AudioSegment . from_file ( f " { update . effective_user . id } .ogg " , format = " ogg " )
ogg_audio . export ( f " { update . effective_user . id } .mp3 " , format = " mp3 " )
os . remove ( f " { update . effective_user . id } .ogg " )
audio_file = open ( f " { update . effective_user . id } .mp3 " , " rb " )
try :
transcript = openai . Audio . transcribe ( " whisper-1 " , audio_file )
except :
await context . bot . send_message ( chat_id = update . effective_chat . id , text = " Transcript failed. " )
os . remove ( f " { update . effective_user . id } .mp3 " )
if transcript [ ' text ' ] == " " :
transcript [ ' text ' ] = " [Silence] "
2023-03-07 20:06:14 +00:00
await context . bot . send_message ( chat_id = update . effective_chat . id , text = transcript [ ' text ' ] )
elif update . message . video :
users [ f " { update . effective_chat . id } " ] [ " usage " ] [ ' whisper ' ] + = update . message . video . duration
file = await context . bot . get_file ( update . message . video . file_id )
await file . download_to_drive ( f " { update . effective_user . id } .mp4 " )
video_file = open ( f " { update . effective_user . id } .mp4 " , " rb " )
try :
transcript = openai . Audio . transcribe ( " whisper-1 " , video_file )
except :
await context . bot . send_message ( chat_id = update . effective_chat . id , text = " Transcript failed. " )
os . remove ( f " { update . effective_user . id } .mp4 " )
if transcript [ ' text ' ] == " " :
transcript [ ' text ' ] = " [Silence] "
await context . bot . send_message ( chat_id = update . effective_chat . id , text = transcript [ ' text ' ] )
else :
2023-03-06 15:20:37 +00:00
await context . bot . send_message ( chat_id = update . effective_chat . id , text = " Can ' t handle such file. Reason: unkown. " )
@restricted
async def chat ( update : Update , context : ContextTypes . DEFAULT_TYPE ) :
if not f " { update . effective_chat . id } " in users :
users [ f " { update . effective_chat . id } " ] = { " context " : [ ] , " usage " : { " chatgpt " : 0 , " whisper " : 0 , " dalle " : 0 , } }
2023-03-07 20:06:14 +00:00
# If replying, add that as context
if hasattr ( update . message . reply_to_message , " text " ) :
userPrompt = f " In reply to: ' { update . message . reply_to_message . text } ' \n --- \n { update . message . text } "
else :
userPrompt = update . message . text
2023-03-06 15:20:37 +00:00
# Save context
if len ( users [ f " { update . effective_chat . id } " ] [ " context " ] ) < = MAX_USER_CONTEXT :
2023-03-07 20:06:14 +00:00
users [ f " { update . effective_chat . id } " ] [ " context " ] . append ( { " role " : " user " , " content " : f " { userPrompt } " } )
2023-03-06 15:20:37 +00:00
else :
users [ f " { update . effective_chat . id } " ] [ " context " ] . pop ( 0 )
2023-03-07 20:06:14 +00:00
users [ f " { update . effective_chat . id } " ] [ " context " ] . append ( { " role " : " user " , " content " : f " { userPrompt } " } )
2023-03-06 15:20:37 +00:00
2023-03-07 20:06:14 +00:00
2023-03-06 15:20:37 +00:00
# Interact with ChatGPT api
response = openai . ChatCompletion . create (
model = " gpt-3.5-turbo " ,
messages = [ { " role " : " system " , " content " : SYSTEM_PROMPT } ] + users [ f " { update . effective_chat . id } " ] [ " context " ]
)
# Save context
if len ( users [ f " { update . effective_chat . id } " ] [ " context " ] ) < = MAX_USER_CONTEXT :
users [ f " { update . effective_chat . id } " ] [ " context " ] . append ( { " role " : " assistant " , " content " : f " { response [ ' choices ' ] [ 0 ] [ ' message ' ] [ ' content ' ] } " } )
else :
users [ f " { update . effective_chat . id } " ] [ " context " ] . pop ( 0 )
users [ f " { update . effective_chat . id } " ] [ " context " ] . append ( { " role " : " assistant " , " content " : f " { response [ ' choices ' ] [ 0 ] [ ' message ' ] [ ' content ' ] } " } )
# Update usage
users [ f " { update . effective_chat . id } " ] [ " usage " ] [ ' chatgpt ' ] + = len ( str ( users [ f " { update . effective_chat . id } " ] [ " context " ] ) )
# Send reply
await context . bot . send_message ( chat_id = update . effective_chat . id , text = response [ ' choices ' ] [ 0 ] [ ' message ' ] [ ' content ' ] )
@restricted
async def clear ( update : Update , context : ContextTypes . DEFAULT_TYPE ) - > None :
if f " { update . effective_chat . id } " in users :
users [ f " { update . effective_chat . id } " ] [ " context " ] = [ ]
print ( f " Cleared context for { update . effective_chat . id } " )
await update . message . reply_text ( f ' Your message context history was cleared. ' )
@restricted
async def usage ( update : Update , context : ContextTypes . DEFAULT_TYPE ) - > None :
user_info = users [ f " { update . effective_chat . id } " ] [ " usage " ]
total_spent = 0.0
total_spent + = ( user_info [ ' chatgpt ' ] / 750 ) * 0.002
total_spent + = float ( user_info [ ' dalle ' ] ) * 0.02
2023-03-06 16:49:16 +00:00
total_spent + = ( user_info [ ' whisper ' ] / 60.0 ) * 0.006
2023-03-07 20:06:14 +00:00
info_message = f """ User: { update . effective_user . name } \n - Used { user_info [ " chatgpt " ] } characters with ChatGPT. \n - Generated { user_info [ " dalle " ] } images with DALL-E. \n - Transcribed { round ( float ( user_info [ " whisper " ] ) / 60.0 , 2 ) } min with Whisper. \n \n Total spent: $ { str ( total_spent ) } """
2023-03-06 15:20:37 +00:00
await context . bot . send_message ( chat_id = update . effective_chat . id , text = info_message )
@restricted
async def _help ( update : Update , context : ContextTypes . DEFAULT_TYPE ) - > None :
help_message = """ Here ' s what you can do: \n \n - /imagine <prompt> to generate an image with DALL-E \n - Send a message to chat with ChatGPT \n - Send an audio to transcribe to text with Whisper. \n \n - /usage To get your usage statistics. \n - /clear To clear you chatgpt message context (start a new chat). """
await context . bot . send_message ( chat_id = update . effective_chat . id , text = help_message )
if __name__ == ' __main__ ' :
try :
ALLOWED_USERS = os . getenv ( " BOT_ALLOWED_USERS " ) . split ( " , " )
except :
ALLOWED_USERS = ALLOWED_USERS
print ( f " Allowed users: { ALLOWED_USERS } " )
print ( f " System prompt: { SYSTEM_PROMPT } " )
application = ApplicationBuilder ( ) . token ( os . getenv ( " BOT_TOKEN " ) ) . build ( )
start_handler = CommandHandler ( ' start ' , start )
application . add_handler ( start_handler ) ,
clear_handler = CommandHandler ( ' clear ' , clear )
application . add_handler ( clear_handler ) ,
info_handler = CommandHandler ( ' usage ' , usage )
application . add_handler ( info_handler ) ,
help_handler = CommandHandler ( ' help ' , _help )
application . add_handler ( help_handler ) ,
imagine_handler = CommandHandler ( ' imagine ' , imagine )
application . add_handler ( imagine_handler ) ,
application . add_handler ( MessageHandler ( filters . TEXT & ~ filters . COMMAND , chat ) )
application . add_handler ( MessageHandler ( filters . ATTACHMENT & ~ filters . COMMAND , attachment ) )
application . run_polling ( )