当前位置:   article > 正文

Python 使用 WeChatFerry 搭建部署微信机器人详细教程(更新中)_pip install wcferry

pip install wcferry

下载安装 wcferry 库

通过 pip 快速安装 wcferry

pip install wcferry
  • 1

免责声明:仅供学习和技术研究使用,不得用于任何商业或非法行为,否则后果自负。

基本原理

当微信收到消息时,抢在微信处理(显示到页面)前,先让工具处理,处理完之后再交还给原来的处理模块。需要发送消息时,模拟微信发送消息,组装好消息体,调用微信发送消息的模块。获取联系人,则是遍历一块特定的内存空间。通过好友验证,则是组装好验证信息,调用微信的验证模块。数据库相关功能,则是通过获取到数据库句柄,基于 sqlite3 的接口来执行。

在这里插入图片描述
博客地址:微信机器人 DIY 从 0 到 1

Wcf 函数说明

WeChatFerry:一个玩微信的工具(函数说明)

函数名称描述返回类型
cleanup关闭连接 回收资源None
keep_running阻塞进程(让 RPC 一直维持连接)
is_receiving_msg是否已启动接收消息功能bool
get_qrcode获取登录二维码(已经登录则返回空字符串)str
is_login检查登录状态bool
get_self_wxid获取登录账号的 wxidstr
get_msg_types获取所有消息类型Dict
get_contacts获取所有联系人List[Dict]
get_friends获取所有好友List[Dict]
get_dbs获取数据库List[str]
get_tables获取某数据库下的表List[Dict]
get_user_info获取登录账号个人信息Dict
get_audio_msg取语音消息并转成 MP3str
send_text发送文本消息(可 @)int
_download_file下载文件str
_process_path处理路径(如果是网络路径则下载文件)str
send_image发送图片(非线程安全)int
send_file发送文件(非线程安全)int
send_xml发送 XMLint
send_emotion发送表情int
send_rich_text发送富文本消息int
send_pat_msg拍一拍群友int
forward_msg转发消息int
get_msg从消息队列中获取消息WxMsg
enable_receiving_msg允许接收消息bool
enable_recv_msg允许接收消息(旧接口)bool
disable_recv_msg停止接收消息bool
query_sql执行 SQL 查询List[Dict]
accept_new_friend接受好友申请int
refresh_pyq刷新朋友圈int
download_attach下载附件int
get_info_by_wxid通过 wxid 查询微信号昵称等信息dict
revoke_msg撤回消息int
decrypt_image解密图片str
get_ocr_result获取 OCR 结果str
download_image下载图片str
add_chatroom_members添加群成员int
del_chatroom_members删除群成员int
invite_chatroom_members邀请群成员int
get_chatroom_members获取群成员Dict
get_alias_in_chatroom获取群名片str
receive_transfer接收转账int

电脑端检测登录微信

from wcferry import Wcf

wcf = Wcf()
  • 1
  • 2
  • 3

检测微信登陆状态

检查当前 PC 端微信登陆状态?

from wcferry import Wcf

wcf = Wcf()
print(wcf.is_login())
  • 1
  • 2
  • 3
  • 4

获取登录账号信息

获取当前 PC 端微信账号信息?

from wcferry import Wcf

wcf = Wcf()
print(wcf.get_user_info())
  • 1
  • 2
  • 3
  • 4

运行结果

{'wxid': 'wxid_***', 'name': '字里行间', 'mobile': '195********', 'home': 'C:\\Users\\Administrator\\Documents\\WeChat Files\\'}
  • 1

开辟线程监听群消息

开启线程监听消息:判断是否是群消息?

from queue import Empty
from threading import Thread

from wcferry import Wcf, WxMsg

wcf = Wcf()


def processMsg(msg: WxMsg):
    if msg.from_group():
        print(msg.content)


def enableReceivingMsg():
    def innerWcFerryProcessMsg():
        while wcf.is_receiving_msg():
            try:
                msg = wcf.get_msg()
                processMsg(msg)
            except Empty:
                continue
            except Exception as e:
                print(f"ERROR: {e}")

    wcf.enable_receiving_msg()
    Thread(target=innerWcFerryProcessMsg, name="ListenMessageThread", daemon=True).start()


enableReceivingMsg()

wcf.keep_running()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

微信消息属性说明

class WxMsg() 微信消息属性说明
  • 1

属性说明

字段名类型描述
typeint消息类型 可通过 get_msg_types 获取
idstr消息 id
xmlstr消息 xml 部分
senderstr消息发送人
roomidstr(仅群消息有)群 id
contentstr消息内容
thumbstr视频或图片消息的缩略图路径
extrastr视频或图片消息的路径

消息类型

from wcferry import Wcf

wcf = Wcf()
print(wcf.get_msg_types())
  • 1
  • 2
  • 3
  • 4
消息类型编号消息类型描述属性
0朋友圈消息int
1文字int
3图片int
34语音int
37好友确认int
40POSSIBLEFRIEND_MSGint
42名片int
43视频int
47石头剪刀布表情图片
48位置int
49共享实时位置、文件、转账、链接int
50VOIPMSGint
51微信初始化int
52VOIPNOTIFYint
53VOIPINVITEint
62小视频int
66微信红包int
9999SYSNOTICEint
10000红包、系统消息int
10002撤回消息int
1048625搜狗表情int
16777265链接int
436207665微信红包int
536936497红包封面int
754974769视频号视频int
771751985视频号名片int
822083633引用消息int
922746929拍一拍int
973078577视频号直播int
974127153商品链接int
975175729视频号直播int
1040187441音乐链接int
1090519089文件int

根据群名称查询群 wxid

特别注意:Wcf 没有提供根据群名称查询群 wxid 功能。我们可以先获取全部联系人数据(微信好友、微信群等等),基于 wxid 进行区分,因为微信群 wxid 后缀都是 “chatroom” 结尾。

from wcferry import Wcf, WxMsg

wcf = Wcf()

wcf_rooms = []

for contact in wcf.get_contacts():
    if contact['wxid'].endswith("chatroom"):
        wcf_rooms.append(contact)

def get_chatroom_roomid(wcf_rooms: list, room_name: str):
    for room in wcf_rooms:
        if room['name'] == room_name:
            return room['wxid']
    return None

room_id = get_chatroom_roomid(wcf_rooms=wcf_rooms, room_name="测试群")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

定时发送群文件

如何进行定时发送群文件?通过 aspschedule 第三方库实现定时任务,然后调用 wcf.send_file 函数执行发送文件的消息。

from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime

from wcferry import Wcf

wcf = Wcf()

wcf_rooms = []

for contact in wcf.get_contacts():
    if contact['wxid'].endswith("chatroom"):
        wcf_rooms.append(contact)


def get_chatroom_roomid(wcf_rooms: list, room_name: str):
    for room in wcf_rooms:
        if room['name'] == room_name:
            return room['wxid']
    return None


def schedule_task_job(room_id: str, wcf: Wcf):
    wcf.send_file(path="test.txt", receiver=room_id)


customize_time = "2024-05-09 09:10:10"
customize_room = "唤醒手腕测试群"
run_date = datetime.strptime(customize_time, "%Y-%m-%d %H:%M:%S")
room_id = get_chatroom_roomid(wcf_rooms, customize_room)

scheduler = BackgroundScheduler()
scheduler.add_job(schedule_task_job, args=(room_id, wcf), run_date=run_date)
scheduler.start()

wcf.keep_running()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

运行结果

在这里插入图片描述

监听保存语音消息

from queue import Empty
from threading import Thread

from wcferry import Wcf, WxMsg

wcf = Wcf()


def processMsg(msg: WxMsg):
    if msg.from_group():
        response = wcf.get_audio_msg(id=msg.id, dir=f"audio")
        print("语音地址:" + response)


def enableReceivingMsg():
    def innerWcFerryProcessMsg():
        while wcf.is_receiving_msg():
            try:
                msg = wcf.get_msg()
                processMsg(msg)
            except Empty:
                continue
            except Exception as e:
                print(f"ERROR: {e}")

    wcf.enable_receiving_msg()
    Thread(target=innerWcFerryProcessMsg, name="ListenMessageThread", daemon=True).start()


enableReceivingMsg()

wcf.keep_running()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

更新中······

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号