initial commit ✨
This commit is contained in:
commit
4f4993da8d
|
@ -0,0 +1,163 @@
|
|||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
.pybuilder/
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# poetry
|
||||
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||
# commonly ignored for libraries.
|
||||
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
||||
#poetry.lock
|
||||
|
||||
# pdm
|
||||
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
||||
#pdm.lock
|
||||
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
|
||||
# in version control.
|
||||
# https://pdm.fming.dev/#use-with-ide
|
||||
.pdm.toml
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# pytype static type analyzer
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# PyCharm
|
||||
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
|
||||
# Telethon
|
||||
*.session
|
|
@ -0,0 +1,25 @@
|
|||
import os
|
||||
from telethon import TelegramClient
|
||||
from telethon.sessions import SQLiteSession
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from cyber_fenneko.bot import Bot
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# Remember to use your own values from my.telegram.org!
|
||||
api_id = os.environ.get('APP_ID')
|
||||
api_hash = os.environ.get('APP_HASH')
|
||||
phone_number = os.environ.get('TG_PHONE')
|
||||
|
||||
session = SQLiteSession('fenneko-' + phone_number)
|
||||
client = TelegramClient(session, api_id, api_hash)
|
||||
|
||||
bot = Bot(client)
|
||||
|
||||
# Import all commands from the commands directory
|
||||
# We just need the files to be imported, nothing else
|
||||
# This is a bit hacky, but it works
|
||||
for filename in os.listdir('cyber_fenneko/commands'):
|
||||
if filename.endswith('.py'):
|
||||
__import__('cyber_fenneko.commands.' + filename[:-3])
|
|
@ -0,0 +1,12 @@
|
|||
from telethon.events import NewMessage
|
||||
from . import client, bot, phone_number
|
||||
|
||||
async def new_message_handler(event: NewMessage):
|
||||
print(event)
|
||||
|
||||
client.add_event_handler(bot.on_new_message, NewMessage)
|
||||
# client.add_event_handler(new_message_handler, NewMessage)
|
||||
|
||||
with client:
|
||||
client.start(phone_number)
|
||||
client.run_until_disconnected()
|
|
@ -0,0 +1,189 @@
|
|||
from typing import Dict, List, Union
|
||||
|
||||
import logging
|
||||
from telethon import TelegramClient
|
||||
from telethon.tl.custom.message import Message
|
||||
from cyber_fenneko.internal.constants import GLOBAL_ARGS
|
||||
from cyber_fenneko.internal.command import Command
|
||||
from cyber_fenneko.internal.command_context import CommandContext
|
||||
from cyber_fenneko.internal.arg import Arg, ArgType
|
||||
|
||||
class Bot:
|
||||
def __init__(self, client: TelegramClient):
|
||||
self.commands = {}
|
||||
self.command_aliases = {}
|
||||
self.client = client
|
||||
self.logger = logging.getLogger('cyber_fenneko.bot')
|
||||
self.default_prefix = '.'
|
||||
|
||||
def set_default_prefix(self, prefix: str):
|
||||
self.default_prefix = prefix
|
||||
|
||||
def command(self, cmd: str, **kwargs: dict):
|
||||
def decorator(func):
|
||||
# Set some defaults
|
||||
kwargs.setdefault('prefix', self.default_prefix)
|
||||
|
||||
command = Command(func, cmd, **kwargs)
|
||||
command.args = GLOBAL_ARGS + command.args
|
||||
|
||||
# Check if a command with the same name already exists
|
||||
if command.command in self.commands:
|
||||
raise ValueError(f'Command {command.command} already exists')
|
||||
|
||||
# Add the command to the bot
|
||||
self.commands[cmd] = command
|
||||
for alias in command.aliases:
|
||||
# Check if an alias with the same name already exists
|
||||
# If so we'll just log a warning and ignore it
|
||||
if alias in self.command_aliases:
|
||||
self.logger.warning(f'Alias {alias} already exists')
|
||||
continue
|
||||
self.command_aliases[alias] = command.command
|
||||
|
||||
return func # we still want the function to be normally accessible
|
||||
return decorator
|
||||
|
||||
async def on_new_message(self, event: Message):
|
||||
message_text = event.message.message.strip()
|
||||
if not message_text:
|
||||
return
|
||||
|
||||
command_text = message_text.split()[0]
|
||||
prefix, command_text = command_text[0], command_text[1:]
|
||||
|
||||
# check if the command text is registered in commands
|
||||
if command_text in self.commands or command_text in self.command_aliases:
|
||||
cmd: Command = self.commands.get(command_text) or self.commands.get(self.command_aliases[command_text])
|
||||
if cmd.prefix != prefix:
|
||||
return
|
||||
|
||||
if event.out and not cmd.outgoing:
|
||||
return
|
||||
elif not event.out and not cmd.incoming:
|
||||
return
|
||||
|
||||
text = message_text[len(command_text) + 1:].strip()
|
||||
try:
|
||||
args, text = self.parse_args(text, cmd.args)
|
||||
except ValueError as e:
|
||||
await event.respond(f'Error: {e}')
|
||||
return
|
||||
|
||||
reply_message = await event.message.get_reply_message()
|
||||
|
||||
# create the command context
|
||||
ctx = CommandContext(
|
||||
client=self.client,
|
||||
bot=self,
|
||||
raw_text=message_text,
|
||||
text=text,
|
||||
event=event,
|
||||
message=event.message,
|
||||
reply_to=reply_message,
|
||||
args=args
|
||||
)
|
||||
|
||||
# call the command function
|
||||
await cmd.func(self, ctx)
|
||||
|
||||
# if the command is marked as 'delete', delete the message that triggered it
|
||||
if ctx.args['delete']:
|
||||
await event.message.delete()
|
||||
|
||||
def parse_args(self, text: str, cmd_args: List[Arg]) -> (Dict[str, Arg], str):
|
||||
"""
|
||||
Take an incoming string and parse args from it.
|
||||
Args are defined one of three ways:
|
||||
- name:value
|
||||
- .name - shorthand for name:true
|
||||
- !name - shorthand for name:false
|
||||
Args are also only valid at the beginning of a message,
|
||||
so as soon as we encounter a non-arg we stop parsing.
|
||||
"""
|
||||
args: Dict[str, Arg] = { arg.name: arg for arg in cmd_args }
|
||||
aliases: Dict[str, str] = { alias: arg.name for arg in cmd_args for alias in arg.aliases }
|
||||
|
||||
get_arg = lambda name: args.get(name) or args.get(aliases.get(name))
|
||||
|
||||
parsed_args: Dict[str, Union[str, int, bool]] = {}
|
||||
tokens = text.split()
|
||||
while tokens:
|
||||
token = tokens.pop(0)
|
||||
|
||||
if token.startswith('.'):
|
||||
# shorthand for name:true
|
||||
arg = get_arg(token[1:])
|
||||
if arg:
|
||||
if args[arg.name].type == ArgType.bool:
|
||||
parsed_args[arg.name] = True
|
||||
else:
|
||||
raise ValueError(f'Arg "{arg.name}" is not a boolean, but a boolean was provided')
|
||||
else:
|
||||
self.logger.warning(f'Unknown arg "{arg.name}"')
|
||||
continue
|
||||
elif token.startswith('!'):
|
||||
# shorthand for name:false
|
||||
arg = get_arg(token[1:])
|
||||
if arg:
|
||||
if arg.type == ArgType.bool:
|
||||
parsed_args[arg.name] = False
|
||||
else:
|
||||
raise ValueError(f'Arg "{arg.name}" is not a boolean, but a boolean was provided')
|
||||
else:
|
||||
self.logger.warning(f'Unknown arg "{arg.name}"')
|
||||
continue
|
||||
elif ':' in token:
|
||||
# name:value
|
||||
# value might be a quoted string, so we need to handle that
|
||||
arg_name, arg_value = token.split(':', 1)
|
||||
arg = get_arg(arg_name)
|
||||
if arg:
|
||||
if arg.type == ArgType.str:
|
||||
# we need to check if the value starts with a quote,
|
||||
# if so we need to loop through tokens until we find the end quote
|
||||
if arg_value.startswith('"'):
|
||||
while not arg_value.endswith('"') and tokens:
|
||||
arg_value += ' ' + tokens.pop(0)
|
||||
if not arg_value.endswith('"'):
|
||||
raise ValueError(f'Unterminated string for arg "{arg.name}"')
|
||||
arg_value = arg_value[1:-1]
|
||||
parsed_args[arg.name] = arg_value
|
||||
elif arg.type == ArgType.bool:
|
||||
# if the arg is a boolean, we need to check if the value is true or false
|
||||
if arg_value.lower() == 'true':
|
||||
parsed_args[arg.name] = True
|
||||
elif arg_value.lower() == 'false':
|
||||
parsed_args[arg.name] = False
|
||||
else:
|
||||
raise ValueError(f'Invalid boolean value {arg_value} for arg "{arg.name}"')
|
||||
elif arg.type == ArgType.int:
|
||||
# if the arg is an int, we need to check if the value is an int
|
||||
try:
|
||||
parsed_args[arg.name] = int(arg_value)
|
||||
except ValueError:
|
||||
raise ValueError(f'Invalid int value {arg_value} for arg "{arg.name}"')
|
||||
else:
|
||||
raise ValueError(f'Invalid arg type {arg.type} for arg "{arg.name}"')
|
||||
else:
|
||||
self.logger.warning(f'Unknown arg "{arg.name}"')
|
||||
continue
|
||||
|
||||
# If we get here, we've encountered a non-arg token
|
||||
# so we stop parsing
|
||||
break
|
||||
|
||||
# Check if any required args are missing
|
||||
missing_args = [arg.name for arg in cmd_args if arg.required and arg.name not in parsed_args]
|
||||
if missing_args:
|
||||
raise ValueError(f'Missing required args: {", ".join(missing_args)}')
|
||||
|
||||
# remove the parsed args from the text
|
||||
text = ' '.join(tokens[len(parsed_args):])
|
||||
|
||||
# add the default values for any args that weren't specified
|
||||
for arg in args.values():
|
||||
if arg.name not in parsed_args:
|
||||
parsed_args[arg.name] = arg.default
|
||||
|
||||
return parsed_args, text
|
|
@ -0,0 +1,22 @@
|
|||
from .. import bot
|
||||
from ..internal.command_context import CommandContext
|
||||
|
||||
@bot.command(
|
||||
'help',
|
||||
description='Get help for commands',
|
||||
)
|
||||
async def help(bot, ctx: CommandContext):
|
||||
cmd = ctx.text
|
||||
if cmd:
|
||||
if cmd in bot.commands:
|
||||
await ctx.respond(bot.commands[cmd].long_help(), parse_mode='Markdown')
|
||||
else:
|
||||
await ctx.respond(f'Command `{cmd}` not found')
|
||||
else:
|
||||
commands = []
|
||||
for command in bot.commands.values():
|
||||
if command.hidden:
|
||||
continue
|
||||
commands.append(command.help())
|
||||
commands.sort()
|
||||
await ctx.respond('\n'.join(commands), parse_mode='Markdown')
|
|
@ -0,0 +1,129 @@
|
|||
import time
|
||||
import httpx
|
||||
from io import BytesIO
|
||||
from urllib.parse import urlencode
|
||||
from collections import OrderedDict
|
||||
from telethon.tl.types import TypeMessageEntity, MessageEntityPre
|
||||
from .. import bot
|
||||
from ..internal.command_context import CommandContext
|
||||
from ..internal.arg import Arg, ArgType
|
||||
|
||||
ENDPOINT_URL = 'https://inkify.0x45.st'
|
||||
|
||||
@bot.command(
|
||||
'highlight',
|
||||
aliases=['hl'],
|
||||
args=[
|
||||
Arg('language', type=ArgType.str, aliases=['lang'], description='The language to use for syntax highlighting'),
|
||||
Arg('theme', type=ArgType.str, description='The theme to use for syntax highlighting'),
|
||||
Arg('font', type=ArgType.str, description='The font to use'),
|
||||
Arg('shadow_color', type=ArgType.str, description='The color of the shadow'),
|
||||
Arg('background', type=ArgType.str, description='The background color'),
|
||||
Arg('tab_width', type=ArgType.int, description='The tab width'),
|
||||
Arg('line_pad', type=ArgType.int, description='The line padding'),
|
||||
Arg('line_offset', type=ArgType.int, description='The line offset'),
|
||||
Arg('window_title', type=ArgType.str, description='The window title'),
|
||||
Arg('no_line_number', type=ArgType.bool, description='Whether to hide the line numbers'),
|
||||
Arg('no_round_corner', type=ArgType.bool, description='Whether to round the corners'),
|
||||
Arg('no_window_controls', type=ArgType.bool, description='Whether to hide the window controls'),
|
||||
Arg('shadow_blur_radius', type=ArgType.int, description='The shadow blur radius'),
|
||||
Arg('shadow_offset_x', type=ArgType.int, description='The shadow offset x'),
|
||||
Arg('shadow_offset_y', type=ArgType.int, description='The shadow offset y'),
|
||||
Arg('pad_horiz', type=ArgType.int, description='The horizontal padding'),
|
||||
Arg('pad_vert', type=ArgType.int, description='The vertical padding'),
|
||||
Arg('highlight_lines', type=ArgType.str, description='The lines to highlight'),
|
||||
Arg('background_image', type=ArgType.str, description='The background image for the padding area as a URL'),
|
||||
|
||||
Arg('themes', type=ArgType.bool, description='List available themes'),
|
||||
Arg('fonts', type=ArgType.bool, description='List available fonts'),
|
||||
Arg('languages', type=ArgType.bool, description='List available languages'),
|
||||
],
|
||||
description='Highlight a code block in the replied to message',
|
||||
)
|
||||
async def highlight(bot, ctx: CommandContext):
|
||||
# Check if the user wants to list available themes
|
||||
if ctx.args['themes']:
|
||||
themes = httpx.get(f'{ENDPOINT_URL}/themes').json()
|
||||
themes_list = ', '.join(themes)
|
||||
message = f'Available themes ({len(themes)}):\n{themes_list}'
|
||||
await ctx.event.respond(message)
|
||||
return
|
||||
|
||||
# Check if the user wants to list available fonts
|
||||
if ctx.args['fonts']:
|
||||
fonts = httpx.get(f'{ENDPOINT_URL}/fonts').json()
|
||||
fonts_list = ', '.join(fonts)
|
||||
message = f'Available fonts ({len(fonts)}):\n{fonts_list}'
|
||||
await ctx.event.respond(message)
|
||||
return
|
||||
|
||||
# Check if the user wants to list available languages
|
||||
if ctx.args['languages']:
|
||||
languages = httpx.get(f'{ENDPOINT_URL}/languages').json()
|
||||
languages_list = ', '.join(languages)
|
||||
message = f'Available languages ({len(languages)}):\n{languages_list}'
|
||||
await ctx.event.respond(message)
|
||||
return
|
||||
|
||||
# Get the message that triggered this command
|
||||
message = ctx.reply_to
|
||||
if not message or not message.message:
|
||||
await ctx.event.respond('Reply to a message to highlight it')
|
||||
return
|
||||
|
||||
# Check if there is a code block in the message
|
||||
if not message.entities:
|
||||
await ctx.event.respond('No code block found')
|
||||
return
|
||||
|
||||
# Filter through entities in the message to find the code block
|
||||
code_blocks = []
|
||||
for entity in message.entities:
|
||||
if isinstance(entity, MessageEntityPre):
|
||||
code_blocks.append(message.message[entity.offset:entity.offset + entity.length])
|
||||
break
|
||||
else:
|
||||
code_blocks.append(ctx.text)
|
||||
|
||||
for code_block in code_blocks:
|
||||
# Inkify returns an image, we just need to build the URL
|
||||
query = OrderedDict(
|
||||
code=code_block,
|
||||
language=ctx.args['language'],
|
||||
theme=ctx.args['theme'],
|
||||
font=ctx.args['font'],
|
||||
shadow_color=ctx.args['shadow_color'],
|
||||
background=ctx.args['background'],
|
||||
tab_width=ctx.args['tab_width'],
|
||||
line_pad=ctx.args['line_pad'],
|
||||
line_offset=ctx.args['line_offset'],
|
||||
window_title=ctx.args['window_title'],
|
||||
no_line_number=ctx.args['no_line_number'],
|
||||
no_round_corner=ctx.args['no_round_corner'],
|
||||
no_window_controls=ctx.args['no_window_controls'],
|
||||
shadow_blur_radius=ctx.args['shadow_blur_radius'],
|
||||
shadow_offset_x=ctx.args['shadow_offset_x'],
|
||||
shadow_offset_y=ctx.args['shadow_offset_y'],
|
||||
pad_horiz=ctx.args['pad_horiz'],
|
||||
pad_vert=ctx.args['pad_vert'],
|
||||
highlight_lines=ctx.args['highlight_lines'],
|
||||
background_image=ctx.args['background_image'],
|
||||
)
|
||||
|
||||
# Remove any arguments that are None, and transform booleans into lowercase strings
|
||||
query = {k: v for k, v in query.items() if v is not None}
|
||||
query = {k: str(v).lower() if isinstance(v, bool) else v for k, v in query.items()}
|
||||
|
||||
# Get the image from the URL
|
||||
url = f'{ENDPOINT_URL}/generate?{urlencode(query)}'
|
||||
print(url)
|
||||
res = httpx.get(url)
|
||||
|
||||
if res.status_code != 200:
|
||||
await ctx.event.respond(f'Error: {res.status_code}')
|
||||
return
|
||||
|
||||
img = BytesIO(res.content)
|
||||
|
||||
# Send the image
|
||||
await ctx.event.respond(file=img)
|
|
@ -0,0 +1,62 @@
|
|||
import time
|
||||
import httpx
|
||||
from telethon.tl.types import MessageEntityPre
|
||||
from telethon.tl.custom.message import Message
|
||||
from .. import bot
|
||||
from ..internal.command_context import CommandContext
|
||||
from ..internal.arg import Arg, ArgType
|
||||
|
||||
ENDPOINT = 'https://0x45.st/api/pastes'
|
||||
|
||||
@bot.command(
|
||||
'paste',
|
||||
description='Send the contents of the replied to message to a pastebin',
|
||||
args=[
|
||||
Arg('language', type=ArgType.str, aliases=['lang'], default='txt', description='The language to use for syntax highlighting'),
|
||||
]
|
||||
)
|
||||
async def paste(bot, ctx: CommandContext):
|
||||
# Get the message to paste
|
||||
message: Message = ctx.reply_to
|
||||
if message is None:
|
||||
await ctx.event.respond('You must reply to a message to use this command')
|
||||
return
|
||||
|
||||
# Get the message text
|
||||
text = message.text
|
||||
if text is None:
|
||||
await ctx.event.respond('You must reply to a text message to use this command')
|
||||
return
|
||||
|
||||
# Get the language
|
||||
language = ctx.args['language']
|
||||
|
||||
contents = ''
|
||||
|
||||
# Check if the message has entities; any pre entities will be used as the contents
|
||||
if message.entities is not None:
|
||||
for entity in message.entities:
|
||||
if isinstance(entity, MessageEntityPre):
|
||||
contents += text[entity.offset:entity.offset + entity.length]
|
||||
contents += '\n\n'
|
||||
break
|
||||
|
||||
if not contents:
|
||||
contents = text
|
||||
|
||||
data = {
|
||||
'language': language,
|
||||
'contents': contents,
|
||||
}
|
||||
|
||||
response = httpx.post(ENDPOINT, json=data)
|
||||
if response.status_code > 299:
|
||||
await ctx.event.respond(f'Failed to create paste: {response.text}')
|
||||
return
|
||||
|
||||
# Get the paste URL
|
||||
json = response.json()
|
||||
url = json['url']
|
||||
|
||||
# Respond with the paste URL
|
||||
await ctx.event.respond(f'Created paste: {url}', link_preview=False)
|
|
@ -0,0 +1,15 @@
|
|||
import time
|
||||
from .. import bot
|
||||
from ..internal.command_context import CommandContext
|
||||
from ..internal.arg import Arg, ArgType
|
||||
|
||||
@bot.command(
|
||||
'ping',
|
||||
description='Ping the bot and calculate the latency',
|
||||
)
|
||||
async def ping(bot, ctx: CommandContext):
|
||||
start = time.time()
|
||||
msg = await ctx.event.respond('Pong!')
|
||||
end = time.time()
|
||||
ms = round((end - start) * 1000, 3)
|
||||
await msg.edit(f'Pong! ({ms}ms)')
|
|
@ -0,0 +1,36 @@
|
|||
from enum import Enum
|
||||
from typing import List
|
||||
|
||||
|
||||
ArgType = Enum('ArgType', ['str', 'int', 'bool', 'peer_id'])
|
||||
|
||||
class Arg:
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
*,
|
||||
# The type of this argument
|
||||
type: ArgType = ArgType.str,
|
||||
|
||||
# Aliases for this argument
|
||||
aliases: List[str] = [],
|
||||
|
||||
# Is this argument required?
|
||||
required: bool = False,
|
||||
|
||||
# The default value for this argument
|
||||
default: any = None,
|
||||
|
||||
# The description for this argument, displayed in the help command
|
||||
description: str = None,
|
||||
|
||||
# Special designation for global args
|
||||
global_arg: bool = False,
|
||||
):
|
||||
self.name = name
|
||||
self.type = type
|
||||
self.aliases = aliases
|
||||
self.required = required
|
||||
self.default = default
|
||||
self.description = description
|
||||
self.global_arg = global_arg
|
|
@ -0,0 +1,98 @@
|
|||
import itertools
|
||||
from typing import List
|
||||
|
||||
from .arg import Arg
|
||||
from ..bot import GLOBAL_ARGS
|
||||
|
||||
class Command:
|
||||
def __init__(
|
||||
self,
|
||||
func: callable,
|
||||
|
||||
# The command string
|
||||
command: str,
|
||||
*,
|
||||
# The prefix that should be used instead of the default prefix
|
||||
prefix: str = None,
|
||||
|
||||
# Should this command be activatable by others?
|
||||
incoming: bool = False,
|
||||
|
||||
# Should this command be activatable by the bot itself?
|
||||
outgoing: bool = True,
|
||||
|
||||
# Aliases for this command
|
||||
aliases: List[str] = [],
|
||||
|
||||
# Output from this command will be captured and logged to the log channel
|
||||
silent: bool = False,
|
||||
|
||||
# Should this command be hidden from the help command?
|
||||
hidden: bool = False,
|
||||
|
||||
# Usage string for this command, displayed in the help command
|
||||
usage: str = None,
|
||||
|
||||
# Description string for this command, displayed in the help command
|
||||
description: str = None,
|
||||
|
||||
# Args that this command accepts
|
||||
args: List[Arg] = [],
|
||||
):
|
||||
self.func = func
|
||||
self.command = command
|
||||
self.prefix = prefix
|
||||
self.incoming = incoming
|
||||
self.outgoing = outgoing
|
||||
self.aliases = aliases
|
||||
self.silent = silent
|
||||
self.hidden = hidden
|
||||
self.usage = usage
|
||||
self.description = description
|
||||
self.args = args
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.func(*args, **kwargs)
|
||||
|
||||
def help(self):
|
||||
"""Returns the basic help string, used in the command list."""
|
||||
return f'**{self.command}** - {self.description}'
|
||||
|
||||
def long_help(self):
|
||||
"""Returns a longer help string, used when getting help on a specific command."""
|
||||
help_string = f'**Command: {self.command}**\n\n'
|
||||
if self.description:
|
||||
help_string += f'{self.description}\n\n'
|
||||
if self.usage:
|
||||
help_string += f'**Usage:** `{self.usage}`\n\n'
|
||||
if self.aliases:
|
||||
help_string += f'**Aliases:** `{", ".join(self.aliases)}`\n\n'
|
||||
if self.args:
|
||||
# Sort args by name
|
||||
args = sorted(self.args, key=lambda arg: arg.name)
|
||||
|
||||
# And then separate them into global args and command args
|
||||
global_args: List[Arg] = []
|
||||
command_args: List[Arg] = []
|
||||
for arg in args:
|
||||
if arg.global_arg:
|
||||
global_args.append(arg)
|
||||
else:
|
||||
command_args.append(arg)
|
||||
|
||||
help_string += '**Args:** (global, then local)\n'
|
||||
for arg in global_args:
|
||||
help_string += f'`{arg.name}'
|
||||
if arg.aliases:
|
||||
help_string += f' ({", ".join(arg.aliases)})'
|
||||
help_string += f': {arg.type.name}`\n {arg.description}\n'
|
||||
help_string += '\n'
|
||||
|
||||
for arg in args:
|
||||
help_string += f'`{arg.name}'
|
||||
if arg.aliases:
|
||||
help_string += f' ({", ".join(arg.aliases)})'
|
||||
help_string += f': {arg.type.name}`\n {arg.description}\n'
|
||||
help_string += '\n'
|
||||
return help_string
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
from typing import Dict
|
||||
from telethon import TelegramClient
|
||||
from telethon.events import NewMessage
|
||||
from telethon.tl.custom.message import Message
|
||||
|
||||
from .. import bot
|
||||
from .arg import Arg
|
||||
|
||||
|
||||
class CommandContext:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
# The client that received the command
|
||||
client: TelegramClient,
|
||||
|
||||
# The instance of the Bot class that received the command
|
||||
bot: bot,
|
||||
|
||||
# The raw command text, including the command itself
|
||||
raw_text: str,
|
||||
|
||||
# The command text, excluding the command itself
|
||||
text: str,
|
||||
|
||||
# The event that triggered this command
|
||||
event: NewMessage,
|
||||
|
||||
# The message from the event
|
||||
message: Message,
|
||||
|
||||
# The message that was replied to
|
||||
reply_to: Message,
|
||||
|
||||
# The arguments that were passed to the command
|
||||
args: Dict[str, Arg]
|
||||
):
|
||||
self.client = client
|
||||
self.bot = bot
|
||||
self.raw_text = raw_text
|
||||
self.text = text
|
||||
self.event = event
|
||||
self.message = message
|
||||
self.reply_to = reply_to
|
||||
self.args = args
|
||||
|
||||
def __repr__(self):
|
||||
return f'<CommandContext raw_text="{self.raw_text}" text="{self.text}" args={self.args}>'
|
||||
|
||||
async def respond(self, *args, **kwargs):
|
||||
"""Respond to the message that triggered this command."""
|
||||
await self.message.respond(*args, **kwargs)
|
||||
|
||||
async def reply(self, *args, **kwargs):
|
||||
"""Reply to the message that triggered this command."""
|
||||
await self.message.reply(*args, **kwargs)
|
|
@ -0,0 +1,6 @@
|
|||
from .arg import Arg, ArgType
|
||||
|
||||
# Args that exist on every command
|
||||
GLOBAL_ARGS = [
|
||||
Arg("delete", type=ArgType.bool, aliases=['d', 'del'], default=False, description="Delete the message that triggered this command", global_arg=True)
|
||||
]
|
|
@ -0,0 +1,180 @@
|
|||
# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "anyio"
|
||||
version = "4.0.0"
|
||||
description = "High level compatibility layer for multiple asynchronous event loop implementations"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "anyio-4.0.0-py3-none-any.whl", hash = "sha256:cfdb2b588b9fc25ede96d8db56ed50848b0b649dca3dd1df0b11f683bb9e0b5f"},
|
||||
{file = "anyio-4.0.0.tar.gz", hash = "sha256:f7ed51751b2c2add651e5747c891b47e26d2a21be5d32d9311dfe9692f3e5d7a"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
idna = ">=2.8"
|
||||
sniffio = ">=1.1"
|
||||
|
||||
[package.extras]
|
||||
doc = ["Sphinx (>=7)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)"]
|
||||
test = ["anyio[trio]", "coverage[toml] (>=7)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"]
|
||||
trio = ["trio (>=0.22)"]
|
||||
|
||||
[[package]]
|
||||
name = "certifi"
|
||||
version = "2023.7.22"
|
||||
description = "Python package for providing Mozilla's CA Bundle."
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
files = [
|
||||
{file = "certifi-2023.7.22-py3-none-any.whl", hash = "sha256:92d6037539857d8206b8f6ae472e8b77db8058fec5937a1ef3f54304089edbb9"},
|
||||
{file = "certifi-2023.7.22.tar.gz", hash = "sha256:539cc1d13202e33ca466e88b2807e29f4c13049d6d87031a3c110744495cb082"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "h11"
|
||||
version = "0.14.0"
|
||||
description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"},
|
||||
{file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "httpcore"
|
||||
version = "0.18.0"
|
||||
description = "A minimal low-level HTTP client."
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "httpcore-0.18.0-py3-none-any.whl", hash = "sha256:adc5398ee0a476567bf87467063ee63584a8bce86078bf748e48754f60202ced"},
|
||||
{file = "httpcore-0.18.0.tar.gz", hash = "sha256:13b5e5cd1dca1a6636a6aaea212b19f4f85cd88c366a2b82304181b769aab3c9"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
anyio = ">=3.0,<5.0"
|
||||
certifi = "*"
|
||||
h11 = ">=0.13,<0.15"
|
||||
sniffio = "==1.*"
|
||||
|
||||
[package.extras]
|
||||
http2 = ["h2 (>=3,<5)"]
|
||||
socks = ["socksio (==1.*)"]
|
||||
|
||||
[[package]]
|
||||
name = "httpx"
|
||||
version = "0.25.0"
|
||||
description = "The next generation HTTP client."
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "httpx-0.25.0-py3-none-any.whl", hash = "sha256:181ea7f8ba3a82578be86ef4171554dd45fec26a02556a744db029a0a27b7100"},
|
||||
{file = "httpx-0.25.0.tar.gz", hash = "sha256:47ecda285389cb32bb2691cc6e069e3ab0205956f681c5b2ad2325719751d875"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
certifi = "*"
|
||||
httpcore = ">=0.18.0,<0.19.0"
|
||||
idna = "*"
|
||||
sniffio = "*"
|
||||
|
||||
[package.extras]
|
||||
brotli = ["brotli", "brotlicffi"]
|
||||
cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"]
|
||||
http2 = ["h2 (>=3,<5)"]
|
||||
socks = ["socksio (==1.*)"]
|
||||
|
||||
[[package]]
|
||||
name = "idna"
|
||||
version = "3.4"
|
||||
description = "Internationalized Domain Names in Applications (IDNA)"
|
||||
optional = false
|
||||
python-versions = ">=3.5"
|
||||
files = [
|
||||
{file = "idna-3.4-py3-none-any.whl", hash = "sha256:90b77e79eaa3eba6de819a0c442c0b4ceefc341a7a2ab77d7562bf49f425c5c2"},
|
||||
{file = "idna-3.4.tar.gz", hash = "sha256:814f528e8dead7d329833b91c5faa87d60bf71824cd12a7530b5526063d02cb4"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pyaes"
|
||||
version = "1.6.1"
|
||||
description = "Pure-Python Implementation of the AES block-cipher and common modes of operation"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "pyaes-1.6.1.tar.gz", hash = "sha256:02c1b1405c38d3c370b085fb952dd8bea3fadcee6411ad99f312cc129c536d8f"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pyasn1"
|
||||
version = "0.5.0"
|
||||
description = "Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208)"
|
||||
optional = false
|
||||
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7"
|
||||
files = [
|
||||
{file = "pyasn1-0.5.0-py2.py3-none-any.whl", hash = "sha256:87a2121042a1ac9358cabcaf1d07680ff97ee6404333bacca15f76aa8ad01a57"},
|
||||
{file = "pyasn1-0.5.0.tar.gz", hash = "sha256:97b7290ca68e62a832558ec3976f15cbf911bf5d7c7039d8b861c2a0ece69fde"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "python-dotenv"
|
||||
version = "1.0.0"
|
||||
description = "Read key-value pairs from a .env file and set them as environment variables"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "python-dotenv-1.0.0.tar.gz", hash = "sha256:a8df96034aae6d2d50a4ebe8216326c61c3eb64836776504fcca410e5937a3ba"},
|
||||
{file = "python_dotenv-1.0.0-py3-none-any.whl", hash = "sha256:f5971a9226b701070a4bf2c38c89e5a3f0d64de8debda981d1db98583009122a"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
cli = ["click (>=5.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "rsa"
|
||||
version = "4.9"
|
||||
description = "Pure-Python RSA implementation"
|
||||
optional = false
|
||||
python-versions = ">=3.6,<4"
|
||||
files = [
|
||||
{file = "rsa-4.9-py3-none-any.whl", hash = "sha256:90260d9058e514786967344d0ef75fa8727eed8a7d2e43ce9f4bcf1b536174f7"},
|
||||
{file = "rsa-4.9.tar.gz", hash = "sha256:e38464a49c6c85d7f1351b0126661487a7e0a14a50f1675ec50eb34d4f20ef21"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
pyasn1 = ">=0.1.3"
|
||||
|
||||
[[package]]
|
||||
name = "sniffio"
|
||||
version = "1.3.0"
|
||||
description = "Sniff out which async library your code is running under"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "sniffio-1.3.0-py3-none-any.whl", hash = "sha256:eecefdce1e5bbfb7ad2eeaabf7c1eeb404d7757c379bd1f7e5cce9d8bf425384"},
|
||||
{file = "sniffio-1.3.0.tar.gz", hash = "sha256:e60305c5e5d314f5389259b7f22aaa33d8f7dee49763119234af3755c55b9101"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "telethon"
|
||||
version = "1.30.3"
|
||||
description = "Full-featured Telegram client library for Python 3"
|
||||
optional = false
|
||||
python-versions = ">=3.5"
|
||||
files = [
|
||||
{file = "Telethon-1.30.3.tar.gz", hash = "sha256:313e40fa06667b19ced13b379d9988167a8319bc0eb90bf39347cff46919a351"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
pyaes = "*"
|
||||
rsa = "*"
|
||||
|
||||
[package.extras]
|
||||
cryptg = ["cryptg"]
|
||||
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "9ef099a5cfddbb3a84d27506f528e2edca10562fad5dabeb22bd64d49d9c1b04"
|
|
@ -0,0 +1,20 @@
|
|||
[tool.poetry]
|
||||
name = "cyber-fenneko"
|
||||
version = "0.1.0"
|
||||
description = ""
|
||||
authors = ["Chris W <cawatson1993@gmail.com>"]
|
||||
readme = "README.md"
|
||||
|
||||
[tool.poetry.scripts]
|
||||
fenneko = "cyber_fenneko.__main__:main"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.11"
|
||||
telethon = "^1.30.3"
|
||||
python-dotenv = "^1.0.0"
|
||||
httpx = "^0.25.0"
|
||||
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
Loading…
Reference in New Issue