| |
| |
| |
| |
| |
| |
|
|
| import re |
| import time |
| from datetime import datetime |
| from os import remove |
|
|
| from git import Repo |
| from telethon import Button |
| from telethon.tl.types import InputWebDocument, Message |
| from telethon.utils import resolve_bot_file_id |
|
|
| from pyUltroid._misc._assistant import callback, in_pattern |
| from pyUltroid.dB._core import HELP, LIST |
| from pyUltroid.fns.helper import gen_chlog, time_formatter, updater |
| from pyUltroid.fns.misc import split_list |
|
|
| from . import ( |
| HNDLR, |
| LOGS, |
| OWNER_NAME, |
| InlinePlugin, |
| asst, |
| get_string, |
| inline_pic, |
| split_list, |
| start_time, |
| udB, |
| ) |
| from ._help import _main_help_menu |
|
|
| |
|
|
| helps = get_string("inline_1") |
|
|
| add_ons = udB.get_key("ADDONS") |
|
|
| zhelps = get_string("inline_3") if add_ons is False else get_string("inline_2") |
| PLUGINS = HELP.get("Official", []) |
| ADDONS = HELP.get("Addons", []) |
| upage = 0 |
| |
|
|
| |
|
|
| SUP_BUTTONS = [ |
| [ |
| Button.url("• Repo •", url="https://github.com/TeamUltroid/Ultroid"), |
| Button.url("• Support •", url="t.me/UltroidSupportChat"), |
| ], |
| ] |
|
|
| |
|
|
|
|
| @in_pattern(owner=True, func=lambda x: not x.text) |
| async def inline_alive(o): |
| TLINK = inline_pic() or "https://graph.org/file/74d6259983e0642923fdb.jpg" |
| MSG = "• **Ultroid Userbot •**" |
| WEB0 = InputWebDocument( |
| "https://graph.org/file/acd4f5d61369f74c5e7a7.jpg", 0, "image/jpg", [] |
| ) |
| RES = [ |
| await o.builder.article( |
| type="photo", |
| text=MSG, |
| include_media=True, |
| buttons=SUP_BUTTONS, |
| title="Ultroid Userbot", |
| description="Userbot | Telethon", |
| url=TLINK, |
| thumb=WEB0, |
| content=InputWebDocument(TLINK, 0, "image/jpg", []), |
| ) |
| ] |
| await o.answer( |
| RES, |
| private=True, |
| cache_time=300, |
| switch_pm="👥 ULTROID PORTAL", |
| switch_pm_param="start", |
| ) |
|
|
|
|
| @in_pattern("ultd", owner=True) |
| async def inline_handler(event): |
| z = [] |
| for x in LIST.values(): |
| z.extend(x) |
| text = get_string("inline_4").format( |
| OWNER_NAME, |
| len(HELP.get("Official", [])), |
| len(HELP.get("Addons", [])), |
| len(z), |
| ) |
| if inline_pic(): |
| result = await event.builder.photo( |
| file=inline_pic(), |
| link_preview=False, |
| text=text, |
| buttons=_main_help_menu, |
| ) |
| else: |
| result = await event.builder.article( |
| title="Ultroid Help Menu", text=text, buttons=_main_help_menu |
| ) |
| await event.answer([result], private=True, cache_time=300, gallery=True) |
|
|
|
|
| @in_pattern("pasta", owner=True) |
| async def _(event): |
| ok = event.text.split("-")[1] |
| if not ok.startswith("http"): |
| link = f"https://spaceb.in/{ok}" |
| raw = f"https://spaceb.in/api/v1/documents/{ok}/raw" |
| else: |
| link = ok |
| raw = f"{ok}/raw" |
| result = await event.builder.article( |
| title="Paste", |
| text="Pasted to Spacebin 🌌", |
| buttons=[ |
| [ |
| Button.url("SpaceBin", url=link), |
| Button.url("Raw", url=raw), |
| ], |
| ], |
| ) |
| await event.answer([result]) |
|
|
|
|
| @callback("ownr", owner=True) |
| async def setting(event): |
| z = [] |
| for x in LIST.values(): |
| z.extend(x) |
| await event.edit( |
| get_string("inline_4").format( |
| OWNER_NAME, |
| len(HELP.get("Official", [])), |
| len(HELP.get("Addons", [])), |
| len(z), |
| ), |
| file=inline_pic(), |
| link_preview=False, |
| buttons=[ |
| [ |
| Button.inline("•Pɪɴɢ•", data="pkng"), |
| Button.inline("•Uᴘᴛɪᴍᴇ•", data="upp"), |
| ], |
| [ |
| Button.inline("•Stats•", data="alive"), |
| Button.inline("•Uᴘᴅᴀᴛᴇ•", data="doupdate"), |
| ], |
| [Button.inline("« Bᴀᴄᴋ", data="open")], |
| ], |
| ) |
|
|
|
|
| _strings = {"Official": helps, "Addons": zhelps, "VCBot": get_string("inline_6")} |
|
|
|
|
| @callback(re.compile("uh_(.*)"), owner=True) |
| async def help_func(ult): |
| key, count = ult.data_match.group(1).decode("utf-8").split("_") |
| if key == "VCBot" and HELP.get("VCBot") is None: |
| return await ult.answer(get_string("help_12"), alert=True) |
| elif key == "Addons" and HELP.get("Addons") is None: |
| return await ult.answer(get_string("help_13").format(HNDLR), alert=True) |
| if "|" in count: |
| _, count = count.split("|") |
| count = int(count) if count else 0 |
| text = _strings.get(key, "").format(OWNER_NAME, len(HELP.get(key))) |
| await ult.edit(text, buttons=page_num(count, key), link_preview=False) |
|
|
|
|
| @callback(re.compile("uplugin_(.*)"), owner=True) |
| async def uptd_plugin(event): |
| key, file = event.data_match.group(1).decode("utf-8").split("_") |
| index = None |
| if "|" in file: |
| file, index = file.split("|") |
| key_ = HELP.get(key, []) |
| hel_p = f"Plugin Name - `{file}`\n" |
| help_ = "" |
| try: |
| for i in key_[file]: |
| help_ += i |
| except BaseException: |
| if file in LIST: |
| help_ = get_string("help_11").format(file) |
| for d in LIST[file]: |
| help_ += HNDLR + d |
| help_ += "\n" |
| if not help_: |
| help_ = f"{file} has no Detailed Help!" |
| help_ += "\n© @TeamUltroid" |
| buttons = [] |
| if inline_pic(): |
| data = f"sndplug_{key}_{file}" |
| if index is not None: |
| data += f"|{index}" |
| buttons.append( |
| [ |
| Button.inline( |
| "« Sᴇɴᴅ Pʟᴜɢɪɴ »", |
| data=data, |
| ) |
| ] |
| ) |
| data = f"uh_{key}_" |
| if index is not None: |
| data += f"|{index}" |
| buttons.append( |
| [ |
| Button.inline("« Bᴀᴄᴋ", data=data), |
| ] |
| ) |
| try: |
| await event.edit(help_, buttons=buttons) |
| except Exception as er: |
| LOGS.exception(er) |
| help = f"Do `{HNDLR}help {key}` to get list of commands." |
| await event.edit(help, buttons=buttons) |
|
|
|
|
| @callback(data="doupdate", owner=True) |
| async def _(event): |
| if not await updater(): |
| return await event.answer(get_string("inline_9"), cache_time=0, alert=True) |
| if not inline_pic(): |
| return await event.answer(f"Do '{HNDLR}update' to update..") |
| repo = Repo.init() |
| changelog, tl_chnglog = await gen_chlog( |
| repo, f"HEAD..upstream/{repo.active_branch}" |
| ) |
| changelog_str = changelog + "\n\n" + get_string("inline_8") |
| if len(changelog_str) > 1024: |
| await event.edit(get_string("upd_4")) |
| with open("ultroid_updates.txt", "w+") as file: |
| file.write(tl_chnglog) |
| await event.edit( |
| get_string("upd_5"), |
| file="ultroid_updates.txt", |
| buttons=[ |
| [Button.inline("• Uᴘᴅᴀᴛᴇ Nᴏᴡ •", data="updatenow")], |
| [Button.inline("« Bᴀᴄᴋ", data="ownr")], |
| ], |
| ) |
| remove("ultroid_updates.txt") |
| else: |
| await event.edit( |
| changelog_str, |
| buttons=[ |
| [Button.inline("Update Now", data="updatenow")], |
| [Button.inline("« Bᴀᴄᴋ", data="ownr")], |
| ], |
| parse_mode="html", |
| ) |
|
|
|
|
| @callback(data="pkng", owner=True) |
| async def _(event): |
| start = datetime.now() |
| end = datetime.now() |
| ms = (end - start).microseconds |
| pin = f"🌋Pɪɴɢ = {ms} microseconds" |
| await event.answer(pin, cache_time=0, alert=True) |
|
|
|
|
| @callback(data="upp", owner=True) |
| async def _(event): |
| uptime = time_formatter((time.time() - start_time) * 1000) |
| pin = f"🙋Uᴘᴛɪᴍᴇ = {uptime}" |
| await event.answer(pin, cache_time=0, alert=True) |
|
|
|
|
| @callback(data="inlone", owner=True) |
| async def _(e): |
| _InButtons = [ |
| Button.switch_inline(_, query=InlinePlugin[_], same_peer=True) |
| for _ in list(InlinePlugin.keys()) |
| ] |
| InButtons = split_list(_InButtons, 2) |
|
|
| button = InButtons.copy() |
| button.append( |
| [ |
| Button.inline("« Bᴀᴄᴋ", data="open"), |
| ], |
| ) |
| await e.edit(buttons=button, link_preview=False) |
|
|
|
|
| @callback(data="open", owner=True) |
| async def opner(event): |
| z = [] |
| for x in LIST.values(): |
| z.extend(x) |
| await event.edit( |
| get_string("inline_4").format( |
| OWNER_NAME, |
| len(HELP.get("Official", [])), |
| len(HELP.get("Addons", [])), |
| len(z), |
| ), |
| buttons=_main_help_menu, |
| link_preview=False, |
| ) |
|
|
|
|
| @callback(data="close", owner=True) |
| async def on_plug_in_callback_query_handler(event): |
| await event.edit( |
| get_string("inline_5"), |
| buttons=Button.inline("Oᴘᴇɴ Aɢᴀɪɴ", data="open"), |
| ) |
|
|
|
|
| def page_num(index, key): |
| rows = udB.get_key("HELP_ROWS") or 5 |
| cols = udB.get_key("HELP_COLUMNS") or 2 |
| loaded = HELP.get(key, []) |
| emoji = udB.get_key("EMOJI_IN_HELP") or "✘" |
| List = [ |
| Button.inline(f"{emoji} {x} {emoji}", data=f"uplugin_{key}_{x}|{index}") |
| for x in sorted(loaded) |
| ] |
| all_ = split_list(List, cols) |
| fl_ = split_list(all_, rows) |
| try: |
| new_ = fl_[index] |
| except IndexError: |
| new_ = fl_[0] if fl_ else [] |
| index = 0 |
| if index == 0 and len(fl_) == 1: |
| new_.append([Button.inline("« Bᴀᴄᴋ »", data="open")]) |
| else: |
| new_.append( |
| [ |
| Button.inline( |
| "« Pʀᴇᴠɪᴏᴜs", |
| data=f"uh_{key}_{index-1}", |
| ), |
| Button.inline("« Bᴀᴄᴋ »", data="open"), |
| Button.inline( |
| "Nᴇxᴛ »", |
| data=f"uh_{key}_{index+1}", |
| ), |
| ] |
| ) |
| return new_ |
|
|
|
|
| |
|
|
|
|
| STUFF = {} |
|
|
|
|
| @in_pattern("stf(.*)", owner=True) |
| async def ibuild(e): |
| n = e.pattern_match.group(1).strip() |
| builder = e.builder |
| if not (n and n.isdigit()): |
| return |
| ok = STUFF.get(int(n)) |
| txt = ok.get("msg") |
| pic = ok.get("media") |
| btn = ok.get("button") |
| if not (pic or txt): |
| txt = "Hey!" |
| if pic: |
| try: |
| include_media = True |
| mime_type, _pic = None, None |
| cont, results = None, None |
| try: |
| ext = str(pic).split(".")[-1].lower() |
| except BaseException: |
| ext = None |
| if ext in ["img", "jpg", "png"]: |
| _type = "photo" |
| mime_type = "image/jpg" |
| elif ext in ["mp4", "mkv", "gif"]: |
| mime_type = "video/mp4" |
| _type = "gif" |
| else: |
| try: |
| if "telethon.tl.types" in str(type(pic)): |
| _pic = pic |
| else: |
| _pic = resolve_bot_file_id(pic) |
| except BaseException: |
| pass |
| if _pic: |
| results = [ |
| await builder.document( |
| _pic, |
| title="Ultroid Op", |
| text=txt, |
| description="@TeamUltroid", |
| buttons=btn, |
| link_preview=False, |
| ) |
| ] |
| else: |
| _type = "article" |
| include_media = False |
| if not results: |
| if include_media: |
| cont = InputWebDocument(pic, 0, mime_type, []) |
| results = [ |
| await builder.article( |
| title="Ultroid Op", |
| type=_type, |
| text=txt, |
| description="@TeamUltroid", |
| include_media=include_media, |
| buttons=btn, |
| thumb=cont, |
| content=cont, |
| link_preview=False, |
| ) |
| ] |
| return await e.answer(results) |
| except Exception as er: |
| LOGS.exception(er) |
| result = [ |
| await builder.article("Ultroid Op", text=txt, link_preview=False, buttons=btn) |
| ] |
| await e.answer(result) |
|
|
|
|
| async def something(e, msg, media, button, reply=True, chat=None): |
| if e.client._bot: |
| return await e.reply(msg, file=media, buttons=button) |
| num = len(STUFF) + 1 |
| STUFF.update({num: {"msg": msg, "media": media, "button": button}}) |
| try: |
| res = await e.client.inline_query(asst.me.username, f"stf{num}") |
| return await res[0].click( |
| chat or e.chat_id, |
| reply_to=bool(isinstance(e, Message) and reply), |
| hide_via=True, |
| silent=True, |
| ) |
|
|
| except Exception as er: |
| LOGS.exception(er) |
|
|