当前位置:   article > 正文

pyflink sink到文件

pyflink sink到文件

[root@master pyflink]# cat sink_file.py 
import json
import re
import logging
import sys
from collections import Counter
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream import DataStream, StreamExecutionEnvironment
from pyflink.datastream.functions import RuntimeContext, FlatMapFunction, MapFunction
from pyflink.common.typeinfo import Types
from pyflink.common import ExecutionMode
from pyflink.datastream.connectors import StreamingFileSink
from pyflink.common.serialization import Encoder

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection(
    collection=[(1, 'aaa'), (2, 'bbb')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))
ds.add_sink(StreamingFileSink
    .for_row_format('/tmp/output', Encoder.simple_string_encoder())
    .build())
env.execute("tutorial_job")

[root@master pyflink]# cat /tmp/output/2024-05-05--09/.part-0-0.inprogress.525d3b8a-c781-4fbb-a5ad-0303c9da32d6 
+I[1, aaa]
+I[2, bbb]

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/545883
推荐阅读
相关标签
  

闽ICP备14008679号