赞
踩
- from datetime import datetime
- import time
- import threading
- from queue import Queue
- from queue import Empty
- import pymssql
- import zipfile
- import os
- from minio import Minio
- from minio.error import S3Error
- import schedule
-
-
- class SqlBak(object):
-
- def init(self):
- self.connect = pymssql.connect('127.0.0.1', 'cssoft', 'cssoft', 'master') #建立连接
- self.db_name = "abs_data"
- self.db_path = "c:\\sqlbak\\abs\\"
- self.bak_name = "c:\\sqlbak\\abs\\abs_data.bak"
- self.zip_name = "abs_data.zip"
- self.zip_path = "c:\\sqlbak\\abs_data.zip"
-
- if self.connect:
- print("连接成功!")
-
-
- def bak(self):
- print("开始备份!")
- cursor = self.connect.cursor()
- self.connect.autocommit(True)
- cursor.execute(f"BACKUP DATABASE {self.db_name} TO DISK = N'{self.bak_name}' WITH INIT , NOUNLOAD , NOSKIP , STATS = 10, NOFORMAT")
- self.connect.autocommit(False)
- print("备份成功!")
- self.zip_folder(self.db_path, self.zip_path)
- pass
-
-
- def zip_folder(self,folder_path, output_path):
- print("开始压缩!")
- zipf = zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED)
- for root, dirs, files in os.walk(folder_path):
- for file in files:
- zipf.write(os.path.join(root, file))
- zipf.close()
- print("压缩成功!")
-
- def minio(self):
- print("开始上传!")
- client = Minio(
- "minio.xxx.com.cn:9000",
- access_key="root",
- secret_key="xxxxxx",
- secure=False
- )
-
- found = client.bucket_exists("sqlbak")
- if not found:
- client.make_bucket("sqlbak")
- else:
- print("Bucket 'sqlbak' already exists")
-
- client.fput_object("sqlbak", self.zip_name, self.zip_path)
- print("上传成功")
-
- def job():
- try:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
- sqlbak = SqlBak()
- sqlbak.init()
- sqlbak.bak()
- sqlbak.minio()
- except Exception as reason:
- print(reason)
-
-
- schedule.every().day.at('23:20').do(job); # 每天在 23:20 时间点运行 job 函数
- print("计划任务已启动,每日23:20")
-
-
- while True:
- schedule.run_pending()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。