import datetime import math import os import mimetypes from io import FileIO, SEEK_SET from typing import Union, TYPE_CHECKING import click from hachoir.metadata.metadata import RootMetadata from hachoir.metadata.video import MP4Metadata from telethon.tl.types import DocumentAttributeVideo, DocumentAttributeFilename from telegram_upload.caption_formatter import CaptionFormatter, FilePath from telegram_upload.exceptions import TelegramInvalidFile, ThumbError from telegram_upload.utils import scantree, truncate from telegram_upload.video import get_video_thumb, video_metadata mimetypes.init() if TYPE_CHECKING: from telegram_upload.client import TelegramManagerClient def is_valid_file(file, error_logger=None): error_message = None if not os.path.lexists(file): error_message = 'File "{}" does not exist.'.format(file) elif not os.path.getsize(file): error_message = 'File "{}" is empty.'.format(file) if error_message and error_logger is not None: error_logger(error_message) return error_message is None def get_file_mime(file): return (mimetypes.guess_type(file)[0] or ('')).split('/')[0] def metadata_has(metadata: RootMetadata, key: str): try: return metadata.has(key) except ValueError: return False def get_file_attributes(file): attrs = [] mime = get_file_mime(file) if mime == 'video': metadata = video_metadata(file) video_meta = metadata meta_groups = None if hasattr(metadata, '_MultipleMetadata__groups'): # Is mkv meta_groups = metadata._MultipleMetadata__groups if metadata is not None and not metadata.has('width') and meta_groups: video_meta = meta_groups[next(filter(lambda x: x.startswith('video'), meta_groups._key_list))] if metadata is not None: supports_streaming = isinstance(video_meta, MP4Metadata) attrs.append(DocumentAttributeVideo( (0, metadata.get('duration').seconds)[metadata_has(metadata, 'duration')], (0, video_meta.get('width'))[metadata_has(video_meta, 'width')], (0, video_meta.get('height'))[metadata_has(video_meta, 'height')], False, supports_streaming, )) return attrs def get_file_thumb(file): if get_file_mime(file) == 'video': return get_video_thumb(file) class UploadFilesBase: def __init__(self, client: 'TelegramManagerClient', files, thumbnail: Union[str, bool, None] = None, force_file: bool = False, caption: Union[str, None] = None): self._iterator = None self.client = client self.files = files self.thumbnail = thumbnail self.force_file = force_file self.caption = caption def get_iterator(self): raise NotImplementedError def __iter__(self): self._iterator = self.get_iterator() return self def __next__(self): if self._iterator is None: self._iterator = self.get_iterator() return next(self._iterator) class RecursiveFiles(UploadFilesBase): def get_iterator(self): for file in self.files: if os.path.isdir(file): yield from map(lambda file: file.path, filter(lambda x: not x.is_dir(), scantree(file, True))) else: yield file class NoDirectoriesFiles(UploadFilesBase): def get_iterator(self): for file in self.files: if os.path.isdir(file): raise TelegramInvalidFile('"{}" is a directory.'.format(file)) else: yield file class LargeFilesBase(UploadFilesBase): def get_iterator(self): for file in self.files: if os.path.getsize(file) > self.client.max_file_size: yield from self.process_large_file(file) else: yield self.process_normal_file(file) def process_normal_file(self, file: str) -> 'File': return File(self.client, file, force_file=self.force_file, thumbnail=self.thumbnail, caption=self.caption) def process_large_file(self, file): raise NotImplementedError class NoLargeFiles(LargeFilesBase): def process_large_file(self, file): raise TelegramInvalidFile('"{}" file is too large for Telegram.'.format(file)) class File(FileIO): force_file = False def __init__(self, client: 'TelegramManagerClient', path: str, force_file: Union[bool, None] = None, thumbnail: Union[str, bool, None] = None, caption: Union[str, None] = None): super().__init__(path) self.client = client self.path = path self.force_file = self.force_file if force_file is None else force_file self._thumbnail = thumbnail self._caption = caption @property def file_name(self): return os.path.basename(self.path) @property def file_size(self): return os.path.getsize(self.path) @property def short_name(self): return '.'.join(self.file_name.split('.')[:-1]) @property def is_custom_thumbnail(self): return self._thumbnail is not False and self._thumbnail is not None @property def file_caption(self) -> str: """Get file caption. If caption parameter is not set, return file name. If caption is set, format it with CaptionFormatter. Anyways, truncate caption to max_caption_length. """ if self._caption is not None: formatter = CaptionFormatter() caption = formatter.format(self._caption, file=FilePath(self.path), now=datetime.datetime.now()) else: caption = self.short_name return truncate(caption, self.client.max_caption_length) def get_thumbnail(self): thumb = None if self._thumbnail is None and not self.force_file: try: thumb = get_file_thumb(self.path) except ThumbError as e: click.echo('{}'.format(e), err=True) elif self.is_custom_thumbnail: if not isinstance(self._thumbnail, str): raise TypeError('Invalid type for thumbnail: {}'.format(type(self._thumbnail))) elif not os.path.lexists(self._thumbnail): raise TelegramInvalidFile('{} thumbnail file does not exists.'.format(self._thumbnail)) thumb = self._thumbnail return thumb @property def file_attributes(self): if self.force_file: return [DocumentAttributeFilename(self.file_name)] else: return get_file_attributes(self.path) class SplitFile(File, FileIO): force_file = True def __init__(self, client: 'TelegramManagerClient', file: Union[str, bytes, int], max_read_size: int, name: str): super().__init__(client, file) self.max_read_size = max_read_size self.remaining_size = max_read_size self._name = name def read(self, size: int = -1) -> bytes: if size == -1: size = self.remaining_size if not self.remaining_size: return b'' size = min(self.remaining_size, size) self.remaining_size -= size return super().read(size) def readall(self) -> bytes: return self.read() @property def file_name(self): return self._name @property def file_size(self): return self.max_read_size def seek(self, offset: int, whence: int = SEEK_SET, split_seek: bool = False) -> int: if not split_seek: self.remaining_size += self.tell() - offset return super().seek(offset, whence) @property def short_name(self): return self.file_name.split('/')[-1] class SplitFiles(LargeFilesBase): def process_large_file(self, file): file_name = os.path.basename(file) total_size = os.path.getsize(file) parts = math.ceil(total_size / self.client.max_file_size) zfill = int(math.log10(10)) + 1 for part in range(parts): size = total_size - (part * self.client.max_file_size) if part >= parts - 1 else self.client.max_file_size splitted_file = SplitFile(self.client, file, size, '{}.{}'.format(file_name, str(part).zfill(zfill))) splitted_file.seek(self.client.max_file_size * part, split_seek=True) yield splitted_file