当前位置:   article > 正文

通过Dataworks的Pyodps连接Oracle库定时调度发送邮件报表Excel_python 连接 dataworks

python 连接 dataworks

问题拆解

一看这个标题就知道是一个极其麻烦的事,将一系列小问题串起来了,可以分解为以下几个问题:

  1. 使用dataworks连接oracle库获取数据 (以下统称DW)

  1. 用pyodps处理oracle的数据,并按时进行调度

  1. 将数据转换成Excel形式通过邮件发出

这么看来,一份工作干三份活真是刺激。

获取数据

首先得解决数据问题,怎样获取oracle的定制数据

  1. 开始考虑的是直接从pyodps脚本中直接建立oracle连接,因为需要运行特定的SQL,然而发现并不行,python连接oracle需要提供Oracle客户端才能连接,本地可以下载Oracle的客户端,连接Oracle从而执行pyodps,但是DW无法安装linux的Oracle客户端,因此这个方案被pass掉了

  1. 用数据集成的方式将所有的表同步到Maxcompute中(以下统称MC),但是通过过来后发现SQL中有Oracle的自定义函数(我真是服了),同步来的数据也没法使用(我再去写一个UDF来的时间比这个任务时间都长)。

  1. 实属是没办法了,想到了建立数据集成的方式,通过同步来将指定SQL的数据导入MC,但是通常都是向导模式进行的数据集成,也就是说最多只涵盖where条件,无法运行特定的join或者oracle自定义函数

  1. 考虑到之前同步神策的impala数据时用的脚本模式,心想底层DataX也是同Sqoop一样,通过SQL查询来同步数据的,因此考虑脚本模式查询Oracle数据,以下还有一个坑:查询的SQL的结尾不能带分号‘;’

  1. {
  2. "type": "job",
  3. "version": "2.0",
  4. "steps": [
  5. {
  6. "stepType": "oracle",
  7. "parameter": {
  8. "querySql": "你需要同步的SQL",
  9. "datasource": "medilink",
  10. "readMode": "jdbc"
  11. },
  12. "name": "Reader",
  13. "category": "reader"
  14. },
  15. {
  16. "stepType": "odps",
  17. "parameter": {
  18. "partition": "pt=${bizdate}",
  19. "truncate": true,
  20. "datasource": "odps_first",
  21. "envType": 0,
  22. "column": [
  23. "id"
  24. ],
  25. "emptyAsNull": false,
  26. "table": "report_1"
  27. },
  28. "name": "Writer",
  29. "category": "writer"
  30. }
  31. ],
  32. "setting": {
  33. "errorLimit": {
  34. "record": "100"
  35. },
  36. "locale": "zh",
  37. "speed": {
  38. "throttle": false,
  39. "concurrent": 2
  40. }
  41. },
  42. "order": {
  43. "hops": [
  44. {
  45. "from": "Reader",
  46. "to": "Writer"
  47. }
  48. ]
  49. }
  50. }

通过脚本调度后这样就将数据同步到了MC中。

处理数据

由于业务需求,要将数据处理成.xlsx格式,也就是Excel表格,这里给出代码

  1. from odps import ODPS
  2. import xlsxwriter # 导入模块
  3. import re
  4. today = args['bizdate']
  5. file_name = today + '-文件名' + '.xlsx'
  6. table_name = '*****' # 表名称
  7. head_is_comment = False # 表头是否为备注
  8. # 从maxcompute 下载数据
  9. def download_file(table_name, file_name, head_is_comment):
  10. # 对应odps配置
  11. o = ODPS('access_id', 'access_key', 'project_name',
  12. endpoint='http://service.cn-hangzhou.maxcompute.aliyun.com/api')
  13. # 获取表字段
  14. tableSchema = o.get_table(table_name).schema
  15. # 将字段转换成列名
  16. head = list(map(lambda x: x.comment, tableSchema.columns)) if head_is_comment else tableSchema.names
  17. data = []
  18. # 执行SQL获取数据
  19. reader = o.execute_sql(' select * from ' + table_name +' where pt='+ today).open_reader(tunnel=True, limit=False)
  20. # 数据装载到data中
  21. for record in reader:
  22. tmp_value = []
  23. for name in tableSchema.names:
  24. if isinstance(record[name], str):
  25. ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]')
  26. tmp_value.append(ILLEGAL_CHARACTERS_RE.sub(r'', record[name]))
  27. else:
  28. tmp_value.append(record[name])
  29. data.append(tmp_value)
  30. # 新建excel表
  31. workbook = xlsxwriter.Workbook(file_name)
  32. worksheet = workbook.add_worksheet('sheet1')
  33. worksheet.write_row('A1', head)
  34. row = 2
  35. # 迭代数据并逐行写入
  36. for i in range(0, len(data)):
  37. worksheet.write_row('A' + str(row), data[i])
  38. row += 1
  39. workbook.close()
  40. pass

有一个点就是pyodps怎么去自定义调度参数的问题:

发送邮件

该需求要群发邮件,下面给出邮件发送模块的代码:

  1. import smtplib
  2. from email.mime.application import MIMEApplication
  3. from email.mime.multipart import MIMEMultipart
  4. receivers = ['xxx','xxx@xxx.com','xxx@xxx.com.cn'] # 接受者
  5. def send_email(file_name,receivers):
  6. # 邮件一些设置
  7. mail_host = 'smtp.qiye.163.com'
  8. mail_username = '****.com' #写你的邮箱地址
  9. mail_password = 'EffPHKMwyC6WT1E7' # 写你的邮箱授权码,不是密码!
  10. mail_sender = '*****.com' #写你的邮箱地址
  11. mail_receivers = receivers ##收件人邮箱地址
  12. message = MIMEMultipart()
  13. # 邮件抬头
  14. message['Subject'] = '写你的邮件主题'
  15. message['From'] = mail_sender
  16. message['To'] = ','.join(mail_receivers)
  17. # 构造附件,传送当前目录下的 Excel 文件
  18. part = MIMEApplication(open(file_name, 'rb').read())
  19. part.add_header('Content-Disposition', 'attachment', filename=file_name)
  20. message.attach(part)
  21. try:
  22. smtpObj = smtplib.SMTP_SSL(mail_host + ':465')
  23. smtpObj.login(mail_username, mail_password)
  24. # for receiver in mail_receivers:
  25. smtpObj.sendmail(mail_sender, mail_receivers, message.as_string())
  26. smtpObj.quit()
  27. print('mail send success')
  28. except smtplib.SMTPException as e:
  29. print('mail send error', e)

这里有个两个点,第一个是需要去开通邮箱的SMTP服务,这里不做赘述;第二个点是邮件群发的问题,群发对象不能用list表示,要将其通过逗号','进行分隔成字符串的形式装入邮件收件人中

调用函数

直接运行就好了

  1. download_file(table_name, file_name, head_is_comment)
  2. send_email(file_name, receivers)

然后剩下的就是设置任务依赖,将之前的数据集成任务设置成上游,定时调度等等

完成了就可以得到对应的邮件了

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/591730
推荐阅读
相关标签
  

闽ICP备14008679号