当前位置:   article > 正文

Python+Flask实现全国、全球疫情大数据可视化(三):flask定时任务每日爬取数据,前后端交互展示_flask如何定时抓取数据

flask如何定时抓取数据

相关文章

Python+Flask实现全国、全球疫情大数据可视化(一):爬取疫情数据并保存至mysql数据库
Python+Flask实现全国、全球疫情大数据可视化(二):网页页面布局+echarts可视化中国地图、世界地图、柱状图和折线图

一、实现效果

最近简单学习了一下flask,决定来做一个疫情大数据的网页出来。
话不多说先上效果图。还是比较喜欢这样的排版的。
在这里插入图片描述

二、编写工具类utils从数据库中获取数据

我们首先要明确每个部分要获取哪些信息。
我们先看下数据格式
在这里插入图片描述
比如累计确诊的数据,我们只需要统计出当日的各地区确诊人数之和即能得到总的确诊人数,而不是对整列直接求和。因此每个表格的数据怎么获得需要简单思考一下。以下是工具类的代码。由于我们会处理两张结构一样的表,因此类的初始化参数定义为数据库库名称、、数据库用户名、数据库密码
关于查询,是通过pandas读取数据库表,由pandas来操作数据。个人觉得这样写起来更方便。

import datetime
import traceback

import pandas as pd
import pymysql
from sqlalchemy import create_engine


class utils:
    def __init__(self, db_name, user, password):
        self.db_name = db_name
        self.user = user
        self.password = password
        self.data = self.query("epidemic")

    def get_conn(self):
        '''

        :return:连接
        '''
        # 创建连接
        conn = pymysql.connect(host="127.0.0.1", port=3306, user=self.user, password=self.password,
                               db=self.db_name, charset="utf8")
        cursor = conn.cursor()
        return conn, cursor

    def close_conn(self, conn, cursor):
        cursor.close()
        conn.close()

    def query(self, table_name, use_sql=None):
        """
        :param sql:
        :return:epidemic: DataFrame
        """
        # 使用pandas从数据库中读取疫情数据
        try:
            conn = create_engine('mysql://{}:{}@localhost:3306/{}?charset=utf8'.format(self.user, self.password, self.db_name))
            sql = use_sql if use_sql else "select * from {}".format(table_name)
            epidemic = pd.read_sql(sql, con=conn)
            return epidemic
        except Exception as e:
            traceback.print_exc(e)
            return None

    def get_c1_data(self):
        '''
        获取c1的四个数据:累计确诊、累计治愈、累计死亡、新增死亡
        :return:
        '''
        sql = "select date, confirm, nowConfirm,heal,dead from china_day order by date desc limit 1"
        df = self.query("china_day", sql)
        confirm = int(df["confirm"][0])
        heal = int(df["heal"][0])
        dead = int(df["dead"][0])
        now_confirm = int(df["nowConfirm"][0])

        return [confirm, heal, dead, now_confirm]

    def get_c2_data(self):
        '''
        获取中国各省的疫情数据
        :return:
        '''
        # 将地区-确诊人数以键值对的形式保存
        dict = {}

        # 获取最新数据
        # 从数据库中获取最近一次更新的数据
        sql = "select distinct 疫情地区,日期,确诊 from china_total_epidemic order by 日期 desc limit 34"
        df = self.query("china_total_epidemic", sql)
        for p, v in zip(df.疫情地区, df.确诊):
            dict[p] = v
        return dict

    def get_l1_data(self):
        '''
        获取疫情期间每日累计数据
        :return:
        '''
        sql = "select date, confirm, heal,dead from china_day order by date desc"
        df = self.query("", sql)
        return df

    def get_l2_data(self):
        '''
        获取疫情期间每日新增数据
        :return:
        '''
        sql = "select date, confirm,heal,dead from china_day_add order by date desc"
        df = self.query("",sql)
        return df

    def get_r1_data(self):
        '''
        获取除湖北地区确诊人数最多的省份
        :return:
        '''
        sql = "select 疫情地区,确诊 from world_epidemic order by 日期 desc limit 5"
        df = self.query("",sql)
        return df

    def get_r2_data(self):
        '''
        获取世界各国的疫情数据
        :return:
        '''
        df = self.query("world_epidemic")
        return df[["name","疫情地区","确诊"]]


if __name__ == "__main__":
    u = utils("myspider", "root", "123456")
    u.get_c1_data()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115

三、编写app.py

接着我们来编写flask的核心类app.py。注册路由。我们的路由命名规则直接按我们定义的main.html中的各部分的id名来命名。即左边第一个为l1,中间第二个为c2等等…。在各个方法中,我们需要将从数据库中获取的数据转换为json格式,用以传入到前台页面。
此外,我们还需要注册定时任务,每天执行一次爬虫脚本,更新我们的数据库信息。

1、注册定时任务

flask有专门注册定时任务的库flask_apscheduler
我们定时任务的执行,只需要调用我们自定义的命令就行,即python spider.py即可
注册定时任务逻辑如下。在flask服务启动前运行即可注册成功。

from flask_apscheduler import APScheduler
def crawl_daily_data():
    """
    定时任务,每天爬取一次数据
    :return:
    """
    cur_path = os.path.dirname(os.path.abspath(__file__))
    os.system("python {}".format(os.path.join(cur_path, "spider.py")))

if __name__ == '__main__':
    # 定时任务 ,间隔一天执行
    scheduler=APScheduler()
    scheduler.add_job(crawl_daily_data,'interval',days=1)
    scheduler.init_app(app=app)
    scheduler.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

2、flask路由注册完整代码

代码如下:

import os
import time

from flask import Flask
from flask import jsonify
from flask import render_template
from flask_apscheduler import APScheduler

import utils

app = Flask(__name__)

# 工具类,初始化参数为数据库名,数据库表名,数据库账号,数据库密码
u = utils.utils("myspider", "root", "123456")


def crawl_daily_data():
    """
    定时任务,每天爬取一次数据
    :return:
    """
    cur_path = os.path.dirname(os.path.abspath(__file__))
    os.system("python {}".format(os.path.join(cur_path, "spider.py")))


@app.route('/')
def hello_world():
    return render_template("main.html")


@app.route('/time')
def get_time():
    time_str = time.strftime("%Y{}%m{}%d{}%X")
    return time_str.format("年", "月", "日")


@app.route('/c1')
def get_c1_data():
    data = u.get_c1_data()
    print(data)
    return jsonify({"dignose": data[0], "heal": data[1], "dead": data[2], "newly_add": data[3]})


@app.route('/c2')
def get_c2_data():
    res = []
    data = u.get_c2_data()
    for key, value in data.items():
        res.append({"name": key, "value": value})
    return jsonify({"data": res})


@app.route('/l1')
def get_l1_data():
    data = u.get_l1_data()
    day = data.date.tolist()
    dignose = data.confirm.tolist()
    heal = data.heal.tolist()
    dead = data.dead.tolist()
    return jsonify({"days": day, "dignose": dignose, "heal": heal, "dead": dead})


@app.route('/l2')
def get_l2_data():
    data = u.get_l2_data()
    day = data.date.tolist()
    dignose = data.confirm.tolist()
    heal = data.heal.tolist()
    dead = data.dead.tolist()
    return jsonify({"days": day, "dignose": dignose, "heal": heal, "dead": dead})


@app.route('/r1')
def get_r1_data():
    data = u.get_r1_data()
    keys = data.疫情地区.tolist()
    values = data.确诊.tolist()
    return jsonify({"keys": keys, "values": values})


@app.route('/r2')
def get_r2_data():
    res = []
    data = u.get_r2_data()
    for key, value in zip(data.name, data.确诊):
        res.append({"name": key, "value": value})
    # 还需要添加中国的总数据
    china = int(u.get_c1_data()[0])
    res.append({"name": "China", "value": china})
    return jsonify({"data": res})


if __name__ == '__main__':

    # 定时任务 ,间隔一天执行
    scheduler=APScheduler()
    scheduler.add_job(crawl_daily_data,'interval',days=1)
    scheduler.init_app(app=app)
    scheduler.start()
    app.run()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101

四、ajax实现数据交互

这里我们定义一个控制器controller.js。用来获取当前时间,以及将数据库中得到的信息传递到echarts中并显示。


function gettime() {
    $.ajax({
        url:"/time",
        timeout:10000,
        success:function (data) {
            $("#time").html(data)
        },
        error:function (xhr,type,errorThrown) {

        }
    })
}

function get_c1_data(){
       $.ajax({
        url:"/c1",
        timeout:10000,
        success:function (data) {
            $(".num h1").eq(0).text(data.dignose)
            $(".num h1").eq(1).text(data.heal)
            $(".num h1").eq(2).text(data.dead)
            $(".num h1").eq(3).text(data.newly_add)
        },
        error:function (xhr,type,errorThrown) {

        }
    })
}

function get_c2_data(){
    $.ajax({
        url:"/c2",
        timeout:10000,
        success:function (data) {
            ec_center_option.series[0].data=data.data
            ec_center.setOption(ec_center_option)
        },
        error:function (xhr,type,errorThrown) {

        }
    })
}

function get_l1_data(){
    $.ajax({
        url:"/l1",
        timeout:10000,
        success:function (data) {
            ec_left1_option.xAxis.data=data.days
            ec_left1_option.series[0].data=data.dignose
            ec_left1_option.series[1].data=data.heal
            ec_left1_option.series[2].data=data.dead
            ec_left1.setOption(ec_left1_option)
        },
        error:function (xhr,type,errorThrown) {

        }
    })
}

function get_l2_data(){
    $.ajax({
        url:"/l2",
        timeout:10000,
        success:function (data) {
            ec_left2_option.xAxis.data=data.days
            ec_left2_option.series[0].data=data.dignose
            ec_left2_option.series[1].data=data.heal
            ec_left2_option.series[2].data=data.dead
            ec_left2.setOption(ec_left2_option)
        },
        error:function (xhr,type,errorThrown) {

        }
    })
}

function get_r1_data(){
    $.ajax({
        url:"/r1",
        timeout:10000,
        success:function (data) {
            ec_right1_option.xAxis.data=data.keys
            ec_right1_option.series[0].data=data.values
            ec_right1.setOption(ec_right1_option)
        },
        error:function (xhr,type,errorThrown) {

        }
    })
}

function get_r2_data(){
    $.ajax({
        url:"/r2",
        timeout:10000,
        success:function (data) {
            ec_right2_option .series[0].data=data.data
            ec_right2.setOption(ec_right2_option)

        },
        error:function (xhr,type,errorThrown) {

        }
    })
}

gettime()

get_c1_data()
get_c2_data()
get_l1_data()
get_l2_data()
get_r1_data()
get_r2_data()
setInterval(gettime,1000)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117

五、完整项目获取

关注以下公众号,回复"0007"即可get完整项目源码

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/143341
推荐阅读
相关标签
  

闽ICP备14008679号