telegram-upload/management.py

256 lines
12 KiB
Python

# -*- coding: utf-8 -*-
"""Console script for telegram-upload."""
import os
import click
from telethon.tl.types import User
from telegram_upload.cli import show_checkboxlist, show_radiolist
from telegram_upload.client import TelegramManagerClient, get_message_file_attribute
from telegram_upload.config import default_config, CONFIG_FILE
from telegram_upload.download_files import KeepDownloadSplitFiles, JoinDownloadSplitFiles
from telegram_upload.exceptions import catch
from telegram_upload.upload_files import NoDirectoriesFiles, RecursiveFiles, NoLargeFiles, SplitFiles, is_valid_file
from telegram_upload.utils import async_to_sync, amap, sync_to_async_iterator
try:
from natsort import natsorted
except ImportError:
natsorted = None
DIRECTORY_MODES = {
'fail': NoDirectoriesFiles,
'recursive': RecursiveFiles,
}
LARGE_FILE_MODES = {
'fail': NoLargeFiles,
'split': SplitFiles,
}
DOWNLOAD_SPLIT_FILE_MODES = {
'keep': KeepDownloadSplitFiles,
'join': JoinDownloadSplitFiles,
}
def get_file_display_name(message):
display_name_parts = []
is_document = message.document
if is_document and message.document.mime_type:
display_name_parts.append(message.document.mime_type.split('/')[0])
if is_document and get_message_file_attribute(message):
display_name_parts.append(get_message_file_attribute(message).file_name)
if message.text:
display_name_parts.append(f'[{message.text}]' if display_name_parts else message.text)
from_user = message.sender and isinstance(message.sender, User)
if from_user:
display_name_parts.append('by')
if from_user and message.sender.first_name:
display_name_parts.append(message.sender.first_name)
if from_user and message.sender.last_name:
display_name_parts.append(message.sender.last_name)
if from_user and message.sender.username:
display_name_parts.append(f'@{message.sender.username}')
display_name_parts.append(f'{message.date}')
return ' '.join(display_name_parts)
async def interactive_select_files(client, entity: str):
iterator = client.iter_files(entity)
iterator = amap(lambda x: (x, get_file_display_name(x)), iterator,)
return await show_checkboxlist(iterator)
async def interactive_select_local_files():
iterator = filter(lambda x: os.path.isfile(x) and os.path.lexists(x), os.listdir('.'))
iterator = sync_to_async_iterator(map(lambda x: (x, x), iterator))
return await show_checkboxlist(iterator, 'Not files were found in the current directory '
'(subdirectories are not supported). Exiting...')
async def interactive_select_dialog(client):
iterator = client.iter_dialogs()
iterator = amap(lambda x: (x, x.name), iterator,)
value = await show_radiolist(iterator, 'Not dialogs were found in your Telegram session. '
'Have you started any conversations?')
return value.id if value else None
class MutuallyExclusiveOption(click.Option):
def __init__(self, *args, **kwargs):
self.mutually_exclusive = set(kwargs.pop('mutually_exclusive', []))
help = kwargs.get('help', '')
if self.mutually_exclusive:
kwargs['help'] = help + (
' NOTE: This argument is mutually exclusive with'
' arguments: [{}].'.format(self.mutually_exclusive_text)
)
super(MutuallyExclusiveOption, self).__init__(*args, **kwargs)
def handle_parse_result(self, ctx, opts, args):
if self.mutually_exclusive.intersection(opts) and self.name in opts:
raise click.UsageError(
"Illegal usage: `{}` is mutually exclusive with "
"arguments `{}`.".format(
self.name,
self.mutually_exclusive_text
)
)
return super(MutuallyExclusiveOption, self).handle_parse_result(
ctx,
opts,
args
)
@property
def mutually_exclusive_text(self):
return ', '.join([x.replace('_', '-') for x in self.mutually_exclusive])
@click.command()
@click.argument('files', nargs=-1)
@click.option('--to', default=None, help='Phone number, username, invite link or "me" (saved messages). '
'By default "me".')
@click.option('--config', default=None, help='Configuration file to use. By default "{}".'.format(CONFIG_FILE))
@click.option('-d', '--delete-on-success', is_flag=True, help='Delete local file after successful upload.')
@click.option('--print-file-id', is_flag=True, help='Print the id of the uploaded file after the upload.')
@click.option('--force-file', is_flag=True, help='Force send as a file. The filename will be preserved '
'but the preview will not be available.')
@click.option('-f', '--forward', multiple=True, help='Forward the file to a chat (alias or id) or user (username, '
'mobile or id). This option can be used multiple times.')
@click.option('--directories', default='fail', type=click.Choice(list(DIRECTORY_MODES.keys())),
help='Defines how to process directories. By default directories are not accepted and will raise an '
'error.')
@click.option('--large-files', default='fail', type=click.Choice(list(LARGE_FILE_MODES.keys())),
help='Defines how to process large files unsupported for Telegram. By default large files are not '
'accepted and will raise an error.')
@click.option('--caption', type=str, help='Change file description. By default the file name.')
@click.option('--no-thumbnail', is_flag=True, cls=MutuallyExclusiveOption, mutually_exclusive=["thumbnail_file"],
help='Disable thumbnail generation. For some known file formats, Telegram may still generate a '
'thumbnail or show a preview.')
@click.option('--thumbnail-file', default=None, cls=MutuallyExclusiveOption, mutually_exclusive=["no_thumbnail"],
help='Path to the preview file to use for the uploaded file.')
@click.option('-p', '--proxy', default=None,
help='Use an http proxy, socks4, socks5 or mtproxy. For example socks5://user:pass@1.2.3.4:8080 '
'for socks5 and mtproxy://secret@1.2.3.4:443 for mtproxy.')
@click.option('-a', '--album', is_flag=True,
help='Send video or photos as an album.')
@click.option('-i', '--interactive', is_flag=True,
help='Use interactive mode.')
@click.option('--sort', is_flag=True,
help='Sort files by name before upload it. Install the natsort Python package for natural sorting.')
def upload(files, to, config, delete_on_success, print_file_id, force_file, forward, directories, large_files, caption,
no_thumbnail, thumbnail_file, proxy, album, interactive, sort):
"""Upload one or more files to Telegram using your personal account.
The maximum file size is 2 GiB for free users and 4 GiB for premium accounts.
By default, they will be saved in your saved messages.
"""
client = TelegramManagerClient(config or default_config(), proxy=proxy)
client.start()
if interactive and not files:
click.echo('Select the local files to upload:')
click.echo('[SPACE] Select file [ENTER] Next step')
files = async_to_sync(interactive_select_local_files())
if interactive and not files:
# No files selected. Exiting.
return
if interactive and to is None:
click.echo('Select the recipient dialog of the files:')
click.echo('[SPACE] Select dialog [ENTER] Next step')
to = async_to_sync(interactive_select_dialog(client))
elif to is None:
to = 'me'
files = filter(lambda file: is_valid_file(file, lambda message: click.echo(message, err=True)), files)
files = DIRECTORY_MODES[directories](client, files)
if directories == 'fail':
# Validate now
files = list(files)
if no_thumbnail:
thumbnail = False
elif thumbnail_file:
thumbnail = thumbnail_file
else:
thumbnail = None
files_cls = LARGE_FILE_MODES[large_files]
files = files_cls(client, files, caption=caption, thumbnail=thumbnail, force_file=force_file)
if large_files == 'fail':
# Validate now
files = list(files)
if isinstance(to, str) and to.lstrip("-+").isdigit():
to = int(to)
if sort and natsorted:
files = natsorted(files, key=lambda x: x.name)
elif sort:
files = sorted(files, key=lambda x: x.name)
if album:
client.send_files_as_album(to, files, delete_on_success, print_file_id, forward)
else:
client.send_files(to, files, delete_on_success, print_file_id, forward)
@click.command()
@click.option('--from', '-f', 'from_', default='',
help='Phone number, username, chat id or "me" (saved messages). By default "me".')
@click.option('--config', default=None, help='Configuration file to use. By default "{}".'.format(CONFIG_FILE))
@click.option('-d', '--delete-on-success', is_flag=True,
help='Delete telegram message after successful download. Useful for creating a download queue.')
@click.option('-p', '--proxy', default=None,
help='Use an http proxy, socks4, socks5 or mtproxy. For example socks5://user:pass@1.2.3.4:8080 '
'for socks5 and mtproxy://secret@1.2.3.4:443 for mtproxy.')
@click.option('-m', '--split-files', default='keep', type=click.Choice(list(DOWNLOAD_SPLIT_FILE_MODES.keys())),
help='Defines how to download large files split in Telegram. By default the files are not merged.')
@click.option('-i', '--interactive', is_flag=True,
help='Use interactive mode.')
def download(from_, config, delete_on_success, proxy, split_files, interactive):
"""Download all the latest messages that are files in a chat, by default download
from "saved messages". It is recommended to forward the files to download to
"saved messages" and use parameter ``--delete-on-success``. Forwarded messages will
be removed from the chat after downloading, such as a download queue.
"""
client = TelegramManagerClient(config or default_config(), proxy=proxy)
client.start()
if not interactive and not from_:
from_ = 'me'
elif isinstance(from_, str) and from_.lstrip("-+").isdigit():
from_ = int(from_)
elif interactive and not from_:
click.echo('Select the dialog of the files to download:')
click.echo('[SPACE] Select dialog [ENTER] Next step')
from_ = async_to_sync(interactive_select_dialog(client))
if interactive:
click.echo('Select all files to download:')
click.echo('[SPACE] Select files [ENTER] Download selected files')
messages = async_to_sync(interactive_select_files(client, from_))
else:
messages = client.find_files(from_)
messages_cls = DOWNLOAD_SPLIT_FILE_MODES[split_files]
download_files = messages_cls(reversed(list(messages)))
client.download_files(from_, download_files, delete_on_success)
upload_cli = catch(upload)
download_cli = catch(download)
if __name__ == '__main__':
import sys
import re
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
commands = {'upload': upload_cli, 'download': download_cli}
if len(sys.argv) < 2:
sys.stderr.write('A command is required. Available commands: {}\n'.format(
', '.join(commands)
))
sys.exit(1)
if sys.argv[1] not in commands:
sys.stderr.write('{} is an invalid command. Valid commands: {}\n'.format(
sys.argv[1], ', '.join(commands)
))
sys.exit(1)
fn = commands[sys.argv[1]]
sys.argv = [sys.argv[0]] + sys.argv[2:]
sys.exit(fn())