赞
踩
在一般情况下,在Flink中,我们都会将数据转换为对象来做逻辑处理。处理的结果可能保存到RabbitMQ等中间件中。但是一些中间件只能接受字符串或者二进制数据,这就需要我们将对象再转换成它能接受的类型。本文介绍的方法,就是定制RabbitMQ的Sink序列化器,让代码中的对象转成Json串,发布到RabbitMQ队列中。
我们在IntelliJ中新建一个工程SinkSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在pom.xml中新增RabbitMQ连接器
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq</artifactId>
<version>3.0.1-1.17</version>
</dependency>
新增Json库依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.17.1</version>
</dependency>
新增lombok库,主要是为了使用它的一些注解
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.32</version>
<scope>provided</scope>
</dependency>
我们新建一个简单的数据对象SampleData
src/main/java/org/example/vo/SampleData.java
package org.example.vo; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class SampleData { private Long id; private String name; private int age; private Boolean married; private Double salary; public String toJson() throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); return mapper.writeValueAsString(this); } public static SampleData fromJson(String json) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(json, SampleData.class); } }
这个方法包含两个方法,一个是将SampleData 转换成字符串,另一个是将字符串转成SampleData 对象。
我们借用《Java版Flink使用指南——自定义无界流生成器》 中的方法,在代码中自动生成SampleData 对象,将其作为数据源接入Flink系统。
src/main/java/org/example/generator/UnBoundedStreamGenerator.java
package org.example.generator; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.example.vo.SampleData; public class UnBoundedStreamGenerator extends RichSourceFunction<SampleData> { private volatile boolean isRunning = true; @Override public void run(SourceContext<SampleData> ctx) throws Exception { long count = 0L; while (isRunning) { Thread.sleep(1000); // Simulate delay ctx.collect(generateSampleData(count++)); // Emit data } } @Override public void cancel() { isRunning = false; System.out.println("UnBoundedStreamGenerator canceled"); } private SampleData generateSampleData(long index) { return new SampleData( index, "name-" + index, (int) index, index % 2 == 0, index * 100.11); } }
我们定义的序列化器SampleDataRabbitMQSerializer 需要实现SerializationSchema接口,主要是实现serialize方法,将SampleData对象转换成二进制数组。
src/main/java/org/example/serializer/SampleDataRabbitMQSerializer.java
package org.example.serializer; import org.apache.flink.api.common.serialization.SerializationSchema; import org.example.vo.SampleData; public class SampleDataRabbitMQSerializer implements SerializationSchema<SampleData> { private static final long serialVersionUID = 1L; @Override public byte[] serialize(SampleData element) { try { return element.toJson().getBytes(); } catch (Exception e) { return new byte[0]; } } }
下面代码是从UnBoundedStreamGenerator获取无界流数据,然后给新建的RMQSink对象传递SampleDataRabbitMQSerializer序列化器对象。这样我们就可以在RabbitMQ队列中看到Json字符串化后的数据了。
DataStreamSource<SampleData> sampleDataDataStreamSource = env.addSource(new UnBoundedStreamGenerator()); String sinkQueueName = "data.to.rbtmq"; // name of the queue to send data to String host = "172.21.112.140"; // IP of the rabbitmq server int port = 5672; String username = "admin"; String password = "fangliang"; String virtualHost = "/"; int parallelism = 1; RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder() .setHost(host) .setPort(port) .setUserName(username) .setPassword(password) .setVirtualHost(virtualHost) .build(); RMQSink<SampleData> stringRMQSink = new RMQSink<>(rmqConnectionConfig, sinkQueueName, new SampleDataRabbitMQSerializer()); sampleDataDataStreamSource.addSink(stringRMQSink).setParallelism(parallelism).name("Custom Sink");
在RabbitMQ后台,我们看到队列中数据在持续增加
数据的内容如下
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。