255 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			255 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import datetime
 | |
| import math
 | |
| import os
 | |
| 
 | |
| 
 | |
| import mimetypes
 | |
| from io import FileIO, SEEK_SET
 | |
| from typing import Union, TYPE_CHECKING
 | |
| 
 | |
| import click
 | |
| from hachoir.metadata.metadata import RootMetadata
 | |
| from hachoir.metadata.video import MP4Metadata
 | |
| from telethon.tl.types import DocumentAttributeVideo, DocumentAttributeFilename
 | |
| 
 | |
| from telegram_upload.caption_formatter import CaptionFormatter, FilePath
 | |
| from telegram_upload.exceptions import TelegramInvalidFile, ThumbError
 | |
| from telegram_upload.utils import scantree, truncate
 | |
| from telegram_upload.video import get_video_thumb, video_metadata
 | |
| 
 | |
| mimetypes.init()
 | |
| 
 | |
| 
 | |
| if TYPE_CHECKING:
 | |
|     from telegram_upload.client import TelegramManagerClient
 | |
| 
 | |
| 
 | |
| def is_valid_file(file, error_logger=None):
 | |
|     error_message = None
 | |
|     if not os.path.lexists(file):
 | |
|         error_message = 'File "{}" does not exist.'.format(file)
 | |
|     elif not os.path.getsize(file):
 | |
|         error_message = 'File "{}" is empty.'.format(file)
 | |
|     if error_message and error_logger is not None:
 | |
|         error_logger(error_message)
 | |
|     return error_message is None
 | |
| 
 | |
| 
 | |
| def get_file_mime(file):
 | |
|     return (mimetypes.guess_type(file)[0] or ('')).split('/')[0]
 | |
| 
 | |
| 
 | |
| def metadata_has(metadata: RootMetadata, key: str):
 | |
|     try:
 | |
|         return metadata.has(key)
 | |
|     except ValueError:
 | |
|         return False
 | |
| 
 | |
| 
 | |
| def get_file_attributes(file):
 | |
|     attrs = []
 | |
|     mime = get_file_mime(file)
 | |
|     if mime == 'video':
 | |
|         metadata = video_metadata(file)
 | |
|         video_meta = metadata
 | |
|         meta_groups = None
 | |
|         if hasattr(metadata, '_MultipleMetadata__groups'):
 | |
|             # Is mkv
 | |
|             meta_groups = metadata._MultipleMetadata__groups
 | |
|         if metadata is not None and not metadata.has('width') and meta_groups:
 | |
|             video_meta = meta_groups[next(filter(lambda x: x.startswith('video'), meta_groups._key_list))]
 | |
|         if metadata is not None:
 | |
|             supports_streaming = isinstance(video_meta, MP4Metadata)
 | |
|             attrs.append(DocumentAttributeVideo(
 | |
|                 (0, metadata.get('duration').seconds)[metadata_has(metadata, 'duration')],
 | |
|                 (0, video_meta.get('width'))[metadata_has(video_meta, 'width')],
 | |
|                 (0, video_meta.get('height'))[metadata_has(video_meta, 'height')],
 | |
|                 False,
 | |
|                 supports_streaming,
 | |
|             ))
 | |
|     return attrs
 | |
| 
 | |
| 
 | |
| def get_file_thumb(file):
 | |
|     if get_file_mime(file) == 'video':
 | |
|         return get_video_thumb(file)
 | |
| 
 | |
| 
 | |
| class UploadFilesBase:
 | |
|     def __init__(self, client: 'TelegramManagerClient', files, thumbnail: Union[str, bool, None] = None,
 | |
|                  force_file: bool = False, caption: Union[str, None] = None):
 | |
|         self._iterator = None
 | |
|         self.client = client
 | |
|         self.files = files
 | |
|         self.thumbnail = thumbnail
 | |
|         self.force_file = force_file
 | |
|         self.caption = caption
 | |
| 
 | |
|     def get_iterator(self):
 | |
|         raise NotImplementedError
 | |
| 
 | |
|     def __iter__(self):
 | |
|         self._iterator = self.get_iterator()
 | |
|         return self
 | |
| 
 | |
|     def __next__(self):
 | |
|         if self._iterator is None:
 | |
|             self._iterator = self.get_iterator()
 | |
|         return next(self._iterator)
 | |
| 
 | |
| 
 | |
| class RecursiveFiles(UploadFilesBase):
 | |
| 
 | |
|     def get_iterator(self):
 | |
|         for file in self.files:
 | |
|             if os.path.isdir(file):
 | |
|                 yield from map(lambda file: file.path,
 | |
|                                filter(lambda x: not x.is_dir(), scantree(file, True)))
 | |
|             else:
 | |
|                 yield file
 | |
| 
 | |
| 
 | |
| class NoDirectoriesFiles(UploadFilesBase):
 | |
|     def get_iterator(self):
 | |
|         for file in self.files:
 | |
|             if os.path.isdir(file):
 | |
|                 raise TelegramInvalidFile('"{}" is a directory.'.format(file))
 | |
|             else:
 | |
|                 yield file
 | |
| 
 | |
| 
 | |
| class LargeFilesBase(UploadFilesBase):
 | |
|     def get_iterator(self):
 | |
|         for file in self.files:
 | |
|             if os.path.getsize(file) > self.client.max_file_size:
 | |
|                 yield from self.process_large_file(file)
 | |
|             else:
 | |
|                 yield self.process_normal_file(file)
 | |
| 
 | |
|     def process_normal_file(self, file: str) -> 'File':
 | |
|         return File(self.client, file, force_file=self.force_file, thumbnail=self.thumbnail, caption=self.caption)
 | |
| 
 | |
|     def process_large_file(self, file):
 | |
|         raise NotImplementedError
 | |
| 
 | |
| 
 | |
| class NoLargeFiles(LargeFilesBase):
 | |
|     def process_large_file(self, file):
 | |
|         raise TelegramInvalidFile('"{}" file is too large for Telegram.'.format(file))
 | |
| 
 | |
| 
 | |
| class File(FileIO):
 | |
|     force_file = False
 | |
| 
 | |
|     def __init__(self, client: 'TelegramManagerClient', path: str, force_file: Union[bool, None] = None,
 | |
|                  thumbnail: Union[str, bool, None] = None, caption: Union[str, None] = None):
 | |
|         super().__init__(path)
 | |
|         self.client = client
 | |
|         self.path = path
 | |
|         self.force_file = self.force_file if force_file is None else force_file
 | |
|         self._thumbnail = thumbnail
 | |
|         self._caption = caption
 | |
| 
 | |
|     @property
 | |
|     def file_name(self):
 | |
|         return os.path.basename(self.path)
 | |
| 
 | |
|     @property
 | |
|     def file_size(self):
 | |
|         return os.path.getsize(self.path)
 | |
| 
 | |
|     @property
 | |
|     def short_name(self):
 | |
|         return '.'.join(self.file_name.split('.')[:-1])
 | |
| 
 | |
|     @property
 | |
|     def is_custom_thumbnail(self):
 | |
|         return self._thumbnail is not False and self._thumbnail is not None
 | |
| 
 | |
|     @property
 | |
|     def file_caption(self) -> str:
 | |
|         """Get file caption. If caption parameter is not set, return file name.
 | |
|         If caption is set, format it with CaptionFormatter.
 | |
|         Anyways, truncate caption to max_caption_length.
 | |
|         """
 | |
|         if self._caption is not None:
 | |
|             formatter = CaptionFormatter()
 | |
|             caption = formatter.format(self._caption, file=FilePath(self.path), now=datetime.datetime.now())
 | |
|         else:
 | |
|             caption = self.short_name
 | |
|         return truncate(caption, self.client.max_caption_length)
 | |
| 
 | |
|     def get_thumbnail(self):
 | |
|         thumb = None
 | |
|         if self._thumbnail is None and not self.force_file:
 | |
|             try:
 | |
|                 thumb = get_file_thumb(self.path)
 | |
|             except ThumbError as e:
 | |
|                 click.echo('{}'.format(e), err=True)
 | |
|         elif self.is_custom_thumbnail:
 | |
|             if not isinstance(self._thumbnail, str):
 | |
|                 raise TypeError('Invalid type for thumbnail: {}'.format(type(self._thumbnail)))
 | |
|             elif not os.path.lexists(self._thumbnail):
 | |
|                 raise TelegramInvalidFile('{} thumbnail file does not exists.'.format(self._thumbnail))
 | |
|             thumb = self._thumbnail
 | |
|         return thumb
 | |
| 
 | |
|     @property
 | |
|     def file_attributes(self):
 | |
|         if self.force_file:
 | |
|             return [DocumentAttributeFilename(self.file_name)]
 | |
|         else:
 | |
|             return get_file_attributes(self.path)
 | |
| 
 | |
| 
 | |
| class SplitFile(File, FileIO):
 | |
|     force_file = True
 | |
| 
 | |
|     def __init__(self, client: 'TelegramManagerClient', file: Union[str, bytes, int], max_read_size: int, name: str):
 | |
|         super().__init__(client, file)
 | |
|         self.max_read_size = max_read_size
 | |
|         self.remaining_size = max_read_size
 | |
|         self._name = name
 | |
| 
 | |
|     def read(self, size: int = -1) -> bytes:
 | |
|         if size == -1:
 | |
|             size = self.remaining_size
 | |
|         if not self.remaining_size:
 | |
|             return b''
 | |
|         size = min(self.remaining_size, size)
 | |
|         self.remaining_size -= size
 | |
|         return super().read(size)
 | |
| 
 | |
|     def readall(self) -> bytes:
 | |
|         return self.read()
 | |
| 
 | |
|     @property
 | |
|     def file_name(self):
 | |
|         return self._name
 | |
| 
 | |
|     @property
 | |
|     def file_size(self):
 | |
|         return self.max_read_size
 | |
| 
 | |
|     def seek(self, offset: int, whence: int = SEEK_SET, split_seek: bool = False) -> int:
 | |
|         if not split_seek:
 | |
|             self.remaining_size += self.tell() - offset
 | |
|         return super().seek(offset, whence)
 | |
| 
 | |
|     @property
 | |
|     def short_name(self):
 | |
|         return self.file_name.split('/')[-1]
 | |
| 
 | |
| 
 | |
| class SplitFiles(LargeFilesBase):
 | |
|     def process_large_file(self, file):
 | |
|         file_name = os.path.basename(file)
 | |
|         total_size = os.path.getsize(file)
 | |
|         parts = math.ceil(total_size / self.client.max_file_size)
 | |
|         zfill = int(math.log10(10)) + 1
 | |
|         for part in range(parts):
 | |
|             size = total_size - (part * self.client.max_file_size) if part >= parts - 1 else self.client.max_file_size
 | |
|             splitted_file = SplitFile(self.client, file, size, '{}.{}'.format(file_name, str(part).zfill(zfill)))
 | |
|             splitted_file.seek(self.client.max_file_size * part, split_seek=True)
 | |
|             yield splitted_file
 |