当前位置:   article > 正文

自动化技术之airtest安装使用教程_airtest+poco下载

airtest+poco下载

参考链接:

https://github.com/AirtestProject/Airtest/blob/master/README_zh.md

 

Airtest框架安装

使用 pip 安装Airtest框架

  1. pip install -U airtest

可视化工具安装:

工具下载链接:

http://airtest.netease.com/

 

解压安装之后运行exe出现如图所示安装成功:

 

log窗口显示adb命令被占用了:

window杀死该进程

taskkill /pid 21584 -t -f

所有的安装完事,下面我们操作一番:

 

首先连接上手机:

 

运行之后,airtest在手机上会安装两个插件,安装完成后我们就可以搞事情了。

connect_device('android:///串号1')

在python程序导包的时候会报错:

from poco.drivers.android.uiautomation import AndroidUiautomationPoco

查看poco官方文档

https://poco.readthedocs.io/en/latest/source/doc/poco_drivers.html

根据自己的需要我们安装

pip install pocoui

演示代码,抓取抖音中出现的广告,信息抓取,并截图,视频地址抓取

eg:

代码:

  1. # -*- encoding=utf8 -*-
  2. __author__ = "yongxinboy"
  3. from airtest.core.api import *
  4. from poco.drivers.android.uiautomation import AndroidUiautomationPoco
  5. device_1 = connect_device('android:///你的设备号')
  6. # 设置抖音图标,操作直接打开抖音
  7. # touch(Template(r"q.jpg", record_pos=(0.122, -0.269), resolution=(1080, 2340)))
  8. start_app("com.ss.android.ugc.aweme")
  9. poco = AndroidUiautomationPoco(device=device_1,use_airtest_input=True,screenshot_each_action=False)
  10. # 设置抖音图标,操作直接打开抖音
  11. #touch(Template(r"q.jpg", record_pos=(0.122, -0.269), resolution=(1080, 2340)))
  12. def spider_info():
  13. marks = ['d6r', 'cg_']
  14. while True:
  15. time.sleep(4)
  16. for mark in marks:
  17. mark_k = 'com.ss.android.ugc.aweme:id/{}'.format(mark)
  18. is_true = poco(name=mark_k).exists()
  19. if is_true == True:
  20. try:
  21. mark = poco(name=mark_k).get_text()
  22. title = poco(name='com.ss.android.ugc.aweme:id/title').get_text()
  23. content = poco(name='com.ss.android.ugc.aweme:id/a12').get_text()
  24. click_count = poco(name='com.ss.android.ugc.aweme:id/a2_').get_text()
  25. comment = poco(name='com.ss.android.ugc.aweme:id/t2').get_text()
  26. if title and content and click_count and comment and mark:
  27. print(mark)
  28. print(title)
  29. print(content)
  30. print(click_count)
  31. print(comment)
  32. return mark,title,content,click_count,comment
  33. except Exception as err:
  34. poco.swipe([0.5, 0.8], [0.5, 0.2])
  35. time.sleep(4)
  36. poco.swipe([0.5, 0.8], [0.5, 0.2])
  37. else:
  38. poco.swipe([0.5, 0.8], [0.5, 0.2])
  39. return []
  40. while True:
  41. print(spider_info())

 如何连接多机:

  1. from airtest.core.api import connect_device
  2. from poco.drivers.android.uiautomation import AndroidUiautomationPoco
  3. device_1 = connect_device('android:///串号1')
  4. device_2 = connect_device('android:///串号2')
  5. device_3 = connect_device('android:///串号3')
  6. poco_1 = AndroidUiautomationPoco(device_1, use_airtest_input=True, screenshot_each_action=False)
  7. poco_2 = AndroidUiautomationPoco(device_2, use_airtest_input=True, screenshot_each_action=False)
  8. poco_3 = AndroidUiautomationPoco(device_3, use_airtest_input=True, screenshot_each_action=False)
  9. poco_1(text='微信').click()
  10. poco_2(text='微信').click()
  11. poco_3(text='微信').click()

使用这种方式,就不需要来回切换,并且还可以使用多线程直接同时控制每一台手机。

类似于xpath的写法:

  1. poco("android.widget.LinearLayout").offspring("com.ss.android.ugc.aweme:id/c9a").offspring(
  2. "com.ss.android.ugc.aweme:id/b43")[0].child("android.widget.LinearLayout")[2].offspring(
  3. "com.ss.android.ugc.aweme:id/cek").click()

 

airtest还有很多高级写法,使用什么去官方文档查看就好了

附带完整代码:

 

  1. '''
  2. 单个手机
  3. 参考:https://github.com/AirtestProject
  4. 程序运行
  5. mitmdump -s douyin_spider.py
  6. mitmweb -s douyin_spider.py
  7. '''
  8. # -*- encoding=utf8 -*-
  9. import sys
  10. sys.path.append('../')
  11. __author__ = "YongXinYang"
  12. from airtest.core.api import *
  13. auto_setup(__file__)
  14. from douyin_spider_v3.spider_video_info import *
  15. from poco.drivers.android.uiautomation import AndroidUiautomationPoco
  16. import time
  17. import pymongo
  18. import re
  19. client = pymongo.MongoClient("localhost:27017")
  20. db = client.douyin_info
  21. device_1 = connect_device('android:///你的设备号')
  22. # 设置抖音图标,操作直接打开抖音
  23. # touch(Template(r"q.jpg", record_pos=(0.122, -0.269), resolution=(1080, 2340)))
  24. start_app("com.ss.android.ugc.aweme")
  25. poco = AndroidUiautomationPoco(device=device_1,use_airtest_input=True, screenshot_each_action=False)
  26. # 设置抖音图标,操作直接打开抖音
  27. # touch(Template(r"q.jpg", record_pos=(0.122, -0.269), resolution=(1080, 2340)))
  28. def re_sting(str):
  29. str = re.sub("[\!\%\[\]\,\。\?\'\"\@\.\*\&\、\:\;\$\\\]", "", str)
  30. return str
  31. def spider_info():
  32. try:
  33. ad1 = poco(text='查看详情').exists()
  34. ad2 = poco(text='视频同款商品').exists()
  35. if ad1 or ad2 == True:
  36. try:
  37. name = poco(name='com.ss.android.ugc.aweme:id/title').get_text()
  38. except:
  39. name = poco(name='com.ss.android.ugc.aweme:id/title').get_text()
  40. try:
  41. title = poco(desc='视频').child(type='android.widget.FrameLayout').child(type='android.widget.RelativeLayout').child(type='android.widget.LinearLayout').child(type='android.widget.FrameLayout').child(type='android.widget.LinearLayout').child(type='android.widget.LinearLayout').child(type='android.widget.TextView').get_text()
  42. except:
  43. title = poco(name='com.ss.android.ugc.aweme:id/title').get_text()
  44. print(name)
  45. print(title)
  46. # 截图操作
  47. #'E:\www\douyin_parser\douyin_spider_v3\info\\
  48. try:
  49. snapshot('E:\www\douyin_parser\douyin_spider_v3\image\{}.png'.format(re_sting(title)), msg=title)
  50. except Exception as err:
  51. raise err
  52. #点击分享链接
  53. poco(desc='分享,按钮').click()
  54. #左侧滑动,寻找复制链接
  55. poco.swipe([0.8268518518518518, 0.8064102564102564] ,[0.11203703703703703, 0.8064102564102564])
  56. time.sleep(2)
  57. #触发复制链接,这一部容易出现bug
  58. poco(text='复制链接').click()
  59. time.sleep(3)
  60. content_infos = db.douyin_url.find({'read_status': 0}, limit=1, sort=[("record_time", 1)])[0]
  61. url = content_infos['url']
  62. video_url = get_video(url)
  63. print(video_url)
  64. db.douyin_url.update({"_id": content_infos['_id']}, {'$set': {"read_status": 1}}, multi=True)
  65. if title and video_url:
  66. info_path = os.getcwd() + '\\信息\\{}.txt'.format(re_sting(name))
  67. with open(info_path, 'a+', encoding='utf-8') as f:
  68. f.write('视频标题:' + title + '\n' + '视频地址:' + video_url)
  69. f.flush()
  70. poco.swipe([0.5, 0.8], [0.5, 0.2])
  71. except Exception as err:
  72. print(err)
  73. poco.swipe([0.5, 0.8], [0.5, 0.2])
  74. if __name__ == '__main__':
  75. while True:
  76. time.sleep(10)
  77. spider_info()

mitmproxy代码:

  1. # -*- encoding=utf8 -*-
  2. '''
  3. 管道过滤启动方式:
  4. mitmdump -s mitm_get.py | python test.py
  5. '''
  6. import sys
  7. sys.path.append('../')
  8. import requests
  9. import time
  10. import mitmproxy
  11. from mitmproxy import http
  12. from mitmproxy import flow, proxy, controller, options
  13. from mitmproxy.proxy.server import ProxyServer
  14. import time
  15. import utils.tools as tools
  16. def response(flow):
  17. import pymongo
  18. client = pymongo.MongoClient("localhost:27017")
  19. db = client.douyin_info
  20. # download_url = tools.get_info(str(flow.request.url), '^(http://v\d{1}-[a-zA-Z]{2}.*?xigua.*?$)', fetch_one=True)
  21. download_url = tools.get_info(str(flow.request.url), '(https:\/\/lf.snssdk.com\/shorten.*?target.*?video.*)', fetch_one=True)
  22. if download_url:
  23. # print(f'---{download_url}---')
  24. record_time=tools.get_current_date()
  25. db.douyin_url.insert({"url": download_url,'record_time':record_time,'read_status':0})

效果截图:

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/276284
推荐阅读
相关标签
  

闽ICP备14008679号