Porównaj commity

...

6 Commity

9 zmienionych plików z 323 dodań i 132 usunięć

9
.gitignore vendored
Wyświetl plik

@ -1,4 +1,11 @@
venv/
.env
*.db
__pycache__/
__pycache__/
db_data
*.ogg
piper/*.so*
piper/piper
piper/espeak*
piper/voices
MODEL_CARD

Wyświetl plik

@ -1,8 +1,16 @@
FROM python:3.10-slim
RUN apt update && apt install -y ffmpeg libespeak1
# Set the voice language
ARG VOICE_LANGUAGE=en
RUN apt update && apt install -y ffmpeg wget libespeak1
WORKDIR /app
COPY ./entrypoint.sh /app
RUN chmod +x /app/entrypoint.sh
COPY ./piper /app/piper
COPY ./main.py /app
COPY ./database.py /app
COPY ./requirements.txt /app
@ -11,4 +19,4 @@ RUN mkdir db_data
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
CMD [ "python3", "/app/main.py" ]
ENTRYPOINT [ "/app/entrypoint.sh" ]

Wyświetl plik

@ -10,7 +10,7 @@ A telegram bot to interact with OpenAI API. You can:
- Voice chat with ChatGPT:
- Send voice message.
- Receive voice messages.
- Use GoogleTTS or 100% local Espeak (more robotic).
- Use 100% local Text-To-Speech with Language Recognition to give ChatGPT a voice in many languages!
Other features include:
@ -53,7 +53,7 @@ Self hosting this chatbot is pretty easy. You just need to follow this steps:
- Set your ALLOWED_USERS (comma separated user ids). Set it to `*` to allow all users.
- Set the SYSTEM_PROMPT for ChatGPT. This is always instructed to ChatGPT as the system.
- Optional: Edit the MAX_CONTEXT. This variable sets the number of messages that will be sent to ChatGPT API as context for the conversation.
- WHISPER_TO_CHAT allows you to choose wether Whisper transcripts should be instructed to ChatGPT or not.
- WHISPER_TO_GPT allows you to choose wether Whisper transcripts should be instructed to ChatGPT or not.
- You can also configure this using `/settings` in chat.
- ENABLE_GOOGLE_TTS the TTS service will be provided by GoogleTTS, producing more natural voices. If disabled, it fallsback to local voice generation using Espeak.
- VOICE_LANGUAGE country code for the default voice accent.

7
entrypoint.sh 100644
Wyświetl plik

@ -0,0 +1,7 @@
#!/bin/bash
echo "Installing piper for text to voice conversion..."
bash /app/piper/get-piper.sh
echo "Bot starting..."
python3 -u /app/main.py

Wyświetl plik

@ -6,10 +6,13 @@ CHATGPT_MAX_USER_CONTEXT=5
CHATGPT_TEMPERATURE=1.0
# Use Whisper transcript from voice message with ChatGPT
WHISPER_TO_CHAT=1
# Use Google TTS for speech to text
ENABLE_GOOGLE_TTS=0
VOICE_LANGUAGE=en # en, es, fr, de, it, pt, ru, ja, ko
WHISPER_TO_GPT=1
# TTS Options
ENABLE_TTS=1
# If USE_TTS=1, you can set the following options
VOICE_LANGUAGE_LIST=en,es,fr,it,pt,ca
DEFAULT_VOICE_LANGUAGE=en
BOT_TOKEN=your-telegram-bot-token
BOT_ALLOWED_USERS= XXXX,YYYY # Comma separated list of Telegram user IDs

316
main.py
Wyświetl plik

@ -1,32 +1,33 @@
import asyncio
import logging
import os
import tempfile
from functools import wraps
from io import BytesIO
import subprocess
import openai
import pyttsx3
from aiogram import Bot, Dispatcher, types
from aiogram.contrib.middlewares.logging import LoggingMiddleware
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup, ParseMode
from aiogram.utils import executor
from dotenv import load_dotenv
from gtts import gTTS
from pydub import AudioSegment
from langdetect import detect
import database
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s',
level=logging.INFO
format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
# Envrionment Variables Load
load_dotenv()
if os.environ.get("OPENAI_API_KEY") is None:
print("OpenAI_API_KEY is not set in.env file or OPENAI_API_KEY environment variable is not set")
print(
"OpenAI_API_KEY is not set in.env file or OPENAI_API_KEY environment variable is not set"
)
exit(1)
BOT_TOKEN = os.getenv("BOT_TOKEN")
@ -39,12 +40,17 @@ ALLOWED_USERS = os.environ.get("BOT_ALLOWED_USERS").split(",")
SYSTEM_PROMPT = os.environ.get("CHATGPT_SYSTEM_PROMPT")
TEMPERATURE = os.environ.get("CHATGPT_TEMPERATURE")
MODEL = os.environ.get("OPENAI_MODEL")
WHISPER_TO_CHAT = bool(int(os.environ.get("WHISPER_TO_CHAT")))
ENABLE_GOOGLE_TTS = bool(int(os.environ.get("ENABLE_GOOGLE_TTS")))
VOICE_LANGUAGE = os.environ.get("VOICE_LANGUAGE")
WHISPER_TO_GPT = bool(int(os.environ.get("WHISPER_TO_GPT")))
# TTS Settings
ENABLE_TTS = bool(int(os.environ.get("ENABLE_TTS")))
DEFAULT_VOICE_LANGUAGE = os.environ.get("DEFAULT_VOICE_LANGUAGE")
VOICE_LANGUAGE_LIST = os.environ.get("VOICE_LANGUAGE_LIST")
MAX_USER_CONTEXT = int(os.environ.get("CHATGPT_MAX_USER_CONTEXT"))
openai.api_key = os.environ.get("OPENAI_API_KEY")
async def getUserData(chat_id):
user_data = database.get_user(chat_id)
if not user_data:
@ -52,75 +58,92 @@ async def getUserData(chat_id):
"context": [],
"usage": {"chatgpt": 0, "whisper": 0, "dalle": 0},
"options": {
"whisper_to_chat": WHISPER_TO_CHAT,
"whisper_to_chat": WHISPER_TO_GPT,
"assistant_voice_chat": False,
"temperature": float(TEMPERATURE),
"max-context": MAX_USER_CONTEXT
}
"max-context": MAX_USER_CONTEXT,
},
}
database.add_user(chat_id, user_data)
user_data = database.get_user(chat_id)
return user_data
def generate_settings_markup(chat_id: str) -> InlineKeyboardMarkup:
keyboard = [
[
InlineKeyboardButton("Increase Temperature", callback_data=f"setting_inc_temp_{chat_id}"),
InlineKeyboardButton("Decrease Temperature", callback_data=f"setting_dec_temp_{chat_id}")
InlineKeyboardButton(
"Increase Temperature", callback_data=f"setting_inc_temp_{chat_id}"
),
InlineKeyboardButton(
"Decrease Temperature", callback_data=f"setting_dec_temp_{chat_id}"
),
],
[
InlineKeyboardButton("Enable Whisper", callback_data=f"setting_en_whisper_{chat_id}"),
InlineKeyboardButton("Disable Whisper", callback_data=f"setting_dis_whisper_{chat_id}")
InlineKeyboardButton(
"Enable Whisper", callback_data=f"setting_en_whisper_{chat_id}"
),
InlineKeyboardButton(
"Disable Whisper", callback_data=f"setting_dis_whisper_{chat_id}"
),
],
[
InlineKeyboardButton("Enable assistant voice", callback_data=f"setting_en_voice_{chat_id}"),
InlineKeyboardButton("Disable assistant voice", callback_data=f"setting_dis_voice_{chat_id}")
InlineKeyboardButton(
"Enable assistant voice", callback_data=f"setting_en_voice_{chat_id}"
),
InlineKeyboardButton(
"Disable assistant voice", callback_data=f"setting_dis_voice_{chat_id}"
),
],
[
InlineKeyboardButton("Increase Context", callback_data=f"setting_inc_context_{chat_id}"),
InlineKeyboardButton("Decrease Context", callback_data=f"setting_dec_context_{chat_id}")
]
InlineKeyboardButton(
"Increase Context", callback_data=f"setting_inc_context_{chat_id}"
),
InlineKeyboardButton(
"Decrease Context", callback_data=f"setting_dec_context_{chat_id}"
),
],
]
return InlineKeyboardMarkup(inline_keyboard=keyboard)
def change_voice(engine, gender='male'):
for voice in engine.getProperty('voices'):
if VOICE_LANGUAGE in voice.languages[0].decode('utf-8') and gender == voice.gender:
engine.setProperty('voice', voice.id)
return True
async def text_to_voice(text: str) -> BytesIO:
with tempfile.NamedTemporaryFile(mode='wb', suffix='.ogg', delete=False) as ogg_file:
temp_filename = ogg_file.name
voice_done = False
async def text_to_voice(text: str, language: str = None) -> BytesIO:
binary_path = "./piper/piper"
if language is None:
language = detect(text[0:100])
model_path = f"./piper/voices/{language}.onnx"
# Generate a unique temporary filename with '.ogg' extension
with tempfile.NamedTemporaryFile(suffix=".ogg", delete=False) as tmp:
tmp_filename = tmp.name
# Run the binary with the escaped text as input and the temp file as output
with open(tmp_filename, "wb") as tmp_file:
process = subprocess.Popen(
[binary_path, "--model", model_path, "--output_file", "-"],
stdin=subprocess.PIPE,
stdout=tmp_file,
stderr=subprocess.PIPE,
text=True,
encoding="utf8",
)
# If Google TTS is enabled, try to use it first
if ENABLE_GOOGLE_TTS:
try:
tts = gTTS(text, lang=VOICE_LANGUAGE)
tts.save(temp_filename)
voice_done = True
except Exception as e:
print("Google TTS failed, falling back to pyttsx3: --> ", e)
# If Google TTS is disabled or failed, use pyttsx3
if not voice_done:
engine = pyttsx3.init()
change_voice(engine)
engine.setProperty('rate', 160)
engine.save_to_file(text, temp_filename)
engine.runAndWait()
engine.stop()
# Add a small delay before reading the file
await asyncio.sleep(1)
# Remove all newlines from the text so that the text is read as a single sentence
text = text.replace("\n", ". ")
process.communicate(input=text)
with open(temp_filename, "rb") as audio_file:
voice_data = BytesIO(audio_file.read())
# Open the file in binary mode and read its content into BytesIO object
with open(tmp_filename, "rb") as file:
bytes_io = BytesIO(file.read())
# Delete the temporary file
os.remove(tmp_filename)
# Return the BytesIO object
return bytes_io
os.remove(temp_filename)
voice_data.seek(0)
return voice_data
def restricted(func):
@wraps(func)
@ -133,44 +156,61 @@ def restricted(func):
else:
_ = await getUserData(user_id)
return await func(message, *args, **kwargs)
return wrapped
async def messageGPT(text: str, chat_id: str, user_name="User", user_data={}):
await bot.send_chat_action(chat_id, action=types.ChatActions.TYPING)
user_data['context'].append({"role": "user", "content": text})
if len(user_data['context']) > user_data["options"]["max-context"]:
user_data['context'].pop(0)
user_data["context"].append({"role": "user", "content": text})
if len(user_data["context"]) > user_data["options"]["max-context"]:
user_data["context"].pop(0)
try:
response = openai.ChatCompletion.create(
model=MODEL,
messages=[{"role": "system", "content": f"You are chatting with {user_name}. {SYSTEM_PROMPT}"}] + user_data['context'],
messages=[
{
"role": "system",
"content": f"You are chatting with {user_name}. {SYSTEM_PROMPT}",
}
]
+ user_data["context"],
temperature=user_data["options"]["temperature"],
)
except Exception as e:
print(e)
return f"There was a problem with OpenAI, so I can't answer you: \n\n{e}"
assistant_message = response.get('choices', [{}])[0].get('message', {"content": None}).get("content", "There was a problem with OpenAI. Maybe your prompt is forbidden? They like to censor a lot!")
assistant_message = (
response.get("choices", [{}])[0]
.get("message", {"content": None})
.get(
"content",
"There was a problem with OpenAI. Maybe your prompt is forbidden? They like to censor a lot!",
)
)
user_data['context'].append({"role": "assistant", "content": assistant_message})
if len(user_data['context']) > user_data["options"]["max-context"]:
user_data['context'].pop(0)
user_data["context"].append({"role": "assistant", "content": assistant_message})
if len(user_data["context"]) > user_data["options"]["max-context"]:
user_data["context"].pop(0)
user_data["usage"]['chatgpt'] += int(response.get('usage', {"total_tokens": 0})["total_tokens"])
user_data["usage"]["chatgpt"] += int(
response.get("usage", {"total_tokens": 0})["total_tokens"]
)
database.update_user(chat_id, user_data)
return assistant_message, user_data
@dp.message_handler(commands=['start'])
@dp.message_handler(commands=["start"])
@restricted
async def start(message: types.Message):
_ = await getUserData(message.chat.id)
await message.reply("Hello, how can I assist you today?")
@dp.message_handler(commands=['clear'], content_types=['text'])
@dp.message_handler(commands=["clear"], content_types=["text"])
@restricted
async def clear(message: types.Message) -> None:
chat_id = str(message.chat.id)
@ -180,8 +220,9 @@ async def clear(message: types.Message) -> None:
database.update_user(chat_id, user_data)
print(f"Cleared context for {message.from_user.full_name}")
await message.reply("Your message context history was cleared.")
@dp.message_handler(commands=['usage'])
@dp.message_handler(commands=["usage"])
@restricted
async def usage(message: types.Message) -> None:
chat_id = str(message.chat.id)
@ -189,8 +230,22 @@ async def usage(message: types.Message) -> None:
user_usage = user_data["usage"]
total_usage = database.get_total_usage()
user_spent = round((((user_usage['chatgpt'] / 750) * 0.002) + (float(user_usage['dalle']) * 0.02) + ((user_usage['whisper'] / 60.0) * 0.006)), 4)
total_spent = round((((total_usage['chatgpt'] / 750) * 0.002) + (float(total_usage['dalle']) * 0.02) + ((total_usage['whisper'] / 60.0) * 0.006)), 4)
user_spent = round(
(
((user_usage["chatgpt"] / 750) * 0.002)
+ (float(user_usage["dalle"]) * 0.02)
+ ((user_usage["whisper"] / 60.0) * 0.006)
),
4,
)
total_spent = round(
(
((total_usage["chatgpt"] / 750) * 0.002)
+ (float(total_usage["dalle"]) * 0.02)
+ ((total_usage["whisper"] / 60.0) * 0.006)
),
4,
)
user_percentage = (user_spent / total_spent) * 100 if total_spent > 0 else 0
@ -210,49 +265,52 @@ Total spent: ${total_spent}"""
await message.reply(info_message)
@dp.message_handler(lambda message: message.chat.type == types.ChatType.PRIVATE, content_types=['text'], regexp='^/imagine')
@dp.message_handler(
lambda message: message.chat.type == types.ChatType.PRIVATE,
content_types=["text"],
regexp="^/imagine",
)
@restricted
async def imagine(message: types.Message):
await bot.send_chat_action(message.chat.id, action=types.ChatActions.TYPING)
user_data = await getUserData(message.chat.id)
user_data["usage"]['dalle'] += 1
user_data["usage"]["dalle"] += 1
database.update_user(message.chat.id, user_data)
response = openai.Image.create(
prompt=message.text,
n=1,
size="1024x1024"
)
response = openai.Image.create(prompt=message.text, n=1, size="1024x1024")
try:
image_url = response['data'][0]['url']
image_url = response["data"][0]["url"]
await message.reply(image_url)
except Exception as e:
print(e)
await message.reply("Error generating. Your prompt may contain text that is not allowed by OpenAI safety system.")
@dp.message_handler(content_types=['photo', 'video', 'audio', 'voice'])
await message.reply(
"Error generating. Your prompt may contain text that is not allowed by OpenAI safety system."
)
@dp.message_handler(content_types=["photo", "video", "audio", "voice"])
@restricted
async def attachment(message: types.Message):
chat_id = message.chat.id
user_data = await getUserData(chat_id)
await bot.send_chat_action(chat_id, action=types.ChatActions.TYPING)
transcript = {'text': ''}
transcript = {"text": ""}
audioMessage = False
if message.voice:
user_data["usage"]['whisper'] += message.voice.duration
user_data["usage"]["whisper"] += message.voice.duration
file_id = message.voice.file_id
file_format = "ogg"
audioMessage = True
elif message.video:
user_data["usage"]['whisper'] += message.video.duration
user_data["usage"]["whisper"] += message.video.duration
file_id = message.video.file_id
file_format = "mp4"
elif message.audio:
user_data["usage"]['whisper'] += message.audio.duration
user_data["usage"]["whisper"] += message.audio.duration
file_id = message.audio.file_id
file_format = "mp3"
else:
@ -262,7 +320,7 @@ async def attachment(message: types.Message):
file = await bot.get_file(file_id)
user_id = message.chat.id
await file.download(f"{user_id}.{file_format}")
if file_format == "ogg":
ogg_audio = AudioSegment.from_file(f"{user_id}.ogg", format="ogg")
ogg_audio.export(f"{user_id}.mp3", format="mp3")
@ -281,28 +339,32 @@ async def attachment(message: types.Message):
os.remove(f"{user_id}.{file_format}")
if transcript['text'] == "":
transcript['text'] = "[Silence]"
if transcript["text"] == "":
transcript["text"] = "[Silence]"
chatGPT_response = False
if audioMessage and user_data["options"]["whisper_to_chat"]:
chatGPT_response, user_data = await messageGPT(transcript['text'], str(chat_id), message.from_user.full_name, user_data)
transcript['text'] = "> " + transcript['text'] + "\n\n" + chatGPT_response
await message.reply(transcript['text'])
chatGPT_response, user_data = await messageGPT(
transcript["text"], str(chat_id), message.from_user.full_name, user_data
)
transcript["text"] = "> " + transcript["text"] + "\n\n" + chatGPT_response
await message.reply(transcript["text"])
if user_data["options"]["assistant_voice_chat"] and chatGPT_response:
await bot.send_chat_action(chat_id, action=types.ChatActions.TYPING)
voice_data = await text_to_voice(chatGPT_response)
await message.reply_voice(voice_data)
await bot.send_chat_action(chat_id, action=types.ChatActions.TYPING)
voice_data = await text_to_voice(chatGPT_response)
await message.reply_voice(voice_data)
database.update_user(str(chat_id), user_data)
@restricted
@dp.message_handler(commands=['settings'])
@dp.message_handler(commands=["settings"])
async def settings(message: types.Message):
chat_id = str(message.chat.id)
settings_markup = generate_settings_markup(chat_id)
await message.reply(text='Settings:', reply_markup=settings_markup)
await message.reply(text="Settings:", reply_markup=settings_markup)
async def settings_callback(callback_query: types.CallbackQuery):
user_data = await getUserData(callback_query.message.chat.id)
@ -310,7 +372,7 @@ async def settings_callback(callback_query: types.CallbackQuery):
options = user_data["options"]
if action.startswith("setting_inc_temp"):
options["temperature"] = min(options["temperature"] + 0.1, 1)
options["temperature"] = min(options["temperature"] + 0.1, 1)
elif action.startswith("setting_dec_temp"):
options["temperature"] = max(options["temperature"] - 0.1, 0)
@ -325,35 +387,45 @@ async def settings_callback(callback_query: types.CallbackQuery):
options["assistant_voice_chat"] = False
elif action.startswith("setting_inc_context"):
options["max-context"] = min(options["max-context"] + 1, MAX_USER_CONTEXT)
options["max-context"] = min(options["max-context"] + 1, MAX_USER_CONTEXT)
elif action.startswith("setting_dec_context"):
options["max-context"] = max(options["max-context"] - 1, 1)
settings_markup = generate_settings_markup(chat_id)
await callback_query.message.edit_text(text='Choose a setting option:', reply_markup=settings_markup)
await callback_query.message.edit_text(
text="Choose a setting option:", reply_markup=settings_markup
)
database.update_user(chat_id, user_data)
settings_txt = f"Updated settings:\n\nTemperature: {options['temperature']}\nWhisper to Chat: {options['whisper_to_chat']}\nAssistant voice: {options['assistant_voice_chat']}\nContext Length: {options['max-context']}"
await callback_query.answer()
await callback_query.message.reply(text=settings_txt)
@dp.message_handler(lambda message: message.chat.type == types.ChatType.PRIVATE and not message.text.startswith("/"), content_types=['text'])
@dp.message_handler(
lambda message: message.chat.type == types.ChatType.PRIVATE
and not message.text.startswith("/"),
content_types=["text"],
)
async def chat(message: types.Message):
chat_id = str(message.chat.id)
user_data = await getUserData(chat_id)
user_prompt = message.text
await bot.send_chat_action(chat_id, action=types.ChatActions.TYPING)
assistant_message, user_data = await messageGPT(user_prompt, chat_id, message.from_user.full_name, user_data)
assistant_message, user_data = await messageGPT(
user_prompt, chat_id, message.from_user.full_name, user_data
)
await message.reply(assistant_message, parse_mode=ParseMode.MARKDOWN)
if user_data["options"]["assistant_voice_chat"]:
await bot.send_chat_action(chat_id, action=types.ChatActions.TYPING)
voice_data = await text_to_voice(assistant_message)
await message.reply_voice(voice_data)
if __name__ == '__main__':
if __name__ == "__main__":
database.init_database()
try:
@ -361,13 +433,15 @@ if __name__ == '__main__':
except Exception as e:
print(e)
ALLOWED_USERS = ALLOWED_USERS
print(f"Allowed users: {ALLOWED_USERS}")
print(f"System prompt: {SYSTEM_PROMPT}")
print(f"Google TTS: {ENABLE_GOOGLE_TTS}")
print(f"TTS: {ENABLE_TTS}")
# Register message handler and callback query handler for settings
dp.register_message_handler(settings, commands=['settings'])
dp.register_callback_query_handler(settings_callback, lambda c: c.data.startswith('setting_'))
executor.start_polling(dp, skip_updates=True)
dp.register_message_handler(settings, commands=["settings"])
dp.register_callback_query_handler(
settings_callback, lambda c: c.data.startswith("setting_")
)
executor.start_polling(dp, skip_updates=True, fast=True)

73
piper/get-piper.sh 100755
Wyświetl plik

@ -0,0 +1,73 @@
#!/bin/bash
source .env
if [ "$ENABLE_TTS" = 1 ]; then
echo "Installing piper for text to voice conversion..."
echo "Downloading piper v0.0.2.."
wget -q https://github.com/rhasspy/piper/releases/download/v0.0.2/piper_amd64.tar.gz && \
echo "Extracting piper" && \
tar -xf piper_amd64.tar.gz && \
rm piper_amd64.tar.gz && \
chmod -R 777 ./piper/ && \
mkdir piper/voices
# Download voices for all or selected languages
if [[ "$VOICE_LANGUAGE_LIST" == "*" ]]; then
langs=( "en" "es" "fr" "it" "pt" "ca" "de" "nl" "no" )
else
IFS=',' read -r -a langs <<< "$VOICE_LANGUAGE_LIST"
fi
echo "Downloading tts voices from VOICE_LANGUAGE_LIST..."
echo "This can take a while..."
for lang in "${langs[@]}"; do
case $lang in
"en" )
voice_file="voice-en-us-ryan-high.tar.gz"
;;
"es" )
voice_file="voice-es-mls_9972-low.tar.gz"
;;
"fr" )
voice_file="voice-fr-siwis-medium.tar.gz"
;;
"it" )
voice_file="voice-it-riccardo_fasol-x-low.tar.gz"
;;
"pt" )
voice_file="voice-pt-br-edresson-low.tar.gz"
;;
"ca" )
voice_file="voice-ca-upc_ona-x-low.tar.gz"
;;
"de" )
voice_file="voice-de-thorsten-low.tar.gz"
;;
"nl" )
voice_file="voice-nl-rdh-medium.tar.gz"
;;
"no" )
voice_file="voice-no-talesyntese-medium.tar.gz"
;;
* )
echo "Ignoring unrecognized language code: $lang"
continue
;;
esac
echo "Downloading $lang voice..."
wget -q https://github.com/rhasspy/piper/releases/download/v0.0.2/$voice_file && \
tar -xf $voice_file && \
rm $voice_file && \
mv $lang-*.onnx piper/voices/$lang.onnx && \
mv $lang-*.onnx.json piper/voices/$lang.onnx.json
echo "Done"
done
echo "Done. Piper installed!"
else
echo "TTS Disabled. No work to do..."
fi

Wyświetl plik

@ -1,6 +1,5 @@
aiogram==2.25.1
gTTS==2.3.1
langdetect==1.0.9
openai==0.27.2
pydub==0.25.1
python-dotenv==1.0.0
pyttsx3==2.90

20
utils.py 100644
Wyświetl plik

@ -0,0 +1,20 @@
import subprocess
import tempfile
import os
def text_to_speech(text: str) -> str:
binary_path = "./piper"
model_path = "blizzard_lessac-medium.onnx"
# Generate a unique temporary filename
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp_filename = tmp.name
# Construct the command to execute the binary
cmd = f"echo '{text}' | {binary_path} --model {model_path} --output_file {tmp_filename}"
# Run the binary and wait for it to finish
subprocess.run(cmd, shell=True, check=True)
# Return the temporary filename
return tmp_filename