赞
踩
from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import RuntimeContext, FlatMapFunction, MapFunction import json import re import logging import sys from pyflink.datastream.state import ValueStateDescriptor, MapStateDescriptor from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, TypeInformation,FlinkKafkaProducer from pyflink.common.typeinfo import Types from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType from pyflink.datastream.connectors import DeliveryGuarantee from pyflink.common.serialization import SimpleStringSchema from datetime import datetime logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s-%(levelname)s-%(message)s") logger = logging.getLogger(__name__) # ���� StreamExecutionEnvironment ���� env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) env.add_jars("file:///root/flink-sql-connector-kafka_2.11-1.14.4.jar") from pyflink.datastream import DataStream, StreamExecutionEnvironment from pyflink.datastream.functions import RuntimeContext, FlatMapFunction, MapFunction from pyflink.common.typeinfo import Types env = StreamExecutionEnvironment.get_execution_environment() data = DataStream(env._j_stream_execution_environment.socketTextStream('192.168.137.201', 8899)) #调用map算子,封装成一个task,并行度为8,有8个subtask ds1=data.map(lambda s: s.upper()).set_parallelism(8) ##sink算子,并行度为4 ds1.print().set_parallelism(4)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。