赞
踩
目标:实现FineBI访问MySQL结果数据集的配置
实施
安装FineBI
参考《FineBI Windows版本安装手册.docx》安装FineBI
配置连接
数据连接名称:Momo
用户名:root
密码:自己MySQL的密码
数据连接URL:jdbc:mysql://node1:3306/momo?useUnicode=true&characterEncoding=utf8
数据准备
SELECT
id, momo_totalcount,momo_province,momo_username,momo_msgcount,
CASE momo_grouptype WHEN '1' THEN '总消息量' WHEN '2' THEN '各省份发送量' WHEN '3' THEN '各省份接收量'
WHEN '4' THEN '各用户发送量' WHEN '5' THEN '各用户接收量' END AS momo_grouptype
FROM momo_count
小结
目标:实现FineBI实时报表构建
路径
实施
实时报表构建
新建仪表盘
添加标题
实时总消息数
发送消息最多的Top10用户
接受消息最多的Top10用户
各省份发送消息Top10
各省份接收消息Top10
各省份总消息量
小结
目标:实现实时报表测试
实施
实时报表配置
官方文档:https://help.fanruan.com/finebi/doc-view-363.html
添加jar包:将jar包放入FineBI安装目录的 webapps\webroot\WEB-INF\lib目录下
添加JS文件
创建js文件:refresh.js
setTimeout(function () {
var b =document.title;
var a =BI.designConfigure.reportId;//获取仪表板id
//这里要指定自己仪表盘的id
if (a=="d574631848bd4e33acae54f986d34e69") {
setInterval(function () {
BI.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());
//Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());
BI.Utils.broadcastAllWidgets2Refresh(true);
}, 3000);//5000000为定时刷新的频率,单位ms
}
}, 2000)
将创建好的refresh.js文件放至 FineBI 安装目录%FineBI%/webapps/webroot中
关闭FineBI缓存,然后关闭FineBI
修改jar包,添加js
<!-- 增加刷新功能 -->
<script type="text/javascript" src="/webroot/refresh.js"></script>
重启FineBI
实时刷新测试
清空MySQL结果表
启动Flink程序:运行MoMoFlinkCount
启动Flume程序
cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
启动模拟数据
java -jar /export/data/momo_init/MoMo_DataGen.jar \
/export/data/momo_init/MoMo_Data.xlsx \
/export/data/momo_data/ \
10
- 观察报表
小结
## 附录一:Maven依赖 ```xml <!--远程仓库--> <repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <releases><enabled>true</enabled></releases> <snapshots> <enabled>false</enabled> <updatePolicy>never</updatePolicy> </snapshots> </repository> </repositories> <dependencies> <!--Hbase 客户端--> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.0</version> </dependency> <!--kafka 客户端--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> <!--JSON解析工具包--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <!--Flink依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>1.10.0</version> </dependency> <!-- flink操作hdfs、Kafka、MySQL、Redis,所需要导入该包--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>2.7.5-10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency> <!--HTTP请求的的依赖--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.4</version> </dependency> <!--MySQL连接驱动--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <target>1.8</target> <source>1.8</source> </configuration> </plugin> </plugins> </build>
package bigdata.itcast.cn.momo.offline; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.MD5Hash; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.*; /** * @ClassName MomoKafkaToHbase * @Description TODO 离线场景:消费Kafka的数据写入Hbase * @Create By Maynor */ public class MomoKafkaToHbase { private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static Connection conn; private static Table table; private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名 private static byte[] family = Bytes.toBytes("C1");//列族 //todo:2-构建Hbase连接 //静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能 static{ try { //构建配置对象 Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); //构建连接 conn = ConnectionFactory.createConnection(conf); //获取表对象 table = conn.getTable(tableName); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { //todo:1-构建消费者,获取数据 consumerKafkaToHbase(); // String momoRowkey = getMomoRowkey("2020-08-13 12:30:00", "13071949728", "17719988692"); // System.out.println(momoRowkey); } /** * 用于消费Kafka的数据,将合法数据写入Hbase */ private static void consumerKafkaToHbase() throws Exception { //构建配置对象 Properties props = new Properties(); //指定服务端地址 props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); //指定消费者组的id props.setProperty("group.id", "momo1"); //关闭自动提交 props.setProperty("enable.auto.commit", "false"); //指定K和V反序列化的类型 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //构建消费者的连接 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //指定订阅哪些Topic consumer.subscribe(Arrays.asList("MOMO_MSG")); //持续拉取数据 while (true) { //向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间 //拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); //todo:3-处理拉取到的数据:打印 //取出每个分区的数据进行处理 Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区 //对每个分区的数据做处理 for (TopicPartition partition : partitions) { List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据 //处理这个分区的数据 long offset = 0; for (ConsumerRecord<String, String> record : partRecords) { //获取Topic String topic = record.topic(); //获取分区 int part = record.partition(); //获取offset offset = record.offset(); //获取Key String key = record.key(); //获取Value String value = record.value(); System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value); //将Value数据写入Hbase if(value != null && !"".equals(value) && value.split("\001").length == 20 ){ writeToHbase(value); } } //手动提交分区的commit offset Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1)); consumer.commitSync(offsets); } } } /** * 用于实现具体的写入Hbase的方法 * @param value */ private static void writeToHbase(String value) throws Exception { //todo:3-写入Hbase //切分数据 String[] items = value.split("\001"); String stime = items[0]; String sender_accounter = items[2]; String receiver_accounter = items[11]; //构建rowkey String rowkey = getMomoRowkey(stime,sender_accounter,receiver_accounter); //构建Put Put put = new Put(Bytes.toBytes(rowkey)); //添加列 put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19])); //执行写入 table.put(put); } /** * 基于消息时间、发送人id、接受人id构建rowkey * @param stime * @param sender_accounter * @param receiver_accounter * @return * @throws Exception */ private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception { //转换时间戳 long time = format.parse(stime).getTime(); String suffix = sender_accounter+"_"+receiver_accounter+"_"+time; //构建MD5 String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8); //合并返回 return prefix+"_"+suffix; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。