赞
踩
flume KafkaSource (拦截器) -> fileChannel -> hdfsSink
1)创建Flume配置文件
[atguigu@hadoop104 flume]$ vim job/kafka_to_hdfs_log.conf
2)配置文件内容如下
## 组件 a1.sources=r1 a1.channels=c1 a1.sinks=k1 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics=topic_log a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.interceptor.TimestampInterceptor$Builder ## channel1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2 a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/ a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6 ## sink1 a1.sinks.k1.type = hdfs #HA高可用配置 a1.sinks.k1.hdfs.path = hdfs://mycluster/origin_data/edu/log/topic_log/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = log- a1.sinks.k1.hdfs.round = false a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 ## 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = gzip ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1
并将HA的core-site.xml、hdfs-site.xml配置文件复制到Flume下的conf目录下
3)编写拦截器代码
导入Flume依赖
public class TimestampInterceptor implements Interceptor { private JsonParser jsonParser; @Override public void initialize() { jsonParser=new JsonParser(); } @Override public Event intercept(Event event) { byte[] body = event.getBody(); String line = new String(body, StandardCharsets.UTF_8); JsonElement element = jsonParser.parse(line); JsonObject jsonObject = element.getAsJsonObject(); String ts = jsonObject.get("ts").getAsString(); Map<String, String> headers = event.getHeaders(); headers.put("timestamp",ts); return event; } @Override public List<Event> intercept(List<Event> list) { for (Event event : list) { intercept(event); } return list; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new TimestampInterceptor(); } @Override public void configure(Context context) { } } }
打包放入Flume下的lib目录下
1)启动Zookeeper、Kafka集群
2)启动日志采集Flume
[atguigu@hadoop102 ~]$ f1.sh start
3)启动hadoop104的日志消费Flume
[atguigu@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
4)生成模拟数据
5)观察HDFS是否出现数据
1)在hadoop102节点的/home/atguigu/bin目录下创建脚本f2.sh
[atguigu@hadoop102 bin]$ vim f2.sh
在脚本中填写如下内容
#!/bin/bash
case $1 in
"start")
echo " --------启动 hadoop104 日志数据flume-------"
ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")
echo " --------停止 hadoop104 日志数据flume-------"
ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
flume KafkaSource (拦截器) -> fileChannel -> hdfsSink
topic_db中数据类型
{ "database": "edu", "table": "order_info", "type": "update", "ts": 1665298138, "xid": 781839, "commit": true, "data": { "id": 26635, "user_id": 849, "origin_amount": 400.0, "coupon_reduce": 0.0, "final_amount": 400.0, "order_status": "1002", "out_trade_no": "779411294547158", "trade_body": "Vue技术全家桶等2件商品", "session_id": "fd8d8590-abd3-454c-9d48-740544822a73", "province_id": 30, "create_time": "2022-10-09 14:48:58", "expire_time": "2022-10-09 15:03:58", "update_time": "2022-10-09 14:48:58" }, "old": { "order_status": "1001", "update_time": null } }
1)创建Flume配置文件
[atguigu@hadoop104 flume]$ vim job/kafka_to_hdfs_log.conf
2)配置文件内容如下
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092 a1.sources.r1.kafka.topics = topic_db a1.sources.r1.kafka.consumer.group.id = flume a1.sources.r1.setTopicHeader = true a1.sources.r1.topicHeader = topic a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.interceptor.TimestampAndTableNameInterceptor$Builder a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior3 a1.channels.c1.dataDirs = /opt/module/flume/data/behavior3/ a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6 ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://mycluster/origin_data/edu/db/%{table}_inc/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = db a1.sinks.k1.hdfs.round = false a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = gzip ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1
3)编写拦截器代码
导入Flume依赖
public class TimestampAndTableNameInterceptor implements Interceptor { private JsonParser jsonParser; @Override public void initialize() { jsonParser=new JsonParser(); } @Override public Event intercept(Event event) { byte[] body = event.getBody(); String line = new String(body, StandardCharsets.UTF_8); JsonElement jsonElement = jsonParser.parse(line); JsonObject jsonObject = jsonElement.getAsJsonObject(); long ts = jsonObject.get("ts").getAsLong() * 1000; String table = jsonObject.get("table").getAsString(); Map<String, String> headers = event.getHeaders(); headers.put("timestamp",String.valueOf(ts)); headers.put("table",table); return event; } @Override public List<Event> intercept(List<Event> list) { for (Event event : list) { intercept(event); } return list; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new TimestampAndTableNameInterceptor(); } @Override public void configure(Context context) { } } }
打包放入Flume下的lib目录下
1)启动Zookeeper、Kafka集群
2)启动hadoop104的Flume
[atguigu@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.logger=info,console
3)生成模拟数据
4) 观察HDFS上的目标路径是否有数据出现
1)在hadoop102节点的/home/atguigu/bin目录下创建脚本f2.sh
[atguigu@hadoop102 bin]$ vim f3.sh
在脚本中填写如下内容
#!/bin/bash
case $1 in
"start")
echo " --------启动 hadoop104 业务数据flume-------"
ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")
echo " --------停止 hadoop104 业务数据flume-------"
ssh hadoop104 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。