import asyncio
import os
import sys
from telethon import TelegramClient, errors
from telethon.tl.types import MessageMediaDocument, DocumentAttributeFilename
from dotenv import load_dotenv
# ================== 1. 读取配置 ==================
load_dotenv()
try:
api_id = int(os.getenv("API_ID"))
api_hash = os.getenv("API_HASH")
PHONE_NUMBER = os.getenv("PHONE_NUMBER")
TWO_STEP_PASSWORD = os.getenv("TWO_STEP_PASSWORD") or None
# --- 修复:自动判断频道 ID 是数字还是用户名 ---
raw_target = os.getenv("TARGET_CHANNEL")
try:
# 如果是数字(如 -100xxx),转换为整数
TARGET_CHANNEL = int(raw_target)
except (ValueError, TypeError):
# 如果不是数字(如 @username),保持字符串
TARGET_CHANNEL = raw_target
# 扫描限制:设置为想要扫描的【视频数量】
scan_env = os.getenv("SCAN_LIMIT", "2000")
TARGET_VIDEO_COUNT = int(scan_env)
except Exception as e:
print(f"❌ 配置错误: {e}")
sys.exit(1)
client = TelegramClient("user_session", api_id, api_hash)
# ================== 2. 工具函数 ==================
def get_video_info(message):
"""
解析视频信息
返回: (is_video, file_id, file_name)
"""
if not message.media or not isinstance(message.media, MessageMediaDocument):
return False, None, None
doc = message.media.document
# 判定是否为视频 mime 类型
if not (doc.mime_type and doc.mime_type.startswith("video/")):
return False, None, None
# 获取文件名(仅用于显示,不用于判重)
file_name = "未知文件名"
for attr in doc.attributes:
if isinstance(attr, DocumentAttributeFilename):
file_name = attr.file_name
break
# Telethon document.id 是该文件在 TG 系统内的唯一标识
return True, doc.id, file_name
# ================== 3. 主逻辑 ==================
async def main():
print("🔐 正在登录 Telegram...")
# 自动处理登录,如果是第一次运行,控制台会要求输入验证码
await client.start(phone=PHONE_NUMBER, password=TWO_STEP_PASSWORD)
print("✅ 登录成功")
try:
# 获取频道实体对象
target = await client.get_entity(TARGET_CHANNEL)
target_name = getattr(target, "title", TARGET_CHANNEL)
except Exception as e:
print(f"❌ 无法获取频道信息: {TARGET_CHANNEL}")
print(f" 原因: {e}")
print(" 提示: 请确保你已经加入了该频道,且 ID 填写正确(ID必须是整数,不带引号)。")
return
# 显示当前任务模式
mode_str = "无限 (直到扫描完所有历史)" if TARGET_VIDEO_COUNT == 0 else f"最近 {TARGET_VIDEO_COUNT} 个视频"
print(f"\n📺 目标频道:{target_name}")
print(f"🎯 扫描目标:{mode_str}")
print(f"⚙️ 判重策略:保留【最新】发布的视频,删除旧的重复项")
print("-" * 40)
seen_keys = set() # 记录已出现的视频 ID
duplicates = [] # 存储待删除的消息 [(msg_id, file_name), ...]
scanned_msgs = 0 # 扫描过的消息总数(含文字/图片)
found_videos = 0 # 找到的视频数
print("⏳ 正在扫描消息 (顺序:从新 -> 旧)...")
# limit=None 表示如果不手动 break,就一直扫描下去
async for msg in client.iter_messages(target, limit=None):
scanned_msgs += 1
is_vid, file_id, file_name = get_video_info(msg)
if not is_vid:
continue
# 找到一个视频
found_videos += 1
# 核心判重逻辑
if file_id in seen_keys:
# 已经在 seen_keys 里,说明之前扫描到了(即更新的消息里有这个视频)
# 所以当前这条较旧的消息是重复的
duplicates.append((msg.id, file_name))
else:
seen_keys.add(file_id)
# 打印进度条
if found_videos % 20 == 0:
print(f" 已检索 {found_videos} 个视频 (总扫描消息 {scanned_msgs} 条)...")
# 达到数量限制,退出循环
if TARGET_VIDEO_COUNT != 0 and found_videos >= TARGET_VIDEO_COUNT:
print(f"✅ 已达到设定的 {TARGET_VIDEO_COUNT} 个视频目标,停止扫描。")
break
print("-" * 40)
print("📊 扫描结果统计")
print(f" 总扫描消息数:{scanned_msgs}")
print(f" 检索视频总数:{found_videos}")
print(f" 发现重复视频:{len(duplicates)}")
if not duplicates:
print("✅ 没有发现需要删除的重复视频")
return
print(f"\n⚠️ 即将删除 {len(duplicates)} 条【旧的重复】视频")
# 等待用户确认
confirm = input("❓ 确认删除?(输入 y 确认,其他键取消): ").strip().lower()
if confirm != "y":
print("🚫 已取消操作")
return
print("🗑️ 开始执行删除任务...")
# 提取所有要删除的消息 ID
delete_ids = [d[0] for d in duplicates]
batch_size = 50 # 每次删除 50 条,防止请求过大
for i in range(0, len(delete_ids), batch_size):
batch = delete_ids[i:i + batch_size]
try:
await client.delete_messages(target, batch)
print(f" 已删除 {min(i + batch_size, len(delete_ids))}/{len(delete_ids)}")
# 适当延时,保护账号安全
await asyncio.sleep(1.5)
except errors.FloodWaitError as e:
print(f"⏳ 触发 Telegram 流控 (FloodWait),需等待 {e.seconds} 秒...")
await asyncio.sleep(e.seconds + 2)
except errors.MessageIdInvalidError:
print(f"⚠️ 某些消息可能已经被删除,跳过该批次")
except Exception as e:
print(f"❌ 删除出错: {e}")
print(f"\n✅ 清理完成!")
# ================== 4. 程序入口 ==================
if __name__ == "__main__":
# 使用 with 语法自动管理连接和断开
with client:
client.loop.run_until_complete(main())