赞
踩
参考链接:
https://github.com/AirtestProject/Airtest/blob/master/README_zh.md
Airtest框架安装
使用 pip
安装Airtest框架
- pip install -U airtest
-
可视化工具安装:
工具下载链接:
解压安装之后运行exe出现如图所示安装成功:
log窗口显示adb命令被占用了:
window杀死该进程
taskkill /pid 21584 -t -f
所有的安装完事,下面我们操作一番:
首先连接上手机:
运行之后,airtest在手机上会安装两个插件,安装完成后我们就可以搞事情了。
connect_device('android:///串号1')
在python程序导包的时候会报错:
from poco.drivers.android.uiautomation import AndroidUiautomationPoco
查看poco官方文档
https://poco.readthedocs.io/en/latest/source/doc/poco_drivers.html
根据自己的需要我们安装
pip install pocoui
演示代码,抓取抖音中出现的广告,信息抓取,并截图,视频地址抓取
eg:
代码:
- # -*- encoding=utf8 -*-
- __author__ = "yongxinboy"
-
- from airtest.core.api import *
- from poco.drivers.android.uiautomation import AndroidUiautomationPoco
-
- device_1 = connect_device('android:///你的设备号')
- # 设置抖音图标,操作直接打开抖音
- # touch(Template(r"q.jpg", record_pos=(0.122, -0.269), resolution=(1080, 2340)))
- start_app("com.ss.android.ugc.aweme")
-
- poco = AndroidUiautomationPoco(device=device_1,use_airtest_input=True,screenshot_each_action=False)
- # 设置抖音图标,操作直接打开抖音
- #touch(Template(r"q.jpg", record_pos=(0.122, -0.269), resolution=(1080, 2340)))
- def spider_info():
- marks = ['d6r', 'cg_']
- while True:
- time.sleep(4)
- for mark in marks:
- mark_k = 'com.ss.android.ugc.aweme:id/{}'.format(mark)
- is_true = poco(name=mark_k).exists()
- if is_true == True:
-
- try:
- mark = poco(name=mark_k).get_text()
-
- title = poco(name='com.ss.android.ugc.aweme:id/title').get_text()
-
- content = poco(name='com.ss.android.ugc.aweme:id/a12').get_text()
-
- click_count = poco(name='com.ss.android.ugc.aweme:id/a2_').get_text()
-
- comment = poco(name='com.ss.android.ugc.aweme:id/t2').get_text()
-
- if title and content and click_count and comment and mark:
- print(mark)
- print(title)
- print(content)
- print(click_count)
- print(comment)
-
- return mark,title,content,click_count,comment
-
- except Exception as err:
-
- poco.swipe([0.5, 0.8], [0.5, 0.2])
- time.sleep(4)
-
- poco.swipe([0.5, 0.8], [0.5, 0.2])
-
- else:
- poco.swipe([0.5, 0.8], [0.5, 0.2])
-
- return []
- while True:
- print(spider_info())
-
如何连接多机:
- from airtest.core.api import connect_device
-
- from poco.drivers.android.uiautomation import AndroidUiautomationPoco
-
- device_1 = connect_device('android:///串号1')
-
- device_2 = connect_device('android:///串号2')
-
- device_3 = connect_device('android:///串号3')
-
- poco_1 = AndroidUiautomationPoco(device_1, use_airtest_input=True, screenshot_each_action=False)
-
- poco_2 = AndroidUiautomationPoco(device_2, use_airtest_input=True, screenshot_each_action=False)
-
- poco_3 = AndroidUiautomationPoco(device_3, use_airtest_input=True, screenshot_each_action=False)
-
- poco_1(text='微信').click()
-
- poco_2(text='微信').click()
-
- poco_3(text='微信').click()
使用这种方式,就不需要来回切换,并且还可以使用多线程直接同时控制每一台手机。
类似于xpath的写法:
- poco("android.widget.LinearLayout").offspring("com.ss.android.ugc.aweme:id/c9a").offspring(
- "com.ss.android.ugc.aweme:id/b43")[0].child("android.widget.LinearLayout")[2].offspring(
- "com.ss.android.ugc.aweme:id/cek").click()
airtest还有很多高级写法,使用什么去官方文档查看就好了
附带完整代码:
- '''
- 单个手机
- 参考:https://github.com/AirtestProject
- 程序运行
- mitmdump -s douyin_spider.py
- mitmweb -s douyin_spider.py
- '''
- # -*- encoding=utf8 -*-
- import sys
- sys.path.append('../')
- __author__ = "YongXinYang"
- from airtest.core.api import *
- auto_setup(__file__)
- from douyin_spider_v3.spider_video_info import *
- from poco.drivers.android.uiautomation import AndroidUiautomationPoco
- import time
- import pymongo
- import re
- client = pymongo.MongoClient("localhost:27017")
- db = client.douyin_info
- device_1 = connect_device('android:///你的设备号')
- # 设置抖音图标,操作直接打开抖音
- # touch(Template(r"q.jpg", record_pos=(0.122, -0.269), resolution=(1080, 2340)))
- start_app("com.ss.android.ugc.aweme")
- poco = AndroidUiautomationPoco(device=device_1,use_airtest_input=True, screenshot_each_action=False)
- # 设置抖音图标,操作直接打开抖音
- # touch(Template(r"q.jpg", record_pos=(0.122, -0.269), resolution=(1080, 2340)))
-
- def re_sting(str):
- str = re.sub("[\!\%\[\]\,\。\?\'\"\@\.\*\&\、\:\;\$\\\]", "", str)
- return str
-
- def spider_info():
- try:
- ad1 = poco(text='查看详情').exists()
- ad2 = poco(text='视频同款商品').exists()
- if ad1 or ad2 == True:
- try:
- name = poco(name='com.ss.android.ugc.aweme:id/title').get_text()
- except:
- name = poco(name='com.ss.android.ugc.aweme:id/title').get_text()
-
- try:
- title = poco(desc='视频').child(type='android.widget.FrameLayout').child(type='android.widget.RelativeLayout').child(type='android.widget.LinearLayout').child(type='android.widget.FrameLayout').child(type='android.widget.LinearLayout').child(type='android.widget.LinearLayout').child(type='android.widget.TextView').get_text()
- except:
- title = poco(name='com.ss.android.ugc.aweme:id/title').get_text()
-
- print(name)
- print(title)
-
- # 截图操作
- #'E:\www\douyin_parser\douyin_spider_v3\info\\
- try:
- snapshot('E:\www\douyin_parser\douyin_spider_v3\image\{}.png'.format(re_sting(title)), msg=title)
- except Exception as err:
- raise err
- #点击分享链接
- poco(desc='分享,按钮').click()
- #左侧滑动,寻找复制链接
- poco.swipe([0.8268518518518518, 0.8064102564102564] ,[0.11203703703703703, 0.8064102564102564])
- time.sleep(2)
-
- #触发复制链接,这一部容易出现bug
- poco(text='复制链接').click()
- time.sleep(3)
- content_infos = db.douyin_url.find({'read_status': 0}, limit=1, sort=[("record_time", 1)])[0]
- url = content_infos['url']
- video_url = get_video(url)
- print(video_url)
-
- db.douyin_url.update({"_id": content_infos['_id']}, {'$set': {"read_status": 1}}, multi=True)
- if title and video_url:
- info_path = os.getcwd() + '\\信息\\{}.txt'.format(re_sting(name))
- with open(info_path, 'a+', encoding='utf-8') as f:
- f.write('视频标题:' + title + '\n' + '视频地址:' + video_url)
- f.flush()
- poco.swipe([0.5, 0.8], [0.5, 0.2])
- except Exception as err:
- print(err)
- poco.swipe([0.5, 0.8], [0.5, 0.2])
-
- if __name__ == '__main__':
- while True:
- time.sleep(10)
- spider_info()
mitmproxy代码:
- # -*- encoding=utf8 -*-
- '''
- 管道过滤启动方式:
- mitmdump -s mitm_get.py | python test.py
- '''
- import sys
- sys.path.append('../')
- import requests
- import time
- import mitmproxy
- from mitmproxy import http
- from mitmproxy import flow, proxy, controller, options
- from mitmproxy.proxy.server import ProxyServer
- import time
- import utils.tools as tools
-
-
- def response(flow):
- import pymongo
- client = pymongo.MongoClient("localhost:27017")
- db = client.douyin_info
- # download_url = tools.get_info(str(flow.request.url), '^(http://v\d{1}-[a-zA-Z]{2}.*?xigua.*?$)', fetch_one=True)
- download_url = tools.get_info(str(flow.request.url), '(https:\/\/lf.snssdk.com\/shorten.*?target.*?video.*)', fetch_one=True)
-
- if download_url:
- # print(f'---{download_url}---')
- record_time=tools.get_current_date()
- db.douyin_url.insert({"url": download_url,'record_time':record_time,'read_status':0})
效果截图:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。