当前位置:   article > 正文

pyflink实时消费带kerberos的kafka数据落地mysql_pyflink flinkkafkaconsumer

pyflink flinkkafkaconsumer

1、环境准备

笔者这边使用的cdh6.2.0自带的kafka,flink版本是1.16.3,python版本是3.6.5

# 安装python依赖
python3 -m pip install apache-flink==1.16.3
# 配置用户环境变量
# ps:这里的python地址要配一下,并且python下面的bin要有一个名字为python的可执行二进制包
vi ~/.bash_profile
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=/etc/hadoop/conf
export JAVA_HOME=/path/jdk
# 配置flink路径
export FLINK_HOME=/path/flink1.16.3
export HADOOP_COMMON_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export PATH=/path/python3.6.5/bin/:$FLINK_HOME/bin:$JAVA_HOME/bin:$PATH
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2、编写代码

准备krb5.conf和jaas.conf文件(本地测试需要用到)
jaas.conf的内容如下:

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/path/user.keytab"
   principal="user/域名@HADOOP.COM";
};
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/path/user.keytab"
   principal="user/域名@HADOOP.COM";
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

pyflink连接kafka,数据落地到mysql的demo(逻辑比较简单):

 # -*- coding: UTF-8 -*-
import logging
import sys,json

from pyflink.common import Types, WatermarkStrategy, Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer, KafkaSource, KafkaOffsetsInitializer, KafkaSink
from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema
from pyflink.java_gateway import get_gateway
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
from pyflink.datastream.data_stream import DataStream
from pyflink.table.descriptors import Schema


# 提交到yarn,这段要注释掉
# 本地测试需要放开
# System = get_gateway().jvm.System
# System.setProperty("java.security.krb5.conf", "/path/krb5.conf");
# System.setProperty("java.security.auth.login.config", "/path/jaas.conf");
# print中文编码问题
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    env = StreamExecutionEnvironment.get_execution_environment()
    # 本地测试需要放开,提交到集群可以注释掉
    # env.add_jars("file:///path/flink-sql-connector-kafka-1.16.3.jar")
    # env.add_jars("file:///path/mysql-connector-java-8.0.22.jar")
    # env.add_jars("file:///path/flink-connector-jdbc-1.16.3.jar")


    print("add kafka source")
    # kafka 最好配置域名访问,不然可能会报出kerberos数据库不存在user/ip@HADOOP.COM凭据的错误
    kafka_source = KafkaSource.builder() \
        .set_bootstrap_servers("xxxx:9092,xxxx:9092,xxxx:9092") \
        .set_topics("test1") \
        .set_group_id("test_group_1") \
        .set_value_only_deserializer(SimpleStringSchema()) \
        .set_property("sasl.kerberos.service.name", "kafka") \
        .set_property("sasl.mechanism", "GSSAPI") \
        .set_property("security.protocol", "SASL_PLAINTEXT") \
        .build()

    data_stream = env.from_source(kafka_source, WatermarkStrategy.no_watermarks(), "Kafka Source")

    # 把字符串转json,然后筛选出类型是D,再把结果转成row输出
    output_type = Types.ROW_NAMED(['id', 'name'], [Types.STRING(), Types.STRING()])
    delete_stream = data_stream.map(lambda data:json.loads(data)).filter(lambda data:data['type'] == 'D').map(lambda data:Row(id=data['id'], name = data['name']), output_type=output_type)
    # 转成row输出
    insert_stream = data_stream.map(lambda data: json.loads(data)).filter(lambda data: data['type'] == 'I').map(lambda data: Row(id=data['id'], name = data['name']), output_type=output_type)


    jdbc_options = JdbcConnectionOptions.JdbcConnectionOptionsBuilder() \
        .with_user_name("用户") \
        .with_password("密码") \
        .with_driver_name("com.mysql.cj.jdbc.Driver") \
        .with_url("jdbc:mysql://ip:3306/数据库名称?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC") \
        .build()

    # batch_size 设置多少条提交,笔者这里设置成一条提交一次,如果不设置值,mysql数据库会一直看不到未提交的数据
    jdbc_execution_options = JdbcExecutionOptions.builder().with_batch_size(1).with_batch_interval_ms(1000).build()

    # 打印
    print("delete")
    delete_stream.print()
    print("insert")
    insert_stream.print()

    type_info = Types.ROW_NAMED(['id', 'name'], [Types.STRING(), Types.STRING()])
    # 新加数据流
    insert_stream.add_sink(JdbcSink.sink("insert into user_table(id, name) VALUES(?, ?)",type_info=type_info,jdbc_connection_options=jdbc_options, jdbc_execution_options=jdbc_execution_options))
    # 删除数据流
    delete_stream.add_sink(JdbcSink.sink("delete from user_table where id = ? and name = ?",type_info=type_info,jdbc_connection_options=jdbc_options, jdbc_execution_options=jdbc_execution_options))

    env.execute()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

flinksql代码

 # -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
from pyflink.java_gateway import get_gateway
import sys
import codecs

# 中文编码问题
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())


# 提交到yarn,这段要注释掉
# 本地测试可以放开
# System = get_gateway().jvm.System
# System.setProperty("java.security.krb5.conf", "/path/krb5.conf");
# System.setProperty("java.security.auth.login.config", "/path/jaas.conf");


if __name__ == '__main__':
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    t_env = StreamTableEnvironment.create(env)

    # env.add_jars("file:///path/flink-sql-connector-kafka-1.16.3.jar")
    # env.add_jars("file:///path/mysql-connector-java-8.0.22.jar")
    # env.add_jars("file:///path/flink-connector-jdbc-1.16.3.jar")
    sourceKafkaDdl = """
        create table kafka_user(
            `name` varchar ,
            `id` varchar,
            `content` varchar,
            `type` varchar
        ) with (
            'connector' = 'kafka',
            'topic' = 'demo2',
            'properties.bootstrap.servers' = 'xxxx:9092,xxxx:9092,xxxx:9092',
            'properties.group.id' = 'record-group',
            'scan.startup.mode' = 'latest-offset',
            'format' = 'json',
            'properties.security.protocol' = 'SASL_PLAINTEXT',
            'properties.sasl.mechanism' = 'GSSAPI',
            'properties.sasl.kerberos.service.name' = 'kafka'
        )
    """

    mysqlSinkDdl="""
        create table user(
            `id` varchar,
            `content` varchar
        ) WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:mysql://ip:3306/数据库名称?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&user=用户&password=密码',
            'table-name' = 'user_record'
        )
    """

    insert_sql = """
        insert into user(id, content) select id,content from kafka_user where type = 'I'
    """


    # 官网说明1.16.3版本不支持删除语句,截至到笔者开发时,最新版本是1.19
    # 17,18,19 官网的描述是:目前, DELETE 语句仅支持批模式, 并且要求目标表实现了 SupportsRowLevelDelete 接口。 如果在一个没有实现该接口的表上执行 DELETE,则会抛异常。目前 Flink 内置的连接器还没有实现该接口
    delete_sql = """
        DELETE FROM user where `id` = 'test'
    """

    print("注册kafka表")
    t_env.execute_sql(sourceKafkaDdl)
    print("注册mysql表")
    t_env.execute_sql(mysqlSinkDdl)
    print("开始插入数据")

    #statement_set = t_env.create_statement_set()
    #t_env.execute_sql(insert_sql).wait()
    t_env.execute_sql(insert_sql).wait()
    #t_env.from_path('sourceKafka').select(col('timeStamp'), col('id'), col('content')).execute_insert('mysqlSink').wait()

    t_env.execute()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

kafka生产者demo

# -*- coding: utf-8 -*-


import logging
import sys,json

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer, KafkaSource, KafkaOffsetsInitializer, KafkaSink
from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema
from pyflink.java_gateway import get_gateway
from pyflink.common.serialization import SimpleStringSchema


System = get_gateway().jvm.System
System.setProperty("java.security.krb5.conf", "/path/krb5.conf");
System.setProperty("java.security.auth.login.config", "/path/jaas.conf");


# 测试pyflink
def write_to_kafka(env):
    type_info = Types.ROW_NAMED(['id', 'name', 'type'],[Types.STRING(), Types.STRING(),Types.STRING()])
    ds = env.from_collection(
        [('1', '测试1', 'I')],
        type_info=type_info)

    serialization_schema = JsonRowSerializationSchema.Builder() \
        .with_type_info(type_info) \
        .build()
    kafka_producer = FlinkKafkaProducer(
        topic='demo1',
        serialization_schema=serialization_schema,
        producer_config={'bootstrap.servers': 'xxxx:9092,xxxx:9092,xxxx:9092', 'group.id': 'test_group', 'sasl.kerberos.service.name': 'kafka', 'sasl.mechanism': 'GSSAPI', 'security.protocol': 'SASL_PLAINTEXT'}
    )

    # note that the output type of ds must be RowTypeInfo
    ds.add_sink(kafka_producer)
    env.execute()


# 测试pyflinksql
def write_to_kafka2(env):
    type_info = Types.ROW_NAMED(['id', 'name', 'content', 'type'], [Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()])
    ds = env.from_collection(
        [('134', '测试134', '测试测试', 'I')],
        type_info=type_info)

    serialization_schema = JsonRowSerializationSchema.Builder() \
        .with_type_info(type_info) \
        .build()
    kafka_producer = FlinkKafkaProducer(
        topic='demo2',
        serialization_schema=serialization_schema,
        producer_config={'bootstrap.servers': 'xxxx:9092,xxxx:9092,xxxx:9092', 'group.id': 'test_group3',
                         'sasl.kerberos.service.name': 'kafka', 'sasl.mechanism': 'GSSAPI',
                         'security.protocol': 'SASL_PLAINTEXT'}
    )

    # note that the output type of ds must be RowTypeInfo
    ds.add_sink(kafka_producer)
    env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///path/flink-sql-connector-kafka-1.16.3.jar")

    print("start writing data to kafka")
    write_to_kafka2(env)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

本地运行结果如下:
在这里插入图片描述
在这里插入图片描述

3、命令行提交作业到集群运行

笔者这边使用的是cdh集群,所以笔者是基于yarn提交的
根据官网说明,笔者这边选择的是application模式,pre-job模式和application模式差不多,都是在集群单独启用一个flink集群,差别的是application模式作业的启动是随机在集群其中一台机器上的,pre-job模式的作业启动是在提交作业那台服务器(频繁提交flink作业涉及到很多资源的分发,全部集中在一台服务器会有网络瓶颈)

打包python的虚拟环境

# 进入python的安装目录上层
zip -r venv python3.6.5/
# 执行完成后会有一个venv.zip的包
# 可以提前把这个虚拟环境包上传到hdfs路径,避免作业运行的时候再上传,而且这种大包上传到hdfs可以避免在提交时重复上传造成的资源浪费,flink会把虚拟环境分发到yarn的执行机器运行作业
  • 1
  • 2
  • 3
  • 4

jar依赖包上传到hdfs

# 同样的理由,就是避免重复上传的资源浪费,笔者这边的依赖包路径是的jar有
  • 1

在这里插入图片描述

作业提交的命令行

# -Dyarn.provided.lib.dirs 依赖jar路径,已经提前上传到hdfs,如果不想提前上传可以把jar包放在-Dyarn.ship-files参数后面,或者放在flink部署包路径的lib目录,这些都会进行集群分发
# ps: 依赖包不能仅仅--jar 引入,因为application的启动是在集群的随机机器,没有进行jar包分发,仅仅加上这个参数是不会生效的,读者可以在-Dyarn.ship-files 参数加上jar,再进行引入试试
# ps:application模式提交python作业也不能仅仅--python指定,因为application的启动是在集群的随机机器,直接使用--python指定会报找不到文件的异常,可以使用-Dyarn.ship-files 进行作业分发,也可以提前上传到hdfs,然后-pyfs 指定hdfs路径即可
flink run-application -t yarn-application \ 
-Dyarn.application.name=pyFlinkDemo \ 
-Djobmanager.memory.process.size=1024m \ 
-Dtaskmanager.memory.process.size=1024m \ 
-Dyarn.application.queue=root.users.xxxx \ 
-Dstate.checkpoints.dir=hdfs:///user/flink/checkpoints \ 
-Dsecurity.kerberos.login.contexts=Client,KafkaClient \ 
-Dsecurity.kerberos.login.keytab=/path/user.keytab \ 
-Dsecurity.kerberos.login.principal=user/xxxx@HADOOP.COM \ 
-Dsecurity.kerberos.login.use-ticket-cache=false \ 
-Dyarn.provided.lib.dirs="hdfs:///user/flink/libs" \ 
-pyarch hdfs:///user/flink/venv.zip \ 
-pyclientexec venv.zip/python3.6.5/bin/python \ 
-pyexec venv.zip/python3.6.5/bin/python \ 
-Dyarn.ship-files=/tmp/pyflink_demo.py \ 
-pyfs pyflink_demo.py \ 
-pym pyflink_demo

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

参考文档:flink官方文档

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/684354
推荐阅读
相关标签
  

闽ICP备14008679号