commit fee546d5e1a7f4e04e0328269f04145ab6dab4d3 Author: Jagrit Thapar Date: Fri Aug 16 19:34:11 2024 +0530 push diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..0a1fce2 --- /dev/null +++ b/__init__.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- + +"""Top-level package for telegram-upload.""" + +__author__ = """Nekmo""" +__email__ = 'contacto@nekmo.com' +__version__ = '0.7.1' diff --git a/_compat.py b/_compat.py new file mode 100644 index 0000000..4dfc76d --- /dev/null +++ b/_compat.py @@ -0,0 +1,67 @@ +import collections +import sys + +import typing + +try: + from os import scandir +except ImportError: + from scandir import scandir + + +# https://pypi.org/project/asyncio_utils/ + +async def anext(iterator: typing.AsyncIterator[typing.Any], *args, **kwargs + ) -> typing.Any: + """Mimics the builtin ``next`` for an ``AsyncIterator``. + + :param iterator: An ``AsyncIterator`` to get the next value from. + :param default: Can be supplied as second arg or as a kwarg. If a value is + supplied in either of those positions then a + ``StopAsyncIteration`` will not be raised and the + ``default`` will be returned. + + :raises TypeError: If the input is not a :class:`collections.AsyncIterator` + + + Example:: + + >>> async def main(): + myrange = await arange(1, 5) + for n in range(1, 5): + print(n, n == await anext(myrange)) + try: + n = await anext(myrange) + print("This should not be shown") + except StopAsyncIteration: + print('Sorry no more values!') + + >>> loop.run_until_complete(main()) + 1 True + 2 True + 3 True + 4 True + Sorry no more values! + + + """ + if not isinstance(iterator, collections.AsyncIterator): + raise TypeError(f'Not an AsyncIterator: {iterator}') + + use_default = False + default = None + + if len(args) > 0: + default = args[0] + use_default = True + else: + if 'default' in kwargs: + default = kwargs['default'] + use_default = True + + try: + return await iterator.__anext__() + except StopAsyncIteration: + if use_default: + return default + raise StopAsyncIteration diff --git a/caption_formatter.py b/caption_formatter.py new file mode 100644 index 0000000..b7d4f22 --- /dev/null +++ b/caption_formatter.py @@ -0,0 +1,358 @@ +import _string +import datetime +import hashlib +import mimetypes +import os +import sys +import zlib +from pathlib import Path, PosixPath, WindowsPath +from string import Formatter +from typing import Any, Sequence, Mapping, Tuple, Optional + +import click + +from telegram_upload.video import video_metadata + +try: + from typing import LiteralString +except ImportError: + LiteralString = str + + +if sys.version_info < (3, 8): + cached_property = property +else: + from functools import cached_property + + +CHUNK_SIZE = 4096 +VALID_TYPES: Tuple[Any, ...] = (str, int, float, complex, bool, datetime.datetime, datetime.date, datetime.time) +AUTHORIZED_METHODS = (Path.home,) +AUTHORIZED_STRING_METHODS = ("title", "capitalize", "lower", "upper", "swapcase", "strip", "lstrip", "rstrip") +AUTHORIZED_DT_METHODS = ( + "astimezone", "ctime", "date", "dst", "isoformat", "isoweekday", "now", "time", + "timestamp", "today", "toordinal", "tzname", "utcnow", "utcoffset", "weekday" +) + + +class Duration: + def __init__(self, seconds: int): + self.seconds = seconds + + @property + def as_minutes(self) -> int: + return self.seconds // 60 + + @property + def as_hours(self) -> int: + return self.as_minutes // 60 + + @property + def as_days(self) -> int: + return self.as_hours // 24 + + @property + def for_humans(self) -> str: + words = ["year", "day", "hour", "minute", "second"] + + if not self.seconds: + return "now" + else: + m, s = divmod(self.seconds, 60) + h, m = divmod(m, 60) + d, h = divmod(h, 24) + y, d = divmod(d, 365) + + time = [y, d, h, m, s] + + duration = [] + + for x, i in enumerate(time): + if i == 1: + duration.append(f"{i} {words[x]}") + elif i > 1: + duration.append(f"{i} {words[x]}s") + + if len(duration) == 1: + return duration[0] + elif len(duration) == 2: + return f"{duration[0]} and {duration[1]}" + else: + return ", ".join(duration[:-1]) + " and " + duration[-1] + + def __int__(self) -> int: + return self.seconds + + def __str__(self) -> str: + return str(self.seconds) + + +class FileSize: + def __init__(self, size: int): + self.size = size + + @property + def as_kilobytes(self) -> int: + return self.size // 1000 + + @property + def as_megabytes(self) -> int: + return self.as_kilobytes // 1000 + + @property + def as_gigabytes(self) -> int: + return self.as_megabytes // 1000 + + @property + def as_kibibytes(self) -> int: + return self.size // 1024 + + @property + def as_mebibytes(self) -> int: + return self.as_kibibytes // 1024 + + @property + def as_gibibytes(self) -> int: + return self.as_mebibytes // 1024 + + @property + def for_humans(self, suffix="B") -> str: + num = self.size + for unit in ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"): + if abs(num) < 1024.0: + return f"{num:3.1f} {unit}{suffix}" + num /= 1024.0 + return f"{num:.1f} Yi{suffix}" + + def __int__(self) -> int: + return self.size + + def __str__(self) -> str: + return str(self.size) + + +class FileMedia: + def __init__(self, path: str): + self.path = path + self.metadata = video_metadata(path) + + @cached_property + def video_metadata(self) -> Any: + metadata = self.metadata + meta_groups = None + if hasattr(metadata, '_MultipleMetadata__groups'): + # Is mkv + meta_groups = metadata._MultipleMetadata__groups + if metadata is not None and not metadata.has('width') and meta_groups: + return meta_groups[next(filter(lambda x: x.startswith('video'), meta_groups._key_list))] + return metadata + + @property + def duration(self) -> Optional[Duration]: + if self.metadata and self.metadata.has('duration'): + return Duration(self.metadata.get('duration').seconds) + + def _get_video_metadata(self, key: str) -> Optional[Any]: + if self.video_metadata and self.video_metadata.has(key): + return self.video_metadata.get(key) + + def _get_metadata(self, key: str) -> Optional[Any]: + if self.metadata and self.metadata.has(key): + return self.metadata.get(key) + + @property + def width(self) -> Optional[int]: + return self._get_video_metadata('width') + + @property + def height(self) -> Optional[int]: + return self._get_video_metadata('height') + + @property + def title(self) -> Optional[str]: + return self._get_metadata('title') + + @property + def artist(self) -> Optional[str]: + return self._get_metadata('artist') + + @property + def album(self) -> Optional[str]: + return self._get_metadata('album') + + @property + def producer(self) -> Optional[str]: + return self._get_metadata('producer') + + +class FileMixin: + + def _calculate_hash(self, hash_calculator: Any) -> str: + with open(str(self), "rb") as f: + # Read and update hash string value in blocks + for byte_block in iter(lambda: f.read(CHUNK_SIZE), b""): + hash_calculator.update(byte_block) + return hash_calculator.hexdigest() + + @property + def md5(self) -> str: + return self._calculate_hash(hashlib.md5()) + + @property + def sha1(self) -> str: + return self._calculate_hash(hashlib.sha1()) + + @property + def sha224(self) -> str: + return self._calculate_hash(hashlib.sha224()) + + @property + def sha256(self) -> str: + return self._calculate_hash(hashlib.sha256()) + + @property + def sha384(self) -> str: + return self._calculate_hash(hashlib.sha384()) + + @property + def sha512(self) -> str: + return self._calculate_hash(hashlib.sha512()) + + @property + def sha3_224(self) -> str: + return self._calculate_hash(hashlib.sha3_224()) + + @property + def sha3_256(self) -> str: + return self._calculate_hash(hashlib.sha3_256()) + + @property + def sha3_384(self) -> str: + return self._calculate_hash(hashlib.sha3_384()) + + @property + def sha3_512(self) -> str: + return self._calculate_hash(hashlib.sha3_512()) + + @property + def crc32(self) -> str: + with open(str(self), "rb") as f: + calculated_hash = 0 + # Read and update hash string value in blocks + for byte_block in iter(lambda: f.read(CHUNK_SIZE), b""): + calculated_hash = zlib.crc32(byte_block, calculated_hash) + return "%08X" % (calculated_hash & 0xFFFFFFFF) + + @property + def adler32(self) -> str: + with open(str(self), "rb") as f: + calculated_hash = 1 + # Read and update hash string value in blocks + for byte_block in iter(lambda: f.read(CHUNK_SIZE), b""): + calculated_hash = zlib.adler32(byte_block, calculated_hash) + if calculated_hash < 0: + calculated_hash += 2 ** 32 + return hex(calculated_hash)[2:10].zfill(8) + + @cached_property + def _file_stat(self) -> os.stat_result: + return os.stat(str(self)) + + @cached_property + def ctime(self) -> datetime.datetime: + return datetime.datetime.fromtimestamp(self._file_stat.st_ctime) + + @cached_property + def mtime(self) -> datetime.datetime: + return datetime.datetime.fromtimestamp(self._file_stat.st_mtime) + + @cached_property + def atime(self) -> datetime.datetime: + return datetime.datetime.fromtimestamp(self._file_stat.st_atime) + + @cached_property + def size(self) -> FileSize: + return FileSize(self._file_stat.st_size) + + @cached_property + def media(self) -> FileMedia: + return FileMedia(str(self)) + + @cached_property + def mimetype(self) -> Optional[str]: + mimetypes.init() + return mimetypes.guess_type(str(self))[0] + + @cached_property + def suffixes(self) -> str: + return "".join(super().suffixes) + + @property + def absolute(self) -> "FilePath": + return super().absolute() + + @property + def relative(self) -> "FilePath": + return self.relative_to(Path.cwd()) + + +class FilePath(FileMixin, Path): + def __new__(cls, *args, **kwargs): + if cls is FilePath: + cls = WindowsFilePath if os.name == 'nt' else PosixFilePath + self = cls._from_parts(args) + if not self._flavour.is_supported: + raise NotImplementedError("cannot instantiate %r on your system" + % (cls.__name__,)) + return self + + +class WindowsFilePath(FileMixin, WindowsPath): + pass + + +class PosixFilePath(FileMixin, PosixPath): + pass + + +class CaptionFormatter(Formatter): + + def get_field(self, field_name: str, args: Sequence[Any], kwargs: Mapping[str, Any]) -> Any: + try: + if "._" in field_name: + raise TypeError(f'Access to private property in {field_name}') + obj, first = super().get_field(field_name, args, kwargs) + has_func = hasattr(obj, "__func__") + has_self = hasattr(obj, "__self__") + if (has_func and obj.__func__ in AUTHORIZED_METHODS) or \ + (has_self and isinstance(obj.__self__, str) and obj.__name__ in AUTHORIZED_STRING_METHODS) or \ + (has_self and isinstance(obj.__self__, datetime.datetime) + and obj.__name__ in AUTHORIZED_DT_METHODS): + obj = obj() + if not isinstance(obj, VALID_TYPES + (WindowsFilePath, PosixFilePath, FilePath, FileSize, Duration)): + raise TypeError(f'Invalid type for {field_name}: {type(obj)}') + return obj, first + except Exception: + first, rest = _string.formatter_field_name_split(field_name) + return '{' + field_name + '}', first + + def format(self, __format_string: LiteralString, *args: LiteralString, **kwargs: LiteralString) -> LiteralString: + try: + return super().format(__format_string, *args, **kwargs) + except ValueError: + return __format_string + + +@click.command() +@click.argument('file', type=click.Path(exists=True)) +@click.argument('caption_format', type=str) +def test_caption_format(file: str, caption_format: str) -> None: + """Test the caption format on a given file""" + file_path = FilePath(file) + formatter = CaptionFormatter() + print(formatter.format(caption_format, file=file_path, now=datetime.datetime.now())) + + +if __name__ == '__main__': + # Testing mode + test_caption_format() diff --git a/cli.py b/cli.py new file mode 100644 index 0000000..8a5c19f --- /dev/null +++ b/cli.py @@ -0,0 +1,147 @@ +import asyncio +from typing import Sequence, Tuple, List, TypeVar + +import click +from prompt_toolkit.filters import Condition +from prompt_toolkit.formatted_text import AnyFormattedText +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.layout import FormattedTextControl, Window, ConditionalMargin, ScrollbarMargin +from prompt_toolkit.widgets import CheckboxList, RadioList +from prompt_toolkit.widgets.base import E, _DialogList + +from telegram_upload.utils import aislice + +_T = TypeVar("_T") + +PAGE_SIZE = 10 + + +async def async_handler(handler, event): + if handler: + await handler(event) + + # Tell the application to redraw. We need to do this, + # because the below event handler won't be able to + # wait for the task to finish. + event.app.invalidate() + + +class IterableDialogList(_DialogList): + many = False + + def __init__(self, values: Sequence[Tuple[_T, AnyFormattedText]]) -> None: + pass + + async def _init(self, values: Sequence[Tuple[_T, AnyFormattedText]]) -> None: + started_values = await aislice(values, PAGE_SIZE) + + # started_values = await aislice(values, PAGE_SIZE) + if not started_values: + raise IndexError('Values is empty.') + self.values = started_values + # current_values will be used in multiple_selection, + # current_value will be used otherwise. + self.current_values: List[_T] = [] + self.current_value: _T = started_values[0][0] + self._selected_index = 0 + + # Key bindings. + kb = KeyBindings() + + @kb.add("up") + def _up(event: E) -> None: + self._selected_index = max(0, self._selected_index - 1) + + @kb.add("down") + def _down(event: E) -> None: + async def handler(event): + if self._selected_index + 1 >= len(self.values): + self.values.extend(await aislice(values, PAGE_SIZE)) + self._selected_index = min(len(self.values) - 1, self._selected_index + 1) + asyncio.get_event_loop().create_task(async_handler(handler, event)) + + @kb.add("pageup") + def _pageup(event: E) -> None: + w = event.app.layout.current_window + if w.render_info: + self._selected_index = max( + 0, self._selected_index - len(w.render_info.displayed_lines) + ) + + @kb.add("pagedown") + def _pagedown(event: E) -> None: + async def handler(event): + w = event.app.layout.current_window + if self._selected_index + len(w.render_info.displayed_lines) >= len(self.values): + self.values.extend(await aislice(values, PAGE_SIZE)) + if w.render_info: + self._selected_index = min( + len(self.values) - 1, + self._selected_index + len(w.render_info.displayed_lines), + ) + asyncio.get_event_loop().create_task(async_handler(handler, event)) + + @kb.add("enter") + def _enter(event: E) -> None: + if self.many: + event.app.exit(result=self.current_values) + else: + event.app.exit(result=self.current_value) + + @kb.add(" ") + def _enter(event: E) -> None: + self._handle_enter() + + # Control and window. + self.control = FormattedTextControl( + self._get_text_fragments, key_bindings=kb, focusable=True + ) + + self.window = Window( + content=self.control, + style=self.container_style, + right_margins=[ + ConditionalMargin( + margin=ScrollbarMargin(display_arrows=True), + filter=Condition(lambda: self.show_scrollbar), + ), + ], + dont_extend_height=True, + ) + + + +class IterableCheckboxList(IterableDialogList, CheckboxList): + many = True + + +class IterableRadioList(IterableDialogList, RadioList): + pass + + +async def show_cli_widget(widget): + from prompt_toolkit import Application + from prompt_toolkit.layout import Layout + app = Application(full_screen=False, layout=Layout(widget), mouse_support=True) + return await app.run_async() + + +async def show_checkboxlist(iterator, not_items_error='No items were found. Exiting...'): + # iterator = map(lambda x: (x, f'{x.text} by {x.chat.first_name}'), iterator) + try: + checkbox_list = IterableCheckboxList(iterator) + await checkbox_list._init(iterator) + except IndexError: + click.echo(not_items_error, err=True) + return [] + return await show_cli_widget(checkbox_list) + + +async def show_radiolist(iterator, not_items_error='No items were found. Exiting...'): + try: + radio_list = IterableRadioList(iterator) + await radio_list._init(iterator) + except IndexError: + click.echo(not_items_error, err=True) + return None + return await show_cli_widget(radio_list) diff --git a/client/__init__.py b/client/__init__.py new file mode 100644 index 0000000..59ff5a4 --- /dev/null +++ b/client/__init__.py @@ -0,0 +1,4 @@ +from telegram_upload.client.telegram_manager_client import TelegramManagerClient, get_message_file_attribute + + +__all__ = ["TelegramManagerClient", "get_message_file_attribute"] diff --git a/client/progress_bar.py b/client/progress_bar.py new file mode 100644 index 0000000..ed6ae51 --- /dev/null +++ b/client/progress_bar.py @@ -0,0 +1,16 @@ +from ctypes import c_int64 + +import click + + +def get_progress_bar(action, file, length): + bar = click.progressbar(label='{} "{}"'.format(action, file), length=length) + last_current = c_int64(0) + + def progress(current, total): + if current < last_current.value: + return + bar.pos = 0 + bar.update(current) + last_current.value = current + return progress, bar diff --git a/client/telegram_download_client.py b/client/telegram_download_client.py new file mode 100644 index 0000000..4015584 --- /dev/null +++ b/client/telegram_download_client.py @@ -0,0 +1,133 @@ +import asyncio +import inspect +import io +import pathlib +import sys +from typing import Iterable + +import typing + +from more_itertools import grouper +from telethon import TelegramClient, utils, helpers +from telethon.client.downloads import MIN_CHUNK_SIZE +from telethon.crypto import AES + +from telegram_upload.client.progress_bar import get_progress_bar +from telegram_upload.download_files import DownloadFile +from telegram_upload.exceptions import TelegramUploadNoSpaceError +from telegram_upload.utils import free_disk_usage, sizeof_fmt, get_environment_integer + + +if sys.version_info < (3, 10): + from telegram_upload._compat import anext + + +PARALLEL_DOWNLOAD_BLOCKS = get_environment_integer('TELEGRAM_UPLOAD_PARALLEL_DOWNLOAD_BLOCKS', 10) + + +class TelegramDownloadClient(TelegramClient): + def find_files(self, entity): + for message in self.iter_messages(entity): + if message.document: + yield message + else: + break + + async def iter_files(self, entity): + async for message in self.iter_messages(entity=entity): + if message.document: + yield message + + def download_files(self, entity, download_files: Iterable[DownloadFile], delete_on_success: bool = False): + for download_file in download_files: + if download_file.size > free_disk_usage(): + raise TelegramUploadNoSpaceError( + 'There is no disk space to download "{}". Space required: {}'.format( + download_file.file_name, sizeof_fmt(download_file.size - free_disk_usage()) + ) + ) + progress, bar = get_progress_bar('Downloading', download_file.file_name, download_file.size) + file_name = download_file.file_name + try: + file_name = self.download_media(download_file.message, progress_callback=progress) + download_file.set_download_file_name(file_name) + finally: + bar.label = f'Downloaded "{file_name}"' + bar.update(1, 1) + bar.render_finish() + if delete_on_success: + self.delete_messages(entity, [download_file.message]) + + async def _download_file( + self: 'TelegramClient', + input_location: 'hints.FileLike', + file: 'hints.OutFileLike' = None, + *, + part_size_kb: float = None, + file_size: int = None, + progress_callback: 'hints.ProgressCallback' = None, + dc_id: int = None, + key: bytes = None, + iv: bytes = None, + msg_data: tuple = None) -> typing.Optional[bytes]: + if not part_size_kb: + if not file_size: + part_size_kb = 64 # Reasonable default + else: + part_size_kb = utils.get_appropriated_part_size(file_size) + + part_size = int(part_size_kb * 1024) + if part_size % MIN_CHUNK_SIZE != 0: + raise ValueError( + 'The part size must be evenly divisible by 4096.') + + if isinstance(file, pathlib.Path): + file = str(file.absolute()) + + in_memory = file is None or file is bytes + if in_memory: + f = io.BytesIO() + elif isinstance(file, str): + # Ensure that we'll be able to download the media + helpers.ensure_parent_dir_exists(file) + f = open(file, 'wb') + else: + f = file + + try: + # The speed of this code can be improved. 10 requests are made in parallel, but it waits for all 10 to + # finish before launching another 10. + for tasks in grouper(self._iter_download_chunk_tasks(input_location, part_size, dc_id, msg_data, file_size), + PARALLEL_DOWNLOAD_BLOCKS): + tasks = list(filter(bool, tasks)) + await asyncio.wait(tasks) + chunk = b''.join(filter(bool, [task.result() for task in tasks])) + if not chunk: + break + if iv and key: + chunk = AES.decrypt_ige(chunk, key, iv) + r = f.write(chunk) + if inspect.isawaitable(r): + await r + + if progress_callback: + r = progress_callback(f.tell(), file_size) + if inspect.isawaitable(r): + await r + + # Not all IO objects have flush (see #1227) + if callable(getattr(f, 'flush', None)): + f.flush() + + if in_memory: + return f.getvalue() + finally: + if isinstance(file, str) or in_memory: + f.close() + + def _iter_download_chunk_tasks(self, input_location, part_size, dc_id, msg_data, file_size): + for i in range(0, file_size, part_size): + yield self.loop.create_task( + anext(self._iter_download(input_location, offset=i, request_size=part_size, dc_id=dc_id, + msg_data=msg_data)) + ) diff --git a/client/telegram_manager_client.py b/client/telegram_manager_client.py new file mode 100644 index 0000000..77ce25d --- /dev/null +++ b/client/telegram_manager_client.py @@ -0,0 +1,129 @@ +import getpass +import json +import os +import re +import sys +from distutils.version import StrictVersion +from typing import Union +from urllib.parse import urlparse + +import click +from telethon.errors import ApiIdInvalidError +from telethon.network import ConnectionTcpMTProxyRandomizedIntermediate +from telethon.tl.types import DocumentAttributeFilename, User, InputPeerUser +from telethon.version import __version__ as telethon_version + +from telegram_upload.client.telegram_download_client import TelegramDownloadClient +from telegram_upload.client.telegram_upload_client import TelegramUploadClient +from telegram_upload.config import SESSION_FILE +from telegram_upload.exceptions import TelegramProxyError, InvalidApiFileError + +if StrictVersion(telethon_version) >= StrictVersion('1.0'): + import telethon.sync # noqa + + +if sys.version_info < (3, 8): + cached_property = property +else: + from functools import cached_property + + +BOT_USER_MAX_FILE_SIZE = 52428800 # 50MB +USER_MAX_FILE_SIZE = 2097152000 # 2GB +PREMIUM_USER_MAX_FILE_SIZE = 4194304000 # 4GB +USER_MAX_CAPTION_LENGTH = 1024 +PREMIUM_USER_MAX_CAPTION_LENGTH = 2048 +PROXY_ENVIRONMENT_VARIABLE_NAMES = [ + 'TELEGRAM_UPLOAD_PROXY', + 'HTTPS_PROXY', + 'HTTP_PROXY', +] + + +def get_message_file_attribute(message): + return next(filter(lambda x: isinstance(x, DocumentAttributeFilename), + message.document.attributes), None) + + +def phone_match(value): + match = re.match(r'\+?[0-9.()\[\] \-]+', value) + if match is None: + raise ValueError('{} is not a valid phone'.format(value)) + return value + + +def get_proxy_environment_variable(): + for env_name in PROXY_ENVIRONMENT_VARIABLE_NAMES: + if env_name in os.environ: + return os.environ[env_name] + + +def parse_proxy_string(proxy: Union[str, None]): + if not proxy: + return None + proxy_parsed = urlparse(proxy) + if not proxy_parsed.scheme or not proxy_parsed.hostname or not proxy_parsed.port: + raise TelegramProxyError('Malformed proxy address: {}'.format(proxy)) + if proxy_parsed.scheme == 'mtproxy': + return ('mtproxy', proxy_parsed.hostname, proxy_parsed.port, proxy_parsed.username) + try: + import socks + except ImportError: + raise TelegramProxyError('pysocks module is required for use HTTP/socks proxies. ' + 'Install it using: pip install pysocks') + proxy_type = { + 'http': socks.HTTP, + 'socks4': socks.SOCKS4, + 'socks5': socks.SOCKS5, + }.get(proxy_parsed.scheme) + if proxy_type is None: + raise TelegramProxyError('Unsupported proxy type: {}'.format(proxy_parsed.scheme)) + return (proxy_type, proxy_parsed.hostname, proxy_parsed.port, True, + proxy_parsed.username, proxy_parsed.password) + + +class TelegramManagerClient(TelegramUploadClient, TelegramDownloadClient): + def __init__(self, config_file, proxy=None, **kwargs): + with open(config_file) as f: + config = json.load(f) + self.config_file = config_file + proxy = proxy if proxy is not None else get_proxy_environment_variable() + proxy = parse_proxy_string(proxy) + if proxy and proxy[0] == 'mtproxy': + proxy = proxy[1:] + kwargs['connection'] = ConnectionTcpMTProxyRandomizedIntermediate + super().__init__(config.get('session', SESSION_FILE), config['api_id'], config['api_hash'], + proxy=proxy, **kwargs) + + def start( + self, + phone=lambda: click.prompt('Please enter your phone', type=phone_match), + password=lambda: getpass.getpass('Please enter your password: '), + *, + bot_token=None, force_sms=False, code_callback=None, + first_name='New User', last_name='', max_attempts=3): + try: + return super().start(phone=phone, password=password, bot_token=bot_token, force_sms=force_sms, + first_name=first_name, last_name=last_name, max_attempts=max_attempts) + except ApiIdInvalidError: + raise InvalidApiFileError(self.config_file) + + @cached_property + def me(self) -> Union[User, InputPeerUser]: + return self.get_me() + + @property + def max_file_size(self): + if hasattr(self.me, 'premium') and self.me.premium: + return PREMIUM_USER_MAX_FILE_SIZE + elif self.me.bot: + return BOT_USER_MAX_FILE_SIZE + else: + return USER_MAX_FILE_SIZE + + @property + def max_caption_length(self): + if hasattr(self.me, 'premium') and self.me.premium: + return PREMIUM_USER_MAX_CAPTION_LENGTH + else: + return USER_MAX_CAPTION_LENGTH diff --git a/client/telegram_upload_client.py b/client/telegram_upload_client.py new file mode 100644 index 0000000..9823263 --- /dev/null +++ b/client/telegram_upload_client.py @@ -0,0 +1,406 @@ +import asyncio +import hashlib +import os +import time +from typing import Iterable, Optional + +import click +from telethon import TelegramClient, utils, helpers, custom +from telethon.crypto import AES +from telethon.errors import RPCError, FloodWaitError, InvalidBufferError, FloodError +from telethon.tl import types, functions, TLRequest +from telethon.utils import pack_bot_file_id + +from telegram_upload.client.progress_bar import get_progress_bar +from telegram_upload.exceptions import TelegramUploadDataLoss, MissingFileError +from telegram_upload.upload_files import File +from telegram_upload.utils import grouper, async_to_sync, get_environment_integer + +PARALLEL_UPLOAD_BLOCKS = get_environment_integer('TELEGRAM_UPLOAD_PARALLEL_UPLOAD_BLOCKS', 8) +ALBUM_FILES = 10 +RETRIES = 10 +MAX_RECONNECT_RETRIES = get_environment_integer('TELEGRAM_UPLOAD_MAX_RECONNECT_RETRIES', 10) +RECONNECT_TIMEOUT = get_environment_integer('TELEGRAM_UPLOAD_RECONNECT_TIMEOUT', 10) +MIN_RECONNECT_WAIT = get_environment_integer('TELEGRAM_UPLOAD_MIN_RECONNECT_WAIT', 10) + + +class TelegramUploadClient(TelegramClient): + parallel_upload_blocks = PARALLEL_UPLOAD_BLOCKS + + def __init__(self, *args, **kwargs): + self.reconnecting_lock = asyncio.Lock() + self.upload_semaphore = asyncio.Semaphore(self.parallel_upload_blocks) + super().__init__(*args, **kwargs) + + def forward_to(self, message, destinations): + for destination in destinations: + self.forward_messages(destination, [message]) + + async def _send_album_media(self, entity, media): + entity = await self.get_input_entity(entity) + request = functions.messages.SendMultiMediaRequest( + entity, multi_media=media, silent=None, schedule_date=None, clear_draft=None + ) + result = await self(request) + + random_ids = [m.random_id for m in media] + return self._get_response_message(random_ids, result, entity) + + def send_files_as_album(self, entity, files, delete_on_success=False, print_file_id=False, + forward=()): + for files_group in grouper(ALBUM_FILES, files): + media = self.send_files(entity, files_group, delete_on_success, print_file_id, forward, send_as_media=True) + async_to_sync(self._send_album_media(entity, media)) + + def _send_file_message(self, entity, file, thumb, progress): + message = self.send_file(entity, file, thumb=thumb, + file_size=file.file_size if isinstance(file, File) else None, + caption=file.file_caption, force_document=file.force_file, + progress_callback=progress, attributes=file.file_attributes) + if hasattr(message.media, 'document') and file.file_size != message.media.document.size: + raise TelegramUploadDataLoss( + 'Remote document size: {} bytes (local file size: {} bytes)'.format( + message.media.document.size, file.file_size)) + return message + + async def _send_media(self, entity, file: File, progress): + entity = await self.get_input_entity(entity) + supports_streaming = False # TODO + fh, fm, _ = await self._file_to_media( + file, supports_streaming=file, progress_callback=progress) + if isinstance(fm, types.InputMediaUploadedPhoto): + r = await self(functions.messages.UploadMediaRequest( + entity, media=fm + )) + + fm = utils.get_input_media(r.photo) + elif isinstance(fm, types.InputMediaUploadedDocument): + r = await self(functions.messages.UploadMediaRequest( + entity, media=fm + )) + + fm = utils.get_input_media( + r.document, supports_streaming=supports_streaming) + + return types.InputSingleMedia( + fm, + message=file.short_name, + entities=None, + # random_id is autogenerated + ) + + def send_one_file(self, entity, file: File, send_as_media: bool = False, thumb: Optional[str] = None, + retries=RETRIES): + message = None + progress, bar = get_progress_bar('Uploading', file.file_name, file.file_size) + + try: + try: + # TODO: remove distinction? + if send_as_media: + message = async_to_sync(self._send_media(entity, file, progress)) + else: + message = self._send_file_message(entity, file, thumb, progress) + finally: + bar.render_finish() + except (FloodWaitError,FloodError) as e: + click.echo(f'{e}. Waiting for {e.seconds} seconds.', err=True) + time.sleep(e.seconds) + message = self.send_one_file(entity, file, send_as_media, thumb, retries) + except RPCError as e: + if retries > 0: + click.echo(f'The file "{file.file_name}" could not be uploaded: {e}. Retrying...', err=True) + message = self.send_one_file(entity, file, send_as_media, thumb, retries - 1) + else: + click.echo(f'The file "{file.file_name}" could not be uploaded: {e}. It will not be retried.', err=True) + return message + + def send_files(self, entity, files: Iterable[File], delete_on_success=False, print_file_id=False, + forward=(), send_as_media: bool = False): + has_files = False + messages = [] + for file in files: + has_files = True + thumb = file.get_thumbnail() + try: + message = self.send_one_file(entity, file, send_as_media, thumb=thumb) + finally: + if thumb and not file.is_custom_thumbnail and os.path.lexists(thumb): + os.remove(thumb) + if message is None: + click.echo('Failed to upload file "{}"'.format(file.file_name), err=True) + if message and print_file_id: + click.echo('Uploaded successfully "{}" (file_id {})'.format(file.file_name, + pack_bot_file_id(message.media))) + if message and delete_on_success: + click.echo('Deleting "{}"'.format(file)) + os.remove(file.path) + if message: + self.forward_to(message, forward) + messages.append(message) + if not has_files: + raise MissingFileError('Files do not exist.') + return messages + + async def upload_file( + self: 'TelegramClient', + file: 'hints.FileLike', + *, + part_size_kb: float = None, + file_size: int = None, + file_name: str = None, + use_cache: type = None, + key: bytes = None, + iv: bytes = None, + progress_callback: 'hints.ProgressCallback' = None) -> 'types.TypeInputFile': + """ + Uploads a file to Telegram's servers, without sending it. + + .. note:: + + Generally, you want to use `send_file` instead. + + This method returns a handle (an instance of :tl:`InputFile` or + :tl:`InputFileBig`, as required) which can be later used before + it expires (they are usable during less than a day). + + Uploading a file will simply return a "handle" to the file stored + remotely in the Telegram servers, which can be later used on. This + will **not** upload the file to your own chat or any chat at all. + + Arguments + file (`str` | `bytes` | `file`): + The path of the file, byte array, or stream that will be sent. + Note that if a byte array or a stream is given, a filename + or its type won't be inferred, and it will be sent as an + "unnamed application/octet-stream". + + part_size_kb (`int`, optional): + Chunk size when uploading files. The larger, the less + requests will be made (up to 512KB maximum). + + file_size (`int`, optional): + The size of the file to be uploaded, which will be determined + automatically if not specified. + + If the file size can't be determined beforehand, the entire + file will be read in-memory to find out how large it is. + + file_name (`str`, optional): + The file name which will be used on the resulting InputFile. + If not specified, the name will be taken from the ``file`` + and if this is not a `str`, it will be ``"unnamed"``. + + use_cache (`type`, optional): + This parameter currently does nothing, but is kept for + backward-compatibility (and it may get its use back in + the future). + + key ('bytes', optional): + In case of an encrypted upload (secret chats) a key is supplied + + iv ('bytes', optional): + In case of an encrypted upload (secret chats) an iv is supplied + + progress_callback (`callable`, optional): + A callback function accepting two parameters: + ``(sent bytes, total)``. + + When sending an album, the callback will receive a number + between 0 and the amount of files as the "sent" parameter, + and the amount of files as the "total". Note that the first + parameter will be a floating point number to indicate progress + within a file (e.g. ``2.5`` means it has sent 50% of the third + file, because it's between 2 and 3). + + Returns + :tl:`InputFileBig` if the file size is larger than 10MB, + `InputSizedFile ` + (subclass of :tl:`InputFile`) otherwise. + + Example + .. code-block:: python + + # Photos as photo and document + file = await client.upload_file('photo.jpg') + await client.send_file(chat, file) # sends as photo + await client.send_file(chat, file, force_document=True) # sends as document + + file.name = 'not a photo.jpg' + await client.send_file(chat, file, force_document=True) # document, new name + + # As song or as voice note + file = await client.upload_file('song.ogg') + await client.send_file(chat, file) # sends as song + await client.send_file(chat, file, voice_note=True) # sends as voice note + """ + if isinstance(file, (types.InputFile, types.InputFileBig)): + return file # Already uploaded + + async with helpers._FileStream(file, file_size=file_size) as stream: + # Opening the stream will determine the correct file size + file_size = stream.file_size + + if not part_size_kb: + part_size_kb = utils.get_appropriated_part_size(file_size) + + if part_size_kb > 512: + raise ValueError('The part size must be less or equal to 512KB') + + part_size = int(part_size_kb * 1024) + if part_size % 1024 != 0: + raise ValueError( + 'The part size must be evenly divisible by 1024') + + # Set a default file name if None was specified + file_id = helpers.generate_random_long() + if not file_name: + file_name = stream.name or str(file_id) + + # If the file name lacks extension, add it if possible. + # Else Telegram complains with `PHOTO_EXT_INVALID_ERROR` + # even if the uploaded image is indeed a photo. + if not os.path.splitext(file_name)[-1]: + file_name += utils._get_extension(stream) + + # Determine whether the file is too big (over 10MB) or not + # Telegram does make a distinction between smaller or larger files + is_big = file_size > 10 * 1024 * 1024 + hash_md5 = hashlib.md5() + + part_count = (file_size + part_size - 1) // part_size + self._log[__name__].info('Uploading file of %d bytes in %d chunks of %d', + file_size, part_count, part_size) + + pos = 0 + for part_index in range(part_count): + # Read the file by in chunks of size part_size + part = await helpers._maybe_await(stream.read(part_size)) + + if not isinstance(part, bytes): + raise TypeError( + 'file descriptor returned {}, not bytes (you must ' + 'open the file in bytes mode)'.format(type(part))) + + # `file_size` could be wrong in which case `part` may not be + # `part_size` before reaching the end. + if len(part) != part_size and part_index < part_count - 1: + raise ValueError( + 'read less than {} before reaching the end; either ' + '`file_size` or `read` are wrong'.format(part_size)) + + pos += len(part) + + # Encryption part if needed + if key and iv: + part = AES.encrypt_ige(part, key, iv) + + if not is_big: + # Bit odd that MD5 is only needed for small files and not + # big ones with more chance for corruption, but that's + # what Telegram wants. + hash_md5.update(part) + + # The SavePartRequest is different depending on whether + # the file is too large or not (over or less than 10MB) + if is_big: + request = functions.upload.SaveBigFilePartRequest( + file_id, part_index, part_count, part) + else: + request = functions.upload.SaveFilePartRequest( + file_id, part_index, part) + await self.upload_semaphore.acquire() + self.loop.create_task( + self._send_file_part(request, part_index, part_count, pos, file_size, progress_callback), + name=f"telegram-upload-file-{part_index}" + ) + # Wait for all tasks to finish + await asyncio.wait([ + task for task in asyncio.all_tasks() if task.get_name().startswith(f"telegram-upload-file-") + ]) + if is_big: + return types.InputFileBig(file_id, part_count, file_name) + else: + return custom.InputSizedFile( + file_id, part_count, file_name, md5=hash_md5, size=file_size + ) + + # endregion + + async def _send_file_part(self, request: TLRequest, part_index: int, part_count: int, pos: int, file_size: int, + progress_callback: Optional['hints.ProgressCallback'] = None, retry: int = 0) -> None: + """ + Submit the file request part to Telegram. This method waits for the request to be executed, logs the upload, + and releases the semaphore to allow further uploading. + + :param request: SaveBigFilePartRequest or SaveFilePartRequest. This request will be awaited. + :param part_index: Part index as integer. Used in logging. + :param part_count: Total parts count as integer. Used in logging. + :param pos: Number of part as integer. Used for progress bar. + :param file_size: Total file size. Used for progress bar. + :param progress_callback: Callback to use after submit the request. Optional. + :return: None + """ + result = None + try: + result = await self(request) + except InvalidBufferError as e: + if e.code == 429: + # Too many connections + click.echo(f'Too many connections to Telegram servers.', err=True) + else: + raise + except ConnectionError: + # Retry to send the file part + click.echo(f'Detected connection error. Retrying...', err=True) + except FloodError as e: + print(e) + else: + self.upload_semaphore.release() + if result is None and retry < MAX_RECONNECT_RETRIES: + # An error occurred, retry + await asyncio.sleep(max(MIN_RECONNECT_WAIT, retry * MIN_RECONNECT_WAIT)) + await self.reconnect() + await self._send_file_part( + request, part_index, part_count, pos, file_size, progress_callback, retry + 1 + ) + elif result: + self._log[__name__].debug('Uploaded %d/%d', + part_index + 1, part_count) + if progress_callback: + await helpers._maybe_await(progress_callback(pos, file_size)) + else: + raise RuntimeError( + 'Failed to upload file part {}.'.format(part_index)) + + def decrease_upload_semaphore(self): + """ + Decreases the upload semaphore by one. This method is used to reduce the number of parallel uploads. + :return: + """ + if self.parallel_upload_blocks > 1: + self.parallel_upload_blocks -= 1 + self.loop.create_task(self.upload_semaphore.acquire()) + + async def reconnect(self): + """ + Reconnects to Telegram servers. + + :return: None + """ + await self.reconnecting_lock.acquire() + if self.is_connected(): + # Reconnected in another task + self.reconnecting_lock.release() + return + self.decrease_upload_semaphore() + try: + click.echo(f'Reconnecting to Telegram servers...') + await asyncio.wait_for(self.connect(), RECONNECT_TIMEOUT) + click.echo(f'Reconnected to Telegram servers.') + except InvalidBufferError: + click.echo(f'InvalidBufferError connecting to Telegram servers.', err=True) + except asyncio.TimeoutError: + click.echo(f'Timeout connecting to Telegram servers.', err=True) + finally: + self.reconnecting_lock.release() diff --git a/config.py b/config.py new file mode 100644 index 0000000..bcf951c --- /dev/null +++ b/config.py @@ -0,0 +1,24 @@ +import json +import os + +import click + +CONFIG_DIRECTORY = os.environ.get('TELEGRAM_UPLOAD_CONFIG_DIRECTORY', '~/.config') +CONFIG_FILE = os.path.expanduser('{}/telegram-upload.json'.format(CONFIG_DIRECTORY)) +SESSION_FILE = os.path.expanduser('{}/telegram-upload'.format(CONFIG_DIRECTORY)) + + +def prompt_config(config_file): + os.makedirs(os.path.dirname(config_file), exist_ok=True) + click.echo('Go to https://my.telegram.org and create a App in API development tools') + api_id = click.prompt('Please Enter api_id', type=int) + api_hash = click.prompt('Now enter api_hash') + with open(config_file, 'w') as f: + json.dump({'api_id': api_id, 'api_hash': api_hash}, f) + return config_file + + +def default_config(): + if os.path.lexists(CONFIG_FILE): + return CONFIG_FILE + return prompt_config(CONFIG_FILE) diff --git a/download_files.py b/download_files.py new file mode 100644 index 0000000..35fdb54 --- /dev/null +++ b/download_files.py @@ -0,0 +1,210 @@ +import os +import sys +from typing import Iterable, Iterator, Optional, BinaryIO + +from telethon.tl.types import Message, DocumentAttributeFilename + + +if sys.version_info < (3, 8): + cached_property = property +else: + from functools import cached_property + + +CHUNK_FILE_SIZE = 1024 * 1024 + + +def pipe_file(read_file_name: str, write_file: BinaryIO): + """Read a file by its file name and write in another file already open.""" + with open(read_file_name, "rb") as read_file: + while True: + data = read_file.read(CHUNK_FILE_SIZE) + if data: + write_file.write(data) + else: + break + + +class JoinStrategyBase: + """Base class to inherit join strategies. The strategies depend on the file type. + For example, zip files and rar files do not merge in the same way. + """ + def __init__(self): + self.download_files = [] + + def is_part(self, download_file: 'DownloadFile') -> bool: + """Returns if the download file is part of this bundle.""" + raise NotImplementedError + + def add_download_file(self, download_file: 'DownloadFile') -> None: + """Add a download file to this bundle.""" + if download_file in self.download_files: + return + self.download_files.append(download_file) + + @classmethod + def is_applicable(cls, download_file: 'DownloadFile') -> bool: + """Returns if this strategy is applicable to the download file.""" + raise NotImplementedError + + def join_download_files(self): + """Join the downloaded files in the bundle.""" + raise NotImplementedError + + +class UnionJoinStrategy(JoinStrategyBase): + """Join separate files without any application. These files have extension + 01, 02, 03... + """ + base_name: Optional[str] = None + + @staticmethod + def get_base_name(download_file: 'DownloadFile'): + """Returns the file name without extension.""" + return download_file.file_name.rsplit(".", 1)[0] + + def add_download_file(self, download_file: 'DownloadFile') -> None: + """Add a download file to this bundle.""" + if self.base_name is None: + self.base_name = self.get_base_name(download_file) + super().add_download_file(download_file) + + def is_part(self, download_file: 'DownloadFile') -> bool: + """Returns if the download file is part of this bundle.""" + return self.base_name == self.get_base_name(download_file) + + @classmethod + def is_applicable(cls, download_file: 'DownloadFile') -> bool: + """Returns if this strategy is applicable to the download file.""" + return download_file.file_name_extension.isdigit() + + def join_download_files(self): + """Join the downloaded files in the bundle.""" + download_files = self.download_files + sorted_files = sorted(download_files, key=lambda x: x.file_name_extension) + sorted_files = [file for file in sorted_files if os.path.lexists(file.downloaded_file_name or "")] + if not sorted_files or len(sorted_files) - 1 != int(sorted_files[-1].file_name_extension): + # There are parts of the file missing. Stopping... + return + with open(self.get_base_name(sorted_files[0]), "wb") as new_file: + for download_file in sorted_files: + pipe_file(download_file.downloaded_file_name, new_file) + for download_file in sorted_files: + os.remove(download_file.downloaded_file_name) + + +JOIN_STRATEGIES = [ + UnionJoinStrategy, +] + + +def get_join_strategy(download_file: 'DownloadFile') -> Optional[JoinStrategyBase]: + """Get join strategy for the download file. An instance is returned if a strategy + is available. Otherwise, None is returned. + """ + for strategy_cls in JOIN_STRATEGIES: + if strategy_cls.is_applicable(download_file): + strategy = strategy_cls() + strategy.add_download_file(download_file) + return strategy + + +class DownloadFile: + """File to download. This includes the Telethon message with the file.""" + downloaded_file_name: Optional[str] = None + + def __init__(self, message: Message): + """Creates the download file instance from the message.""" + self.message = message + + def set_download_file_name(self, file_name): + """After download the file, set the final download file name.""" + self.downloaded_file_name = file_name + + @cached_property + def filename_attr(self) -> Optional[DocumentAttributeFilename]: + """Get the document attribute file name attribute in the document.""" + return next(filter(lambda x: isinstance(x, DocumentAttributeFilename), + self.document.attributes), None) + + @cached_property + def file_name(self) -> str: + """Get the file name.""" + return self.filename_attr.file_name if self.filename_attr else 'Unknown' + + @property + def file_name_extension(self) -> str: + """Get the file name extension.""" + parts = self.file_name.rsplit(".", 1) + return parts[-1] if len(parts) >= 2 else "" + + @property + def document(self): + """Get the message document.""" + return self.message.document + + @property + def size(self) -> int: + """Get the file size.""" + return self.document.size + + def __eq__(self, other: 'DownloadFile'): + """Compare download files by their file name.""" + return self.file_name == other.file_name + + +class DownloadSplitFilesBase: + """Iterate over complete and split files. Base class to inherit.""" + def __init__(self, messages: Iterable[Message]): + self.messages = messages + + def get_iterator(self) -> Iterator[DownloadFile]: + """Get an iterator with the download files.""" + raise NotImplementedError + + def __iter__(self) -> 'DownloadSplitFilesBase': + """Set the iterator from the get_iterator method.""" + self._iterator = self.get_iterator() + return self + + def __next__(self) -> 'DownloadFile': + """Get the next download file in the iterator.""" + if self._iterator is None: + self._iterator = self.get_iterator() + return next(self._iterator) + + +class KeepDownloadSplitFiles(DownloadSplitFilesBase): + """Download split files without join it.""" + def get_iterator(self) -> Iterator[DownloadFile]: + """Get an iterator with the download files.""" + return map(lambda message: DownloadFile(message), self.messages) + + +class JoinDownloadSplitFiles(DownloadSplitFilesBase): + """Download split files and join it.""" + def get_iterator(self) -> Iterator[DownloadFile]: + """Get an iterator with the download files. This method applies the join strategy and + joins the files after download it. + """ + current_join_strategy: Optional[JoinStrategyBase] = None + for message in self.messages: + download_file = DownloadFile(message) + yield download_file + if current_join_strategy and current_join_strategy.is_part(download_file): + # There is a bundle in process and the download file is part of it. Add the download + # file to the bundle. + current_join_strategy.add_download_file(download_file) + elif current_join_strategy and not current_join_strategy.is_part(download_file): + # There is a bundle in process and the download file is not part of it. Join the files + # in the bundle and finish it. + current_join_strategy.join_download_files() + current_join_strategy = None + if current_join_strategy is None: + # There is no bundle in process. Get the current bundle if the file has a strategy + # available. + current_join_strategy = get_join_strategy(download_file) + else: + # After finish all the files, join the latest bundle. + if current_join_strategy: + current_join_strategy.join_download_files() diff --git a/exceptions.py b/exceptions.py new file mode 100644 index 0000000..e9734cd --- /dev/null +++ b/exceptions.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- + +"""Exceptions for telegram-upload.""" +import sys + +import click + +from telegram_upload.config import prompt_config + + +class ThumbError(Exception): + pass + + +class ThumbVideoError(ThumbError): + pass + + +class TelegramUploadError(Exception): + body = '' + error_code = 1 + + def __init__(self, extra_body=''): + self.extra_body = extra_body + + def __str__(self): + msg = self.__class__.__name__ + if self.body: + msg += ': {}'.format(self.body) + if self.extra_body: + msg += ('. {}' if self.body else ': {}').format(self.extra_body) + return msg + + +class MissingFileError(TelegramUploadError): + pass + + +class InvalidApiFileError(TelegramUploadError): + def __init__(self, config_file, extra_body=''): + self.config_file = config_file + super().__init__(extra_body) + + +class TelegramInvalidFile(TelegramUploadError): + error_code = 3 + + +class TelegramUploadNoSpaceError(TelegramUploadError): + error_code = 28 + + +class TelegramUploadDataLoss(TelegramUploadError): + error_code = 29 + + +class TelegramProxyError(TelegramUploadError): + error_code = 30 + + +class TelegramEnvironmentError(TelegramUploadError): + error_code = 31 + + +def catch(fn): + def wrap(*args, **kwargs): + try: + return fn(*args, **kwargs) + except InvalidApiFileError as e: + click.echo('The api_id/api_hash combination is invalid. Re-enter both values.') + prompt_config(e.config_file) + return catch(fn)(*args, **kwargs) + except TelegramUploadError as e: + sys.stderr.write('[Error] telegram-upload Exception:\n{}\n'.format(e)) + exit(e.error_code) + return wrap diff --git a/management.py b/management.py new file mode 100644 index 0000000..fa397aa --- /dev/null +++ b/management.py @@ -0,0 +1,255 @@ +# -*- coding: utf-8 -*- + +"""Console script for telegram-upload.""" +import os + +import click +from telethon.tl.types import User + +from telegram_upload.cli import show_checkboxlist, show_radiolist +from telegram_upload.client import TelegramManagerClient, get_message_file_attribute +from telegram_upload.config import default_config, CONFIG_FILE +from telegram_upload.download_files import KeepDownloadSplitFiles, JoinDownloadSplitFiles +from telegram_upload.exceptions import catch +from telegram_upload.upload_files import NoDirectoriesFiles, RecursiveFiles, NoLargeFiles, SplitFiles, is_valid_file +from telegram_upload.utils import async_to_sync, amap, sync_to_async_iterator + + +try: + from natsort import natsorted +except ImportError: + natsorted = None + + +DIRECTORY_MODES = { + 'fail': NoDirectoriesFiles, + 'recursive': RecursiveFiles, +} +LARGE_FILE_MODES = { + 'fail': NoLargeFiles, + 'split': SplitFiles, +} +DOWNLOAD_SPLIT_FILE_MODES = { + 'keep': KeepDownloadSplitFiles, + 'join': JoinDownloadSplitFiles, +} + + +def get_file_display_name(message): + display_name_parts = [] + is_document = message.document + if is_document and message.document.mime_type: + display_name_parts.append(message.document.mime_type.split('/')[0]) + if is_document and get_message_file_attribute(message): + display_name_parts.append(get_message_file_attribute(message).file_name) + if message.text: + display_name_parts.append(f'[{message.text}]' if display_name_parts else message.text) + from_user = message.sender and isinstance(message.sender, User) + if from_user: + display_name_parts.append('by') + if from_user and message.sender.first_name: + display_name_parts.append(message.sender.first_name) + if from_user and message.sender.last_name: + display_name_parts.append(message.sender.last_name) + if from_user and message.sender.username: + display_name_parts.append(f'@{message.sender.username}') + display_name_parts.append(f'{message.date}') + return ' '.join(display_name_parts) + + +async def interactive_select_files(client, entity: str): + iterator = client.iter_files(entity) + iterator = amap(lambda x: (x, get_file_display_name(x)), iterator,) + return await show_checkboxlist(iterator) + + +async def interactive_select_local_files(): + iterator = filter(lambda x: os.path.isfile(x) and os.path.lexists(x), os.listdir('.')) + iterator = sync_to_async_iterator(map(lambda x: (x, x), iterator)) + return await show_checkboxlist(iterator, 'Not files were found in the current directory ' + '(subdirectories are not supported). Exiting...') + + +async def interactive_select_dialog(client): + iterator = client.iter_dialogs() + iterator = amap(lambda x: (x, x.name), iterator,) + value = await show_radiolist(iterator, 'Not dialogs were found in your Telegram session. ' + 'Have you started any conversations?') + return value.id if value else None + + +class MutuallyExclusiveOption(click.Option): + def __init__(self, *args, **kwargs): + self.mutually_exclusive = set(kwargs.pop('mutually_exclusive', [])) + help = kwargs.get('help', '') + if self.mutually_exclusive: + kwargs['help'] = help + ( + ' NOTE: This argument is mutually exclusive with' + ' arguments: [{}].'.format(self.mutually_exclusive_text) + ) + super(MutuallyExclusiveOption, self).__init__(*args, **kwargs) + + def handle_parse_result(self, ctx, opts, args): + if self.mutually_exclusive.intersection(opts) and self.name in opts: + raise click.UsageError( + "Illegal usage: `{}` is mutually exclusive with " + "arguments `{}`.".format( + self.name, + self.mutually_exclusive_text + ) + ) + + return super(MutuallyExclusiveOption, self).handle_parse_result( + ctx, + opts, + args + ) + + @property + def mutually_exclusive_text(self): + return ', '.join([x.replace('_', '-') for x in self.mutually_exclusive]) + + +@click.command() +@click.argument('files', nargs=-1) +@click.option('--to', default=None, help='Phone number, username, invite link or "me" (saved messages). ' + 'By default "me".') +@click.option('--config', default=None, help='Configuration file to use. By default "{}".'.format(CONFIG_FILE)) +@click.option('-d', '--delete-on-success', is_flag=True, help='Delete local file after successful upload.') +@click.option('--print-file-id', is_flag=True, help='Print the id of the uploaded file after the upload.') +@click.option('--force-file', is_flag=True, help='Force send as a file. The filename will be preserved ' + 'but the preview will not be available.') +@click.option('-f', '--forward', multiple=True, help='Forward the file to a chat (alias or id) or user (username, ' + 'mobile or id). This option can be used multiple times.') +@click.option('--directories', default='fail', type=click.Choice(list(DIRECTORY_MODES.keys())), + help='Defines how to process directories. By default directories are not accepted and will raise an ' + 'error.') +@click.option('--large-files', default='fail', type=click.Choice(list(LARGE_FILE_MODES.keys())), + help='Defines how to process large files unsupported for Telegram. By default large files are not ' + 'accepted and will raise an error.') +@click.option('--caption', type=str, help='Change file description. By default the file name.') +@click.option('--no-thumbnail', is_flag=True, cls=MutuallyExclusiveOption, mutually_exclusive=["thumbnail_file"], + help='Disable thumbnail generation. For some known file formats, Telegram may still generate a ' + 'thumbnail or show a preview.') +@click.option('--thumbnail-file', default=None, cls=MutuallyExclusiveOption, mutually_exclusive=["no_thumbnail"], + help='Path to the preview file to use for the uploaded file.') +@click.option('-p', '--proxy', default=None, + help='Use an http proxy, socks4, socks5 or mtproxy. For example socks5://user:pass@1.2.3.4:8080 ' + 'for socks5 and mtproxy://secret@1.2.3.4:443 for mtproxy.') +@click.option('-a', '--album', is_flag=True, + help='Send video or photos as an album.') +@click.option('-i', '--interactive', is_flag=True, + help='Use interactive mode.') +@click.option('--sort', is_flag=True, + help='Sort files by name before upload it. Install the natsort Python package for natural sorting.') +def upload(files, to, config, delete_on_success, print_file_id, force_file, forward, directories, large_files, caption, + no_thumbnail, thumbnail_file, proxy, album, interactive, sort): + """Upload one or more files to Telegram using your personal account. + The maximum file size is 2 GiB for free users and 4 GiB for premium accounts. + By default, they will be saved in your saved messages. + """ + client = TelegramManagerClient(config or default_config(), proxy=proxy) + client.start() + if interactive and not files: + click.echo('Select the local files to upload:') + click.echo('[SPACE] Select file [ENTER] Next step') + files = async_to_sync(interactive_select_local_files()) + if interactive and not files: + # No files selected. Exiting. + return + if interactive and to is None: + click.echo('Select the recipient dialog of the files:') + click.echo('[SPACE] Select dialog [ENTER] Next step') + to = async_to_sync(interactive_select_dialog(client)) + elif to is None: + to = 'me' + files = filter(lambda file: is_valid_file(file, lambda message: click.echo(message, err=True)), files) + files = DIRECTORY_MODES[directories](client, files) + if directories == 'fail': + # Validate now + files = list(files) + if no_thumbnail: + thumbnail = False + elif thumbnail_file: + thumbnail = thumbnail_file + else: + thumbnail = None + files_cls = LARGE_FILE_MODES[large_files] + files = files_cls(client, files, caption=caption, thumbnail=thumbnail, force_file=force_file) + if large_files == 'fail': + # Validate now + files = list(files) + if isinstance(to, str) and to.lstrip("-+").isdigit(): + to = int(to) + if sort and natsorted: + files = natsorted(files, key=lambda x: x.name) + elif sort: + files = sorted(files, key=lambda x: x.name) + if album: + client.send_files_as_album(to, files, delete_on_success, print_file_id, forward) + else: + client.send_files(to, files, delete_on_success, print_file_id, forward) + + +@click.command() +@click.option('--from', '-f', 'from_', default='', + help='Phone number, username, chat id or "me" (saved messages). By default "me".') +@click.option('--config', default=None, help='Configuration file to use. By default "{}".'.format(CONFIG_FILE)) +@click.option('-d', '--delete-on-success', is_flag=True, + help='Delete telegram message after successful download. Useful for creating a download queue.') +@click.option('-p', '--proxy', default=None, + help='Use an http proxy, socks4, socks5 or mtproxy. For example socks5://user:pass@1.2.3.4:8080 ' + 'for socks5 and mtproxy://secret@1.2.3.4:443 for mtproxy.') +@click.option('-m', '--split-files', default='keep', type=click.Choice(list(DOWNLOAD_SPLIT_FILE_MODES.keys())), + help='Defines how to download large files split in Telegram. By default the files are not merged.') +@click.option('-i', '--interactive', is_flag=True, + help='Use interactive mode.') +def download(from_, config, delete_on_success, proxy, split_files, interactive): + """Download all the latest messages that are files in a chat, by default download + from "saved messages". It is recommended to forward the files to download to + "saved messages" and use parameter ``--delete-on-success``. Forwarded messages will + be removed from the chat after downloading, such as a download queue. + """ + client = TelegramManagerClient(config or default_config(), proxy=proxy) + client.start() + if not interactive and not from_: + from_ = 'me' + elif isinstance(from_, str) and from_.lstrip("-+").isdigit(): + from_ = int(from_) + elif interactive and not from_: + click.echo('Select the dialog of the files to download:') + click.echo('[SPACE] Select dialog [ENTER] Next step') + from_ = async_to_sync(interactive_select_dialog(client)) + if interactive: + click.echo('Select all files to download:') + click.echo('[SPACE] Select files [ENTER] Download selected files') + messages = async_to_sync(interactive_select_files(client, from_)) + else: + messages = client.find_files(from_) + messages_cls = DOWNLOAD_SPLIT_FILE_MODES[split_files] + download_files = messages_cls(reversed(list(messages))) + client.download_files(from_, download_files, delete_on_success) + + +upload_cli = catch(upload) +download_cli = catch(download) + + +if __name__ == '__main__': + import sys + import re + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + commands = {'upload': upload_cli, 'download': download_cli} + if len(sys.argv) < 2: + sys.stderr.write('A command is required. Available commands: {}\n'.format( + ', '.join(commands) + )) + sys.exit(1) + if sys.argv[1] not in commands: + sys.stderr.write('{} is an invalid command. Valid commands: {}\n'.format( + sys.argv[1], ', '.join(commands) + )) + sys.exit(1) + fn = commands[sys.argv[1]] + sys.argv = [sys.argv[0]] + sys.argv[2:] + sys.exit(fn()) diff --git a/upload_files.py b/upload_files.py new file mode 100644 index 0000000..42798bc --- /dev/null +++ b/upload_files.py @@ -0,0 +1,254 @@ +import datetime +import math +import os + + +import mimetypes +from io import FileIO, SEEK_SET +from typing import Union, TYPE_CHECKING + +import click +from hachoir.metadata.metadata import RootMetadata +from hachoir.metadata.video import MP4Metadata +from telethon.tl.types import DocumentAttributeVideo, DocumentAttributeFilename + +from telegram_upload.caption_formatter import CaptionFormatter, FilePath +from telegram_upload.exceptions import TelegramInvalidFile, ThumbError +from telegram_upload.utils import scantree, truncate +from telegram_upload.video import get_video_thumb, video_metadata + +mimetypes.init() + + +if TYPE_CHECKING: + from telegram_upload.client import TelegramManagerClient + + +def is_valid_file(file, error_logger=None): + error_message = None + if not os.path.lexists(file): + error_message = 'File "{}" does not exist.'.format(file) + elif not os.path.getsize(file): + error_message = 'File "{}" is empty.'.format(file) + if error_message and error_logger is not None: + error_logger(error_message) + return error_message is None + + +def get_file_mime(file): + return (mimetypes.guess_type(file)[0] or ('')).split('/')[0] + + +def metadata_has(metadata: RootMetadata, key: str): + try: + return metadata.has(key) + except ValueError: + return False + + +def get_file_attributes(file): + attrs = [] + mime = get_file_mime(file) + if mime == 'video': + metadata = video_metadata(file) + video_meta = metadata + meta_groups = None + if hasattr(metadata, '_MultipleMetadata__groups'): + # Is mkv + meta_groups = metadata._MultipleMetadata__groups + if metadata is not None and not metadata.has('width') and meta_groups: + video_meta = meta_groups[next(filter(lambda x: x.startswith('video'), meta_groups._key_list))] + if metadata is not None: + supports_streaming = isinstance(video_meta, MP4Metadata) + attrs.append(DocumentAttributeVideo( + (0, metadata.get('duration').seconds)[metadata_has(metadata, 'duration')], + (0, video_meta.get('width'))[metadata_has(video_meta, 'width')], + (0, video_meta.get('height'))[metadata_has(video_meta, 'height')], + False, + supports_streaming, + )) + return attrs + + +def get_file_thumb(file): + if get_file_mime(file) == 'video': + return get_video_thumb(file) + + +class UploadFilesBase: + def __init__(self, client: 'TelegramManagerClient', files, thumbnail: Union[str, bool, None] = None, + force_file: bool = False, caption: Union[str, None] = None): + self._iterator = None + self.client = client + self.files = files + self.thumbnail = thumbnail + self.force_file = force_file + self.caption = caption + + def get_iterator(self): + raise NotImplementedError + + def __iter__(self): + self._iterator = self.get_iterator() + return self + + def __next__(self): + if self._iterator is None: + self._iterator = self.get_iterator() + return next(self._iterator) + + +class RecursiveFiles(UploadFilesBase): + + def get_iterator(self): + for file in self.files: + if os.path.isdir(file): + yield from map(lambda file: file.path, + filter(lambda x: not x.is_dir(), scantree(file, True))) + else: + yield file + + +class NoDirectoriesFiles(UploadFilesBase): + def get_iterator(self): + for file in self.files: + if os.path.isdir(file): + raise TelegramInvalidFile('"{}" is a directory.'.format(file)) + else: + yield file + + +class LargeFilesBase(UploadFilesBase): + def get_iterator(self): + for file in self.files: + if os.path.getsize(file) > self.client.max_file_size: + yield from self.process_large_file(file) + else: + yield self.process_normal_file(file) + + def process_normal_file(self, file: str) -> 'File': + return File(self.client, file, force_file=self.force_file, thumbnail=self.thumbnail, caption=self.caption) + + def process_large_file(self, file): + raise NotImplementedError + + +class NoLargeFiles(LargeFilesBase): + def process_large_file(self, file): + raise TelegramInvalidFile('"{}" file is too large for Telegram.'.format(file)) + + +class File(FileIO): + force_file = False + + def __init__(self, client: 'TelegramManagerClient', path: str, force_file: Union[bool, None] = None, + thumbnail: Union[str, bool, None] = None, caption: Union[str, None] = None): + super().__init__(path) + self.client = client + self.path = path + self.force_file = self.force_file if force_file is None else force_file + self._thumbnail = thumbnail + self._caption = caption + + @property + def file_name(self): + return os.path.basename(self.path) + + @property + def file_size(self): + return os.path.getsize(self.path) + + @property + def short_name(self): + return '.'.join(self.file_name.split('.')[:-1]) + + @property + def is_custom_thumbnail(self): + return self._thumbnail is not False and self._thumbnail is not None + + @property + def file_caption(self) -> str: + """Get file caption. If caption parameter is not set, return file name. + If caption is set, format it with CaptionFormatter. + Anyways, truncate caption to max_caption_length. + """ + if self._caption is not None: + formatter = CaptionFormatter() + caption = formatter.format(self._caption, file=FilePath(self.path), now=datetime.datetime.now()) + else: + caption = self.short_name + return truncate(caption, self.client.max_caption_length) + + def get_thumbnail(self): + thumb = None + if self._thumbnail is None and not self.force_file: + try: + thumb = get_file_thumb(self.path) + except ThumbError as e: + click.echo('{}'.format(e), err=True) + elif self.is_custom_thumbnail: + if not isinstance(self._thumbnail, str): + raise TypeError('Invalid type for thumbnail: {}'.format(type(self._thumbnail))) + elif not os.path.lexists(self._thumbnail): + raise TelegramInvalidFile('{} thumbnail file does not exists.'.format(self._thumbnail)) + thumb = self._thumbnail + return thumb + + @property + def file_attributes(self): + if self.force_file: + return [DocumentAttributeFilename(self.file_name)] + else: + return get_file_attributes(self.path) + + +class SplitFile(File, FileIO): + force_file = True + + def __init__(self, client: 'TelegramManagerClient', file: Union[str, bytes, int], max_read_size: int, name: str): + super().__init__(client, file) + self.max_read_size = max_read_size + self.remaining_size = max_read_size + self._name = name + + def read(self, size: int = -1) -> bytes: + if size == -1: + size = self.remaining_size + if not self.remaining_size: + return b'' + size = min(self.remaining_size, size) + self.remaining_size -= size + return super().read(size) + + def readall(self) -> bytes: + return self.read() + + @property + def file_name(self): + return self._name + + @property + def file_size(self): + return self.max_read_size + + def seek(self, offset: int, whence: int = SEEK_SET, split_seek: bool = False) -> int: + if not split_seek: + self.remaining_size += self.tell() - offset + return super().seek(offset, whence) + + @property + def short_name(self): + return self.file_name.split('/')[-1] + + +class SplitFiles(LargeFilesBase): + def process_large_file(self, file): + file_name = os.path.basename(file) + total_size = os.path.getsize(file) + parts = math.ceil(total_size / self.client.max_file_size) + zfill = int(math.log10(10)) + 1 + for part in range(parts): + size = total_size - (part * self.client.max_file_size) if part >= parts - 1 else self.client.max_file_size + splitted_file = SplitFile(self.client, file, size, '{}.{}'.format(file_name, str(part).zfill(zfill))) + splitted_file.seek(self.client.max_file_size * part, split_seek=True) + yield splitted_file diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..5ea33d7 --- /dev/null +++ b/utils.py @@ -0,0 +1,77 @@ +import asyncio +import itertools +import os +import shutil +from telegram_upload._compat import scandir +from telegram_upload.exceptions import TelegramEnvironmentError + + +def free_disk_usage(directory='.'): + return shutil.disk_usage(directory)[2] + + +def truncate(text, max_length): + return (text[:max_length - 3] + '...') if len(text) > max_length else text + + +def grouper(n, iterable): + it = iter(iterable) + while True: + chunk = tuple(itertools.islice(it, n)) + if not chunk: + return + yield chunk + + +def sizeof_fmt(num, suffix='B'): + for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']: + if abs(num) < 1024.0: + return "%3.1f%s%s" % (num, unit, suffix) + num /= 1024.0 + return "%.1f%s%s" % (num, 'Yi', suffix) + + +def scantree(path, follow_symlinks=False): + """Recursively yield DirEntry objects for given directory.""" + for entry in scandir(path): + if entry.is_dir(follow_symlinks=follow_symlinks): + yield from scantree(entry.path, follow_symlinks) # see below for Python 2.x + else: + yield entry + + +def async_to_sync(coro): + loop = asyncio.get_event_loop() + if loop.is_running(): + return coro + else: + return loop.run_until_complete(coro) + + +async def aislice(iterator, limit): + items = [] + i = 0 + async for value in iterator: + if i > limit: + break + i += 1 + items.append(value) + return items + + +async def amap(fn, iterator): + async for value in iterator: + yield fn(value) + + +async def sync_to_async_iterator(iterator): + for value in iterator: + yield value + + +def get_environment_integer(environment_name: str, default_value: int): + """Get an integer from an environment variable.""" + value = os.environ.get(environment_name, default_value) + if isinstance(value, int) or value.isdigit(): + return int(value) + raise TelegramEnvironmentError(f"Environment variable {environment_name} must be an integer") diff --git a/video.py b/video.py new file mode 100644 index 0000000..65b6c18 --- /dev/null +++ b/video.py @@ -0,0 +1,69 @@ +import platform +import re +import subprocess +import tempfile +import os + +from hachoir.metadata import extractMetadata +from hachoir.parser import createParser +from hachoir.core import config as hachoir_config + +from telegram_upload.exceptions import ThumbVideoError + + +hachoir_config.quiet = True + + +def video_metadata(file): + return extractMetadata(createParser(file)) + + +def call_ffmpeg(args): + try: + return subprocess.Popen([get_ffmpeg_command()] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except FileNotFoundError: + raise ThumbVideoError('ffmpeg command is not available. Thumbnails for videos are not available!') + + +def get_ffmpeg_command(): + return os.environ.get('FFMPEG_COMMAND', + 'ffmpeg.exe' if platform.system() == 'Windows' else 'ffmpeg') + + +def get_video_size(file): + p = call_ffmpeg([ + '-i', file, + ]) + stdout, stderr = p.communicate() + video_lines = re.findall(': Video: ([^\n]+)', stderr.decode('utf-8', errors='ignore')) + if not video_lines: + return + matchs = re.findall("(\d{2,6})x(\d{2,6})", video_lines[0]) + if matchs: + return [int(x) for x in matchs[0]] + + +def get_video_thumb(file, output=None, size=200): + output = output or tempfile.NamedTemporaryFile(suffix='.jpg').name + metadata = video_metadata(file) + if metadata is None: + return + duration = metadata.get('duration').seconds if metadata.has('duration') else 0 + ratio = get_video_size(file) + if ratio is None: + raise ThumbVideoError('Video ratio is not available.') + if ratio[0] / ratio[1] > 1: + width, height = size, -1 + else: + width, height = -1, size + p = call_ffmpeg([ + '-ss', str(int(duration / 2)), + '-i', file, + '-filter:v', + 'scale={}:{}'.format(width, height), + '-vframes:v', '1', + output, + ]) + p.communicate() + if not p.returncode and os.path.lexists(file): + return output