赞
踩
Flume基础:https://yellow520.blog.csdn.net/article/details/112758144
Kafka基础:https://yellow520.blog.csdn.net/article/details/112701565
mkdir -p /opt/test/data
vi /opt/test/ffk
# 定义agent、source、channel名称,并绑定关系 a1.sources = r1 a1.channels = c1 a1.sources.r1.channels = c1 # source a1.sources.r1.type = TAILDIR # 指定文件组的名称 a1.sources.r1.filegroups = f1 # 指定组监控的目录(支持正则表达式) a1.sources.r1.filegroups.f1 = /opt/test/data/.+txt # 指定断点续传文件 a1.sources.r1.positionFile = /opt/test/data/position.json # 指定一个批次采集多少数据 a1.sources.r1.batchSize = 100 # channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel # kafka服务地址 a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092 # 主题 a1.channels.c1.kafka.topic = A1 # 数据写入时,是否以event格式写入,false表示只写body a1.channels.c1.parseAsFlumeEvent = false
vi /opt/test/kfh
# 定义agent、channel、source、sink名称,并关联 a2.sources = r2 a2.channels = c2 a2.sinks = k2 a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2 # source a2.sources.r2.type = org.apache.flume.source.kafka.KafkaSource # kafka服务地址 a2.sources.r2.kafka.bootstrap.servers = hadoop102:9092 # 主题 a2.sources.r2.kafka.topics = A1 # 消费者组 a2.sources.r2.kafka.consumer.group.id = g1 # 消费者组第一个消费topic的数据的时候从哪里开始消费 a2.sources.r2.kafka.consumer.auto.offset.reset = earliest # source从Kafka拉消息的批次大小 a2.sources.r2.batchSize = 50 # 拦截器 a2.sources.r2.interceptors = i1 a2.sources.r2.interceptors.i1.type = a.b.c.TimeInterceptor$Builder # channel a2.channels = c2 # channel类型:内存(断电可能会丢失数据) a2.channels.c2.type = memory # sink a2.sinks.k2.type = hdfs # 指定数据存储目录 a2.sinks.k2.hdfs.path = hdfs://hadoop100:8020/kf/%Y-%m-%d # 指定文件的前缀 a2.sinks.k2.hdfs.filePrefix = log- # 指定滚动生成文件的时间间隔 a2.sinks.k2.hdfs.rollInterval = 30 # 指定滚动生成文件的大小(133169152=127*1024*1024<128M) a2.sinks.k2.hdfs.rollSize = 133169152 # 写入多少个event之后滚动生成新文件,通常选0,表示禁用 a2.sinks.k2.hdfs.rollCount = 0 # 文件写入格式:SequenceFile-序列化文件、DataStream-文本文件、CompressedStream-压缩文件 a2.sinks.k2.hdfs.fileType = DataStream
注意:
因为source的batch size默认值1000
,channel的transaction capacity默认值100
所以要把source’s batch size调小
<!-- 依赖 --> <dependencies> <!-- Flume;其中scope设置为provided,打包时不用打包Flume,因为可以用服务器的那个Flume --> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> <scope>provided</scope> </dependency> <!-- json解析器 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> </dependencies> <!-- 打包插件,可以把json相关的依赖打包上 --> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
package a.b.c; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.List; import java.util.Map; public class TimeInterceptor implements Interceptor { // 初始化 @Override public void initialize() {} // 处理单个event @Override public Event intercept(Event event) { // 解析json String str = new String(event.getBody()); JSONObject js = JSON.parseObject(str); // 业务逻辑:按key取时间戳 Long ts = js.getLong("ts"); // hdfs.useLocalTimeStamp = false // hdfs.path = /kf/%Y-%m-%d // 写到HDFS时,会找event的headers中key为timestamp的value // value是毫秒数,可得到%Y-%m-%d Map<String, String> headers = event.getHeaders(); headers.put("timestamp", ts.toString()); return event; } // 处理批次events @Override public List<Event> intercept(List<Event> events) { for (Event event : events) intercept(event); return events; } // 关闭 @Override public void close() {} public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new TimeInterceptor(); } @Override public void configure(Context context) {} } }
打包后将名称含
with-dependencies
的jar包上传到$FLUME_HOME/lib
ll $FLUME_HOME/lib | grep SNAPSHOT
zookeeper.py start
kafka.py start
hadoop.py start
kafka-topics.sh \
--zookeeper hadoop102:2181/kafka \
--create \
--replication-factor 2 \
--partitions 3 \
--topic A1
vi /opt/test/a.py
from time import strptime, mktime
t0 = '2020-12-31'
for i in range(1, 200000, 2):
t = int(i + mktime(strptime(t0, '%Y-%m-%d'))) * 1000 # 毫秒数
js = {'ts': t, 'num': str(i)}
print(js)
python /opt/test/a.py > /opt/test/data/a.txt
cd /opt/test/
nohup \
flume-ng agent -n a1 -c $FLUME_HOME/conf/ -f ffk -Dflume.root.logger=INFO,console \
>/opt/test/f1.log 2>&1 &
nohup \
flume-ng agent -n a2 -c $FLUME_HOME/conf/ -f kfh -Dflume.root.logger=INFO,console \
>/opt/test/f2.log 2>&1 &
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。