kopia lustrzana https://github.com/J-Rios/TLG_JoinCaptchaBot
Some code refactor on member join
rodzic
379448cc99
commit
71c1b1a18e
|
@ -10,9 +10,9 @@ Author:
|
||||||
Creation date:
|
Creation date:
|
||||||
09/09/2018
|
09/09/2018
|
||||||
Last modified date:
|
Last modified date:
|
||||||
30/06/2022
|
14/10/2022
|
||||||
Version:
|
Version:
|
||||||
1.26.4
|
1.27.0
|
||||||
'''
|
'''
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
|
@ -290,7 +290,7 @@ CONST = {
|
||||||
"DEV_DONATION_ADDR": "https://www.buymeacoffee.com/joincaptchabot",
|
"DEV_DONATION_ADDR": "https://www.buymeacoffee.com/joincaptchabot",
|
||||||
|
|
||||||
# Bot version
|
# Bot version
|
||||||
"VERSION": "1.26.4 (30/06/2022)"
|
"VERSION": "1.27.0 (14/10/2022)"
|
||||||
}
|
}
|
||||||
|
|
||||||
# Supported languages list
|
# Supported languages list
|
||||||
|
|
|
@ -12,14 +12,15 @@ Author:
|
||||||
Creation date:
|
Creation date:
|
||||||
09/09/2018
|
09/09/2018
|
||||||
Last modified date:
|
Last modified date:
|
||||||
30/06/2022
|
14/10/2022
|
||||||
Version:
|
Version:
|
||||||
1.26.4
|
1.27.0
|
||||||
'''
|
'''
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
### Imported modules
|
### Imported modules
|
||||||
|
|
||||||
|
from operator import is_
|
||||||
from platform import system as os_system
|
from platform import system as os_system
|
||||||
|
|
||||||
from signal import signal, SIGTERM, SIGINT
|
from signal import signal, SIGTERM, SIGINT
|
||||||
|
@ -71,8 +72,9 @@ from tlgbotutils import (
|
||||||
tlg_answer_callback_query, tlg_delete_msg, tlg_edit_msg_media,
|
tlg_answer_callback_query, tlg_delete_msg, tlg_edit_msg_media,
|
||||||
tlg_ban_user, tlg_kick_user, tlg_user_is_admin, tlg_leave_chat,
|
tlg_ban_user, tlg_kick_user, tlg_user_is_admin, tlg_leave_chat,
|
||||||
tlg_restrict_user, tlg_is_valid_user_id_or_alias, tlg_is_valid_group,
|
tlg_restrict_user, tlg_is_valid_user_id_or_alias, tlg_is_valid_group,
|
||||||
tlg_alias_in_string, tlg_extract_members_status_change,
|
tlg_alias_in_string, tlg_extract_members_status_change, tlg_get_msg,
|
||||||
tlg_is_a_channel_msg_on_discussion_group
|
tlg_is_a_channel_msg_on_discussion_group, tlg_get_user_name,
|
||||||
|
tlg_has_new_member_join_group
|
||||||
)
|
)
|
||||||
|
|
||||||
from constants import (
|
from constants import (
|
||||||
|
@ -606,6 +608,46 @@ def is_captcha_num_solve(captcha_mode, msg_text, solve_num):
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def should_manage_captcha(update, bot):
|
||||||
|
'''Check if the Bot should manage a Captcha process to this Group and
|
||||||
|
Member. It checks if the group is allowed to use the Bot, checks if the
|
||||||
|
member is not an Administrator neither a member added by an Admin, or an
|
||||||
|
added Bot, and checks if the Member is not in any of the allowed users
|
||||||
|
lists.'''
|
||||||
|
chat = update.chat_member.chat
|
||||||
|
join_user = update.chat_member.new_chat_member.user
|
||||||
|
member_added_by = update.chat_member.from_user
|
||||||
|
# Check if Group is not allowed to be used by the Bot
|
||||||
|
if not allowed_in_this_group(bot, chat, member_added_by):
|
||||||
|
tlg_leave_chat(bot, chat.id)
|
||||||
|
return False
|
||||||
|
# Ignore Admins
|
||||||
|
if tlg_user_is_admin(bot, join_user.id, chat.id):
|
||||||
|
printts("[{}] User is an admin.".format(chat.id))
|
||||||
|
printts("Skipping the captcha process.")
|
||||||
|
return False
|
||||||
|
# Ignore Members added by an Admin
|
||||||
|
if tlg_user_is_admin(bot, member_added_by.id, chat.id):
|
||||||
|
printts("[{}] User has been added by an admin.".format(chat.id))
|
||||||
|
printts("Skipping the captcha process.")
|
||||||
|
return False
|
||||||
|
# Ignore if the member that has been join the group is a Bot
|
||||||
|
if join_user.is_bot:
|
||||||
|
printts("[{}] User is a Bot.".format(chat.id))
|
||||||
|
printts("Skipping the captcha process.")
|
||||||
|
return False
|
||||||
|
# Ignore if the member that has joined is in chat ignore list
|
||||||
|
if is_user_in_ignored_list(chat.id, join_user):
|
||||||
|
printts("[{}] User is in ignore list.".format(chat.id))
|
||||||
|
printts("Skipping the captcha process.")
|
||||||
|
return False
|
||||||
|
if is_user_in_allowed_list(join_user):
|
||||||
|
printts("[{}] User is in global allowed list.".format(chat.id))
|
||||||
|
printts("Skipping the captcha process.")
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
### Received Telegram not-command messages handlers
|
### Received Telegram not-command messages handlers
|
||||||
|
|
||||||
|
@ -692,35 +734,16 @@ def chat_member_status_change(update: Update, context: CallbackContext):
|
||||||
update won't be received.'''
|
update won't be received.'''
|
||||||
global new_users
|
global new_users
|
||||||
bot = context.bot
|
bot = context.bot
|
||||||
# Check members changes
|
# Ignore if it is not a new member join
|
||||||
result = tlg_extract_members_status_change(update.chat_member)
|
if not tlg_has_new_member_join_group(update.chat_member):
|
||||||
if result is None:
|
|
||||||
return
|
|
||||||
was_member, is_member = result
|
|
||||||
# Check if it is a new member join
|
|
||||||
if was_member:
|
|
||||||
return
|
|
||||||
if not is_member:
|
|
||||||
return
|
return
|
||||||
# Get Chat data
|
# Get Chat data
|
||||||
chat = update.chat_member.chat
|
chat = update.chat_member.chat
|
||||||
member_added_by = update.chat_member.from_user
|
|
||||||
join_user = update.chat_member.new_chat_member.user
|
join_user = update.chat_member.new_chat_member.user
|
||||||
chat_id = chat.id
|
chat_id = chat.id
|
||||||
# Get User ID
|
# Get User ID and Name
|
||||||
join_user_id = join_user.id
|
join_user_id = join_user.id
|
||||||
# Get user name
|
join_user_name = tlg_get_user_name(join_user, 35)
|
||||||
if join_user.name is not None:
|
|
||||||
join_user_name = join_user.name
|
|
||||||
else:
|
|
||||||
join_user_name = join_user.full_name
|
|
||||||
# If the user name is too long, truncate it to 35 characters
|
|
||||||
if len(join_user_name) > 35:
|
|
||||||
join_user_name = join_user_name[0:35]
|
|
||||||
# Add an unicode Left to Right Mark (LRM) to user name (names fix
|
|
||||||
# for arabic, hebrew, etc.)
|
|
||||||
user_name_lrm = add_lrm(join_user_name)
|
|
||||||
printts(" ")
|
|
||||||
printts("[{}] New join detected: {} ({})".format(chat_id,
|
printts("[{}] New join detected: {} ({})".format(chat_id,
|
||||||
join_user_name, join_user_id))
|
join_user_name, join_user_id))
|
||||||
# Get and update chat data
|
# Get and update chat data
|
||||||
|
@ -734,34 +757,8 @@ def chat_member_status_change(update: Update, context: CallbackContext):
|
||||||
if chat_link:
|
if chat_link:
|
||||||
chat_link = "@{}".format(chat_link)
|
chat_link = "@{}".format(chat_link)
|
||||||
save_config_property(chat_id, "Link", chat_link)
|
save_config_property(chat_id, "Link", chat_link)
|
||||||
# Check if Group is not allowed to be used by the Bot
|
# Check if the Bot should manage a Captcha process to this Group and Member
|
||||||
if not allowed_in_this_group(bot, chat, member_added_by):
|
if not should_manage_captcha(update, bot):
|
||||||
tlg_leave_chat(bot, chat.id)
|
|
||||||
return
|
|
||||||
# Ignore Admins
|
|
||||||
if tlg_user_is_admin(bot, join_user_id, chat_id):
|
|
||||||
printts("[{}] User is an administrator.".format(chat_id))
|
|
||||||
printts("Skipping the captcha process.")
|
|
||||||
return
|
|
||||||
# Ignore Members added by an Admin
|
|
||||||
if tlg_user_is_admin(bot, member_added_by.id, chat_id):
|
|
||||||
printts("[{}] User has been added by an administrator.".format(
|
|
||||||
chat_id))
|
|
||||||
printts("Skipping the captcha process.")
|
|
||||||
return
|
|
||||||
# Ignore if the member that has been join the group is a Bot
|
|
||||||
if join_user.is_bot:
|
|
||||||
printts("[{}] User is a Bot.".format(chat_id))
|
|
||||||
printts("Skipping the captcha process.")
|
|
||||||
return
|
|
||||||
# Ignore if the member that has joined is in chat ignore list
|
|
||||||
if is_user_in_ignored_list(chat_id, join_user):
|
|
||||||
printts("[{}] User is in ignore list.".format(chat_id))
|
|
||||||
printts("Skipping the captcha process.")
|
|
||||||
return
|
|
||||||
if is_user_in_allowed_list(join_user):
|
|
||||||
printts("[{}] User is in global allowed list.".format(chat_id))
|
|
||||||
printts("Skipping the captcha process.")
|
|
||||||
return
|
return
|
||||||
# Check and remove previous join messages of that user (if any)
|
# Check and remove previous join messages of that user (if any)
|
||||||
if chat_id in new_users:
|
if chat_id in new_users:
|
||||||
|
@ -788,7 +785,7 @@ def chat_member_status_change(update: Update, context: CallbackContext):
|
||||||
captcha_num = ""
|
captcha_num = ""
|
||||||
if captcha_mode == "random":
|
if captcha_mode == "random":
|
||||||
captcha_mode = choice(["nums", "math", "poll"])
|
captcha_mode = choice(["nums", "math", "poll"])
|
||||||
# If Captcha Mode Poll is not configured use othe mode
|
# If Captcha Mode Poll is not configured use another mode
|
||||||
if captcha_mode == "poll":
|
if captcha_mode == "poll":
|
||||||
poll_question = get_chat_config(chat_id, "Poll_Q")
|
poll_question = get_chat_config(chat_id, "Poll_Q")
|
||||||
poll_options = get_chat_config(chat_id, "Poll_A")
|
poll_options = get_chat_config(chat_id, "Poll_A")
|
||||||
|
@ -799,7 +796,7 @@ def chat_member_status_change(update: Update, context: CallbackContext):
|
||||||
captcha_mode = choice(["nums", "math"])
|
captcha_mode = choice(["nums", "math"])
|
||||||
if captcha_mode == "button":
|
if captcha_mode == "button":
|
||||||
# Send a button-only challenge
|
# Send a button-only challenge
|
||||||
challenge_text = TEXT[lang]["NEW_USER_BUTTON_MODE"].format(user_name_lrm,
|
challenge_text = TEXT[lang]["NEW_USER_BUTTON_MODE"].format(join_user_name,
|
||||||
chat_title, timeout_str)
|
chat_title, timeout_str)
|
||||||
# Prepare inline keyboard button to let user pass
|
# Prepare inline keyboard button to let user pass
|
||||||
keyboard = [[InlineKeyboardButton(TEXT[lang]["PASS_BTN_TEXT"],
|
keyboard = [[InlineKeyboardButton(TEXT[lang]["PASS_BTN_TEXT"],
|
||||||
|
@ -822,7 +819,7 @@ def chat_member_status_change(update: Update, context: CallbackContext):
|
||||||
# Remove empty strings from options list
|
# Remove empty strings from options list
|
||||||
poll_options = list(filter(None, poll_options))
|
poll_options = list(filter(None, poll_options))
|
||||||
# Send request to solve the poll text message
|
# Send request to solve the poll text message
|
||||||
poll_request_msg_text = TEXT[lang]["POLL_NEW_USER"].format(user_name_lrm,
|
poll_request_msg_text = TEXT[lang]["POLL_NEW_USER"].format(join_user_name,
|
||||||
chat_title, timeout_str)
|
chat_title, timeout_str)
|
||||||
sent_result = tlg_send_selfdestruct_msg(bot, chat_id, poll_request_msg_text)
|
sent_result = tlg_send_selfdestruct_msg(bot, chat_id, poll_request_msg_text)
|
||||||
solve_poll_request_msg_id = None
|
solve_poll_request_msg_id = None
|
||||||
|
@ -860,14 +857,14 @@ def chat_member_status_change(update: Update, context: CallbackContext):
|
||||||
captcha["equation_result"]))
|
captcha["equation_result"]))
|
||||||
# Note: Img caption must be <= 1024 chars
|
# Note: Img caption must be <= 1024 chars
|
||||||
img_caption = TEXT[lang]["NEW_USER_MATH_CAPTION"].format( \
|
img_caption = TEXT[lang]["NEW_USER_MATH_CAPTION"].format( \
|
||||||
user_name_lrm, chat_title, timeout_str)
|
join_user_name, chat_title, timeout_str)
|
||||||
else:
|
else:
|
||||||
captcha_num = captcha["characters"]
|
captcha_num = captcha["characters"]
|
||||||
printts("[{}] Sending captcha message to {}: {}...".format( \
|
printts("[{}] Sending captcha message to {}: {}...".format( \
|
||||||
chat_id, join_user_name, captcha_num))
|
chat_id, join_user_name, captcha_num))
|
||||||
# Note: Img caption must be <= 1024 chars
|
# Note: Img caption must be <= 1024 chars
|
||||||
img_caption = TEXT[lang]["NEW_USER_IMG_CAPTION"].format( \
|
img_caption = TEXT[lang]["NEW_USER_IMG_CAPTION"].format( \
|
||||||
user_name_lrm, chat_title, timeout_str)
|
join_user_name, chat_title, timeout_str)
|
||||||
# Prepare inline keyboard button to let user request another captcha
|
# Prepare inline keyboard button to let user request another captcha
|
||||||
keyboard = [[InlineKeyboardButton(TEXT[lang]["OTHER_CAPTCHA_BTN_TEXT"],
|
keyboard = [[InlineKeyboardButton(TEXT[lang]["OTHER_CAPTCHA_BTN_TEXT"],
|
||||||
callback_data="image_captcha {}".format(join_user_id))]]
|
callback_data="image_captcha {}".format(join_user_id))]]
|
||||||
|
@ -929,11 +926,13 @@ def msg_user_joined_group(update: Update, context: CallbackContext):
|
||||||
'''New member join the group event handler'''
|
'''New member join the group event handler'''
|
||||||
global new_users
|
global new_users
|
||||||
# Get message data
|
# Get message data
|
||||||
update_msg = getattr(update, "message", None)
|
chat_id = None
|
||||||
if update_msg is None:
|
update_msg = tlg_get_msg(update)
|
||||||
return
|
if update_msg is not None:
|
||||||
chat_id = getattr(update_msg, "chat_id", None)
|
chat_id = getattr(update_msg, "chat_id", None)
|
||||||
if chat_id is None:
|
if (update_msg is None) or (chat_id is None):
|
||||||
|
print("Warning: Received an unexpected update.")
|
||||||
|
print(update)
|
||||||
return
|
return
|
||||||
msg_id = getattr(update_msg, "message_id", None)
|
msg_id = getattr(update_msg, "message_id", None)
|
||||||
if msg_id is None:
|
if msg_id is None:
|
||||||
|
@ -958,26 +957,15 @@ def msg_notext(update: Update, context: CallbackContext):
|
||||||
'''All non-text messages handler.'''
|
'''All non-text messages handler.'''
|
||||||
bot = context.bot
|
bot = context.bot
|
||||||
# Get message data
|
# Get message data
|
||||||
update_msg = getattr(update, "message", None)
|
chat = None
|
||||||
if update_msg is None:
|
chat_id = None
|
||||||
update_msg = getattr(update, "edited_message", None)
|
update_msg = None
|
||||||
if update_msg is None:
|
update_msg = tlg_get_msg(update)
|
||||||
update_msg = getattr(update, "channel_post", None)
|
if update_msg is not None:
|
||||||
if update_msg is not None:
|
chat = getattr(update_msg, "chat", None)
|
||||||
chat = getattr(update_msg, "chat", None)
|
chat_id = getattr(update_msg, "chat_id", None)
|
||||||
if chat is not None:
|
if (update_msg is None) or (chat is None) or (chat_id is None):
|
||||||
return
|
print("Warning: Received an unexpected update.")
|
||||||
print("Warning: Received an unexpected no-text update.")
|
|
||||||
print(update)
|
|
||||||
return
|
|
||||||
chat_id = getattr(update_msg, "chat_id", None)
|
|
||||||
if chat_id is None:
|
|
||||||
print("Warning: Received an unexpected no-text update without chat id.")
|
|
||||||
print(update)
|
|
||||||
return
|
|
||||||
chat = getattr(update_msg, "chat", None)
|
|
||||||
if chat is None:
|
|
||||||
print("Warning: Received an unexpected no-text update without chat.")
|
|
||||||
print(update)
|
print(update)
|
||||||
return
|
return
|
||||||
# Ignore if message comes from a private chat
|
# Ignore if message comes from a private chat
|
||||||
|
@ -1018,26 +1006,15 @@ def msg_nocmd(update: Update, context: CallbackContext):
|
||||||
global new_users
|
global new_users
|
||||||
bot = context.bot
|
bot = context.bot
|
||||||
# Get message data
|
# Get message data
|
||||||
update_msg = getattr(update, "message", None)
|
chat = None
|
||||||
if update_msg is None:
|
chat_id = None
|
||||||
update_msg = getattr(update, "edited_message", None)
|
update_msg = None
|
||||||
if update_msg is None:
|
update_msg = tlg_get_msg(update)
|
||||||
update_msg = getattr(update, "channel_post", None)
|
if update_msg is not None:
|
||||||
if update_msg is not None:
|
chat = getattr(update_msg, "chat", None)
|
||||||
chat = getattr(update_msg, "chat", None)
|
chat_id = getattr(update_msg, "chat_id", None)
|
||||||
if chat is not None:
|
if (update_msg is None) or (chat is None) or (chat_id is None):
|
||||||
return
|
print("Warning: Received an unexpected update.")
|
||||||
print("Warning: Received an unexpected no-command update.")
|
|
||||||
print(update)
|
|
||||||
return
|
|
||||||
chat_id = getattr(update_msg, "chat_id", None)
|
|
||||||
if chat_id is None:
|
|
||||||
print("Warning: Received an unexpected no-command update without chat id.")
|
|
||||||
print(update)
|
|
||||||
return
|
|
||||||
chat = getattr(update_msg, "chat", None)
|
|
||||||
if chat is None:
|
|
||||||
print("Warning: Received an unexpected no-command update without chat.")
|
|
||||||
print(update)
|
print(update)
|
||||||
return
|
return
|
||||||
# Ignore if message comes from a private chat
|
# Ignore if message comes from a private chat
|
||||||
|
|
|
@ -10,9 +10,9 @@ Author:
|
||||||
Creation date:
|
Creation date:
|
||||||
02/11/2020
|
02/11/2020
|
||||||
Last modified date:
|
Last modified date:
|
||||||
13/06/2022
|
14/10/2022
|
||||||
Version:
|
Version:
|
||||||
1.1.4
|
1.1.5
|
||||||
'''
|
'''
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
|
@ -27,7 +27,7 @@ from telegram import (
|
||||||
|
|
||||||
from telegram.utils.helpers import DEFAULT_NONE
|
from telegram.utils.helpers import DEFAULT_NONE
|
||||||
|
|
||||||
from commons import printts
|
from commons import printts, add_lrm
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
### Specific Telegram constants
|
### Specific Telegram constants
|
||||||
|
@ -391,3 +391,50 @@ def tlg_extract_members_status_change(
|
||||||
ChatMember.ADMINISTRATOR,
|
ChatMember.ADMINISTRATOR,
|
||||||
] or (new_status == ChatMember.RESTRICTED and new_is_member is True)
|
] or (new_status == ChatMember.RESTRICTED and new_is_member is True)
|
||||||
return was_member, is_member
|
return was_member, is_member
|
||||||
|
|
||||||
|
|
||||||
|
def tlg_get_user_name(user, truncate_name_len=0):
|
||||||
|
'''Get and return a Telegram member username. It allows to truncate the
|
||||||
|
name if argument for that ar provided. It applies a LRM mark to ensure and
|
||||||
|
fix any possible representation error due Right-To-Left language texts."'''
|
||||||
|
user_name = ""
|
||||||
|
if user is None:
|
||||||
|
return "None"
|
||||||
|
if user.name is not None:
|
||||||
|
user_name = user.name
|
||||||
|
else:
|
||||||
|
user_name = user.full_name
|
||||||
|
# If the user name is too long, truncate it to specified num of characters
|
||||||
|
if truncate_name_len > 0:
|
||||||
|
if len(user_name) > truncate_name_len:
|
||||||
|
user_name = user_name[0:truncate_name_len]
|
||||||
|
# Add an unicode Left to Right Mark (LRM) to user name (names fix for
|
||||||
|
# arabic, hebrew, etc.)
|
||||||
|
user_name = add_lrm(user_name)
|
||||||
|
return user_name
|
||||||
|
|
||||||
|
|
||||||
|
def tlg_has_new_member_join_group(chat_member):
|
||||||
|
'''Check chat members status changes and detect if the provided member has
|
||||||
|
join the current group.'''
|
||||||
|
# Check members changes
|
||||||
|
result = tlg_extract_members_status_change(chat_member)
|
||||||
|
if result is None:
|
||||||
|
return False
|
||||||
|
was_member, is_member = result
|
||||||
|
# Check if it is a new member join
|
||||||
|
if was_member:
|
||||||
|
return False
|
||||||
|
if not is_member:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def tlg_get_msg(update):
|
||||||
|
'''Get Telegram message data from the Update element.'''
|
||||||
|
msg = getattr(update, "message", None)
|
||||||
|
if msg is None:
|
||||||
|
msg = getattr(update, "edited_message", None)
|
||||||
|
if msg is None:
|
||||||
|
msg = getattr(update, "channel_post", None)
|
||||||
|
return msg
|
||||||
|
|
Ładowanie…
Reference in New Issue