赞
踩
我正在使用云编写器来协调ETL,以获取到达BigCS的GCS中到达的文件.我有一个云函数,当文件到达时会触发dag,而云函数会将文件名/位置传递给DAG.在我的DAG中,我有2个任务:
1)使用DataflowPythonOperator运行一个数据流作业,该作业从GCS中的文本读取数据并将其转换并将其输入到BQ中,以及2)根据该文件是失败还是成功将文件移动到失败/成功存储段.
每个文件都有一个文件ID,该文件ID是bigquery表中的一列.有时,文件将被编辑一次或两次(通常不是流媒体),而我希望能够首先删除该文件的现有记录.
我调查了其他气流运算符,但在运行数据流作业之前希望在DAG中有2个任务:
>根据文件名获取文件ID(现在,我有一个bigquery表映射文件名->文件ID,但我也可以引入一个用作地图的json,我想这是否更容易)
>如果bigquery表(从数据流作业输出转换后的数据的表)中已经存在文件ID,请删除它,然后运行数据流作业,以便获得最新信息.我知道一种选择是仅添加一个时间戳记,并且仅使用最新记录,但是因为每个文件可能有100万条记录,这与我每天要删除100个文件(可能是1-2个顶部)不同看起来可能是混乱和混乱的.
在完成数据流作业之后,理想情况下,在将文件移至成功/失败文件夹之前,我想附加到一些“记录”表中,以表明此时已输入了该游戏.这将是我查看发生的所有插入的方式.
我尝试寻找不同的方法来进行此操作,因为我是云作曲家的新手,所以我对10个小时的研究后如何工作尚不十分清楚,否则我会发布代码以供输入.
谢谢,我非常感谢大家的帮助,如果您不清楚您的意思,我们深表歉意.有关气流的文档非常强大,但是鉴于云作曲家和bigquery相对较新,因此很难彻底学习如何做一些GCP特定任务.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。