当前位置:   article > 正文

Flink 支持的数据类型、从kafka读取自定义POJO类型的数据流_flink获得source中pojo

flink获得source中pojo


1. Flink 的类型系统

  Flink 作为一个分布式处理框架,处理的是以数据对象作为元素的流。要分布式地处理这些数据,就不可避免地要面对数据的网络传输、状态的落盘和故障恢复等问题,这就需要对数据进行序列化和反序列化
在这里插入图片描述

  Flink 有自己一整套类型系统。Flink 使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation 类是 Flink 中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器


2. Flink 支持的数据类型

Basic types: All Java primitives and their boxed form, plus void, String, Date, BigDecimal, and BigInteger.

Primitive arrays and Object arrays

Composite types:
------Flink Java Tuples (part of the Flink Java API): max 25 fields, null fields not supported
------Scala case classes (including Scala tuples): null fields not supported
------Row: tuples with arbitrary number of fields and support for null fields
------POJOs: classes that follow a certain bean-like pattern

Auxiliary types (Option, Either, Lists, Maps, …)

Generic types: These will not be serialized by Flink itself, but by Kryo.

(1)基本类型
  所有 Java 基本类型及其包装类,再加上 Void、String、Date、BigDecimal 和 BigInteger

(2)数组类型
  包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)

(3)复合数据类型
  Java 元组类型(TUPLE):这是 Flink 内置的元组类型,是 Java API 的一部分。最多25 个字段,也就是从 Tuple0~Tuple25,不支持空字段

  Scala 样例类及 Scala 元组:不支持空字段

  行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段

  POJO:Flink 自定义的类似于 Java bean 模式的类

(4)辅助类型
  Option、Either、List、Map 等

(5)泛型类型(GENERIC)
  Flink 支持所有的 Java 类和 Scala 类。不过如果没有按照POJO 类型的要求来定义,就会被 Flink 当作泛型类来处理。Flink 会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由 Flink 本身序列化的,而是由 Kryo 序列化的。

  在项目实践中,往往会将流处理程序中的元素类型定为 Flink 的 POJO 类型。Flink 对 POJO 类型的要求如下:
  类是公共的(public)和独立的(没有非静态的内部类);
  类有一个公共的无参构造方法;
   类中的所有字段是 public 且非 final 的;或者有一个公共的 getter 和 setter 方法,这些方法需要符合 Java bean 的命名规范

import java.sql.Timestamp;

public class Event {
    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user, String url, Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", timestamp=" + new Timestamp(timestamp) +
                '}';
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

3. 类型提示(Type Hints)

  Flink 具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(如 Lambda 表达式中),自动提取的信息是不够精细的,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能

.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
  • 1
  • 2

  Flink 专门提供了 TypeHint 类,它可以捕获泛型的类型信息,并且一直记录下来,为运行
时提供足够的信息。同样可以通过.returns()方法,明确地指定转换之后的 DataStream 里元素的类型

returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
  • 1

4. 从kafka读取自定义类型的数据流

在这里插入图片描述
创建 FlinkKafkaConsumer 时需要传入三个参数:
  第一个参数 topic,定义了从哪些主题中读取数据。可以是一个 topic,也可以是 topic列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条流中去
   第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。DeserializationSchema 和 KeyedDeserializationSchema 是公共接口,所以可以自定义反序列化逻辑
  第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性

自定义反序列化器:
  需要实现反序列化方法以及指定TypeInformation

import cn.hutool.json.JSONUtil;
import com.cz.pojo.Event;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;

public class EventSerializer implements DeserializationSchema<Event> {
    @Override
    public Event deserialize(byte[] message) throws IOException {
        return JSONUtil.toBean(new String(message), Event.class);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        return TypeInformation.of(Event.class);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

作业:

import com.cz.pojo.Event;
import com.cz.source.my.EventSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class SourceKafkaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.0.114:29092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");
        DataStreamSource<Event> stream = env.addSource(new FlinkKafkaConsumer<Event>(
                "clicks",
                new EventSerializer(),
                properties
        ));
        
        stream.print("Kafka");

        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/500524
推荐阅读
相关标签
  

闽ICP备14008679号