255 lines
8.3 KiB
Python
255 lines
8.3 KiB
Python
import datetime
|
|
import math
|
|
import os
|
|
|
|
|
|
import mimetypes
|
|
from io import FileIO, SEEK_SET
|
|
from typing import Union, TYPE_CHECKING
|
|
|
|
import click
|
|
from hachoir.metadata.metadata import RootMetadata
|
|
from hachoir.metadata.video import MP4Metadata
|
|
from telethon.tl.types import DocumentAttributeVideo, DocumentAttributeFilename
|
|
|
|
from telegram_upload.caption_formatter import CaptionFormatter, FilePath
|
|
from telegram_upload.exceptions import TelegramInvalidFile, ThumbError
|
|
from telegram_upload.utils import scantree, truncate
|
|
from telegram_upload.video import get_video_thumb, video_metadata
|
|
|
|
mimetypes.init()
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
from telegram_upload.client import TelegramManagerClient
|
|
|
|
|
|
def is_valid_file(file, error_logger=None):
|
|
error_message = None
|
|
if not os.path.lexists(file):
|
|
error_message = 'File "{}" does not exist.'.format(file)
|
|
elif not os.path.getsize(file):
|
|
error_message = 'File "{}" is empty.'.format(file)
|
|
if error_message and error_logger is not None:
|
|
error_logger(error_message)
|
|
return error_message is None
|
|
|
|
|
|
def get_file_mime(file):
|
|
return (mimetypes.guess_type(file)[0] or ('')).split('/')[0]
|
|
|
|
|
|
def metadata_has(metadata: RootMetadata, key: str):
|
|
try:
|
|
return metadata.has(key)
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def get_file_attributes(file):
|
|
attrs = []
|
|
mime = get_file_mime(file)
|
|
if mime == 'video':
|
|
metadata = video_metadata(file)
|
|
video_meta = metadata
|
|
meta_groups = None
|
|
if hasattr(metadata, '_MultipleMetadata__groups'):
|
|
# Is mkv
|
|
meta_groups = metadata._MultipleMetadata__groups
|
|
if metadata is not None and not metadata.has('width') and meta_groups:
|
|
video_meta = meta_groups[next(filter(lambda x: x.startswith('video'), meta_groups._key_list))]
|
|
if metadata is not None:
|
|
supports_streaming = isinstance(video_meta, MP4Metadata)
|
|
attrs.append(DocumentAttributeVideo(
|
|
(0, metadata.get('duration').seconds)[metadata_has(metadata, 'duration')],
|
|
(0, video_meta.get('width'))[metadata_has(video_meta, 'width')],
|
|
(0, video_meta.get('height'))[metadata_has(video_meta, 'height')],
|
|
False,
|
|
supports_streaming,
|
|
))
|
|
return attrs
|
|
|
|
|
|
def get_file_thumb(file):
|
|
if get_file_mime(file) == 'video':
|
|
return get_video_thumb(file)
|
|
|
|
|
|
class UploadFilesBase:
|
|
def __init__(self, client: 'TelegramManagerClient', files, thumbnail: Union[str, bool, None] = None,
|
|
force_file: bool = False, caption: Union[str, None] = None):
|
|
self._iterator = None
|
|
self.client = client
|
|
self.files = files
|
|
self.thumbnail = thumbnail
|
|
self.force_file = force_file
|
|
self.caption = caption
|
|
|
|
def get_iterator(self):
|
|
raise NotImplementedError
|
|
|
|
def __iter__(self):
|
|
self._iterator = self.get_iterator()
|
|
return self
|
|
|
|
def __next__(self):
|
|
if self._iterator is None:
|
|
self._iterator = self.get_iterator()
|
|
return next(self._iterator)
|
|
|
|
|
|
class RecursiveFiles(UploadFilesBase):
|
|
|
|
def get_iterator(self):
|
|
for file in self.files:
|
|
if os.path.isdir(file):
|
|
yield from map(lambda file: file.path,
|
|
filter(lambda x: not x.is_dir(), scantree(file, True)))
|
|
else:
|
|
yield file
|
|
|
|
|
|
class NoDirectoriesFiles(UploadFilesBase):
|
|
def get_iterator(self):
|
|
for file in self.files:
|
|
if os.path.isdir(file):
|
|
raise TelegramInvalidFile('"{}" is a directory.'.format(file))
|
|
else:
|
|
yield file
|
|
|
|
|
|
class LargeFilesBase(UploadFilesBase):
|
|
def get_iterator(self):
|
|
for file in self.files:
|
|
if os.path.getsize(file) > self.client.max_file_size:
|
|
yield from self.process_large_file(file)
|
|
else:
|
|
yield self.process_normal_file(file)
|
|
|
|
def process_normal_file(self, file: str) -> 'File':
|
|
return File(self.client, file, force_file=self.force_file, thumbnail=self.thumbnail, caption=self.caption)
|
|
|
|
def process_large_file(self, file):
|
|
raise NotImplementedError
|
|
|
|
|
|
class NoLargeFiles(LargeFilesBase):
|
|
def process_large_file(self, file):
|
|
raise TelegramInvalidFile('"{}" file is too large for Telegram.'.format(file))
|
|
|
|
|
|
class File(FileIO):
|
|
force_file = False
|
|
|
|
def __init__(self, client: 'TelegramManagerClient', path: str, force_file: Union[bool, None] = None,
|
|
thumbnail: Union[str, bool, None] = None, caption: Union[str, None] = None):
|
|
super().__init__(path)
|
|
self.client = client
|
|
self.path = path
|
|
self.force_file = self.force_file if force_file is None else force_file
|
|
self._thumbnail = thumbnail
|
|
self._caption = caption
|
|
|
|
@property
|
|
def file_name(self):
|
|
return os.path.basename(self.path)
|
|
|
|
@property
|
|
def file_size(self):
|
|
return os.path.getsize(self.path)
|
|
|
|
@property
|
|
def short_name(self):
|
|
return '.'.join(self.file_name.split('.')[:-1])
|
|
|
|
@property
|
|
def is_custom_thumbnail(self):
|
|
return self._thumbnail is not False and self._thumbnail is not None
|
|
|
|
@property
|
|
def file_caption(self) -> str:
|
|
"""Get file caption. If caption parameter is not set, return file name.
|
|
If caption is set, format it with CaptionFormatter.
|
|
Anyways, truncate caption to max_caption_length.
|
|
"""
|
|
if self._caption is not None:
|
|
formatter = CaptionFormatter()
|
|
caption = formatter.format(self._caption, file=FilePath(self.path), now=datetime.datetime.now())
|
|
else:
|
|
caption = self.short_name
|
|
return truncate(caption, self.client.max_caption_length)
|
|
|
|
def get_thumbnail(self):
|
|
thumb = None
|
|
if self._thumbnail is None and not self.force_file:
|
|
try:
|
|
thumb = get_file_thumb(self.path)
|
|
except ThumbError as e:
|
|
click.echo('{}'.format(e), err=True)
|
|
elif self.is_custom_thumbnail:
|
|
if not isinstance(self._thumbnail, str):
|
|
raise TypeError('Invalid type for thumbnail: {}'.format(type(self._thumbnail)))
|
|
elif not os.path.lexists(self._thumbnail):
|
|
raise TelegramInvalidFile('{} thumbnail file does not exists.'.format(self._thumbnail))
|
|
thumb = self._thumbnail
|
|
return thumb
|
|
|
|
@property
|
|
def file_attributes(self):
|
|
if self.force_file:
|
|
return [DocumentAttributeFilename(self.file_name)]
|
|
else:
|
|
return get_file_attributes(self.path)
|
|
|
|
|
|
class SplitFile(File, FileIO):
|
|
force_file = True
|
|
|
|
def __init__(self, client: 'TelegramManagerClient', file: Union[str, bytes, int], max_read_size: int, name: str):
|
|
super().__init__(client, file)
|
|
self.max_read_size = max_read_size
|
|
self.remaining_size = max_read_size
|
|
self._name = name
|
|
|
|
def read(self, size: int = -1) -> bytes:
|
|
if size == -1:
|
|
size = self.remaining_size
|
|
if not self.remaining_size:
|
|
return b''
|
|
size = min(self.remaining_size, size)
|
|
self.remaining_size -= size
|
|
return super().read(size)
|
|
|
|
def readall(self) -> bytes:
|
|
return self.read()
|
|
|
|
@property
|
|
def file_name(self):
|
|
return self._name
|
|
|
|
@property
|
|
def file_size(self):
|
|
return self.max_read_size
|
|
|
|
def seek(self, offset: int, whence: int = SEEK_SET, split_seek: bool = False) -> int:
|
|
if not split_seek:
|
|
self.remaining_size += self.tell() - offset
|
|
return super().seek(offset, whence)
|
|
|
|
@property
|
|
def short_name(self):
|
|
return self.file_name.split('/')[-1]
|
|
|
|
|
|
class SplitFiles(LargeFilesBase):
|
|
def process_large_file(self, file):
|
|
file_name = os.path.basename(file)
|
|
total_size = os.path.getsize(file)
|
|
parts = math.ceil(total_size / self.client.max_file_size)
|
|
zfill = int(math.log10(10)) + 1
|
|
for part in range(parts):
|
|
size = total_size - (part * self.client.max_file_size) if part >= parts - 1 else self.client.max_file_size
|
|
splitted_file = SplitFile(self.client, file, size, '{}.{}'.format(file_name, str(part).zfill(zfill)))
|
|
splitted_file.seek(self.client.max_file_size * part, split_seek=True)
|
|
yield splitted_file
|