赞
踩
本章介绍使用DjCelery即Django+Celery框架开发定时任务功能,在Autotestplat平台上实现单一接口自动化测试脚本、业务场景接口自动化测试脚本、App自动化测试脚本、Web自动化测试脚本等任务的定时执行、调度、管理等,从而取代Jenkins上的定时执行脚本和发送邮件等功能。**
自动化测试逻辑流程图11.1所示。
11.1 环境搭建
1.安装
步骤1 安装Celery。pyramid_celery-3.0.0,
配置https://pypi.python.org/pypi/pyramid_celery/。
步骤2 安装django-clery。django-celery-3.2.2,
配置https://pypi.python.org/pypi/django- celery。 INSTALLED_APPS= []
加入2:
'djcelery', 运行 Python manage.py migrate
步骤 3 安装celery-with-redis-3.0,
地址为https://pypi.python.org/pypi/celery-with-redis/。
步骤 4 安装django-clery-beat。django-celery-beat-1.1.0,
https://pypi.python.org/pypi/ django_celery_beat。
步骤5 下载Redis-x64-3.2.100,
Redis-x64-3.2.100.zip github.com/MicrosoftAr…
2.配置
步骤1 在Settings.py中增加如下内容。
加入1:
import djcelery
djcelery.setup_loader() #加载djcelery
加入2:
#数据库调度
CELERYBEAT_SCHEDULER ='djcelery.schedulers.DatabaseScheduler'
加入3:
BROKER_URL = 'redis://127.0.0.1:6379/0'
BROKER_TRANSPORT = 'redis'
步骤2 在应用Apitest目录下新建celery.py文件1,加入如下内容。
from future import absolute_import
import os
import django
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE','autotest.settings')
django.setup()
app = Celery('autotest')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda:settings.INSTALLED_APPS)
步骤3 新建tasks.py文件,加入如下内容。
importrequests, time, sys, re
importurllib, zlib#,
importpymysql
importunittest
from traceimport CoverageResults
importjson
fromidlelib.rpc import response_queue
fromapitest.celery import app
from timeimport sleep
@app.task
def hello_world():
print('已运行')
步骤4 启动服务python manage.py runserver。
步骤5 解压缩后,运行CMD,切换到相应目录,输入启动Redis指令redis-server redis. windows.conf,成功后出现如图11.2所示界面。
▲图11.2
步骤6 启动指令python manage.py celery worker -l info。
步骤7 启动指令python manage.py celery beat。
11.2 前端功能实现
1.功能描述
完成实现单一接口测试用例、业务场景接口API测试用例、AppUI测试用例、WebUI测试用例的自动化定时任务。
2.程序清单
在autotest\apitest\templates目录下新建periodic_task.html文件,加入如下内容。
<html>
<head>
{% load bootstrap4 %}
{% bootstrap_css %}
{% bootstrap_javascript %}
<title>产品自动化测试平台</title>
<link rel="stylesheet"type="text/css" href="/static/admin/css/forms.css" />
<script type="text/javascript"src="/admin/jsi18n/"></script>
<script type="text/javascript"src="/static/admin/js/vendor/jquery/jquery.js"></script>
<script type="text/javascript"src="/static/admin/js/jquery.init.js"></script>
<script type="text/javascript"src="/static/admin/js/core.js"></script>
<script type="text/javascript"src="/static/admin/js/admin/RelatedObjectLookups.js"></script>
<script type="text/javascript"src="/static/admin/js/actions.js"></script>
<script type="text/javascript"src="/static/admin/js/urlify.js"></script>
<script type="text/javascript"src="/static/admin/js/prepopulate.js"></script>
<script type="text/javascript"src="/static/admin/js/vendor/xregexp/xregexp.js"></script>
<meta name="viewport"content="user-scalable=no, width=device-width, initial-scale=1.0,maximum-scale=1.0">
<link rel="stylesheet"type="text/css" href="/static/admin/css/responsive.css"/>
<meta name="robots"content="NONE,NOARCHIVE" />
</head>
<body role="document">
<!-- 导航栏-->
<nav class="navbar navbar-expand-smbg-dark navbar-dark fixed-top">
<div>
<ahref="#"> </a>
<ul>
</ul>
<ul>
<li><astyle='color:white' href="#"></a></li>
<li><astyle='color:white' href="/logout/"></a></li>
</ul>
</div>
</nav>
<!-- 搜索栏-->
<divstyle="padding-top: 70px;">
<formmethod="get" action="/tasksearch/">
{% csrf_token %}
<input type="search"name="task" placeholder="名称" required>
<button type="submit">搜索</button>
<!-- 增加定时任务-->
<div style="float:right;width:73%">
<select name="PeriodicTask"id="PeriodicTask">
<option value="" selected>----定时任务----</option>
</select>
<a id="change_id_PeriodicTask"data-href-template="/admin/djcelery/periodictask/fk/change/?_to_field=id&_popup=1"title="更改选中的定时任务">
<imgsrc="/static/admin/img/icon-changelink.svg" alt="修改"/>
</a>
<a style='color:light blue' id="add_id_PeriodicTask" href="/admin/djcelery/periodictask/add/?_to_field=id&_popup=1"title="增加另一个测试用例">
<imgsrc="/static/admin/img/icon-addlink.svg" alt="增加"/>增加
</a>
</form>
</div>
<!-- 任务计划列表-->
<divstyle="padding-top: 20px;">
<div>
<table class="table table-striped">
<thead>
<tr>
<th>ID</th><th>任务名称</th><th>任务模块</th><th>时间计划</th><th>修改时间</th><th>开启</th><th>立即</th><th>编辑</th><th>删除</th>
</tr>
</thead>
<tbody>
{% for task in tasks %}{% for periodic inperiodics %}
<tr>
{% if task.interval_id != null andtask.interval_id == periodic.id %}
<td>{{ task.id }}</td>
<td>{{ task.name }}</td>
<td>{{ task.task }}</td>
<td><a style='color:green'>每{{ periodic.period }} {{ periodic.every}}次</a></td>
<td>{{ task.date_changed }}</td>
<td>{{ task.enabled }}</td>
<td>{% if task.id == 1 %}
<a href="../task_apis"target="mainFrame">运行</a>
{% elif task.id == 2 %}
<a href="../task_apitest"target="mainFrame">运行</a>
{% else %}
{% endif %}
</td>
<td><a style='color:light blue'class="related-widget-wrapper-link add-related"id="add_id_Apitest" href="../admin/djcelery/periodictask/{{task.id }}/change/?_to_field=id&_popup=1"><imgsrc="/static/admin/img/icon-changelink.svg"/></a></td>
<td><a style='color:light blue'class="related-widget-wrapper-link add-related" id="add_id_Apitest"href="../admin/djcelery/periodictask/{{ task.id}}/delete/?_to_field=id&_popup=1"><imgsrc="/static/admin/img/icon-deletelink.svg"/></a></td>
{% else %}
{% endif %}
{% for crontab in crontabs %}
{% if task.crontab_id != null and task.crontab_id ==crontab.id and task.interval_id == 1 %}
<td>{{ task.id }}</td>
<td>{{ task.name }}</td>
<td>{{ task.task }}</td>
<td><a style='color:green'>{{crontab.month_of_year }}年{{crontab.day_of_month }}月{{crontab.day_of_week }}日{{crontab.hour }}时{{ crontab.minute}}分</a></td>
<td>{{ task.date_changed }}</td>
<td>{{ task.enabled }}</td>
<td><a href="../task_apis"target="mainFrame">运行</a></td>
<td><a style='color:light blue'class="related-widget-wrapper-link add-related"id="add_id_Apitest" href="../admin/djcelery/periodictask/{{task.id }}/change/?_to_field=id&_popup=1"><imgsrc="/static/admin/img/icon-changelink.svg"/></a></td>
<td><a style='color:light blue'class="related-widget-wrapper-link add-related"id="add_id_Apitest" href="../admin/djcelery/periodictask/{{task.id }}/delete/?_to_field=id&_popup=1"><imgsrc="/static/admin/img/icon-deletelink.svg"/></a></td>
{% else %}
{% endif %}
{% endfor %}{% endfor %}{% endfor %}
</tbody>
</table>
</div>
</div>
<span style="position:absolute;right:100px; bottom:30px;"> {# 把翻页功能固定显示在右下角#}
<div style="position:absolute; right:100px; width:100px; ">
<tr><th>总数</th><td>{{ taskcounts }}</td></tr> {# 前端读取定义的变量#}
</div>
<div>
- <ulclass="pagination" id="pager">
-
- {#上一页链接开始#}
-
- {%if tasks.has_previous %}
-
- {# 如果有上一页,则正常显示上一页链接#}
-
- <li><ahref="/periodic_task/?page={{ tasks.previous_page_number }}">上一页</a></li> {# 上一页标签 #}
-
- {%else %}
-
- <li class="previous disabled"><ahref="#">上一页</a></li>{# 如果当前不存在上一页,则上一页的链接不可单击#}
-
- {%endif %}
-
- {# 上一页链接开始#}
-
-
-
- {%for num in tasks.paginator.page_range %}
-
-
-
- {% if num == currentPage %}
-
- <li><a href="/periodic_task/?page={{ num }}">{{ num}}</a></li> {#显示当前页数链接#}
-
- {% else %}
-
- <liclass="item"><a href="/periodic_task/?page={{ num}}">{{ num }}</a></li>
-
- {% endif %}
-
- {% endfor %}
-
-
-
- {# 下一页链接开始#}
-
- {% if tasks.has_next %} {# 如果有下一页,则正常显示下一页链接#}
-
- <liclass="next"><a href="/periodic_task/?page={{tasks.next_page_number }}">下一页</a></li>
-
- {% else %}
-
- <li><a href="#">下一页</a></li>
-
- {% endif %}
-
- {# 下一页链接结束#}
-
- </ul>
</div>
</body>
</html>
功能描述:实现自动化测试任务调度执行,包括单一接口、扫描、流程接口、业务场景、Web搜索、自动化平台测试开发、App登录,CSDN定时任务注册,定时任务执行等功能。
程序清单:在apitest/views.py中加入如下内容。
from .tasks importhello_world
from .tasks importtest_readSQLcase
from djcelery.modelsimport PeriodicTask,CrontabSchedule,IntervalSchedule
@login_required
defperiodic_task(request):
- username = request.session.get('user', '')
-
- task_list = PeriodicTask.objects.all()
-
- task_count =PeriodicTask.objects.all().count() #统计数
-
- periodic_list =IntervalSchedule.objects.all() # 周期任务(如每隔1小时执行1次)
-
- crontab_list =CrontabSchedule.objects.all() # 定时任务(如某年某月某日的某时,每# 天的某时)
-
- paginator = Paginator(task_list, 5) #生成paginator对象,设置每页显示5条记录
-
- page = request.GET.get('page',1) #获取当前的页码数,默认为第1页
-
- currentPage=int(page) #把获取的当前页码数转换成整数类型
-
- try:
-
- task_list = paginator.page(page)#获取当前页码数的记录列表
-
- except PageNotAnInteger:
-
- task_list = paginator.page(1)#如果输入的页数不是整数,则显示第1页内容
-
- except EmptyPage:
-
- task_list =paginator.page(paginator.num_pages)#如果输入的页数不在系统的页数中,# 则显示最后一页内容
-
- return render(request,"periodic_task.html", {"user": username,"tasks":task_list,"taskcounts": task_count, "periodics":periodic_list,"crontabs": crontab_list })
@login_required
deftasksearch(request):
- username = request.session.get('user', '')# 读取浏览器登录Session
-
- search_name =request.GET.get("task", "")
-
- task_list = PeriodicTask.objects.filter(task__icontains=search_name)
-
- periodic_list =IntervalSchedule.objects.all() # 周期任务(如每隔1小时执行1次)
-
- crontab_list =CrontabSchedule.objects.all() # 定时任务(如某年某月某日的某时,每# 天的某时)
-
- return render(request,'periodic_task.html',{"user": username,"tasks":task_list,"periodics":periodic_list,"crontabs": crontab_list })
在autotest/urls.py中加入:
path('periodic_task/',views.periodic_task),
path('tasksearch/', views.tasksearch),
在apitest/left.html中加入:
<tr> <td>
- <li>
-
- <a href="../periodic_task"target="mainFrame">
-
- <iclass="glyphicon glyphicon-fire"></i>
-
- 任务计划
-
- </a>
-
- </li>
<tr><td>&nbsp;</td></tr>
查看前端页面效果,如图11.3所示。
▲图11.3
行动吧,在路上总比一直观望的要好,未来的你肯定会感 谢现在拼搏的自己!如果想学习提升找不到资料,没人答疑解惑时,请及时加入扣群: 320231853,里面有各种软件测试+开发资料和技术可以一起交流学习哦。
最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:
这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。