当前位置:   article > 正文

Flink实时计算运用(七)Flink 自定义序列化Protobuf接入实现方案

flink 自定义序列化

1. 自定义序列化接入方案(Protobuf)

在实际应用场景中, 会存在各种复杂传输对象,同时要求较高的传输处理性能, 这就需要采用自定义的序列化方式做相应实现, 这里以Protobuf为例做讲解。

功能: kafka对同一Topic的生产与消费,采用Protobuf做序列化与反序列化传输, 验证能否正常解析数据。

  1. 通过protobuf脚本生成JAVA文件

    syntax = "proto3";
    option java_package = "com.itcast.flink.connectors.kafka.proto";
    option java_outer_classname = "AccessLogProto";
    
    // 消息结构定义
    message AccessLog {
    
        string ip = 1;
    
        string time = 2;
    
        string type = 3;
    
        string api = 4;
    
        string num = 5;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    通过批处理脚本,生成JAVA文件:

    @echo off
    for %%i in (proto/*.proto) do (
      d:/TestCode/protoc.exe --proto_path=./proto  --java_out=../java  ./proto/%%i
      echo generate %%i to java file successfully!
    )
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    注意, 路径要配置正确。

  2. 自定义序列化实现

    添加POM依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>5.1.8.RELEASE</version>
        </dependency>
    </dependencies>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    AccessLog对象:

    @Data
    public class AccessLog implements Serializable {
    
        private String ip;
    
        private String time;
    
        private String type;
    
        private String api;
    
        private Integer num;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    CustomSerialSchema:

    /**
     * 自定义序列化实现(Protobuf)
     */
    public class CustomSerialSchema implements DeserializationSchema<AccessLog>, SerializationSchema<AccessLog> {
    
    	private static final long serialVersionUID = 1L;
    
    	private transient Charset charset;
    
    	public CustomSerialSchema() {
    		this(StandardCharsets.UTF_8);
    	}
    
    	public CustomSerialSchema(Charset charset) {
    		this.charset = checkNotNull(charset);
    	}
    
    	public Charset getCharset() {
    		return charset;
    	}
      
        /**
         * 反序列化实现
         * @param message
         * @return
         */
    	@Override
    	public AccessLog deserialize(byte[] message) {
            AccessLog accessLog = null;
            try {
                AccessLogProto.AccessLog accessLogProto = AccessLogProto.AccessLog.parseFrom(message);
                accessLog = new AccessLog();
                BeanUtils.copyProperties(accessLogProto, accessLog);
                return accessLog;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return accessLog;
    	}
    
    	@Override
    	public boolean isEndOfStream(AccessLog nextElement) {
    		return false;
    	}
    
        /**
         * 序列化处理
         * @param element
         * @return
         */
    	@Override
    	public byte[] serialize(AccessLog element) {
            AccessLogProto.AccessLog.Builder builder = AccessLogProto.AccessLog.newBuilder();
            BeanUtils.copyProperties(element, builder);
    		return builder.build().toByteArray();
    	}
    
        /**
         * 定义消息类型
         * @return
         */
    	@Override
    	public TypeInformation<AccessLog> getProducedType() {
    		return TypeInformation.of(AccessLog.class);
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
  3. 通过flink对kafka消息生产者的实现

    public class KafkaSinkApplication {
    
        public static void main(String[] args) throws Exception {
    
            // 1. 创建运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 2. 读取Socket数据源
            DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n");
            // 3. 转换处理流数据
            SingleOutputStreamOperator<AccessLog> outputStream = socketStr.map(new MapFunction<String, AccessLog>() {
                @Override
                public AccessLog map(String value) throws Exception {
                    System.out.println(value);
                    // 根据分隔符解析数据
                    String[] arrValue = value.split("\t");
                    // 将数据组装为对象
                    AccessLog log = new AccessLog();
                    log.setNum(1);
                    for(int i=0; i<arrValue.length; i++) {
                        if(i == 0) {
                            log.setIp(arrValue[i]);
                        }else if( i== 1) {
                            log.setTime(arrValue[i]);
                        }else if( i== 2) {
                            log.setType(arrValue[i]);
                        }else if( i== 3) {
                            log.setApi(arrValue[i]);
                        }
                    }
    
                    return log;
                }
            });
    
            // 3. Kakfa的生产者配置
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.10.20.132:9092");
            FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
                    "10.10.20.132:9092",            // broker 列表
                    "flink-serial",                  // 目标 topic
                    new CustomSerialSchema()                 // 序列化 方式
                    );   
    
            // 4. 添加kafka的写入器
            outputStream.addSink(kafkaProducer);
    
            socketStr.print().setParallelism(1);
    
            // 5. 执行任务
            env.execute("job");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    开启Kafka消费者命令行终端,验证生产者的可用性:

    [root@flink1 kafka_2.12-1.1.1]# bin/kafka-console-consumer.sh --bootstrap-server  10.10.20.132:9092  --topic flink-serial	
    1601649380422GET"
    getAccount
    1601649381422POSTaddOrder
    1601649382422POST"
    
    • 1
    • 2
    • 3
    • 4
    • 5
  4. 通过flink对kafka消息订阅者的实现

    public class KafkaSourceApplication {
    
        public static void main(String[] args) throws Exception {
    
            // 1. 创建运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 2. 设置kafka服务连接信息
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.10.20.132:9092");
            properties.setProperty("group.id", "fink_group");
    
            // 3. 创建Kafka消费端
            FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer(
                    "flink-serial",                  // 目标 topic
                    new CustomSerialSchema(),   // 自定义序列化
                    properties);
    
            // 4. 读取Kafka数据源
            DataStreamSource<AccessLog> socketStr = env.addSource(kafkaProducer);
    
            socketStr.print().setParallelism(1);
    
            // 5. 执行任务
            env.execute("job");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    通过flink的kafka生产者消息的发送, 对消费者的功能做测试验证。


本文由mirson创作分享,如需进一步交流,请加QQ群:19310171或访问www.softart.cn

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/413121
推荐阅读
相关标签
  

闽ICP备14008679号