{"id":12,"date":"2025-12-20T08:28:52","date_gmt":"2025-12-20T08:28:52","guid":{"rendered":"https:\/\/wordpress-fz3fv.wasmer.app\/?p=12"},"modified":"2026-06-03T06:30:56","modified_gmt":"2026-06-03T06:30:56","slug":"tg-forward-videos","status":"publish","type":"post","link":"https:\/\/wp.iii.nn.kg\/?p=12","title":{"rendered":"TG-forward-videos"},"content":{"rendered":"\n<pre class=\"wp-block-code\"><code>import asyncio\nimport os\nimport random\nimport re\nimport logging\nfrom logging.handlers import RotatingFileHandler\nfrom typing import List, Dict, Optional\nimport aiosqlite\nfrom telethon import TelegramClient\nfrom telethon.tl.types import MessageMediaDocument, Message\nfrom telethon.errors import FloodWaitError\nfrom dotenv import load_dotenv\n\n# ==================== 1. \u73af\u5883\u521d\u59cb\u5316 ====================\nDB_FOLDER = \"database\"\nSESSION_FOLDER = \"session\"\nfor folder in &#91;DB_FOLDER, SESSION_FOLDER]:\n    if not os.path.exists(folder):\n        os.makedirs(folder)\n\nlogging.basicConfig(\n    level=logging.INFO,\n    format=\"%(asctime)s - %(levelname)s - &#91;%(name)s] - %(message)s\",\n    handlers=&#91;\n        RotatingFileHandler(os.path.join(DB_FOLDER, \"bot_work.log\"), maxBytes=10*1024*1024, backupCount=5, encoding=\"utf-8\"),\n        logging.StreamHandler()\n    ]\n)\nlogger = logging.getLogger(\"SuperForwarder\")\n\nload_dotenv()\nAPI_ID = int(os.getenv(\"API_ID\", 0))\nAPI_HASH = os.getenv(\"API_HASH\", \"\")\nPHONE_NUMBER = os.getenv(\"PHONE_NUMBER\", \"\")\nTWO_STEP_PASSWORD = os.getenv(\"TWO_STEP_PASSWORD\", \"\")\nTARGET_CHANNEL = int(os.getenv(\"TARGET_CHANNEL\", 0))\nSOURCE_CHANNELS = &#91;int(x.strip()) for x in os.getenv(\"SOURCE_CHANNELS\", \"\").split(\",\") if x.strip()]\n\n# \u914d\u7f6e\u53c2\u6570\nMIN_INTERVAL = float(os.getenv(\"MIN_INTERVAL\", 3.0))\nMAX_INTERVAL = float(os.getenv(\"MAX_INTERVAL\", 7.0))\nMIN_SIZE_MB = float(os.getenv(\"MIN_SIZE_MB\", 0))\nALBUM_WAIT_TIME = 5.0  # \u7a0d\u5fae\u62c9\u957f\uff0c\u786e\u4fdd\u5927\u76f8\u518c\u63a5\u6536\u5b8c\u6574\n\nAD_PATTERNS = &#91;\n    r\"https?:\/\/\\S+\", \n    r\"t\\.me\/\\S+\",     \n    r\"@\\w+\",         \n    r\"Via .*\",        \n    r\"\\&#91;.*?\\]\\(https?:\/\/.*?\\)\" \n]\n\n# ==================== 2. \u6570\u636e\u5e93\u7ba1\u7406 ====================\nclass AsyncDB:\n    def __init__(self, path):\n        self.path = path\n        self.conn: Optional&#91;aiosqlite.Connection] = None\n\n    async def connect(self):\n        self.conn = await aiosqlite.connect(self.path)\n        await self.conn.execute(\"PRAGMA journal_mode=WAL;\")\n        await self.conn.execute(\"\"\"\n            CREATE TABLE IF NOT EXISTS progress (\n                channel_id TEXT PRIMARY KEY, \n                last_msg_id INTEGER DEFAULT 0, \n                min_msg_id INTEGER DEFAULT 0\n            )\"\"\")\n        await self.conn.execute(\"CREATE TABLE IF NOT EXISTS videos (video_key TEXT PRIMARY KEY)\")\n        await self.conn.execute(\"CREATE INDEX IF NOT EXISTS idx_vkey ON videos (video_key);\")\n        await self.conn.commit()\n\n    async def is_seen(self, key: str) -> bool:\n        async with self.conn.execute(\"SELECT 1 FROM videos WHERE video_key=?\", (key,)) as cursor:\n            return await cursor.fetchone() is not None\n\n    async def mark_seen(self, key: str):\n        await self.conn.execute(\"INSERT OR IGNORE INTO videos (video_key) VALUES (?)\", (key,))\n        await self.conn.commit()\n\n    async def get_prog(self, cid) -> tuple:\n        async with self.conn.execute(\"SELECT last_msg_id, min_msg_id FROM progress WHERE channel_id=?\", (str(cid),)) as cursor:\n            r = await cursor.fetchone()\n            return r if r else (0, 0)\n\n    async def update_prog(self, cid, last_id=None, min_id=None):\n        if last_id is not None:\n            await self.conn.execute(\"INSERT INTO progress (channel_id, last_msg_id) VALUES (?, ?) ON CONFLICT(channel_id) DO UPDATE SET last_msg_id=?\", (str(cid), last_id, last_id))\n        if min_id is not None:\n            await self.conn.execute(\"INSERT INTO progress (channel_id, min_msg_id) VALUES (?, ?) ON CONFLICT(channel_id) DO UPDATE SET min_msg_id=?\", (str(cid), min_id, min_id))\n        await self.conn.commit()\n\n# ==================== 3. \u5de5\u5177\u903b\u8f91 ====================\ndef clean_caption(text: str) -> str:\n    if not text: \n        return \"\"\n    # \u6309\u884c\u8fc7\u6ee4\u5e7f\u544a\uff0c\u4fdd\u7559\u5e72\u51c0\u7684\u6587\u672c\u884c\n    lines = text.split(\"\\n\")\n    clean_lines = &#91;]\n    for line in lines:\n        filtered = line\n        for p in AD_PATTERNS:\n            filtered = re.sub(p, \"\", filtered, flags=re.I)\n        if filtered.strip():\n            clean_lines.append(line) # \u5982\u679c\u4f60\u5e0c\u671b\u4fdd\u7559\u6574\u884c\uff0c\u6216\u8005\u7528 filtered\n    return \"\\n\".join(clean_lines).strip()\n\ndef is_video(msg: Message) -> bool:\n    if not msg or not msg.media or not isinstance(msg.media, MessageMediaDocument): \n        return False\n    if not msg.media.document or not msg.media.document.mime_type.startswith(\"video\"): \n        return False\n    if MIN_SIZE_MB > 0 and (msg.media.document.size \/ 1048576) &lt; MIN_SIZE_MB: \n        return False\n    return True\n\n# ==================== 4. \u76f8\u518c\u805a\u5408\u903b\u8f91 ====================\nforward_queue = asyncio.Queue(maxsize=500)\npending_albums: Dict&#91;int, List&#91;Message]] = {}\n\nasync def push_to_queue_later(grouped_id):\n    await asyncio.sleep(ALBUM_WAIT_TIME)\n    if grouped_id in pending_albums:\n        msgs = pending_albums.pop(grouped_id)\n        if msgs:\n            msgs.sort(key=lambda x: x.id)\n            await forward_queue.put(msgs)\n\nasync def handle_incoming(msg: Message):\n    if msg.grouped_id:\n        if msg.grouped_id not in pending_albums:\n            pending_albums&#91;msg.grouped_id] = &#91;]\n            asyncio.create_task(push_to_queue_later(msg.grouped_id))\n        pending_albums&#91;msg.grouped_id].append(msg)\n    else:\n        await forward_queue.put(&#91;msg])\n\n# ==================== 5. \u8f6c\u53d1 Worker (\u5355 Worker \u4e32\u884c\uff0c\u9632\u4e71\u5e8f\u98ce\u63a7) ====================\nasync def worker():\n    logger.info(\"\u8f6c\u53d1 Worker \u7ebf\u7a0b\u5df2\u542f\u52a8...\")\n    while True:\n        batch = await forward_queue.get()\n        try:\n            to_send = &#91;]\n            caption = \"\"\n            \n            for m in batch:\n                # \u5b89\u5168\u6821\u9a8c\uff1a\u786e\u4fdd\u6709 media \u548c document\n                if not is_video(m):\n                    continue\n                \n                v_key = str(m.media.document.id)\n                if not await db.is_seen(v_key):\n                    to_send.append(m)\n                \n                if m.text and not caption:\n                    caption = clean_caption(m.text)\n\n            if to_send:\n                files = &#91;m.media for m in to_send]\n                # \u53d1\u9001\u63a7\u5236\n                await client.send_file(TARGET_CHANNEL, file=files, caption=caption, supports_streaming=True)\n                \n                for m in to_send:\n                    await db.mark_seen(str(m.media.document.id))\n                \n                logger.info(f\"\u6210\u529f\u642c\u8fd0 {len(to_send)} \u4e2a\u89c6\u9891\u5230\u76ee\u6807\u9891\u9053\")\n                # \u968f\u673a\u51b7\u5374\uff0c\u4fdd\u62a4\u8d26\u53f7\n                await asyncio.sleep(random.uniform(MIN_INTERVAL, MAX_INTERVAL))\n                \n        except FloodWaitError as e:\n            logger.warning(f\"\u89e6\u53d1 Telegram \u98ce\u63a7\uff0c\u5f3a\u5236\u7b49\u5f85 {e.seconds} \u79d2...\")\n            await asyncio.sleep(e.seconds + 5)\n        except Exception as e:\n            logger.error(f\"Worker \u5904\u7406\u642c\u8fd0\u65f6\u53d1\u751f\u672a\u77e5\u9519\u8bef: {e}\", exc_info=True)\n        finally:\n            forward_queue.task_done()\n\n# ==================== 6. \u626b\u63cf\u4efb\u52a1 ====================\nasync def scan_latest_task(cid):\n    last_id, min_id = await db.get_prog(cid)\n    \n    # \u9996\u6b21\u8fd0\u884c\u521d\u59cb\u5316\n    if last_id == 0:\n        logger.info(f\"\u9891\u9053 {cid} \u521d\u6b21\u8fd0\u884c\uff0c\u5f00\u59cb\u6293\u53d6\u6700\u65b0 2000 \u6761\u5386\u53f2...\")\n        max_seen_id = 0\n        min_seen_id = float('inf')\n        \n        async for msg in client.iter_messages(cid, limit=2000):\n            if is_video(msg): \n                await handle_incoming(msg)\n            if msg.id > max_seen_id: max_seen_id = msg.id\n            if msg.id &lt; min_seen_id: min_seen_id = msg.id\n        \n        last_id = max_seen_id\n        # \u5982\u679c\u5386\u53f2\u8865\u5168\u6307\u9488\u4e5f\u662f0\uff0c\u540c\u6b65\u66f4\u65b0\u5b83\n        if min_id == 0 and min_seen_id != float('inf'):\n            min_id = min_seen_id\n            await db.update_prog(cid, last_id=last_id, min_id=min_id)\n        else:\n            await db.update_prog(cid, last_id=last_id)\n\n    logger.info(f\"\u9891\u9053 {cid} \u5b9e\u65f6\u76d1\u63a7\u5df2\u5c31\u7eea\uff0c\u5f53\u524d last_id: {last_id}\")\n    while True:\n        try:\n            # reverse=True \u8ba9\u6d88\u606f\u4ece\u65e7\u5230\u65b0\u5b9e\u65f6\u6d41\u5165\n            async for msg in client.iter_messages(cid, min_id=last_id, reverse=True):\n                if is_video(msg): \n                    await handle_incoming(msg)\n                last_id = msg.id\n                await db.update_prog(cid, last_id=last_id)\n            await asyncio.sleep(30) # \u6bcf 30 \u79d2\u68c0\u67e5\u4e00\u6b21\u662f\u5426\u6709\u65b0\u52a8\u6001\n        except FloodWaitError as e:\n            await asyncio.sleep(e.seconds + 5)\n        except Exception as e:\n            logger.error(f\"\u5b9e\u65f6\u76d1\u63a7\u6545\u969c ({cid}): {e}\")\n            await asyncio.sleep(30)\n\nasync def backfill_history_task(cid):\n    logger.info(f\"\u9891\u9053 {cid} \u5386\u53f2\u5f02\u6b65\u8865\u5168\u961f\u5217\u542f\u52a8...\")\n    while True:\n        try:\n            _, min_id = await db.get_prog(cid)\n            if min_id &lt;= 1: \n                logger.info(f\"\u9891\u9053 {cid} \u5386\u53f2\u6570\u636e\u5df2\u5168\u90e8\u8865\u5168\u5b8c\u6bd5\uff0c\u9000\u51fa\u8be5\u4efb\u52a1\u3002\")\n                break\n            \n            # \u53ea\u6709\u5f53\u7f13\u51b2\u533a\u961f\u5217\u4e0d\u62e5\u5835\u65f6\uff0c\u624d\u7ee7\u7eed\u5f80\u524d\u5228\u5386\u53f2\n            if forward_queue.qsize() &lt; 50:\n                current_min_id = min_id\n                has_msg = False\n                \n                async for msg in client.iter_messages(cid, offset_id=min_id, limit=40):\n                    has_msg = True\n                    if is_video(msg): \n                        await handle_incoming(msg)\n                    current_min_id = msg.id # \u9010\u6b65\u9012\u51cf\n                \n                # \u5982\u679c\u63a8\u8fdb\u4e0d\u52a8\u4e86\uff0c\u8bf4\u660e\u524d\u9762\u771f\u7684\u6ca1\u6d88\u606f\u4e86\n                if not has_msg or current_min_id == min_id:\n                    await db.update_prog(cid, min_id=1)\n                    logger.info(f\"\u9891\u9053 {cid} \u5386\u53f2\u5df2\u89e6\u5e95\u3002\")\n                    break\n                    \n                await db.update_prog(cid, min_id=current_min_id)\n                logger.info(f\"\u5386\u53f2\u8865\u5168\u63a8\u8fdb\u4e2d... \u9891\u9053: {cid}, \u5f53\u524d\u8fdb\u5ea6 msg_id: {current_min_id}\")\n                await asyncio.sleep(random.randint(15, 30))\n            else:\n                await asyncio.sleep(20)\n        except FloodWaitError as e:\n            await asyncio.sleep(e.seconds + 5)\n        except Exception as e:\n            logger.error(f\"\u5386\u53f2\u8865\u5168\u6545\u969c ({cid}): {e}\")\n            await asyncio.sleep(60)\n\n# ==================== 7. \u542f\u52a8\u5165\u53e3 ====================\ns_tag = str(SOURCE_CHANNELS&#91;0]) if SOURCE_CHANNELS else \"unknown\"\nt_tag = str(TARGET_CHANNEL)\ndb_filename = f\"{s_tag}to{t_tag}.db\"\ndb_path = os.path.join(DB_FOLDER, db_filename)\n\nsession_path = os.path.join(SESSION_FOLDER, \"forwarder_session\")\nclient = TelegramClient(session_path, API_ID, API_HASH)\ndb = AsyncDB(db_path)\n\nasync def main():\n    await db.connect()\n    await client.start(PHONE_NUMBER, TWO_STEP_PASSWORD)\n    logger.info(f\"--- \u767b\u5f55\u6210\u529f | \u8fd0\u884c\u6570\u636e\u5e93: {db_filename} ---\")\n\n    # \u542f\u52a8\u5355 Worker \u4e32\u884c\u53d1\u9001\u673a\u5236\uff0c\u786e\u4fdd\u5b89\u5168\u548c\u65f6\u5e8f\n    asyncio.create_task(worker())\n\n    for cid in SOURCE_CHANNELS:\n        asyncio.create_task(scan_latest_task(cid))\n        asyncio.create_task(backfill_history_task(cid))\n\n    await client.run_until_disconnected()\n\nif __name__ == \"__main__\":\n    try:\n        asyncio.run(main())\n    except KeyboardInterrupt:\n        logger.info(\"\u7a0b\u5e8f\u5df2\u88ab\u7528\u6237\u624b\u52a8\u7ec8\u6b62\u3002\")<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-12","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"_links":{"self":[{"href":"https:\/\/wp.iii.nn.kg\/index.php?rest_route=\/wp\/v2\/posts\/12","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/wp.iii.nn.kg\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/wp.iii.nn.kg\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/wp.iii.nn.kg\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/wp.iii.nn.kg\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=12"}],"version-history":[{"count":0,"href":"https:\/\/wp.iii.nn.kg\/index.php?rest_route=\/wp\/v2\/posts\/12\/revisions"}],"wp:attachment":[{"href":"https:\/\/wp.iii.nn.kg\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=12"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/wp.iii.nn.kg\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=12"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/wp.iii.nn.kg\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=12"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}