当前位置:   article > 正文

pyflink 读取kafka

pyflink 读取kafka

# -*- coding: utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from abc import ABC, abstractmethod
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
from pyflink.datastream.state import MapStateDescriptor
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types, TypeInformation
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
from pyflink.datastream.connectors import DeliveryGuarantee
from pyflink.common.serialization import SimpleStringSchema
import json
import re
from datetime import datetime
from elasticsearch import Elasticsearch
from pyflink.datastream.functions import RuntimeContext, FlatMapFunction
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema


import re
import redis


# 创建 StreamExecutionEnvironment 对象
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///root/flink-sql-connector-kafka_2.11-1.14.4.jar")

TEST_KAFKA_SERVERS = "127.0.0.1:9092"
TEST_KAFKA_TOPIC = "topic_elink"
TEST_GROUP_ID = "pyflink_group"
def get_kafka_customer_properties(kafka_servers: str, group_id: str):
    properties = {
        "bootstrap.servers": kafka_servers,
        "fetch.max.bytes": "67108864",
        "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "enable.auto.commit": "false",  # 关闭kafka 自动提交,此处不能传bool 类型会报错
        "group.id": group_id,
    }
    return properties
properties = get_kafka_customer_properties(TEST_KAFKA_SERVERS, TEST_GROUP_ID)


data_stream = env.add_source(
        FlinkKafkaConsumer(topics='topic_elink',
                           properties=properties,
                           deserialization_schema=SimpleStringSchema()) \
            .set_commit_offsets_on_checkpoints(True) \
            .set_start_from_latest()
    ).name(f"消费{TEST_KAFKA_TOPIC}主题数据")

data_stream.print()
env.set_parallelism(1)

# 执行任务
env.execute('Add "bus_seq" to each line')

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/789284
推荐阅读
相关标签
  

闽ICP备14008679号