当前位置:   article > 正文

飞书机器人webhook调用_飞书webhook

飞书webhook

调用飞书机器人webhook进行打卡提醒

import yaml

import os,re
import requests
import json
import logging
import time
import urllib3
urllib3.disable_warnings()

import datetime
import schedule

def load_yaml(config_file):
    try:
        with open(config_file, 'r') as f:
            config=yaml.safe_load(f)
            return config
    except Exception as e:
        print(str(e))
    
    return None

try:
    JSONDecodeError = json.decoder.JSONDecodeError
except AttributeError:
    JSONDecodeError = ValueError

class FeiShuBot(object):
    def __init__(self):
        '''
        机器人初始化
        :param webhook: 飞书群自定义机器人webhook地址
        :param secret: 机器人安全设置页面勾选“加签”时需要传入的密钥
        :param pc_slide: 消息链接打开方式,默认False为浏览器打开,设置为True时为PC端侧边栏打开
        :param fail_notice: 消息发送失败提醒,默认为False不提醒,开发者可以根据返回的消息发送结果自行判断和处理
        '''
        super(FeiShuBot, self).__init__()
        self._headers = {'Content-Type': 'application/json; charset=utf-8'}

    def _post(self, data):
        self._web_hook = "https://open.feishu.cn/open-apis/bot/v2/hook/c773d47a-d107-4674-9550-34086669bc9c"
        if self._web_hook is None:
            print('no valid web_hook or chat_group is selected')
            return
        
        try:
            post_data = json.dumps(data)
            response = requests.post(self._web_hook, headers=self._headers, data=post_data, verify=False)
            #print(response)
        except requests.exceptions.HTTPError as exc:
            logging.error("消息发送失败, HTTP error: %d, reason: %s" % (exc.response.status_code, exc.response.reason))
            raise
        except requests.exceptions.ConnectionError:
            logging.error("消息发送失败,HTTP connection error!")
            raise
        except requests.exceptions.Timeout:
            logging.error("消息发送失败,Timeout error!")
            raise
        except requests.exceptions.RequestException:
            logging.error("消息发送失败, Request Exception!")
            raise
        else:
            try:
                result = response.json()
            except JSONDecodeError:
                logging.error("服务器响应异常,状态码:%s,响应内容:%s" % (response.status_code, response.text))
                return {'errcode': 500, 'errmsg': '服务器响应异常'}
            else:
                logging.debug('发送结果:%s' % result)
                # 消息发送失败提醒(errcode 不为 0,表示消息发送异常),默认不提醒,开发者可以根据返回的消息发送结果自行判断和处理
                # if self._fail_notice and result.get('errcode', True):
                #     time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
                #     error_data = {
                #         "msgtype": "text",
                #         "text": {
                #             "content": "[注意-自动通知]飞书机器人消息发送失败,时间:%s,原因:%s,请及时跟进,谢谢!" % (
                #                 time_now, result['errmsg'] if result.get('errmsg', False) else '未知异常')
                #         },
                #         "at": {
                #             "isAtAll": False
                #         }
                #     }
                #     logging.error("消息发送失败,自动通知:%s" % error_data)
                #     requests.post(self._web_hook, headers=self._headers, data=json.dumps(error_data))
                return result

    # -----------------public function-----------------
    def send_notification(self):
        # 打印按指定格式排版的时间
        
        current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        message = "今天又是美好的一天"
        print(current_time)
        report_content=[
            [   {"tag": "text", "text": "当前时间: {}".format(current_time)} ],
            [   {"tag": "text", "text": "message: {}".format(message)} ],
        ]

        print('send to feishu:')

        self._post({
            "msg_type": "post",
            "content": {
                "post": {
                    "zh_cn": {
                        "title": "九点啦!快打卡!搞快点!",
                        "content": report_content,
                    }
                }
            }
        })


def job():
    print("I'm working...")
    fsb = FeiShuBot()
    fsb.send_notification()
    
if __name__ == "__main__":

    # schedule.every(5).seconds.do(job)
    # schedule.every(10).seconds.do(job)
    # schedule.every(0.25).minutes.do(job)
    # schedule.every().hour.do(job)
    schedule.every().day.at("21:00").do(job)
    # schedule.every().monday.do(job)
    # schedule.every().wednesday.at("13:15").do(job)
    
    while True:
        schedule.run_pending()
        time.sleep(1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/498601
推荐阅读
相关标签
  

闽ICP备14008679号