赞
踩
本文用的组件包括以下几个,是参考了官方案例,版本可以参考github以及里面的pom文件。本文假定以下环境均已安装好。
由于是测试案例,我们就写一个生成数据的脚本,并且把数据写到kafka source中,真实的场景应该是源源不断写数据到kafka中的(比如flume或者其他工具),具体数据脚本和模版可以参考官方demo数据
gen-data.sh
#!/bin/bash #current time cur_time=`date +%Y-%m-%d_%H:%M:%S` sed s/TIME/$cur_time/ /opt/module/data/source.temp > /opt/module/data/source.tp #create data for row in 1 2 3 4 5 6 7 8 9 10 do sed -n "${row}p" < /opt/module/data/source.tp > sline cnt=`shuf -i1-2 -n1` clr="red" if [ $cnt == 2 ]; then clr="yellow"; fi sed s/COLOR/$clr/ sline >> /opt/module/data/source.data done rm sline rm source.tp #import data kafka-console-producer.sh --broker-list hadoop101:9092 --topic source < /opt/module/data/source.data rm source.data echo "insert data at ${cur_time}"
streaming-data.sh
#!/bin/bash
#create topics
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic target
#every minute
set +e
while true
do
/opt/module/data/gen-data.sh
sleep 90
done
set -e
source.temp
{ "id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"} { "id": 2, "name": "Banana", "color": "COLOR", "time": "TIME"} { "id": 3, "name": "Cherry", "color": "COLOR", "time": "TIME"} { "id": 4, "name": "Durian", "color": "COLOR", "time": "TIME"} { "id": 5, "name": "Lichee", "color": "COLOR", "time": "TIME"} { "id": 6, "name": "Peach", "color": "COLOR", "time": "TIME"} { "id": 7, "name": "Papaya", "color": "COLOR", "time": "TIME"} { "id": 8, "name": "Lemon", "color": "COLOR", "time": "TIME"} { "id": 9, "name": "Mango", "color": "COLOR", "time": "TIME"} { "id": 10, "name": "Pitaya", "color": "COLOR", "time": "TIME"}
flink流式数据分成三个部分,读取kafka,业务处理,写入kafka
1.首先交代我的pom.xml引入的依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xxxx</groupId> <artifactId>kafka_Flink_kafka_Test</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.ink.FlinkLambdaTest.FlinkToLambda</mainClass> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> </transformers> <relocations> <relocation> <pattern>org.codehaus.plexus
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。