赞
踩
一看这个标题就知道是一个极其麻烦的事,将一系列小问题串起来了,可以分解为以下几个问题:
使用dataworks连接oracle库获取数据 (以下统称DW)
用pyodps处理oracle的数据,并按时进行调度
将数据转换成Excel形式通过邮件发出
这么看来,一份工作干三份活真是刺激。
首先得解决数据问题,怎样获取oracle的定制数据
开始考虑的是直接从pyodps脚本中直接建立oracle连接,因为需要运行特定的SQL,然而发现并不行,python连接oracle需要提供Oracle客户端才能连接,本地可以下载Oracle的客户端,连接Oracle从而执行pyodps,但是DW无法安装linux的Oracle客户端,因此这个方案被pass掉了
用数据集成的方式将所有的表同步到Maxcompute中(以下统称MC),但是通过过来后发现SQL中有Oracle的自定义函数(我真是服了),同步来的数据也没法使用(我再去写一个UDF来的时间比这个任务时间都长)。
实属是没办法了,想到了建立数据集成的方式,通过同步来将指定SQL的数据导入MC,但是通常都是向导模式进行的数据集成,也就是说最多只涵盖where条件,无法运行特定的join或者oracle自定义函数
考虑到之前同步神策的impala数据时用的脚本模式,心想底层DataX也是同Sqoop一样,通过SQL查询来同步数据的,因此考虑脚本模式查询Oracle数据,以下还有一个坑:查询的SQL的结尾不能带分号‘;’
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "oracle", "parameter": { "querySql": "你需要同步的SQL", "datasource": "medilink", "readMode": "jdbc" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "pt=${bizdate}", "truncate": true, "datasource": "odps_first", "envType": 0, "column": [ "id" ], "emptyAsNull": false, "table": "report_1" }, "name": "Writer", "category": "writer" } ], "setting": { "errorLimit": { "record": "100" }, "locale": "zh", "speed": { "throttle": false, "concurrent": 2 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
通过脚本调度后这样就将数据同步到了MC中。
由于业务需求,要将数据处理成.xlsx格式,也就是Excel表格,这里给出代码
- from odps import ODPS
- import xlsxwriter # 导入模块
- import re
-
- today = args['bizdate']
- file_name = today + '-文件名' + '.xlsx'
- table_name = '*****' # 表名称
- head_is_comment = False # 表头是否为备注
-
-
- # 从maxcompute 下载数据
- def download_file(table_name, file_name, head_is_comment):
- # 对应odps配置
- o = ODPS('access_id', 'access_key', 'project_name',
- endpoint='http://service.cn-hangzhou.maxcompute.aliyun.com/api')
-
- # 获取表字段
- tableSchema = o.get_table(table_name).schema
-
- # 将字段转换成列名
- head = list(map(lambda x: x.comment, tableSchema.columns)) if head_is_comment else tableSchema.names
- data = []
- # 执行SQL获取数据
- reader = o.execute_sql(' select * from ' + table_name +' where pt='+ today).open_reader(tunnel=True, limit=False)
-
- # 数据装载到data中
- for record in reader:
- tmp_value = []
-
- for name in tableSchema.names:
- if isinstance(record[name], str):
- ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]')
- tmp_value.append(ILLEGAL_CHARACTERS_RE.sub(r'', record[name]))
- else:
- tmp_value.append(record[name])
- data.append(tmp_value)
-
- # 新建excel表
- workbook = xlsxwriter.Workbook(file_name)
- worksheet = workbook.add_worksheet('sheet1')
- worksheet.write_row('A1', head)
- row = 2
- # 迭代数据并逐行写入
- for i in range(0, len(data)):
- worksheet.write_row('A' + str(row), data[i])
- row += 1
- workbook.close()
- pass

有一个点就是pyodps怎么去自定义调度参数的问题:
该需求要群发邮件,下面给出邮件发送模块的代码:
- import smtplib
- from email.mime.application import MIMEApplication
- from email.mime.multipart import MIMEMultipart
-
- receivers = ['xxx','xxx@xxx.com','xxx@xxx.com.cn'] # 接受者
-
- def send_email(file_name,receivers):
- # 邮件一些设置
- mail_host = 'smtp.qiye.163.com'
- mail_username = '****.com' #写你的邮箱地址
- mail_password = 'EffPHKMwyC6WT1E7' # 写你的邮箱授权码,不是密码!
- mail_sender = '*****.com' #写你的邮箱地址
- mail_receivers = receivers ##收件人邮箱地址
- message = MIMEMultipart()
-
- # 邮件抬头
- message['Subject'] = '写你的邮件主题'
- message['From'] = mail_sender
- message['To'] = ','.join(mail_receivers)
-
- # 构造附件,传送当前目录下的 Excel 文件
- part = MIMEApplication(open(file_name, 'rb').read())
- part.add_header('Content-Disposition', 'attachment', filename=file_name)
- message.attach(part)
- try:
- smtpObj = smtplib.SMTP_SSL(mail_host + ':465')
- smtpObj.login(mail_username, mail_password)
- # for receiver in mail_receivers:
- smtpObj.sendmail(mail_sender, mail_receivers, message.as_string())
- smtpObj.quit()
- print('mail send success')
- except smtplib.SMTPException as e:
- print('mail send error', e)

这里有个两个点,第一个是需要去开通邮箱的SMTP服务,这里不做赘述;第二个点是邮件群发的问题,群发对象不能用list表示,要将其通过逗号','进行分隔成字符串的形式装入邮件收件人中
直接运行就好了
- download_file(table_name, file_name, head_is_comment)
- send_email(file_name, receivers)
然后剩下的就是设置任务依赖,将之前的数据集成任务设置成上游,定时调度等等
完成了就可以得到对应的邮件了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。