import asyncio import io import logging import os from typing import Any, Dict, List, Union import pillow_heif from aiogram import BaseMiddleware, Bot, Dispatcher, F, types from aiogram.types import BufferedInputFile, Message from aiogram.utils.media_group import MediaGroupBuilder from aiohttp import web from PIL import Image from prometheus_client import Counter, start_http_server # --- CONFIG & METRICS --- API_TOKEN = os.getenv("BOT_TOKEN") METRICS_PORT = 8000 HEALTH_PORT = 8080 CONVERSION_COUNT = Counter("heic_conversions_total", "Total HEIC to JPEG conversions") logging.basicConfig(level=logging.INFO) pillow_heif.register_heif_opener() if not API_TOKEN: raise ValueError("No BOT_TOKEN found in environment variables") # --- MIDDLEWARE FOR ALBUMS --- class AlbumMiddleware(BaseMiddleware): """Groups multiple messages in a media group into a single list.""" def __init__(self, latency: float = 0.6): self.latency = latency self.album_data: Dict[str, List[Message]] = {} async def __call__(self, handler, event: Message, data: Dict[str, Any]): if not event.media_group_id: return await handler(event, data) if event.media_group_id not in self.album_data: self.album_data[event.media_group_id] = [event] await asyncio.sleep(self.latency) data["album"] = self.album_data.pop(event.media_group_id) return await handler(event, data) self.album_data[event.media_group_id].append(event) return # --- HANDLERS --- async def convert_image(bot: Bot, msg: Message) -> BufferedInputFile: """Helper to download and convert a single HEIC to JPEG.""" file_id = msg.document.file_id if msg.document else msg.photo[-1].file_id file_name = msg.document.file_name if msg.document else f"{file_id}.heic" file = await bot.get_file(file_id) heic_buffer = await bot.download_file(file.file_path) with Image.open(heic_buffer) as img: jpeg_buffer = io.BytesIO() img.convert("RGB").save(jpeg_buffer, format="JPEG", quality=95) jpeg_buffer.seek(0) new_name = file_name.rsplit(".", 1)[0] + ".jpg" CONVERSION_COUNT.inc() return BufferedInputFile(jpeg_buffer.read(), filename=new_name) @dp.message(F.media_group_id) async def handle_album(message: Message, album: List[Message], bot: Bot): """Handles multiple HEIC images sent as an album.""" media_group = MediaGroupBuilder(caption="Converted Images") for msg in album: if ( msg.document and msg.document.file_name.lower().endswith(".heic") ) or msg.photo: conv_file = await convert_image(bot, msg) media_group.add_document(media=conv_file) await message.answer_media_group(media=media_group.build()) @dp.message(F.document.file_name.lower().endswith(".heic")) async def handle_single_doc(message: Message, bot: Bot): """Handles a single HEIC file.""" jpeg_file = await convert_image(bot, message) await message.reply_document(document=jpeg_file) # --- K8S HEALTH CHECK --- async def health_check(request): return web.Response(text="OK", status=200) # --- MAIN --- async def main(): bot = Bot(token=API_TOKEN) dp = Dispatcher() dp.message.middleware(AlbumMiddleware()) # Start Prometheus start_http_server(METRICS_PORT) # Start Health Check Server server = web.Application() server.router.add_get("/healthz", health_check) runner = web.AppRunner(server) await runner.setup() site = web.TCPSite(runner, "0.0.0.0", HEALTH_PORT) await asyncio.gather(site.start(), dp.start_polling(bot)) if __name__ == "__main__": asyncio.run(main())