当前位置:   article > 正文

Java版Flink使用指南——定制RabbitMQ的Sink序列化器

Java版Flink使用指南——定制RabbitMQ的Sink序列化器

在一般情况下,在Flink中,我们都会将数据转换为对象来做逻辑处理。处理的结果可能保存到RabbitMQ等中间件中。但是一些中间件只能接受字符串或者二进制数据,这就需要我们将对象再转换成它能接受的类型。本文介绍的方法,就是定制RabbitMQ的Sink序列化器,让代码中的对象转成Json串,发布到RabbitMQ队列中。

新建工程

我们在IntelliJ中新建一个工程SinkSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-rabbitmq</artifactId>
			<version>3.0.1-1.17</version>
		</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

新增Json库依赖

		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-core</artifactId>
			<version>2.17.1</version>
		</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

新增lombok库,主要是为了使用它的一些注解

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.32</version>
            <scope>provided</scope>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

编码

数据对象

我们新建一个简单的数据对象SampleData
src/main/java/org/example/vo/SampleData.java

package org.example.vo;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class SampleData {
    private Long id;
    private String name;
    private int age;
    private Boolean married;
    private Double salary;

    public String toJson() throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.writeValueAsString(this);
    }

    public static SampleData fromJson(String json) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readValue(json, SampleData.class);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

这个方法包含两个方法,一个是将SampleData 转换成字符串,另一个是将字符串转成SampleData 对象。

接入数据源

我们借用《Java版Flink使用指南——自定义无界流生成器》 中的方法,在代码中自动生成SampleData 对象,将其作为数据源接入Flink系统。
src/main/java/org/example/generator/UnBoundedStreamGenerator.java

package org.example.generator;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.example.vo.SampleData;

public class UnBoundedStreamGenerator extends RichSourceFunction<SampleData> {
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<SampleData> ctx) throws Exception {
        long count = 0L;
        while (isRunning) {
            Thread.sleep(1000); // Simulate delay
            ctx.collect(generateSampleData(count++)); // Emit data
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
        System.out.println("UnBoundedStreamGenerator canceled");
    }

    private SampleData generateSampleData(long index) {
        return new SampleData(
            index,
            "name-" + index,
            (int) index,
            index % 2 == 0,
            index * 100.11);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

自定义序列化器

我们定义的序列化器SampleDataRabbitMQSerializer 需要实现SerializationSchema接口,主要是实现serialize方法,将SampleData对象转换成二进制数组。
src/main/java/org/example/serializer/SampleDataRabbitMQSerializer.java

package org.example.serializer;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.example.vo.SampleData;

public class SampleDataRabbitMQSerializer implements SerializationSchema<SampleData>  {
    private static final long serialVersionUID = 1L;

    @Override
    public byte[] serialize(SampleData element) {
        try {
            return element.toJson().getBytes();
        } catch (Exception e) {
            return new byte[0];
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

主体逻辑

下面代码是从UnBoundedStreamGenerator获取无界流数据,然后给新建的RMQSink对象传递SampleDataRabbitMQSerializer序列化器对象。这样我们就可以在RabbitMQ队列中看到Json字符串化后的数据了。

DataStreamSource<SampleData> sampleDataDataStreamSource = env.addSource(new UnBoundedStreamGenerator());

String sinkQueueName = "data.to.rbtmq"; // name of the queue to send data to
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;

RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder()
		.setHost(host)
		.setPort(port)
		.setUserName(username)
		.setPassword(password)
		.setVirtualHost(virtualHost)
		.build();

RMQSink<SampleData> stringRMQSink = new RMQSink<>(rmqConnectionConfig, sinkQueueName, new SampleDataRabbitMQSerializer());

sampleDataDataStreamSource.addSink(stringRMQSink).setParallelism(parallelism).name("Custom Sink");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

测试

打包、提交、运行

在这里插入图片描述
在RabbitMQ后台,我们看到队列中数据在持续增加
在这里插入图片描述
数据的内容如下
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/819998
推荐阅读
相关标签
  

闽ICP备14008679号