当前位置:   article > 正文

Python 批量推送微信公众号模板消息_python微信公众号订阅通知

python微信公众号订阅通知

1. 依赖包

Django==3.1.3
django-redis==4.12.1
redis==3.5.3
requests==2.24.0
  • 1
  • 2
  • 3
  • 4

2. 消息推送的步骤

  1. 获取 access_token;
  2. 构建数据;
  3. 发送推送请求。

3. 功能描述

3.1. 注意事项

  1. 要注意模板消息的类型,一次性订阅消息长期订阅消息
  1. 一次性订阅消息

一次性订阅消息用于解决用户使用小程序后,后续服务环节的通知问题。用户自主订阅后,开发者可不限时间地下发一条对应的服务消息;每条消息可单独订阅或退订。

  1. 长期订阅消息

一次性订阅消息可满足小程序的大部分服务场景需求,但线下公共服务领域存在一次性订阅无法满足的场景,如航班延误,需根据航班实时动态来多次发送消息提醒。为便于服务,我们提供了长期性订阅消息,用户订阅一次后,开发者可长期下发多条消息。

目前长期性订阅消息仅向政务民生、医疗、交通、金融、教育等线下公共服务开放,后期将逐步支持到其他线下公共服务业务。

  1. access_token的有效期目前为2个小时,需定时刷新,重复获取将导致上次获取的access_token失效。

3.2. 消息推送模板的参数说明

参数是否必填说明
touser接收者openid
template_id模板ID
url模板跳转链接(海外帐号没有跳转能力)
miniprogram跳小程序所需数据,不需跳小程序可不用传该数据
appid所需跳转到的小程序appid(该小程序appid必须与发模板消息的公众号是绑定关联关系,暂不支持小游戏)
pagepath所需跳转到小程序的具体页面路径,支持带参数,(示例index?foo=bar),暂不支持小游戏
data模板数据
color模板内容字体颜色,不填默认为黑色

3.3. 消息推送模块代码

import copy
import json

import redis
import requests
from django.conf import settings
from django_redis import get_redis_connection


class WeChatMsg:

    def __init__(self, app_id=settings.WX_APP_ID, secret=settings.WX_APP_SECRET, template=None):
        self.app_id = app_id
        self.secret = secret
        self.template = {
            "touser": "",
            "template_id": "",
            "url": "",
            "data": {
                "first": {
                    "value": "",
                },
                "keyword1": {
                    "value": "",
                },
                "keyword2": {
                    "value": "",
                },
                "remark": {
                    "value": "",
                },
            }
        }
        if template:
            self.template = template
        self.access_token = None
        self.req_list = list()
        self.user_list = list()
        self.data_list = list()

    def _get_token(self, force_update=False):
        key_name = 'wechat_token_{}'.format(self.app_id)
        if force_update:
            self.access_token = None
        else:
            self.access_token = get_data(key_name)
        if not self.access_token:
            url = "https://api.weixin.qq.com/cgi-bin/token?"
            payload = {
                'grant_type': 'client_credential',
                'appid': self.app_id,
                'secret': self.secret,
            }
            response = requests.get(url, params=payload, timeout=50)
            access_token = response.json().get("access_token")
            if access_token:
                set_data(key_name, access_token, ex=7100)
                self.access_token = get_data(key_name)
        print('access_token', self.access_token)

    def make_data_list(self):
        from main.models import User
        user_openid_list = User.objects.filter(
            pk__in=self.user_list
        ).exclude(
            openid=''
        ).exclude(
            openid__isnull=True
        ).values_list('openid', flat=True)
        for openid in user_openid_list:
            user_template = copy.deepcopy(self.template)
            user_template['touser'] = openid
            self.data_list.append(user_template)

    def post_data(self):
        # 获取 token
        self._get_token()
        url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={}".format(self.access_token)
        # 准备数据
        if self.user_list:
            self.make_data_list()
        # 发送请求
        for data in self.data_list:
            json_template = json.dumps(data)
            res = requests.post(url, data=json_template)
            print('post_data', res.text)


def get_redis():
    """
    获取 Redis 的连接
    """
    pool = get_redis_connection('default').connection_pool
    r = redis.Redis(connection_pool=pool)
    return r


def set_data(name, value, **kwargs):
    # 将单条数据存入redis缓存
    r = get_redis()
    value = json.dumps(value)
    r.set(name, value, **kwargs)


def get_data(name):
    # 取出单条数据
    r = get_redis()
    value = r.get(name)
    if value:
        value = json.loads(value)
    return value

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112

3.4. 消息推送模块的调用

def send_wechat_msg():
    w = WeChatMsg()
    w.template = {
        "touser": "",
        "template_id": "",
        "url": "",
        "data": {
            "first": {
                "value": "",
            },
            "keyword1": {
                "value": "",
            },
            "keyword2": {
                "value": "",
            },
            "remark": {
                "value": "",
            },
        }
    }
    w.user_list = [1, 2]
    w.post_data()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/70638
推荐阅读
相关标签
  

闽ICP备14008679号