import asyncio
import os
import random
import json
from datetime import datetime, timezone
from telethon import TelegramClient, events
from telethon.tl.types import (
MessageMediaDocument,
DocumentAttributeFilename,
DocumentAttributeVideo
)
from telethon.errors import FloodWaitError
from dotenv import load_dotenv
# ==================== 配置读取 ====================
load_dotenv()
api_id = int(os.getenv(“API_ID”))
api_hash = os.getenv(“API_HASH”)
PHONE_NUMBER = os.getenv(“PHONE_NUMBER”)
TWO_STEP_PASSWORD = os.getenv(“TWO_STEP_PASSWORD”)
SOURCE_CHANNELS = [ch.strip() for ch in os.getenv(“SOURCE_CHANNELS”, “”).split(“,”) if ch.strip()]
TARGET_CHANNEL = os.getenv(“TARGET_CHANNEL”)
MIN_FILE_SIZE = int(os.getenv(“MIN_FILE_SIZE”, 200 * 1024 * 1024))
MAX_FORWARD_COUNT = int(os.getenv(“MAX_FORWARD_COUNT”, 0))
SCAN_LIMIT = int(os.getenv(“SCAN_LIMIT”, 50)) # 0 = 全量历史
MIN_DURATION = int(os.getenv(“MIN_DURATION”, 0))
MAX_DURATION = int(os.getenv(“MAX_DURATION”, 0))
MAX_CAPTION_LENGTH = int(os.getenv(“MAX_CAPTION_LENGTH”, 1024))
START_DATE = os.getenv(“START_DATE”)
END_DATE = os.getenv(“END_DATE”)
start_date = datetime.strptime(START_DATE, “%Y-%m-%d”).replace(tzinfo=timezone.utc) if START_DATE else None
end_date = datetime.strptime(END_DATE, “%Y-%m-%d”).replace(tzinfo=timezone.utc) if END_DATE else None
client = TelegramClient(“user_session”, api_id, api_hash)
VIDEO_KEYS_FILE = “video_keys.json”
PROGRESS_FILE = “channel_progress.json”
# ==================== JSON 持久化 ====================
def load_json(path, default):
if os.path.exists(path):
try:
with open(path, “r”, encoding=”utf-8″) as f:
return json.load(f)
except Exception:
return default
return default
def save_json(path, data):
try:
with open(path, “w”, encoding=”utf-8″) as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f”❌ 保存 {path} 失败: {e}”)
existing_video_keys = load_json(VIDEO_KEYS_FILE, {})
channel_progress = load_json(PROGRESS_FILE, {})
# ==================== 工具函数 ====================
def get_duration_from_doc(doc):
for attr in doc.attributes:
if isinstance(attr, DocumentAttributeVideo):
return attr.duration
return 0
def get_video_key(message):
doc = message.media.document
filename = next(
(a.file_name for a in doc.attributes if isinstance(a, DocumentAttributeFilename)),
“video.mp4”
)
return f”{filename}_{doc.size}_{doc.id}”
def is_video_eligible(message):
if not message.media or not isinstance(message.media, MessageMediaDocument):
return False
doc = message.media.document
if not doc.mime_type or not doc.mime_type.startswith(“video”):
return False
if doc.size < MIN_FILE_SIZE:
return False
duration = get_duration_from_doc(doc)
if MIN_DURATION > 0 and duration < MIN_DURATION:
return False
if MAX_DURATION > 0 and duration > MAX_DURATION:
return False
if start_date and message.date < start_date:
return False
if end_date and message.date > end_date:
return False
return True
# ==================== 转发函数 ====================
async def forward_message(message, video_key):
caption = message.message or “”
if len(caption) > MAX_CAPTION_LENGTH:
caption = caption[:MAX_CAPTION_LENGTH] + “…”
try:
await client.send_file(
TARGET_CHANNEL,
file=message.media,
caption=caption,
silent=True
)
print(f”✅ 转发成功 message_id={message.id}”)
existing_video_keys[video_key] = message.id
save_json(VIDEO_KEYS_FILE, existing_video_keys)
await asyncio.sleep(random.uniform(1, 4))
except FloodWaitError as e:
print(f”⏳ FloodWait {e.seconds}s”)
await asyncio.sleep(e.seconds)
await forward_message(message, video_key)
# ==================== 历史批量转发 ====================
async def batch_forward_latest_videos():
for source_channel in SOURCE_CHANNELS:
source = await client.get_entity(source_channel)
channel_id = str(source.id)
last_id = channel_progress.get(channel_id, 0)
limit = SCAN_LIMIT if SCAN_LIMIT > 0 else None
mode = “全量历史” if limit is None else f”最近 {limit} 条”
print(f”\n📦 扫描 {source_channel}({mode}),last_message_id={last_id}”)
count = 0
async for message in client.iter_messages(
source,
min_id=last_id,
limit=limit,
reverse=True # 🔑 旧 → 新
):
if not is_video_eligible(message):
continue
video_key = get_video_key(message)
if video_key in existing_video_keys:
continue
try:
await forward_message(message, video_key)
# ✅ 成功后推进断点
channel_progress[channel_id] = message.id
save_json(PROGRESS_FILE, channel_progress)
count += 1
if MAX_FORWARD_COUNT > 0 and count >= MAX_FORWARD_COUNT:
print(f”⏹️ 达到最大转发数量 {MAX_FORWARD_COUNT}”)
break
except Exception as e:
print(f”❌ message_id={message.id} 失败,中断本频道: {e}”)
break
print(f”📁 {source_channel} 本轮完成,转发 {count} 个视频”)
# ==================== 实时监听 ====================
@client.on(events.NewMessage(chats=SOURCE_CHANNELS))
async def handler(event):
source = await event.get_chat()
channel_id = str(source.id)
message = event.message
if not is_video_eligible(message):
return
video_key = get_video_key(message)
if video_key in existing_video_keys:
return
await forward_message(message, video_key)
# ✅ 实时消息同样推进断点
channel_progress[channel_id] = message.id
save_json(PROGRESS_FILE, channel_progress)
# ==================== 主程序 ====================
async def main():
await client.start(PHONE_NUMBER, TWO_STEP_PASSWORD)
print(“✅ Telegram 登录成功”)
await batch_forward_latest_videos()
print(“⏳ 开始实时监听新视频…”)
await client.run_until_disconnected()
if __name__ == “__main__”:
with client:
client.loop.run_until_complete(main())