import asyncio
import os
import random
import re
import logging
from logging.handlers import RotatingFileHandler
from typing import List, Dict, Optional
import aiosqlite
from telethon import TelegramClient
from telethon.tl.types import MessageMediaDocument, Message
from telethon.errors import FloodWaitError
from dotenv import load_dotenv
# ==================== 1. 环境初始化 ====================
DB_FOLDER = "database"
SESSION_FOLDER = "session"
for folder in [DB_FOLDER, SESSION_FOLDER]:
if not os.path.exists(folder):
os.makedirs(folder)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - [%(name)s] - %(message)s",
handlers=[
RotatingFileHandler(os.path.join(DB_FOLDER, "bot_work.log"), maxBytes=10*1024*1024, backupCount=5, encoding="utf-8"),
logging.StreamHandler()
]
)
logger = logging.getLogger("SuperForwarder")
load_dotenv()
API_ID = int(os.getenv("API_ID", 0))
API_HASH = os.getenv("API_HASH", "")
PHONE_NUMBER = os.getenv("PHONE_NUMBER", "")
TWO_STEP_PASSWORD = os.getenv("TWO_STEP_PASSWORD", "")
TARGET_CHANNEL = int(os.getenv("TARGET_CHANNEL", 0))
SOURCE_CHANNELS = [int(x.strip()) for x in os.getenv("SOURCE_CHANNELS", "").split(",") if x.strip()]
# 配置参数
MIN_INTERVAL = float(os.getenv("MIN_INTERVAL", 3.0))
MAX_INTERVAL = float(os.getenv("MAX_INTERVAL", 7.0))
MIN_SIZE_MB = float(os.getenv("MIN_SIZE_MB", 0))
ALBUM_WAIT_TIME = 5.0 # 稍微拉长,确保大相册接收完整
AD_PATTERNS = [
r"https?://\S+",
r"t\.me/\S+",
r"@\w+",
r"Via .*",
r"\[.*?\]\(https?://.*?\)"
]
# ==================== 2. 数据库管理 ====================
class AsyncDB:
def __init__(self, path):
self.path = path
self.conn: Optional[aiosqlite.Connection] = None
async def connect(self):
self.conn = await aiosqlite.connect(self.path)
await self.conn.execute("PRAGMA journal_mode=WAL;")
await self.conn.execute("""
CREATE TABLE IF NOT EXISTS progress (
channel_id TEXT PRIMARY KEY,
last_msg_id INTEGER DEFAULT 0,
min_msg_id INTEGER DEFAULT 0
)""")
await self.conn.execute("CREATE TABLE IF NOT EXISTS videos (video_key TEXT PRIMARY KEY)")
await self.conn.execute("CREATE INDEX IF NOT EXISTS idx_vkey ON videos (video_key);")
await self.conn.commit()
async def is_seen(self, key: str) -> bool:
async with self.conn.execute("SELECT 1 FROM videos WHERE video_key=?", (key,)) as cursor:
return await cursor.fetchone() is not None
async def mark_seen(self, key: str):
await self.conn.execute("INSERT OR IGNORE INTO videos (video_key) VALUES (?)", (key,))
await self.conn.commit()
async def get_prog(self, cid) -> tuple:
async with self.conn.execute("SELECT last_msg_id, min_msg_id FROM progress WHERE channel_id=?", (str(cid),)) as cursor:
r = await cursor.fetchone()
return r if r else (0, 0)
async def update_prog(self, cid, last_id=None, min_id=None):
if last_id is not None:
await self.conn.execute("INSERT INTO progress (channel_id, last_msg_id) VALUES (?, ?) ON CONFLICT(channel_id) DO UPDATE SET last_msg_id=?", (str(cid), last_id, last_id))
if min_id is not None:
await self.conn.execute("INSERT INTO progress (channel_id, min_msg_id) VALUES (?, ?) ON CONFLICT(channel_id) DO UPDATE SET min_msg_id=?", (str(cid), min_id, min_id))
await self.conn.commit()
# ==================== 3. 工具逻辑 ====================
def clean_caption(text: str) -> str:
if not text:
return ""
# 按行过滤广告,保留干净的文本行
lines = text.split("\n")
clean_lines = []
for line in lines:
filtered = line
for p in AD_PATTERNS:
filtered = re.sub(p, "", filtered, flags=re.I)
if filtered.strip():
clean_lines.append(line) # 如果你希望保留整行,或者用 filtered
return "\n".join(clean_lines).strip()
def is_video(msg: Message) -> bool:
if not msg or not msg.media or not isinstance(msg.media, MessageMediaDocument):
return False
if not msg.media.document or not msg.media.document.mime_type.startswith("video"):
return False
if MIN_SIZE_MB > 0 and (msg.media.document.size / 1048576) < MIN_SIZE_MB:
return False
return True
# ==================== 4. 相册聚合逻辑 ====================
forward_queue = asyncio.Queue(maxsize=500)
pending_albums: Dict[int, List[Message]] = {}
async def push_to_queue_later(grouped_id):
await asyncio.sleep(ALBUM_WAIT_TIME)
if grouped_id in pending_albums:
msgs = pending_albums.pop(grouped_id)
if msgs:
msgs.sort(key=lambda x: x.id)
await forward_queue.put(msgs)
async def handle_incoming(msg: Message):
if msg.grouped_id:
if msg.grouped_id not in pending_albums:
pending_albums[msg.grouped_id] = []
asyncio.create_task(push_to_queue_later(msg.grouped_id))
pending_albums[msg.grouped_id].append(msg)
else:
await forward_queue.put([msg])
# ==================== 5. 转发 Worker (单 Worker 串行,防乱序风控) ====================
async def worker():
logger.info("转发 Worker 线程已启动...")
while True:
batch = await forward_queue.get()
try:
to_send = []
caption = ""
for m in batch:
# 安全校验:确保有 media 和 document
if not is_video(m):
continue
v_key = str(m.media.document.id)
if not await db.is_seen(v_key):
to_send.append(m)
if m.text and not caption:
caption = clean_caption(m.text)
if to_send:
files = [m.media for m in to_send]
# 发送控制
await client.send_file(TARGET_CHANNEL, file=files, caption=caption, supports_streaming=True)
for m in to_send:
await db.mark_seen(str(m.media.document.id))
logger.info(f"成功搬运 {len(to_send)} 个视频到目标频道")
# 随机冷却,保护账号
await asyncio.sleep(random.uniform(MIN_INTERVAL, MAX_INTERVAL))
except FloodWaitError as e:
logger.warning(f"触发 Telegram 风控,强制等待 {e.seconds} 秒...")
await asyncio.sleep(e.seconds + 5)
except Exception as e:
logger.error(f"Worker 处理搬运时发生未知错误: {e}", exc_info=True)
finally:
forward_queue.task_done()
# ==================== 6. 扫描任务 ====================
async def scan_latest_task(cid):
last_id, min_id = await db.get_prog(cid)
# 首次运行初始化
if last_id == 0:
logger.info(f"频道 {cid} 初次运行,开始抓取最新 2000 条历史...")
max_seen_id = 0
min_seen_id = float('inf')
async for msg in client.iter_messages(cid, limit=2000):
if is_video(msg):
await handle_incoming(msg)
if msg.id > max_seen_id: max_seen_id = msg.id
if msg.id < min_seen_id: min_seen_id = msg.id
last_id = max_seen_id
# 如果历史补全指针也是0,同步更新它
if min_id == 0 and min_seen_id != float('inf'):
min_id = min_seen_id
await db.update_prog(cid, last_id=last_id, min_id=min_id)
else:
await db.update_prog(cid, last_id=last_id)
logger.info(f"频道 {cid} 实时监控已就绪,当前 last_id: {last_id}")
while True:
try:
# reverse=True 让消息从旧到新实时流入
async for msg in client.iter_messages(cid, min_id=last_id, reverse=True):
if is_video(msg):
await handle_incoming(msg)
last_id = msg.id
await db.update_prog(cid, last_id=last_id)
await asyncio.sleep(30) # 每 30 秒检查一次是否有新动态
except FloodWaitError as e:
await asyncio.sleep(e.seconds + 5)
except Exception as e:
logger.error(f"实时监控故障 ({cid}): {e}")
await asyncio.sleep(30)
async def backfill_history_task(cid):
logger.info(f"频道 {cid} 历史异步补全队列启动...")
while True:
try:
_, min_id = await db.get_prog(cid)
if min_id <= 1:
logger.info(f"频道 {cid} 历史数据已全部补全完毕,退出该任务。")
break
# 只有当缓冲区队列不拥堵时,才继续往前刨历史
if forward_queue.qsize() < 50:
current_min_id = min_id
has_msg = False
async for msg in client.iter_messages(cid, offset_id=min_id, limit=40):
has_msg = True
if is_video(msg):
await handle_incoming(msg)
current_min_id = msg.id # 逐步递减
# 如果推进不动了,说明前面真的没消息了
if not has_msg or current_min_id == min_id:
await db.update_prog(cid, min_id=1)
logger.info(f"频道 {cid} 历史已触底。")
break
await db.update_prog(cid, min_id=current_min_id)
logger.info(f"历史补全推进中... 频道: {cid}, 当前进度 msg_id: {current_min_id}")
await asyncio.sleep(random.randint(15, 30))
else:
await asyncio.sleep(20)
except FloodWaitError as e:
await asyncio.sleep(e.seconds + 5)
except Exception as e:
logger.error(f"历史补全故障 ({cid}): {e}")
await asyncio.sleep(60)
# ==================== 7. 启动入口 ====================
s_tag = str(SOURCE_CHANNELS[0]) if SOURCE_CHANNELS else "unknown"
t_tag = str(TARGET_CHANNEL)
db_filename = f"{s_tag}to{t_tag}.db"
db_path = os.path.join(DB_FOLDER, db_filename)
session_path = os.path.join(SESSION_FOLDER, "forwarder_session")
client = TelegramClient(session_path, API_ID, API_HASH)
db = AsyncDB(db_path)
async def main():
await db.connect()
await client.start(PHONE_NUMBER, TWO_STEP_PASSWORD)
logger.info(f"--- 登录成功 | 运行数据库: {db_filename} ---")
# 启动单 Worker 串行发送机制,确保安全和时序
asyncio.create_task(worker())
for cid in SOURCE_CHANNELS:
asyncio.create_task(scan_latest_task(cid))
asyncio.create_task(backfill_history_task(cid))
await client.run_until_disconnected()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("程序已被用户手动终止。")