当前位置:   article > 正文

Kafka实现应用日志实时上报统计分析_使用kafka streams分析数据 统计在给定的时间段(从2023年1月1日到2023年6月30

使用kafka streams分析数据 统计在给定的时间段(从2023年1月1日到2023年6月30日)

目录

 

1、Flume插件

1.1 简介

1.2 安装

1.3 配置

1.4 测试

2、Flume集成Kafka

2.1 配置kafka信息

2.2 启动zookeeper,kafka,flume

2.3 测试

3、Flume生产日志收集

3.1 日志收集配置

3.2 测试

4、Fink安装和简单实用

4.1 概述

4.2 安装配置

4.3 启动测试

4.4 体验Flink

5、Flink集成Kafka

5.1 引入pom依赖

5.2 创建一个Flink任务执行类

5.3 打包上传启动

5、搭建一个完整的实时日志统计平台

5.1 架构图

5.2 服务器矩阵图

5.2 Web服务和负载均衡节点

5.3 配置keepalived VIP漂移

5.4 搭建Flume数据采集

5.5 部署Kafka集群

5.6 Flink集群搭建

5.7 Kafka+Flink整合

5.8 Flink+Redis整合


1、Flume插件

1.1 简介

Apache Flume 是一种分布式的、高可靠的、高可用的日志收集聚合系统,将不同来源海量的日志数据传输到集中的数据存储。

Flume agent 负责把外部事件流(数据流)传输到指定下一跳,agent包括source(数据源)、channel(传输通道)、sink(接收端)。Flume agent可以多跳级联,组成复杂的数据流。 Flume 支持多种类型的source:Avro数据源(序列化数据格式)、Thrift数据源(通讯协议格式)、Kafka数据源(消息队列)、NetCat数据源(网络访问)、Syslog数据源(系统日志)、文件数据源(普通文本)、自定义数据源等,可灵活地与应用系统集成,需要较少的开发代价。 Flume 能够与常见的大数据工具结合,支持多种sink:HDFS、Hive、HBase、Kafka等,将数据传输到这些系统,进行进一步分析处理。

1.2 安装

  1. #下载Apache Flume压缩包(需要JDK环境)
  2. wget https://mirror.bit.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
  3. tar zxvf apache-flume-1.9.0-bin.tar.gz
  4. mv apache-flume-1.9.0-bin flume-1.9.0

1.3 配置

  1. #拷贝配置文件,从监听端口获取数据,保存到本地文件
  2. cd flume-1.9.0
  3. cp conf/flume-conf.properties.template conf/flume-conf.properties
  4. vim conf/flume-conf.properties
  5. #编辑配置如下
  6. ---------------------------------------------------------------------------------
  7. # The configuration file needs to define the sources,
  8. # the channels and the sinks.
  9. # Sources, channels and sinks are defined per agent,
  10. # in this case called 'agent'
  11. agent.sources = r1
  12. agent.channels = c1
  13. agent.sinks = s1
  14. # For each one of the sources, the type is defined
  15. agent.sources.r1.type = netcat
  16. agent.sources.r1.bind = 192.168.223.128 #PS:这个地方配成localhost默认会转127.0.0.1
  17. agent.sources.r1.port = 8888
  18. # The channel can be defined as follows.
  19. agent.sources.r1.channels = c1
  20. # Each sink's type must be defined
  21. agent.sinks.s1.type = file_roll
  22. agent.sinks.s1.sink.directory = /usr/local/flume-1.9.0/logs
  23. #Specify the channel the sink should use
  24. agent.sinks.s1.channel = c1
  25. agent.sinks.s1.hdfs.roundUnit = minute
  26. agent.sinks.s1.hdfs.rollInterval = 2
  27. # Each channel's type is defined.
  28. agent.channels.c1.type = memory
  29. # Other config values specific to each type of channel(sink or source)
  30. # can be defined as well
  31. # In this case, it specifies the capacity of the memory channel
  32. agent.channels.c1.capacity = 100

1.4 测试

1.4.1 编写Flume启动/停止/重启脚本

flume的启动,停止,重启脚本比较麻烦,我们这里使用shell脚本编写一个一键启动:

  1. vim bin/flume.sh
  2. ---------------------------输入如下配置----------------------------------------
  3. #!/bin/bash
  4. #echo "begin start flume..."
  5. #flume的安装根目录(根据自己情况,修#!/bin/bash
  6. #echo "begin start flume..."
  7. #flume的安装根目录(根据自己情况,修改为自己的安装目录)
  8. path=/usr/local/flume-1.9.0
  9. echo "flume home is :$path"
  10. #flume的进程名称,固定值(不用修改)
  11. JAR="flume"
  12. #flume的配置文件名称(根据自己的情况,修改为自己的flume配置文件名称)
  13. Flumeconf="flume-conf.properties"
  14. #定义的soure名称
  15. agentname="agent"
  16. function start(){
  17.        echo "begin start flume process ...."
  18.        #查找flume运行的进程数
  19.        num=$(ps -ef|grep flume-conf|grep -v grep|wc -l)
  20.        echo $num
  21.        #判断是否有flume进程运行,如果没有则运行执行启动命令
  22.        if [ $num -lt 1 ] ;then
  23.                $path/bin/flume-ng agent --conf conf -f $path/conf/$Flumeconf --name $agentname -Dflume.root.logger=INFO,console &
  24.                echo "start success...."
  25.                echo "日志路径: $path/logs/flume.log"
  26.        else
  27.                echo "进程已经存在,启动失败,请检查....."
  28.                exit 0
  29.        fi
  30. }
  31. function stop(){
  32.        echo "begin stop flume process.."
  33.        num=$(ps -ef|grep flume|wc -l)
  34.        echo $num
  35.        #echo "$num...."
  36.        if [ $num -gt 0 ];then
  37.                #停止flume
  38.                ps -ef|grep flume-conf|awk '{print $2;}'|xargs kill
  39.                echo "进程已经关闭..."
  40.        else
  41.                echo "服务未启动,无须停止..."
  42.        fi
  43. }
  44. function restart(){
  45.        echo "begin stop flume process .."
  46.        #判断程序是否彻底停止
  47.        num=$(ps -ef|grep flume|wc -l)
  48.        #stop完成之后,查找flume的进程数,判断进程数是否为0,如果不为0,则休眠5秒,再次查看,直到进程数为0
  49.        if [ $num -gt 0 ];then
  50.                stop
  51.                echo "flume process stoped,and starting..."
  52.        fi
  53.        #执行start
  54.        start
  55.        echo "started...."
  56. }
  57. #case 命令获取输入的参数,如果参数为start,执行start函数,如果参数为stop执行stop函数,如果参数为restart,执行restart函数
  58. case "$1" in
  59.    "start")
  60.      start $@
  61.      exit 0
  62.   ;;
  63.    "stop")
  64.      stop
  65.      exit 0
  66.     ;;
  67.    "restart")
  68.       restart $@
  69.       exit 0
  70.     ;;
  71.   *)
  72.       echo "用法: $0 {start|stop|restart}"
  73.       exit 1
  74.   ;;
  75. esac

命令介绍

参数作用举例
–conf 或 -c指定配置文件夹,包含flume-env.sh和log4j的配置文件–conf conf
–conf-file 或 -f配置文件地址–conf-file conf/flume-conf.properties
–name 或 -nagent名称–name agent

启动命令:./bin/flume.sh start

1.4.2 启动客户端测试

  1. [root@ydt1 logs]# telnet 127.0.0.1 8888
  2. Trying 127.0.0.1...
  3. Connected to 127.0.0.1.
  4. Escape character is '^]'.
  5. hello laohu
  6. OK
  7. [root@ydt1 logs]# ll -s
  8. 总用量 12
  9. 4 -rw-r--r--. 1 root root 25 10月 10 20:47 1602333960297-3
  10. 4 -rw-r--r--. 1 root root 23 10月 10 20:47 1602333960297-4
  11. 0 -rw-r--r--. 1 root root  0 10月 10 20:48 1602333960297-5
  12. 0 -rw-r--r--. 1 root root  0 10月 10 20:48 1602333960297-6
  13. 0 -rw-r--r--. 1 root root  0 10月 10 20:49 1602333960297-7
  14. 0 -rw-r--r--. 1 root root  0 3月   3 14:09 1614751795117-1
  15. 4 -rw-r--r--. 1 root root 13 3月   3 14:10 1614751795117-2
  16. 0 -rw-r--r--. 1 root root  0 3月   3 14:10 1614751795117-3
  17. [root@ydt1 logs]# pwd
  18. /usr/local/flume-1.9.0/logs
  19. #查看生成的日志信息:
  20. [root@ydt1 logs]# cat 1614751795117-2
  21. hello laohu

 

2、Flume集成Kafka

2.1 配置kafka信息

  1. # Name the components on this agent
  2. agent.sources = r1
  3. agent.sinks = k1
  4. agent.channels = c1
  5. # Describe/configure the source
  6. agent.sources.r1.type = netcat
  7. agent.sources.r1.bind = 192.168.223.128
  8. agent.sources.r1.port = 8888
  9. # Describe the sink
  10. agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  11. agent.sinks.k1.kafka.bootstrap.servers=192.168.223.128:9092
  12. agent.sinks.k1.kafka.topic=log4j-flume-kafka
  13. agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
  14. agent.sinks.k1.kafka.producer.acks=1
  15. agent.sinks.k1.custom.encoding=UTF-8
  16. # Use a channel which buffers events in memory
  17. agent.channels.c1.type = memory
  18. agent.channels.c1.capacity = 1000
  19. agent.channels.c1.transactionCapacity = 100
  20. # Bind the source and sink to the channel
  21. ## 配置传输通道
  22. agent.sources.r1.channels = c1
  23. agent.sinks.k1.channel = c1

2.2 启动zookeeper,kafka,flume

  1. #启动单台zookeeper
  2. [root@ydt1 zookeeper-3.4.6]# ./bin/zkServer.sh start
  3. JMX enabled by default
  4. Using config: /usr/local/zookeeper-3.4.6/bin/../conf/zoo.cfg
  5. Starting zookeeper ... STARTED
  6. #现在我们只启动单台kafka,所以当前kafka建立过集群,请先删除kafka日志信息,主要是元数据,再启动kafka之前
  7. rm /tmp/kafka-logs/meta.properties -f
  8. [root@ydt1 zookeeper-3.4.6]# cd ../kafka_2.12-2.5.0/
  9. [root@ydt1 kafka_2.12-2.5.0]# ./bin/kafka-server-start.sh config/server.properties
  10. cd /usr/local/flume-1.9.0
  11. #启动flume服务
  12. ./bin/flume.sh start

2.3 测试

  1. cd /usr/local/kafka
  2. #创建kafka topic,有就算了
  3. [root@ydt1 kafka]# ./bin/kafka-topics.sh --bootstrap-server ydt1:9092 --create --topic log4j-flume-kafka
  4. OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
  5. Created topic log4j-flume-kafka.
  6. #启动kafka消费者,消费log4j-flume-kafka数据
  7. [root@ydt1 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server ydt1:9092 --topic log4j-flume-kafka --from-beginning
  8. OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
  9. #连接flume监听,写入数据信息
  10. [root@ydt1 kafka_2.12-2.5.0]# telnet 192.168.223.128 8888
  11. Trying 192.168.223.128...
  12. Connected to 192.168.223.128.
  13. Escape character is '^]'.
  14. gebilaowang
  15. OK
  16. 隔壁老王是一个热心的邻居
  17. OK
  18. #可以看到kafka消费者已经可以收到数据
  19. [root@ydt1 kafka_2.12-2.5.0]# ./bin/kafka-console-consumer.sh --bootstrap-server ydt1:9092 --topic log4j-flume-kafka --from-beginning
  20. OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
  21. gebilaowang
  22. 隔壁老王是一个热心的邻居3、

 

3、Flume生产日志收集

3.1 日志收集配置

实际应用中,flume常和应用程序部署在同一机器上,应用程序将日志写入文件中,flume再以监听命令的方式(tail命令打开文件)对该文件进行监听,再把其传入到Kafka集群中。flume的配置为:

  1. # Name the components on this agent
  2. agent.sources = r1
  3. agent.sinks = k1
  4. agent.channels = c1
  5. #指定源类型为Linux 命令(单个文件)
  6. #agent.sources.r1.type = exec
  7. #agent.sources.r1.command = tail -f /usr/local/redis-4.0.6/log/redis.log
  8. #多个文件(这个地方的目录格式有点怪,注意一下)
  9. agent.sources.r1.type = TAILDIR
  10. agent.sources.r1.filegroups = f1
  11. agent.sources.r1.filegroups.f1 = /usr/local/redis-4.0.6/log/.*log
  12. #指定事件不包括头信息
  13. #agent.sources.r1.fileHeader = false
  14. # Describe the sink
  15. agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  16. agent.sinks.k1.kafka.bootstrap.servers=192.168.223.128:9092
  17. agent.sinks.k1.kafka.topic=log4j-flume-kafka
  18. agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
  19. agent.sinks.k1.kafka.producer.acks=1
  20. agent.sinks.k1.custom.encoding=UTF-8
  21. # Use a channel which buffers events in memory
  22. #直接使用内存做数据的临时缓存,虽然快,但是安全性不高,不过我们这里只是记录日志,就算了,如果是那种重要的实时统计,还是需要使用文件数据临时缓存的形式
  23. agent.channels.c1.type = memory
  24. agent.channels.c1.capacity = 1000
  25. agent.channels.c1.transactionCapacity = 100
  26. #agent.channels.c1.type = file
  27. #agent.channels.c1.checkpointDir = /usr/local/flume-1.9.0/log/checkpoint
  28. #agent.channels.c1.dataDirs = /usr/local/flume-1.9.0/log/data
  29. # Bind the source and sink to the channel
  30. agent.sources.r1.channels = c1
  31. agent.sinks.k1.channel = c1

这样,Kafka和flume集群故障时,都不会影响到应用程序的正常运行。flume成了Kafka的一个Producer,因为flume是一个轻服务应用,可在每台应用服务器上都部署一个。

3.2 测试

  1. #重新启动flume,用之前写的脚本也可以
  2. ./bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties -Dflume.root.logger=INFO,console

再次打开kafka消费者客户端,可以看到redis.log文件中所有信息!

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

4、Fink安装和简单实用

4.1 概述

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink设计为在所有常见的集群环境中运行,以内存速度和任何规模执行计算。

官方下载地址:Apache Flink: Downloads

4.2 安装配置

  1. #下载安装,如果这个版本链接找不到了,去官网重新获取新版本的链接
  2. wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.11.3/flink-1.11.3-bin-scala_2.11.tgz
  3. tar -xvf flink-1.11.3-bin-scala_2.11.tgz
  4. #配置
  5. cd flink-1.11.3/
  6. vim conf/flink-conf.yaml
  7. #修改该配置为主机名或者ip:
  8. jobmanager.rpc.address: ydt1

4.3 启动测试

  1. [root@ydt1 flink-1.11.3]# ./bin/start-cluster.sh
  2. Starting cluster.
  3. Starting standalonesession daemon on host ydt1.
  4. Starting taskexecutor daemon on host ydt1.

关闭防火墙或者开启8081端口:service firewalld stop

访问IP地址为:http://192.168.223.128:8081/

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

4.4 体验Flink

开启一个终端,在机器上执行以下命令连接本机9000端口:

nc -l 9000

执行以下示例程序命令,即可监听本机9000端口,等待该端口的数据:

./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

此时再去刷新web页面,可见如下信息,新增了一个Job,这个job只是计数的样例

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

然后在第一个终端发送数据:

20210304161302343.png

点进去可以看到接收的数据条数,想看详情继续点进去看:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

5、Flink集成Kafka

Flink 提供了特殊的Kafka Connectors来从Kafka topic中读取数据或者将数据写入到Kafkatopic中,Flink的Kafka Consumer与Flink的检查点机制相结合,提供exactly-once处理语义。为了做到这一点,Flink并不完全依赖于Kafka的consumer组的offset跟踪,而是在自己的内部去跟踪和检查。

上一章节,我们知道flink主要是通过执行jar包任务来调度日志消息,所以我们需要定义一个maven项目来获取Kafka主题消息

5.1 引入pom依赖

  1. <dependencies>
  2.        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
  3.        <dependency>
  4.            <groupId>org.apache.flink</groupId>
  5.            <artifactId>flink-java</artifactId>
  6.            <version>1.11.1</version>
  7.        </dependency>
  8.        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
  9.        <dependency>
  10.            <groupId>org.apache.flink</groupId>
  11.            <artifactId>flink-clients_2.11</artifactId>
  12.            <version>1.11.1</version>
  13.        </dependency>
  14.        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
  15.        <dependency>
  16.            <groupId>org.apache.flink</groupId>
  17.            <artifactId>flink-streaming-java_2.11</artifactId>
  18.            <version>1.11.1</version>
  19.            <!-- <scope>provided</scope>-->
  20.        </dependency>
  21.        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
  22.        <dependency>
  23.            <groupId>org.apache.flink</groupId>
  24.            <artifactId>flink-connector-kafka_2.11</artifactId>
  25.            <version>1.11.1</version>
  26.        </dependency>
  27.        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
  28.        <dependency>
  29.            <groupId>org.slf4j</groupId>
  30.            <artifactId>slf4j-log4j12</artifactId>
  31.            <version>1.7.25</version>
  32.        </dependency>
  33.        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
  34.        <dependency>
  35.            <groupId>log4j</groupId>
  36.            <artifactId>log4j</artifactId>
  37.            <version>1.2.17</version>
  38.        </dependency>
  39.    </dependencies>
  40.    <build>
  41.        <plugins>
  42.            <plugin>
  43.                <groupId>org.apache.maven.plugins</groupId>
  44.                <artifactId>maven-compiler-plugin</artifactId>
  45.                <version>3.2</version>
  46.                <configuration>
  47.                    <source>1.8</source>
  48.                    <target>1.8</target>
  49.                </configuration>
  50.            </plugin>
  51.            <plugin>
  52.                <groupId>org.apache.maven.plugins</groupId>
  53.                <artifactId>maven-dependency-plugin</artifactId>
  54.                <executions>
  55.                    <execution>
  56.                        <id>copy-dependencies</id>
  57.                        <phase>test</phase>
  58.                        <goals>
  59.                            <goal>copy-dependencies</goal>
  60.                        </goals>
  61.                        <configuration>
  62.                            <outputDirectory>
  63.                               target/classes/lib
  64.                            </outputDirectory>
  65.                        </configuration>
  66.                    </execution>
  67.                </executions>
  68.            </plugin>
  69.            <plugin>
  70.                <groupId>org.apache.maven.plugins</groupId>
  71.                <artifactId>maven-jar-plugin</artifactId>
  72.                <configuration>
  73.                    <archive>
  74.                        <manifest>
  75.                            <addClasspath>true</addClasspath>
  76.                            <mainClass>
  77.                               com.ydt.flinkkafka.FlinkKafka
  78.                            </mainClass>
  79.                            <classpathPrefix>lib/</classpathPrefix>
  80.                        </manifest>
  81.                        <manifestEntries>
  82.                            <Class-Path>.</Class-Path>
  83.                        </manifestEntries>
  84.                    </archive>
  85.                </configuration>
  86.            </plugin>
  87.        </plugins>
  88.    </build>

5.2 创建一个Flink任务执行类

创建一个Flink任务执行类,将Kafka数据转为转为flink的dataStream类型

  1. package com.ydt.flinkkafka;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.api.TimeCharacteristic;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSink;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  7. import java.util.Properties;
  8. public class FlinkKafka {
  9.    public static void main(String[] args) throws Exception {
  10.        try {
  11.            // 获取上下文环境StreamExecutionEnvironment对象
  12.            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13.            env.enableCheckpointing(5000); // 要设置启动检查点
  14.            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置事件触发时写入流
  15.            // 配置kafka的ip和端口,以及消费者组
  16.            Properties properties = new Properties();
  17.            properties.setProperty("bootstrap.servers", "ydt1:9092");
  18.            properties.setProperty("group.id", "flume-kafka");
  19.            //将消费者数据对象加入到上下文环境StreamExecutionEnvironment对象中,并生成DataStream对象;
  20.            DataStreamSink<String> dataStream =env.
  21.                    addSource(new FlinkKafkaConsumer<>("log4j-flume-kafka", new SimpleStringSchema(), properties))
  22.                   .print();
  23.            //设置job名称
  24.            env.execute("consumer from kafka data");
  25.       } catch (Exception e) {
  26.            e.printStackTrace();
  27.       }
  28.   }
  29. }

5.3 打包上传启动

将以上项目打成jar包包上传到flink,并且启动

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

执行项目中的任务类:com.ydt.flinkkafka.FlinkKafka

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

可以看到刚刚上传的job包已经在运行了:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

点进去可以看到日志信息:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

5、搭建一个完整的实时日志统计平台

在互联网应用中,不管是哪一种处理方式,其基本的数据来源都是日志数据,例如对于web应用来说,则可能是用户的访问日志、用户的点击日志等。

如果对于数据的分析结果在时间上有比较严格的要求,则可以采用在线处理的方式来对数据进行分析,如使用Flink进行处理。比较贴切的一个例子是天猫双十一的成交额,在其展板上,我们看到交易额是实时动态进行更新的,对于这种情况,则需要采用在线处理。下面要介绍的是实时数据处理方式,即基于Flink的在线处理,在下面给出的完整案例中,我们将会完成下面的几项工作:

  • 1.如何一步步构建我们的实时处理系统(Flume+Kafka+Flink+Redis)

  • 2.实时处理网站的用户访问日志,并统计出该网站的PV(访问量)、UV(独立访客),IP

  • 3.将实时分析出的PV、UV动态地展示在我们的前端页面上

5.1 架构图

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

即从上面的架构中我们可以看出,其由下面的几部分构成:

  • Flume集群

  • Kafka集群

  • Flink集群

从构建实时处理系统的角度出发,我们需要做的是,如何让数据在各个不同的集群系统之间打通(从上面的图示中也能很好地说明这一点),即需要做各个系统之前的整合,包括Flume与Kafka的整合,Kafka与Flink的整合。当然,各个环境是否使用集群,依个人的实际需要而定,在我们的环境中,Flume、Kafka、Flink都使用集群。

5.2 服务器矩阵图

Web服务节点:两台提供应用服务,一台提供负载均衡服务

Flume:两台采集数据

Zookeeper:三台服务搭建集群

Kafka:三台服务搭建集群

Flink:三台服务搭建集群

Redis:三台服务搭建集群

Keepalived:负责将负载均衡服务器漂移VIP(这里就不备份了,你高兴可以继续搞一个主备服务来漂)

VIP:虚拟IP

 192.168.223.128192.168.223.129192.168.223.130192.168.223.131
Flume 11 
Zookeeper111 
Kafka111 
Flink1(主)11 
Redis111 
VIP   1
Web服务节点1(负载均衡)11 
Keppalived1(VIP漂移)   

5.2 Web服务和负载均衡节点

我们的目标是是是统计访客数据,这里以Nginx作为Web服务器(你高兴可以用tomcat),主要是统计访问日志access.log进行分析

因为我们重点不在nginx的使用,所以仅仅只是访问一个静态图片而已,你高兴你可以弄的更复杂!

想获取用户真实IP,我们需要安装realip模块,否则得到的是代理服务器IP

  1. #129,130节点需要安装realip模块
  2. cd /usr/local/nginx-1.12.2
  3. ./configure --prefix=/usr/local/nginx --with-http_stub_status_module --with-http_ssl_module --with-http_realip_module
  4. make && make install

129,130节点 Nginx服务器nginx.conf配置:

  1. server {
  2.     listen 80;
  3.     location / {
  4.         root html;
  5.         set_real_ip_from 192.168.223.128; #指接受从哪个信任前代理处获得真实用户ip
  6.         real_ip_header X-Real-IP; #存储X-Real-IP变量名称
  7.     }
  8.   }
  9. #访问日志格式
  10. log_format main  '$remote_addr~$time_local~$request~$status~$http_user_agent';
  11. #开启访问日志记录                    
  12. access_log logs/access.log main;
  13. #弄张图片到nginx/html目录下,启动nginx,访问http://192.168.223.129/mv.jpg,可以看到美女就行

128节点Nginx服务器nginx.conf负载均衡配置:

  1. upstream test {
  2.   server 192.168.223.129:80; #内部服务器1
  3.   server 192.168.223.130:80; #内部服务器2
  4. }
  5. server {
  6.   listen 80;
  7.   location / {
  8.   proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;#记录来访IP
  9.     proxy_pass http://test;
  10.   }
  11. }
  12. #访问负载均衡代理节点http://192.168.223.128/mv.jpg可以看到两台机器上的美女即可

 

5.3 配置keepalived VIP漂移

编辑keepalived的配置如下:

  1. vim /etc/keepalived/keepalived.conf
  2. ------------------------------------------------------------------------
  3. ! Configuration File for keepalived
  4. global_defs {
  5.    #不与其他节点重名即可
  6.   router_id flume-kafka-flink-redis
  7. }
  8. vrrp_instance kafka {
  9.   state MASTER
  10.   interface eth0      #指定虚拟ip的网卡接口
  11.   mcast_src_ip 192.168.223.128
  12.   virtual_router_id 51    #路由器标识,MASTER和BACKUP必须是一致的
  13.   priority 100            #定义优先级,数字越大,优先级越高,在同一个vrrp_instance下,MASTER的优先级必须大于BACKUP的优先级。这样MASTER故障恢复后,就可以将VIP资源再次抢回来
  14.   authentication {
  15.       auth_type PASS
  16.       auth_pass 1111
  17.   }
  18.   virtual_ipaddress {
  19.        192.168.223.132
  20.   }
  21. }
  22. #访问VIP地址:http://192.168.223.131/mv.jpg可以看到两台机器上的美女即可

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

5.4 搭建Flume数据采集

129,130节点Flume安装略!

129,130节点Flume配置文件flume-conf.properties如下:

  1. ##主要作用是监听文件中的新增数据,采集到数据之后,输出到avro
  2. ##   注意:Flume agent的运行,主要就是配置source channel sink
  3. ## 下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
  4. #########################################################
  5. a1.sources = r1
  6. a1.sinks = k1
  7. a1.channels = c1
  8. #对于source的配置描述 监听文件中的新增数据 exec
  9. a1.sources.r1.type = exec
  10. a1.sources.r1.command  = tail -f /usr/local/nginx/logs/access.log
  11. #对于sink的配置描述 使用kafka做数据的消费
  12. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  13. #需要提前创建该topic
  14. a1.sinks.k1.topic = flume-kafka-flink-redis
  15. a1.sinks.k1.brokerList = ydt1:9092,ydt2:9092,ydt3:9092
  16. a1.sinks.k1.requiredAcks = 1
  17. a1.sinks.k1.batchSize = 1
  18. #对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
  19. a1.channels.c1.type = file
  20. a1.channels.c1.checkpointDir = /usr/local/flume-1.9.0/checkpoint
  21. a1.channels.c1.dataDirs = /usr/local/flume-1.9.0/data
  22. #通过channel c1将source r1和sink k1关联起来
  23. a1.sources.r1.channels = c1
  24. a1.sinks.k1.channel = c1
  1. #启动Flume Agent,对日志文件进行监听
  2. ./bin/flume-ng agent --conf conf -n a1 -f conf/flume-conf.properties >/dev/null 2>&1 &

 

5.5 部署Kafka集群

Kafka集群部署(略,参照之前课程),注意需要先启动zookeeper集群,再启动kafka集群(如果kafka集群结构有变化,需要删除元数据)

启动成功:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

创建Flume下沉节点(kafka主题):

  1. #创建kafka主题
  2. [root@ydt1 kafka_2.12-2.5.0]# ./bin/kafka-topics.sh --bootstrap-server ydt1:9092 --create --topic flume-kafka-flink-redis
  3. OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
  4. Created topic flume-kafka-flink-redis.
  5. #启动kafka消费者,消费flume-kafka-flink-redis数据
  6. ./bin/kafka-console-consumer.sh --bootstrap-server ydt1:9092 --topic flume-kafka-flink-redis --from-beginning

可以看到原始数据采集到了kafka集群:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

5.6 Flink集群搭建

Flink可在所有类UNIX环境中运行,例如Linux,Mac OS X和Cygwin(适用于Windows),并期望集群由一个主节点和一个或多个工作节点组成。在开始设置系统之前,请确保在每个节点上安装了以下软件:

  • Java 1.8.x或更高版本,

  • ssh(必须运行sshd才能使用管理远程组件的Flink脚本)

    如果您的群集不满足这些软件要求,则需要安装/升级它。

    在所有群集节点上使用无密码SSH和相同的目录结构将允许使用我们的脚本来控制所有内容。

  • 无密码SSH配置

    1. #注意:以下master,worker1,worker2分别代表flink集群中128,129,130节点所在的服务器
    2. #1. 过程为对每个节点,生成密钥对,然后将生成的所有公钥都追加 authorized_keys 文件中,再将authorized_keys文件放到每个节点 ~/.ssh/ 下
    3. #2. 在每个节点上生成密钥对,一路回车,生成密钥对:id_rsa 和 d_rsa.pub,默认存储在 ~/.ssh 下:
    4. # 例:在master节点上
    5. # 生成密钥对
    6. [root@ydt1 ~]$ ssh-keygen -t rsa -P ''
    7. # 将 id_rsa.pub 追加到授权的key中
    8. [root@ydt1 ~]$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
    9. #3. 每个节点修改配置文件 /etc/ssh/sshd_config
    10. [root@ydt1 ~]# vim /etc/ssh/sshd_config
    11. #修改内容如下:
    12. RSAAuthentication yes # 启用 RSA 认证
    13. PubkeyAuthentication yes # 启用公钥私钥配对认证方式
    14. AuthorizedKeysFile .ssh/authorized_keys # 公钥文件路径
    15. #4. 使每个 authorized_keys 包含所有公钥
    16. # 在master节点上
    17. # 复制 authorized_keys 到 worker1,会提示输入worker1的密码,下面的 root 为用户名,三台节点均有相同的用户名 root
    18. [root@ydt1 ~]$ scp ~/.ssh/authorized_keys root@ydt2:~/.ssh
    19. #然后切换到 worker1,追加master的公钥到worker1的authorized_keys,如下:
    20. # 在 worker1 上,授予目录访问权限
    21. [root@ydt2 ~]$ chmod 700 ~/.ssh
    22. #此时authorized_keys包含master和worker1的公钥,将该authorized_keys复制到worker2,追加worker2的公钥
    23. # 在 worker1 上远程拷贝到worker2
    24. [root@ydt2 ~]$ scp ~/.ssh/authorized_keys root@ydt3:~/.ssh
    25. #注:若果还有节点,以此类推,一直到最后一个节点,最后节点上的authorized_keys就拥有所有节点的公钥,然后再把该authorized_keys传到其他所有节点对应位置
    26. # 切换到worker2 上将授权Key回传到worker1和master
    27. [root@ydt3 ~]$ scp ~/.ssh/authorized_keys root@ydt2:~/.ssh/authorized_keys
    28. [root@ydt3 ~]$ scp ~/.ssh/authorized_keys root@ydt1:~/.ssh/authorized_keys
    29. #另外,将所有节点的authorized_keys改一下权限 !!!非常重要!!!
    30. chmod 600 ~/.ssh/authorized_keys
    31. #测试
    32. #需要先重启sshd服务,不行就重启服务器试试
    33. service sshd restart
    34. #在命令行直接输入 ssh ydt2(服务名) 看是否不需要输入密码就能登陆
    35. # 在master节点上
    36. [root@ydt1 flink-1.11.1]# ssh ydt2
    37. Last login: Wed Aug 26 22:21:24 2020 from ydt1
    38. [root@ydt2 ~]#
    39. #说明master节点可以直接操作worker1,worker2两个节点flink了

我们只使用Standalone 模式集群

5.6.1 主节点128上修改

  1. /usr/local/flink-1.11.1
  2. #master配置
  3. vim conf/masters  #输入
  4. ---------------------------------------------------------------------
  5. ydt1:8081
  6. #slaves配置
  7. vim conf/workers  #输入
  8. ---------------------------------------------------------------------
  9. ydt2
  10. ydt3
  11. #wq保存退出
  12. #flink-conf.yaml配置
  13. vim conf/flink-conf.yaml #输入
  14. ---------------------------------------------------------------------
  15. #每个TaskManager提供的任务槽数。每个插槽运行一个并行管道,有两个从节点,配置两个插槽
  16. taskmanager.numberOfTaskSlots: 2
  17. jobmanager.rpc.address: ydt1

5.6.2 拷贝安装包

scp将安装包复制到129,130节点

  1. scp -r flink-1.11.1/ ydt2:/usr/local/ #弄到129
  2. scp -r flink-1.11.1/ ydt3:/usr/local/ #弄到130

5.6.3 配置环境变量

配置所有节点Flink的环境变量

  1. # vim /etc/profile 增加以下配置
  2. export FLINK_HOME=/usr/local/flink-1.11.1
  3. export PATH=$PATH:$FLINK_HOME/bin
  4. #立马生效
  5. source /etc/profile

5.6.4 启动Flink集群

  1. #master节点启动
  2. [root@ydt1 flink-1.11.1]# ./bin/start-cluster.sh
  3. Starting cluster.
  4. Starting standalonesession daemon on host ydt1.
  5. Starting taskexecutor daemon on host ydt2.
  6. Starting taskexecutor daemon on host ydt3.

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

5.7 Kafka+Flink整合

该章节主要是将Kafka中的byte[]数据转换为对象,所以不能使用之前Flink提供的简单类型的schema了,需要自己定义一个转换类:

5.7.1 转换的实体对象

  1. package com.ydt.flinkkafka;
  2. import java.io.Serializable;
  3. public class MyAccess implements Serializable {
  4.    private String ip;
  5.    private String browser;
  6.    private String date;
  7.    private String status;
  8.    public MyAccess(String ip, String browser, String date, String status) {
  9.        this.ip = ip;
  10.        this.browser = browser;
  11.        this.date = date;
  12.        this.status = status;
  13.   }
  14.    public String getIp() {
  15.        return ip;
  16.   }
  17.    public void setIp(String ip) {
  18.        this.ip = ip;
  19.   }
  20.    public String getBrowser() {
  21.        return browser;
  22.   }
  23.    public void setBrowser(String browser) {
  24.        this.browser = browser;
  25.   }
  26.    public String getDate() {
  27.        return date;
  28.   }
  29.    public void setDate(String date) {
  30.        this.date = date;
  31.   }
  32.    public String getStatus() {
  33.        return status;
  34.   }
  35.    public void setStatus(String status) {
  36.        this.status = status;
  37.   }
  38.    @Override
  39.    public String toString() {
  40.        return "MyAccess{" +
  41.                "ip='" + ip + '\'' +
  42.                ", browser='" + browser + '\'' +
  43.                ", date='" + date + '\'' +
  44.                ", status='" + status + '\'' +
  45.                '}';
  46.   }
  47. }

5.7.2 自定义转换类

  1. package com.ydt.flinkkafka;
  2. import org.apache.flink.api.common.serialization.DeserializationSchema;
  3. import org.apache.flink.api.common.typeinfo.TypeInformation;
  4. import org.apache.flink.api.java.typeutils.TypeExtractor;
  5. import java.io.IOException;
  6. import java.nio.ByteBuffer;
  7. import java.nio.ByteOrder;
  8. import java.nio.CharBuffer;
  9. import java.nio.charset.Charset;
  10. import java.nio.charset.CharsetDecoder;
  11. public class ConsumerDeserializationSchema implements DeserializationSchema<MyAccess> {
  12.    private Class<MyAccess> clazz;
  13.    public ConsumerDeserializationSchema(Class<MyAccess> clazz) {
  14.        this.clazz = clazz;
  15.   }
  16.    @Override
  17.    public MyAccess deserialize(byte[] message) throws IOException {
  18.        ByteBuffer buffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN);
  19.        String mess = byteBuffertoString(buffer);
  20.        //封装为POJO类
  21.        String[] split = mess.split("~");
  22.        MyAccess myAccess = new MyAccess(split[0],split[4],split[1],split[3]);
  23.        return myAccess;
  24.   }
  25.    public static String byteBuffertoString(ByteBuffer buffer) {
  26.        Charset charset = null;
  27.        CharsetDecoder decoder = null;
  28.        CharBuffer charBuffer = null;
  29.        try {
  30.            charset = Charset.forName("UTF-8");
  31.            decoder = charset.newDecoder();
  32.            // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空
  33.            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
  34.            return charBuffer.toString();
  35.       } catch (Exception ex) {
  36.            ex.printStackTrace();
  37.            return "";
  38.       }
  39.   }
  40.    @Override
  41.    public boolean isEndOfStream(MyAccess myAccess) {
  42.        return false;
  43.   }
  44.    @Override
  45.    public TypeInformation<MyAccess> getProducedType() {
  46.        return TypeExtractor.getForClass(clazz);
  47.   }
  48. }

5.7.3 消费者处理类

  1. package com.ydt.flinkkafka;
  2. import org.apache.flink.streaming.api.TimeCharacteristic;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  5. import java.text.SimpleDateFormat;
  6. import java.util.Properties;
  7. public class FlinkKafka {
  8.    public static void main(String[] args) throws Exception {
  9.        try {
  10.            // 获取上下文环境StreamExecutionEnvironment对象
  11.            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.            env.enableCheckpointing(5000); // 要设置启动检查点
  13.            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置事件触发时写入流
  14.            // 配置kafka的ip和端口,以及消费者组
  15.            Properties properties = new Properties();
  16.            properties.setProperty("bootstrap.servers", "ydt1:9092");
  17.            properties.setProperty("group.id", "kafka-flink");
  18.            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");//注意月份是MM
  19.            FlinkKafkaConsumer<MyAccess> consumer
  20.                    = new FlinkKafkaConsumer<MyAccess>("flume-kafka-flink-redis"
  21.                   , new ConsumerDeserializationSchema(MyAccess.class), properties);
  22.            //将消费者数据对象加入到上下文环境StreamExecutionEnvironment对象中,并生成DataStream对象;
  23.            env.addSource(consumer).print();
  24.            //设置job名称
  25.            env.execute("consumer from kafka data");
  26.       } catch (Exception e) {
  27.            e.printStackTrace();
  28.       }
  29.   }
  30. }

5.7.4 打包上传到Flink

打包上窜到Flink,并启动任务

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

5.8 Flink+Redis整合

在很多大数据场景下,要求数据形成数据流的形式进行计算和存储。上面介绍了Flink消费Kafka数据实现对象转换,该章节需要完成的是将实时计算的结果写到redis。当kafka从其他端获取数据立刻到Flink计算,Flink计算完后结果写到Redis,整个过程就像流水一样形成了数据流的处理

5.8.1 增加POM依赖

  1. <dependency>
  2.            <groupId>org.apache.flink</groupId>
  3.            <artifactId>flink-connector-redis_2.10</artifactId>
  4.            <version>1.1.5</version>
  5.        </dependency>

5.8.2 修改消费者处理类

增加flink sink代码,将数据实时刷入redis

  1. package com.ydt.flinkkafka;
  2. import org.apache.flink.streaming.api.TimeCharacteristic;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  6. import org.apache.flink.streaming.connectors.redis.RedisSink;
  7. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  8. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  9. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  10. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  11. import redis.clients.jedis.Jedis;
  12. import java.text.DateFormat;
  13. import java.text.SimpleDateFormat;
  14. import java.util.Date;
  15. import java.util.Properties;
  16. public class FlinkKafka {
  17.    private static Jedis jedis = new Jedis("ydt1",6379);
  18.    private static DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
  19.    public static void main(String[] args) throws Exception {
  20.        try {
  21.            // 获取上下文环境StreamExecutionEnvironment对象
  22.            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23.            // 为了打印到控制台的结果不乱序,我们配置全局的并发为1,这里改变并发对结果正确性没有影响
  24.            env.setParallelism(1);
  25.            /*ProcessingTime:事件被处理的时间。也就是由机器的系统时间来决定。(默认)
  26.            EventTime:事件发生的时间。一般就是数据本身携带的时间。*/
  27.            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  28.            // 配置kafka的ip和端口,以及消费者组
  29.            Properties properties = new Properties();
  30.            properties.setProperty("bootstrap.servers", "ydt1:9092");
  31.            properties.setProperty("group.id", "kafka-flink");
  32.            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");//注意月份是MM
  33.            FlinkKafkaConsumer<MyAccess> consumer
  34.                    = new FlinkKafkaConsumer<MyAccess>("flume-kafka-flink-redis"
  35.                   , new ConsumerDeserializationSchema(MyAccess.class), properties);
  36.            //将消费者数据对象加入到上下文环境StreamExecutionEnvironment对象中,并生成DataStream对象;
  37.            DataStreamSource<MyAccess> streamSource = env.addSource(consumer);
  38.            streamSource.print();
  39.            //实例化Flink和Redis关联类FlinkJedisPoolConfig,设置Redis端口
  40.            FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
  41.                   .setHost("192.168.223.128")
  42.                   .setPort(6379)
  43.                   .build();
  44.            //实例化RedisSink,并通过flink的addSink的方式将flink转换的结果插入到redis
  45.            //ip
  46.            streamSource.addSink(new RedisSink(conf,new RedisExampleMapperIp()));
  47.            //浏览器
  48.            streamSource.addSink(new RedisSink(conf,new RedisExampleMapperBrowser()));
  49.            //日期
  50.            streamSource.addSink(new RedisSink(conf,new RedisExampleMapperDate()));
  51.            //设置job名称
  52.            env.execute("consumer from kafka data");
  53.       } catch (Exception e) {
  54.            e.printStackTrace();
  55.       }
  56.   }
  57.    //指定Redis key并将flink数据类型映射到Redis数据类型
  58.    public static final class RedisExampleMapperIp implements RedisMapper<MyAccess>{
  59.        @Override
  60.        public RedisCommandDescription getCommandDescription() {
  61.            return new RedisCommandDescription(RedisCommand.HSET,"my_access_ip");
  62.       }
  63.        @Override
  64.        public String getKeyFromData(MyAccess myAccess) {
  65.            return myAccess.getIp();
  66.       }
  67.        @Override
  68.        public String getValueFromData(MyAccess myAccess) {
  69.            String value = jedis.hget("my_access_ip", myAccess.getIp());
  70.            if(value == null){
  71.                return 1+"";
  72.           }
  73.            return String.valueOf(Integer.valueOf(value)+1);
  74.       }
  75.   }
  76.    //指定Redis key并将flink数据类型映射到Redis数据类型
  77.    public static final class RedisExampleMapperBrowser implements RedisMapper<MyAccess>{
  78.        @Override
  79.        public RedisCommandDescription getCommandDescription() {
  80.            return new RedisCommandDescription(RedisCommand.HSET,"my_access_browser");
  81.       }
  82.        @Override
  83.        public String getKeyFromData(MyAccess myAccess) {
  84.            return getBrowserType(myAccess.getBrowser());
  85.       }
  86.        @Override
  87.        public String getValueFromData(MyAccess myAccess) {
  88.            String browserType = getBrowserType(myAccess.getBrowser());
  89.            String value = jedis.hget("my_access_browser", browserType);
  90.            if(value == null){
  91.                return 1+"";
  92.           }
  93.            return String.valueOf(Integer.valueOf(value)+1);
  94.       }
  95.   }
  96.    public static String getBrowserType(String browser){
  97.        if(browser.indexOf("AppleWebKit") != -1){
  98.            return "Google";
  99.       }else if(browser.indexOf("QQBrowser") != -1){
  100.            return "QQ";
  101.       }else if(browser.indexOf("Trident") != -1){
  102.            return "IE";
  103.       }else if(browser.indexOf("Firefox") != -1){
  104.            return "Firefox";
  105.       }
  106.        return "other";
  107.   }
  108.    //指定Redis key并将flink数据类型映射到Redis数据类型
  109.    public static final class RedisExampleMapperDate implements RedisMapper<MyAccess>{
  110.        @Override
  111.        public RedisCommandDescription getCommandDescription() {
  112.            return new RedisCommandDescription(RedisCommand.HSET,"my_access_date");
  113.       }
  114.        @Override
  115.        public String getKeyFromData(MyAccess myAccess) {
  116.            String format = FlinkKafka.format.format(new Date());
  117.            return format;
  118.       }
  119.        @Override
  120.        public String getValueFromData(MyAccess myAccess) {
  121.            String format = FlinkKafka.format.format(new Date());
  122.            String value = jedis.hget("my_access_date", format);
  123.            if(value == null){
  124.                return 1+"";
  125.           }
  126.            return String.valueOf(Integer.valueOf(value)+1);
  127.       }
  128.   }
  129. }

5.8.3 打包上传测试

切换浏览器统计结果如下:

20210304161616141.png

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/497349
推荐阅读
相关标签
  

闽ICP备14008679号