赞
踩
笔者这边使用的cdh6.2.0自带的kafka,flink版本是1.16.3,python版本是3.6.5
# 安装python依赖
python3 -m pip install apache-flink==1.16.3
# 配置用户环境变量
# ps:这里的python地址要配一下,并且python下面的bin要有一个名字为python的可执行二进制包
vi ~/.bash_profile
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=/etc/hadoop/conf
export JAVA_HOME=/path/jdk
# 配置flink路径
export FLINK_HOME=/path/flink1.16.3
export HADOOP_COMMON_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export PATH=/path/python3.6.5/bin/:$FLINK_HOME/bin:$JAVA_HOME/bin:$PATH
准备krb5.conf和jaas.conf文件(本地测试需要用到)
jaas.conf的内容如下:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/path/user.keytab"
principal="user/域名@HADOOP.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/path/user.keytab"
principal="user/域名@HADOOP.COM";
};
# -*- coding: UTF-8 -*- import logging import sys,json from pyflink.common import Types, WatermarkStrategy, Row from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer, KafkaSource, KafkaOffsetsInitializer, KafkaSink from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema from pyflink.java_gateway import get_gateway from pyflink.common.serialization import SimpleStringSchema from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions from pyflink.datastream.data_stream import DataStream from pyflink.table.descriptors import Schema # 提交到yarn,这段要注释掉 # 本地测试需要放开 # System = get_gateway().jvm.System # System.setProperty("java.security.krb5.conf", "/path/krb5.conf"); # System.setProperty("java.security.auth.login.config", "/path/jaas.conf"); # print中文编码问题 sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach()) if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") env = StreamExecutionEnvironment.get_execution_environment() # 本地测试需要放开,提交到集群可以注释掉 # env.add_jars("file:///path/flink-sql-connector-kafka-1.16.3.jar") # env.add_jars("file:///path/mysql-connector-java-8.0.22.jar") # env.add_jars("file:///path/flink-connector-jdbc-1.16.3.jar") print("add kafka source") # kafka 最好配置域名访问,不然可能会报出kerberos数据库不存在user/ip@HADOOP.COM凭据的错误 kafka_source = KafkaSource.builder() \ .set_bootstrap_servers("xxxx:9092,xxxx:9092,xxxx:9092") \ .set_topics("test1") \ .set_group_id("test_group_1") \ .set_value_only_deserializer(SimpleStringSchema()) \ .set_property("sasl.kerberos.service.name", "kafka") \ .set_property("sasl.mechanism", "GSSAPI") \ .set_property("security.protocol", "SASL_PLAINTEXT") \ .build() data_stream = env.from_source(kafka_source, WatermarkStrategy.no_watermarks(), "Kafka Source") # 把字符串转json,然后筛选出类型是D,再把结果转成row输出 output_type = Types.ROW_NAMED(['id', 'name'], [Types.STRING(), Types.STRING()]) delete_stream = data_stream.map(lambda data:json.loads(data)).filter(lambda data:data['type'] == 'D').map(lambda data:Row(id=data['id'], name = data['name']), output_type=output_type) # 转成row输出 insert_stream = data_stream.map(lambda data: json.loads(data)).filter(lambda data: data['type'] == 'I').map(lambda data: Row(id=data['id'], name = data['name']), output_type=output_type) jdbc_options = JdbcConnectionOptions.JdbcConnectionOptionsBuilder() \ .with_user_name("用户") \ .with_password("密码") \ .with_driver_name("com.mysql.cj.jdbc.Driver") \ .with_url("jdbc:mysql://ip:3306/数据库名称?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC") \ .build() # batch_size 设置多少条提交,笔者这里设置成一条提交一次,如果不设置值,mysql数据库会一直看不到未提交的数据 jdbc_execution_options = JdbcExecutionOptions.builder().with_batch_size(1).with_batch_interval_ms(1000).build() # 打印 print("delete") delete_stream.print() print("insert") insert_stream.print() type_info = Types.ROW_NAMED(['id', 'name'], [Types.STRING(), Types.STRING()]) # 新加数据流 insert_stream.add_sink(JdbcSink.sink("insert into user_table(id, name) VALUES(?, ?)",type_info=type_info,jdbc_connection_options=jdbc_options, jdbc_execution_options=jdbc_execution_options)) # 删除数据流 delete_stream.add_sink(JdbcSink.sink("delete from user_table where id = ? and name = ?",type_info=type_info,jdbc_connection_options=jdbc_options, jdbc_execution_options=jdbc_execution_options)) env.execute()
# -*- coding: UTF-8 -*- from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment from pyflink.table import EnvironmentSettings, TableEnvironment from pyflink.table.expressions import col from pyflink.java_gateway import get_gateway import sys import codecs # 中文编码问题 sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach()) # 提交到yarn,这段要注释掉 # 本地测试可以放开 # System = get_gateway().jvm.System # System.setProperty("java.security.krb5.conf", "/path/krb5.conf"); # System.setProperty("java.security.auth.login.config", "/path/jaas.conf"); if __name__ == '__main__': env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) # env.add_jars("file:///path/flink-sql-connector-kafka-1.16.3.jar") # env.add_jars("file:///path/mysql-connector-java-8.0.22.jar") # env.add_jars("file:///path/flink-connector-jdbc-1.16.3.jar") sourceKafkaDdl = """ create table kafka_user( `name` varchar , `id` varchar, `content` varchar, `type` varchar ) with ( 'connector' = 'kafka', 'topic' = 'demo2', 'properties.bootstrap.servers' = 'xxxx:9092,xxxx:9092,xxxx:9092', 'properties.group.id' = 'record-group', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'properties.sasl.kerberos.service.name' = 'kafka' ) """ mysqlSinkDdl=""" create table user( `id` varchar, `content` varchar ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://ip:3306/数据库名称?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&user=用户&password=密码', 'table-name' = 'user_record' ) """ insert_sql = """ insert into user(id, content) select id,content from kafka_user where type = 'I' """ # 官网说明1.16.3版本不支持删除语句,截至到笔者开发时,最新版本是1.19 # 17,18,19 官网的描述是:目前, DELETE 语句仅支持批模式, 并且要求目标表实现了 SupportsRowLevelDelete 接口。 如果在一个没有实现该接口的表上执行 DELETE,则会抛异常。目前 Flink 内置的连接器还没有实现该接口 delete_sql = """ DELETE FROM user where `id` = 'test' """ print("注册kafka表") t_env.execute_sql(sourceKafkaDdl) print("注册mysql表") t_env.execute_sql(mysqlSinkDdl) print("开始插入数据") #statement_set = t_env.create_statement_set() #t_env.execute_sql(insert_sql).wait() t_env.execute_sql(insert_sql).wait() #t_env.from_path('sourceKafka').select(col('timeStamp'), col('id'), col('content')).execute_insert('mysqlSink').wait() t_env.execute()
# -*- coding: utf-8 -*- import logging import sys,json from pyflink.common import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer, KafkaSource, KafkaOffsetsInitializer, KafkaSink from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema from pyflink.java_gateway import get_gateway from pyflink.common.serialization import SimpleStringSchema System = get_gateway().jvm.System System.setProperty("java.security.krb5.conf", "/path/krb5.conf"); System.setProperty("java.security.auth.login.config", "/path/jaas.conf"); # 测试pyflink def write_to_kafka(env): type_info = Types.ROW_NAMED(['id', 'name', 'type'],[Types.STRING(), Types.STRING(),Types.STRING()]) ds = env.from_collection( [('1', '测试1', 'I')], type_info=type_info) serialization_schema = JsonRowSerializationSchema.Builder() \ .with_type_info(type_info) \ .build() kafka_producer = FlinkKafkaProducer( topic='demo1', serialization_schema=serialization_schema, producer_config={'bootstrap.servers': 'xxxx:9092,xxxx:9092,xxxx:9092', 'group.id': 'test_group', 'sasl.kerberos.service.name': 'kafka', 'sasl.mechanism': 'GSSAPI', 'security.protocol': 'SASL_PLAINTEXT'} ) # note that the output type of ds must be RowTypeInfo ds.add_sink(kafka_producer) env.execute() # 测试pyflinksql def write_to_kafka2(env): type_info = Types.ROW_NAMED(['id', 'name', 'content', 'type'], [Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]) ds = env.from_collection( [('134', '测试134', '测试测试', 'I')], type_info=type_info) serialization_schema = JsonRowSerializationSchema.Builder() \ .with_type_info(type_info) \ .build() kafka_producer = FlinkKafkaProducer( topic='demo2', serialization_schema=serialization_schema, producer_config={'bootstrap.servers': 'xxxx:9092,xxxx:9092,xxxx:9092', 'group.id': 'test_group3', 'sasl.kerberos.service.name': 'kafka', 'sasl.mechanism': 'GSSAPI', 'security.protocol': 'SASL_PLAINTEXT'} ) # note that the output type of ds must be RowTypeInfo ds.add_sink(kafka_producer) env.execute() if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") env = StreamExecutionEnvironment.get_execution_environment() env.add_jars("file:///path/flink-sql-connector-kafka-1.16.3.jar") print("start writing data to kafka") write_to_kafka2(env)
本地运行结果如下:
笔者这边使用的是cdh集群,所以笔者是基于yarn提交的
根据官网说明,笔者这边选择的是application模式,pre-job模式和application模式差不多,都是在集群单独启用一个flink集群,差别的是application模式作业的启动是随机在集群其中一台机器上的,pre-job模式的作业启动是在提交作业那台服务器(频繁提交flink作业涉及到很多资源的分发,全部集中在一台服务器会有网络瓶颈)
# 进入python的安装目录上层
zip -r venv python3.6.5/
# 执行完成后会有一个venv.zip的包
# 可以提前把这个虚拟环境包上传到hdfs路径,避免作业运行的时候再上传,而且这种大包上传到hdfs可以避免在提交时重复上传造成的资源浪费,flink会把虚拟环境分发到yarn的执行机器运行作业
# 同样的理由,就是避免重复上传的资源浪费,笔者这边的依赖包路径是的jar有
# -Dyarn.provided.lib.dirs 依赖jar路径,已经提前上传到hdfs,如果不想提前上传可以把jar包放在-Dyarn.ship-files参数后面,或者放在flink部署包路径的lib目录,这些都会进行集群分发 # ps: 依赖包不能仅仅--jar 引入,因为application的启动是在集群的随机机器,没有进行jar包分发,仅仅加上这个参数是不会生效的,读者可以在-Dyarn.ship-files 参数加上jar,再进行引入试试 # ps:application模式提交python作业也不能仅仅--python指定,因为application的启动是在集群的随机机器,直接使用--python指定会报找不到文件的异常,可以使用-Dyarn.ship-files 进行作业分发,也可以提前上传到hdfs,然后-pyfs 指定hdfs路径即可 flink run-application -t yarn-application \ -Dyarn.application.name=pyFlinkDemo \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=1024m \ -Dyarn.application.queue=root.users.xxxx \ -Dstate.checkpoints.dir=hdfs:///user/flink/checkpoints \ -Dsecurity.kerberos.login.contexts=Client,KafkaClient \ -Dsecurity.kerberos.login.keytab=/path/user.keytab \ -Dsecurity.kerberos.login.principal=user/xxxx@HADOOP.COM \ -Dsecurity.kerberos.login.use-ticket-cache=false \ -Dyarn.provided.lib.dirs="hdfs:///user/flink/libs" \ -pyarch hdfs:///user/flink/venv.zip \ -pyclientexec venv.zip/python3.6.5/bin/python \ -pyexec venv.zip/python3.6.5/bin/python \ -Dyarn.ship-files=/tmp/pyflink_demo.py \ -pyfs pyflink_demo.py \ -pym pyflink_demo
参考文档:flink官方文档
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。