当前位置:   article > 正文

pyflink task并行度问题

pyflink task并行度问题
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import RuntimeContext, FlatMapFunction, MapFunction
import json
import re
import logging
import sys
from pyflink.datastream.state import ValueStateDescriptor, MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, TypeInformation,FlinkKafkaProducer
from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from  pyflink.datastream.connectors import  DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
from datetime import datetime




logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s-%(levelname)s-%(message)s")
logger = logging.getLogger(__name__)

# ���� StreamExecutionEnvironment ����
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.add_jars("file:///root/flink-sql-connector-kafka_2.11-1.14.4.jar")
from pyflink.datastream import DataStream, StreamExecutionEnvironment
from pyflink.datastream.functions import RuntimeContext, FlatMapFunction, MapFunction
from pyflink.common.typeinfo import Types

env = StreamExecutionEnvironment.get_execution_environment()
data = DataStream(env._j_stream_execution_environment.socketTextStream('192.168.137.201', 8899))
#调用map算子,封装成一个task,并行度为8,有8个subtask
ds1=data.map(lambda s: s.upper()).set_parallelism(8)
##sink算子,并行度为4
ds1.print().set_parallelism(4)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/548816
推荐阅读
相关标签
  

闽ICP备14008679号