赞
踩
本系统主要使用了Flume、kafka、Spark、Mysql数据库、python的Django框架以及echarrts来实现整个功能
本系统按照实现功能不同,分为两个模块,一个是获取及处理流数据模块,一个是实时可视化模块
对于获取及处理流数据模块,本系统利用Flume监听一台虚拟机上的目录,将目录中的数据传到Kafka的某一Topic中,而后利用SparkStreaming来作为这一Topic的消费者,来实时分析数据并将分析统计结果存入到Mysql数据库中
而实时可视化模块则用Django框架固定时间间隔从数据库中取出所需数据,结合echarts画出相应的图,最后展示在页面上,实现固定时间间隔的页面刷新以及近似实时可视化
系统的框架大致如下,由于没有数据源,所以此处我自己编写了一个程序来模拟产生数据
spooldir.conf
#Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 #Describe/configue the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/local/flume/data a1.sources.r1.fileHeader = true #Describe the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink #设置kafka的主题topic a1.sinks.k1.topic = sparkstreaming_1 #设置消费者编码为UTF-8 a1.sinks.k1.custom.encoding=UTF-8 #绑定kafka主机以及端口号 a1.sinks.k1.kafka.bootstrap.servers = xy8:9092,xy9:9092,xy10:9092 #设置kafka序列化方式 a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder #use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动Flume任务时,需要指定相应的conf文件,来确定Flume的source(从哪里取数据)、sink(数据发送到哪里)
此处的这个conf文件设定source为spooidir类型并指定监听的spooDir路径;指定sink为kafka并指定相应的Topic和kafka主机及端口号等信息
此处设定的spoolDir因为没有,所以需要提前创建,而kafka对于sparkstreaming_1这一topic会在kafka配置部分进行创建
在spooldir.conf中,我指定了这个conf对应的任务的sink为kafka的sparkstreaming_1这一Topic
对于kafka部分,需要在3台虚拟机上启动kafka(首先保证zookeeper都已经启动)
./bin/kafka-server-start.sh -daemon config/server.properties
然后再创建sparkstreaming_1这一topic
./kafka-topics.sh --create --zookeeper xy8:2181,xy9:2181,xy10:2181 --replication-factor 1 --partitions 1 --topic sparkstreaming_1
Django部分,主要就是数据库的迁移部分,首先在models.py中指定相应的表结构,并在setting.py中指定数据库的相关信息
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'online',
'USER': 'root',
'PASSWORD': 'xuyang',
'HOST': '127.0.0.1',
'PORT': 3306,
}
}
models.py中设定数据库的代码如下所示
class Province_time(models.Model): time_province = models.CharField(max_length=64, primary_key=True) time = models.CharField(max_length=64) province = models.CharField(max_length=64) num = models.IntegerField(default=1) def __str__(self): return self.time_province + ":" + self.time + ";" + self.province + ";" + str(self.num) def inf_time(self): return self.time class Machine_time(models.Model): time_machine = models.CharField(max_length=64, primary_key=True) time = models.CharField(max_length=64) machine = models.CharField(max_length=64) num = models.IntegerField(default=1) def __str__(self): return self.time_machine + ":" + self.time + ";" + self.machine + ";" + str(self.num) def inf_machine(self): return self.machine def inf_num(self): return self.num class Anchor_time(models.Model): time_anchor = models.CharField(max_length=64, primary_key=True) time = models.CharField(max_length=64) anchor = models.CharField(max_length=64) num = models.IntegerField(default=1) def __str__(self): return self.time_anchor + ":" + self.time + ";" + self.anchor + ";" + str(self.num) def inf_anchor(self): return self.anchor def inf_num(self): return self.num class Anchor_type_time(models.Model): time_anchor_type = models.CharField(max_length=64, primary_key=True) time = models.CharField(max_length=64) anchor_type = models.CharField(max_length=64)<
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。