赞
踩
# Windows系统: pip3 install aliyun-python-sdk-core-v3 pip3 install aliyun-python-sdk-domain pip3 install aliyun-python-sdk-alidns pip3 install requests pip3 install apscheduler # CentOS 7 系统: pip3 install aliyun-python-sdk-core-v3==2.13.10 pip3 install aliyun-python-sdk-domain pip3 install aliyun-python-sdk-alidns pip3 install requests pip3 install apscheduler # 在系统下先执行 openssl version 查看ssl版本,如是低于1.1.1版本者需要安装指定ssl版本 # 安装指定版本的 urllib3 库,请确保指定的版本与您当前的环境和其他依赖项兼容 pip3 install urllib3==1.25.10
import json import sys import os import requests import logging from logging.handlers import RotatingFileHandler from apscheduler.schedulers.blocking import BlockingScheduler from aliyunsdkcore.client import AcsClient from aliyunsdkalidns.request.v20150109.DescribeSubDomainRecordsRequest import DescribeSubDomainRecordsRequest from aliyunsdkalidns.request.v20150109.AddDomainRecordRequest import AddDomainRecordRequest from aliyunsdkalidns.request.v20150109.UpdateDomainRecordRequest import UpdateDomainRecordRequest from aliyunsdkalidns.request.v20150109.DeleteSubDomainRecordsRequest import DeleteSubDomainRecordsRequest def record_log(): """日志记录模块""" # 获取当前脚本所在的目录路径 script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) log_filename = os.path.join(script_dir, "alyddns.log") # 日志格式 log_format = "%(asctime)s %(levelname)s: %(message)s" # 设置 apscheduler 调度器的日志级别 logging.getLogger('apscheduler').setLevel(logging.ERROR) # 创建 Logger 实例, 日志全局记录级别为 INFO logger = logging.getLogger() logger.setLevel(logging.INFO) # 配置控制台输出 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter(log_format)) logger.addHandler(console_handler) # 配置文件输出, maxBytes文件大小(1M), backupCount文件数量 file_handler = RotatingFileHandler( filename=log_filename, maxBytes=1*1024*1024, backupCount=2, encoding='utf-8' ) file_handler.setLevel(logging.ERROR) file_handler.setFormatter(logging.Formatter(log_format)) logger.addHandler(file_handler) return logger # 日志 LOGGER = record_log() # 阿里云DDNS class DnsController: def __init__(self, access_key_id, access_key_secret, region): """ 初始化 AcsClient Args: access_key_id (str): 阿里云访问密钥 ID access_key_secret (str): 阿里云访问密钥 密钥 region (str): 设置区域, 默认cn-shenzhen """ self.client = AcsClient(access_key_id, access_key_secret, region) def add(self, DomainName, RR, Type, Value): """ 添加新的域名解析记录 Args: set_DomainName: 传入域名 set_RR: 传入主机记录 set_Type: 传入记录类型 set_Value: 传入记录值 """ request = AddDomainRecordRequest() request.set_accept_format('json') request.set_DomainName(DomainName) request.set_RR(RR) request.set_Type(Type) request.set_Value(Value) response = self.client.do_action_with_exception(request) return json.loads(response) def update(self, RecordId, RR, Type, Value): """ 修改指定子域名解析记录 Args: set_RecordId: 传入指定子域名解析的RecordId set_RR: 传入主机记录 set_Type: 传入记录类型 set_Value: 传入记录值 """ request = UpdateDomainRecordRequest() request.set_accept_format('json') request.set_RecordId(RecordId) request.set_RR(RR) request.set_Type(Type) request.set_Value(Value) response = self.client.do_action_with_exception(request) return json.loads(response) def delete(self, DomainName, RR): """ 删除指定子域名所有解析记录 Args: set_DomainName: 传入域名 set_RR: 传入主机记录 """ request = DeleteSubDomainRecordsRequest() request.set_accept_format('json') request.set_DomainName(DomainName) request.set_RR(RR) response = self.client.do_action_with_exception(request) return json.loads(response) def get_domain_res_record(self, host_record, domain_name): """ 获取指定子域名解析记录 Args: set_SubDomain: 传入主机记录.域名, 例如blog.csdn.net API请求就会针对子域名blog.csdn.net进行操作, 获取该子域名的解析记录 """ request = DescribeSubDomainRecordsRequest() request.set_accept_format('json') request.set_SubDomain(host_record + "." + domain_name) response = self.client.do_action_with_exception(request) return json.loads(response) def obtain_public_ip(self, mode, Public_iP, json_param=None): """获取公网IP""" data = requests.get(Public_iP).text if json_param: # 利用split将字段名拆分成多个层级 field_names = json_param.split('.') field_value = json.loads(data) # 通过字典的get方法逐层获取字段值 for field_name in field_names: field_value = field_value.get(field_name) LOGGER.info(f"获取到{mode}地址 {field_value}") return field_value else: LOGGER.info(f"获取到{mode}地址 {data}") return data def domain_name_analysis(self, mode, domain_name, host_record, record_type, Public_iP, json_param=None): """公网IP解析至域名""" # 获取当前子域名的所有解析列表 domain_list = self.get_domain_res_record(host_record, domain_name) # 获取公网IP地址 get_ip = self.obtain_public_ip(mode, Public_iP, json_param) if domain_list['TotalCount'] == 0: self.add(domain_name, host_record, record_type, get_ip) LOGGER.info("新建域名解析成功") elif domain_list['TotalCount'] == 1: if domain_list['DomainRecords']['Record'][0]['Value'] != get_ip: self.update(domain_list["DomainRecords"]["Record"][0]["RecordId"], host_record, record_type, get_ip) LOGGER.info("修改域名解析成功—update") else: LOGGER.info(f"{mode}地址没变") else: self.delete(domain_name, host_record) self.add(domain_name, host_record, record_type, get_ip) LOGGER.info("修改域名解析成功—delete") if __name__ == "__main__": # 阿里云账号访问密钥ID access_key_id = "value" # 阿里云账号访问密钥 access_key_secret = "value" # 配置区域, 默认深圳, 可修改 region = "cn-shenzhen" # 解析类型:ipv4 & ipv6 mode = "ipv4" # 域名解析 主域名 domain_name = "csdn.net" # 域名解析 主机记录 ,如无需子域名填入@ host_record = "@" # 域名解析 记录类型, ipv4填A & ipv6 填AAAA record_type = "A" # 获取公网ip的地址 # 返回json格式,通过json_param提取相应的值 # 返回非json格式,返回值必须是公网IP,且json_param必须为空 # 常用获取IP地址: # https://ifconfig.me/ip # https://4.ipw.cn/api/ip/myip?json 提取值:IP # https://6.ipw.cn/api/ip/myip?json 提取值:IP Public_iP = "https://4.ipw.cn/api/ip/myip?json" # json参数: value 或 value.IP 或 None # 嵌套JSON对象用 . 获取,如 value.IP json_param = "IP" def execute(): run = DnsController(access_key_id, access_key_secret, region) run.domain_name_analysis(mode, domain_name, host_record, record_type, Public_iP, json_param) try: # 先执行一次任务 execute() # BlockingScheduler调度器,适用于小型应用程序或简单任务调度的场景 scheduler = BlockingScheduler(timezone='Asia/Shanghai') # 按间隔一定时间执行任务:seconds=秒;minutes=分钟;hours=小时,时间建议不要低于8分钟 scheduler.add_job(execute, 'interval', minutes=10) scheduler.start() except Exception as e: LOGGER.error(e)
1.把alyddns.py文件上传到服务器“/home”目录下,并赋予权限
cd /home
chmod +x alyddns.py
2.脚本添加到systemd服务管理器中
①创建一个新的服务单元文件 vi /etc/systemd/system/alyddns.service ②在该文件中,插入以下内容: ------------------- [Unit] Description=aly DDNS After=network.target network-online.target systemd-networkd-wait-online.service [Service] WorkingDirectory=/home/ ExecStart=/usr/bin/python3 /home/alyddns.py Restart=on-failure RestartSec=10s KillMode=mixed [Install] WantedBy=multi-user.target ------------------- Description:描述该服务的文本,可以随意命名,用于标识该服务的目的 WorkingDirectory: 指定服务的工作目录 ExecStart: 指定要执行的命令和脚本(/usr/bin/python3程序路径,根据实际情况替换。/home/alyddns.py脚本路径) 如是可执行程序,直接 ExecStart=/home/alyddns 即可。 ③保存并关闭文件,重新加载systemd配置 sudo systemctl daemon-reload ④启用服务以在开机时自动运行 sudo systemctl enable alyddns.service ⑤启动服务,使其立即生效 sudo systemctl start alyddns.service ⑥查看服务状态 sudo systemctl status alyddns.service # 停止服务 sudo systemctl stop alyddns.service
以上方法是需要在python环境下运行alyddns.py文件,我们也可以把脚本打包,直接运行打包后的程序。脚本打包文章链接
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。