赞
踩
## HADOOP 安装步骤
````
1. tar -zxvf hadoop_2.7.1 -- 解压hadoop
2. pwd -- 查看当前路径
3. ln -s hadoop_2.7.1 hadoop -- 创建软连接
4. vi ~/.bashrc -- 设置环境变量
5. 编辑并保存 wq | shift ZZ
export HADOOP_HOME='hadoop的安装位置'
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
6. source /.bashrc -- 环境变量生效
7. -- 校验环境变量是否配置成功
whereis hdfs && whereis start-all.sh
8. hadoop 的目录结构
bin -- 存放操作命令
etc/hadoop 存放所有的配置文件
lib -- 本地库 java 程序员都懂
logs -- 日志
sbin -- 集群的命令,如启动停止
share /doc|hadoop 所有依赖的jar包
````
## hadoop 单机模式安装
```
vi hadoop-env.sh -- 编辑配置
export JAVA_HOME='jdk 安装路径'
```
## hadoop 伪分布式安装
```
1. hostname -- 查看主机名称
2. vi /etc/hostname -- 修改主机名称
3. reboot -- 重启生效
4. vi /ect/hosts -- 编辑ip [ip地址 192.168.1.101 node1]
5. 免密登录设置
1) ssh-keygen -t rsa -- 生成免密登录
2) ll ~/.ssh/ -- 查看免密文件位置
3) ssh-copy-id -i ~/.ssh/id_rsa.pub node1 -- 追加免密 node1 为自己的主机名
6\. ssh node1 -- 免密登录验证
7. 设置hadoop配置文件 hadoop-env.sh / core-site.xml / hdfs-site.xml / mapred-site.xml / yarn-site.xml
1) cd ${HADOOP_HOME}/etc/hadoop 进入配置文件目录
2) vi hadoop-env.sh 设置hadoop-env.sh 与单机一样 配置jdk路径
3) vi core-site.xml 配置core-site.xml 文件
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://node1:9000</value> -- ip 按照实际情况更改
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/hadoop/tmp</value>
</property>
4) vi hdfs-site.xml
<property>
<name>dfs.replication</name>
<value>1</value>
-- 默认为3 默认伪分布式只有一个节点所以改为1
</property>
// 允许rest方式访问
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
5) cp mapred.site.xml.tmplate mapred.site.xm -- 复制文件
vi mapred.site.xm
<property>
<name>mapred.job.tracker</name>
<value>node1:9001</value>
</property>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
6) vi yarn-site.xml
<property>
<name>yarn.resourcemanager.hostname</name>
<value>haizhuangdeMacBook-Pro.local</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
7) hdfs namenode -format -- 格式化 hdfs 只需格式化一次 下次启动再次格式化会丢失dataNode
8) 启动hadoop
start-dfs.sh -- 启动hdfs
start-yarn.sh -- 启动yarn
start-all.sh 启动所有
stop-all.sh 停止所有
jps 查看是否启动
9) 浏览器 查看hadoop http:ip:50070 namenode datanode 信息 50090 查看 secondNamenode信息 8088 查看集群所有的应用信息
10) 开启日志记录功能
vim yarn-site.xml
<property>
<name>yarn.resourcemanager.hostname</name>
<value>haizhuangdeMacBook-Pro.local</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 开启日志聚集功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 配置日志查看地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://haizhuangdeMacBook-Pro.local:19888/jobhistory/logs</value>
</property>
<!-- 配置保存时长 -->
<property>
<name>yarn.log.server.retain</name>
<value>604800</value>
</property>
```
## 安装完全分布式
```
1\. node1 node2 node3 同分布式一样 修改hostname 文件 reboot 生效
2. xxx.xxx.x..xxx node1 添加hosts 映射 将三台机器都加进来
3. 免密登录设置
1) ssh-keygen -t rsa 在node1 生成密钥
2) 将node1 的公钥 复制到 node1 node2 node3 的主机上
ssh-copy-id -i ~/.ssh/id_rsa.pub node1
ssh-copy-id -i ~/.ssh/id_rsa.pub node2
ssh-copy-id -i ~/.ssh/id_rsa.pub node3
4. 安装ntp 防止分布式服务器时间不同步
yum install ntp
5. 配置文件同伪分布式一样 hdfs-site.xml 不同 及 salves
vi hdfs-site.xml hdfs.replication ---- value 为2 节点数
vi slavers 将原有内容删除 添加 node2 node3
6\. 分发配置
cd ~/hadoop/etc
scp -r hadoop root@node2:~/hadoop/etc/
scp -r hadoop root@node3:~/hadoop/etc/
7\. 格式化 hdfs namenode -format
```
## hdfs命令
```
hadoop fs -help -- 查看帮助
hadoop fs moveFromLocal '文件名' '上传路径名称' --将文件上传到某个目录下
hadoop fs copyFromLocal 'xxx.txt ' 'copy路径' -- 将文件copy到目录下
Hadoop fs -rm '文件路径'。-- 删除文件
Hadoop fs -rm -r '文件路径下的所有文件'。 -- 删除某个文件夹下的所有文件
Hadoop fs -du -s -h '文件路径'。 -- 查看文件夹下的所有文件及文件夹大小
Hadoop fs -setrep 10 '文件'。-- 设置副本 一台机器只会存储一个副本
hdfs dfs -ls / -- 查看当前文件系统下的所有数据
hdfs dfs -put '文件' /路径 -- 上传文件
hdfs dfs -mkdir '目录' -- 创建目录
hdfs dfs -cat '文件路径' | head -- 查看文件
hdfs dfs -get '文件路径' -- 下载文件
hdfs dfs -rmdir '文件路径' -- 删除文件
hdfs getconf -namenodes -- 获取hdfs 路径
hdfs dfs -cat '文件路径' | wc -l -- 查看指定文件的大小
```
## java - api操作hadoop
```
FIleSystem
get(COnfiguration conf) -- 根据配置获取实例
get(URI uri,Configuration conf) -- 根据URI的模式和权限获取实例
get(Configuration conf,URI uri ,String user) -- 根据uri,配置和用户获取filesystem实例
###获取输入流
FSDataInputStream
open(Path path) -- 在指定路径上打开FSDataInputStream
### 创建输出流
FSDataOutputStream
create(FileSystem fs,Path file,FsPermission fs) -- 制定一个路径和权限创建一个文件,并返回FSDataOutputStream
create(Path path) -- 指定路径创建文件并返回 fsDataOutputSream
create(Path path,boolean overwrite) -- 指定路径创建一个文件,overwrite 是否覆盖源文件 并返回FSDataOutputStream
create(Path path,boolean overwrite,int buffersize) -- 指定路径创建一个文件,buffersize 表示缓存区大小,并返回FSDataOutputStream
## 创建目录
mkdirs(FileSystem fs,Path dir,FsPermission permisson) -- 使用提供的权限创建目录
mkdir(Path path) -- 使用默认的权限来调用mkdirs(Path ,FsPermission)接口
## 删除文件
delete(Path path) -- 删除文件
delete(Path path,boolean b) -- 删除文件,b表示是否递归
## 列出子目录或子文件
isStatus(Path p) -- 如果路径是一个目录,列出他子文件或者子目录的状态信息
## 设置文件拓展属性
setXAttr(Path p,String name,byte[] val) -- 设置文件或者目录的拓展属性
## 获取文件拓展属性
getXAttr(Path path,String name) -- 传入属性名称,获取文件或目录中扩展属性的值
```
main
package com.bu.sec_o.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class MyMapredurce {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1. 处理输入和输出路径
if (args == null || args.length != 2) {
System.out.println("Usage: yarn wc.jar com/bu/sec_o/hadoop/MyMapredurce <input Path> <output path>");
System.exit(1);
}
// 2. 创建配置文件的对象
Configuration conf = new Configuration(true);
// 3 . 创建Job对象
Job job = Job.getInstance(conf);
// 4. 设置Job作业的名称
job.setJobName("单词统计");
// 5。 设置jar的入口类
job.setJarByClass(MyMapredurce.class);
// 6. 设置文件输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 7. 设置文件输出路径
// 7.1 创建文件输出的对应Path对象
Path outPath = new Path(args[1]);
FileSystem fileSystem = outPath.getFileSystem(conf);
// 7.2 判断输出路径是否存在,存在则删除
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
}
// 指定用到的Mapper类
job.setMapperClass(null);
// 指定map输出的key的类型
job.setMapOutputKeyClass(Text.class);
// 指定map输出的value的类型
job.setMapOutputValueClass(IntWritable.class);
// 指定用到的reducer类
job.setReducerClass(null);
// 指定输出的reducer的key的类型
job.setOutputKeyClass(Text.class);
// 指定输出的reducer的value的类型
job.setOutputValueClass(IntWritable.class);
// 8. 提交job
job.waitForCompletion(true);
}
}
## 多数据Mapper。输入使用MultipleInputs.addInputPath();
## 多文件输出。MultipleOutputFormat
## MapReduce 数据处理. 分布式运算框架
```
mapper. ----- suffer ---- redurce
优点:
1. 易于编程。用户只关心,业务逻辑。实现框架接口
2. 良好的扩展性: 可以动态增加服务器,解决资源不够问题
3. 高容错性。任何一台服务器挂掉。可以将任务转移到其他节点
4. 适合海量数据的计算,几千台服务器共同计算
缺点:
1. 不擅长实时计算。
2. 不擅长流计算。
3. 不擅长DAG有向无环图计算。
输入类 继承 Mapper
重写map方法
输出类 继承Reducer
重写reducer方法
总执行类Driver
System.setProperty("hadoop.home.dir", "/Applications/tools/hadoop");
// 1. 获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2. 设置jar包路径
job.setJarByClass(WordCountDriver.class);
// 3. 关联map和reduce
job.setMapperClass(MineMapper.class);
job.setReducerClass(MineReducer.class);
// 4. 设置输出的的k v 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5. 最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 更改切片规则
job.setInputFormatClass(.class);
// 6. 设置输入路径和输出路径
FileInputFormat.setInputPaths(job,new Path("/user/a.txt"));
FileOutputFormat.setOutputPath(job,new Path("/usr/b.txt"));
// 7. 提交job true并打印源码
boolean result = job.waitForCompletion(true);
System.exit(result? 0 :1);
```
## YARN 资源管理,程序调度
```
yarn application -list --- 查看所有任务
yarn application -kill 'task _ id'. -- 杀死任务
yarn logs -applicationId 'application-id' -- 查询任务日志. --containerid 查看位置
yarn application attempt -status 'application_id' -- 查看任务状态
yarn container -list -- 查看容器
yarn container -status 'container_id ' -- 查看容器状态
yarn node -list -all -- 查看节点
yarn rmadmin -refreshQueues -- 更新配置
yarn queue -status default -- 查看队列
继承tools接口
package com.bu.dir;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import java.io.IOException;
public class t1 implements Tool {
private Configuration configuration;
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance();
job.setJarByClass(WorldCountDriver.class);
// map reducer
job.setMapperClass(MineMapper.class);
job.setReducerClass(MineReducer.class);
return 0;
}
@Override
public void setConf(Configuration configuration) {
this.configuration = configuration;
}
@Override
public Configuration getConf() {
return configuration;
}
// map
public static class MineMapper extends Mapper {
@Override
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException {
String s = value.toString();
String[] list = s.split(" ");
for (String s1 : list) {
}
}
}
// reducer
public static class MineReducer extends Reducer {
@Override
protected void reduce(Object key, Iterable values, Context context) throws IOException, InterruptedException {
}
}
}
编写启动类:
package com.bu.dir;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.util.Arrays;
public class WorldCountDriver {
private static Tool tool;
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
switch (args[0]) {
case "wordCount":
tool = new t1();
break;
default:
throw new RuntimeException("no such tools" + tool);
}
//
ToolRunner.run(configuration, tool, Arrays.copyOfRange(args, 1, args.length));
}
}
```
## hbase 安装
vi hbase-env.sh # 编辑配置
![image](https://upload-images.jianshu.io/upload_images/23995008-7280cfe137ad7b27.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![image](https://upload-images.jianshu.io/upload_images/23995008-6ac87ac6c1fe0b54.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
## vi hbase-site.xml
![image](https://upload-images.jianshu.io/upload_images/23995008-05d26a2974908adb.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
启动
./start-hbase.sh
## 概念
```
namespace: 类似关系数据库中的database
table:
-- rowkey 检索记录的主键 , 默认有序
-- column family
## hbase shell 命令
create ' 表名','列族' -- 创建表
put 插入
get 获取
scan 获取全部
disable '表名' --- drop '表名' 删除表之前需将表设置为禁用
```
### 错误总结
```
2021-04-28 19:25:15,861 DEBUG [main] client.RpcRetryingCallerImpl: Call exception, tries=6, retries=16, started=11286 ms ago, cancelled=false, msg=Call to node2/192.168.1.102:60020 failed on connection exception: org.apache.hbase.thirdparty.io.netty.chann
问题原因: 之前创建后之后在Zookeeper中注册,导致集群起不来
解决方法: 删除集群中 hbase/meta-register 节点
```
## java编程
````
## hbase shell '文件路径' 执行当前文件的所有命令
## 创建表 create "表名" ,"列簇",可以包含多个列簇
create "order_info","c1"
## 指定某个列簇的压缩算法
create "table_name",{NAME=>"列簇",COMPRESSION=>"GZ(压缩算法)"}
## 数据域分区
create "namespace:tablename",{NAME =>"c1",COMPRESSION=>"GZ"},{NUMREGIONS => 6,SPLITALGO => "HexStringSplit"}
## 查看表
list
## 删除表 disable "表名" 禁用表 drop "表名" 删除表
disable "order_info";
drop "order_info";
## 添加数据 put "表名","rowkey","列簇:列","value"
put "order_info","00002","c1:STATUS","已提交";
put "order_info","00002","c1:PAY_MONEY",4000
put "order_info","00002","c1:PAY_WAY",1
put "order_info","00002","c1:USER_ID","90822"
put "order_info","00002","c1:OPERATION_DATE","2020-04-25 12:09:11"
put "order_info","00002","c1:GATEGORY","手机";
## 查询数据 get "表名","rowkey"
get "order_info","00002";
## 将数据中的中文正确显示
get "order_info","00002",{FORMATTER => 'toString'};
## 更新指定列 put "table_name","rowkey","列簇:类","值"
put "order_info","00002","c1:STATUS","已完成";
## 删除整行 deleteall "order_info","rowkey"
deleteall "order_info","00002";
## 删除指定列 delete "table_name","rowkey","列簇:列"
delete "order_info","00002","c1:GATEGORY";
## 统计一个表中的所有数据 count "表名" 生产慎用
count "order_info"
## 查询一张表的所有数据 scan "表名"
scan "order_info"
## 只查询几条数据
scan "order_info",{LIMIT => 3}
## 只查询几个列
scan "order_info",{COLUMNS => 'c1:STATUS',FORMATTER=>'toString'}
## 查询指定rowkey的列
scan "order_info",{ROWPREFIXFILTER => '00002'}
## 过滤器 filter
## 查看所有过滤器 show_filter
## 查询指定rowkey 过滤 rowkey 的列 = 比较运算符 binary 二进制判断 COLUMNS 显示指定列
scan "order_info",{FILTER => "RowFilter(=,'binary:00002')",COLUMNS => 'c1:STATUS'}
## 过滤列值的过滤器 SingleColumnValueFilter 某个列过滤
scan "order_info",{FILTER => "SingleColumnValueFilter('c1','STATUS',=,'binary:已提交')"}
## 组合查询
scan "order_info",{FILTER => "SingleColumnValueFilter('c1','STATUS',=,'binary:已提交') AND SingleColumnValueFilter('c1','PAY_MONEY',>,'binary:3000') AND SingleColumnValueFilter('c1','USER_ID',>,'binary:0')"}
## 对某个列进行累加
create "info","c1"
incr "info","00001","c1:incr1",0
put "info","00001","c1:name","访问次数"
get_counter "info","00001",'c1:incr1'
## shell 管理命令
status 查看节点状态
whoami 当前操作的用户
describe "表名" 查看表结构
exists "表名" 查看表是否存在
is_enabled "表名" 查看当前表是否可用
enable "表名" 启用表
truncate "表名" 删除表里面的数据
## alter 更改表和列簇的模式
alter "表名","列簇" 新增列簇
alter "表名", "delete"=>"列簇" 删除列簇
## java 编程
## 1. 导入jar包
<!-- hbase java客户端 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
## 2. 测试用例
package com.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
// 建立hbase 连接
public class TableAdminTest {
private Connection connection;
private Admin admin;
@Before
public void beforeTest() throws IOException {
// 使用HbaseConfiguration.create 创建hbase配置文件
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://192.168.1.101:9000/HBase");
conf.set("hbase.zookeeper.quorum","192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181");
// conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
// 使用ConnectionFactory 创建hbase连接
connection = ConnectionFactory.createConnection(conf);
// 要创建表 , 需要基于hbase连接获取admin管理对象
admin = connection.getAdmin();
}
@Test
public void crateTableTest() throws IOException {
TableName tableName = TableName.valueOf("WATER_BALL");
if (admin.tableExists(tableName)) {
return;
}
// 获取 表描述器
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
// 获取列簇描述器
ColumnFamilyDescriptorBuilder columnFamily = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c1"));
ColumnFamilyDescriptor build = columnFamily.build();
tableDescriptorBuilder.setColumnFamily(build);
TableDescriptor tableDescriptor = tableDescriptorBuilder.build();
// 创建表
admin.createTable(tableDescriptor);
}
@After
public void afterTest() throws IOException {
admin.close();
connection.close();
}
}
## 插入数据
// 提交数据
@Test
public void exe() throws IOException {
admin = connection.getAdmin();
if (admin.tableExists(tableName)) {
Table table = connection.getTable(tableName);
// 构建rowkey
String rowKey = "55522";
// 构建列簇
String columnFamily = "c1";
// 构建列名
String columnName = "Name";
// 构建put对象
Put put = new Put(Bytes.toBytes(rowKey));
// 添加列族 列名 列值
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes("Mr.Bu"));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("ADDRESS"), Bytes.toBytes("山西太原"));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("SEX"), Bytes.toBytes("男"));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("PAY_DATE"), Bytes.toBytes("2020-05-11"));
// 提交 执行put操作
table.put(put);
// 关闭table 对象
table.close();
} else {
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
ColumnFamilyDescriptor c1 = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c1")).build();
tableDescriptorBuilder.setColumnFamily(c1);
admin.createTable(tableDescriptorBuilder.build());
exe();
}
}
## 获取数据
// 获取数据
@Test
public void get() throws IOException {
Table table = connection.getTable(tableName);
Get get = new Get(Bytes.toBytes("55522"));
Result result = table.get(get);
List<Cell> cells = result.listCells();
byte[] row = result.getRow();
System.out.println(Bytes.toString(row));
if (cells == null || cells.size() == 0 || cells.equals("null")) {
System.out.println("没有数据");
table.close();
return;
}
cells.forEach(i -> {
// 获取列簇名称
System.out.println(Bytes.toString(i.getFamilyArray(), i.getFamilyOffset(), i.getFamilyLength()));
// 获取列名称
System.out.println(Bytes.toString(i.getQualifierArray(), i.getQualifierOffset(), i.getQualifierLength()));
// 获取值
System.out.println(Bytes.toString(i.getValueArray(), i.getValueOffset(), i.getValueLength()));
});
table.close();
}
## 删除数据
// 删除数据
@Test
public void delete() throws IOException {
Table table = connection.getTable(tableName);
Delete delete = new Delete(Bytes.toBytes("55522"));
table.delete(delete);
table.close();
}
## 过滤查询
@Test
public void scanFilter() throws IOException {
//
Scan scan = new Scan();
Table table = connection.getTable(tableName);
// 设置过滤条件
SingleColumnValueFilter singleColumnValueFilter =
new SingleColumnValueFilter(Bytes.toBytes("c1"), Bytes.toBytes("date"), CompareOperator.EQUAL, Bytes.toBytes("2020"));
SingleColumnValueFilter singleColumnValueFilter2 =
new SingleColumnValueFilter(Bytes.toBytes("c1"), Bytes.toBytes("date"), CompareOperator.EQUAL, Bytes.toBytes("2020"));
// 组装多个过滤条件
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, singleColumnValueFilter, singleColumnValueFilter2);
// 装配过滤条件
scan.setFilter(filterList);
ResultScanner scanner = table.getScanner(scan);
scanner.close();
table.close();
}
````
## Hive 概念
```
1. 下载hive 解压
2. cd conf
mv hive-env.xml.sample hive-env.xml
vi hive-env.xml 修改HADOOP_HOME HIVE_CONF_DIR
3. ./bin/schematool -dbType derby -initSchema 初始化数据
{
初始化若是出错,可能是hadoop guava。jar 版本 问题 拷贝高版本jar包解决,hadoop share 下的guava 和 hive lib下的比较
}
-- 命令
show databases; -- 查看所有数据库
use databaseName; 进入数据库
alter database 'db_name' set dbproperties("createTime" = "2012-10-26")修改数据库
drop database 'db_name'; 删除数据库 只可以删除空的
drop database 'db_name' cascade; 强制删除
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY(col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...)
[SORTED BY(col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION hdfs_path]
二、参数说明
CREATE TABLE 创建一个指定名字的表。如果相同名字的表已经存在,则抛出异常;用户可以用 IF NOT EXISTS 选项来忽略这个异常。
EXTERNAL 关键字可以让用户创建一个外部表,默认是内部表。外部表在建表的必须同时指定一个指向实际数据的路径(LOCATION),Hive 创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。在删除表的s时候,内部表的元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。
COMMENT 是给表字段或者表内容添加注释说明的。
PARTITIONED BY 给表做分区,决定了表是否是分区表。
CLUSTERED BY 对于每一个表(table)或者分区, Hive 可以进一步组织成桶,也就是说桶是更为细粒度的数据范围划分,Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',', 这里指定表存储中列的分隔符,默认是 \001,这里指定的是逗号分隔符,还可以指定其他列的分隔符。
STORED AS SEQUENCEFILE|TEXTFILE|RCFILE,如果文件数据是纯文本,可以使用 STORED AS TEXTFILE,如果数据需要压缩,使用 STORED AS SEQUENCEFILE。
LOCATION 定义 hive 表的数据在 hdfs 上的存储路径,一般管理表(内部表不不要自定义),但是如果定义的是外部表,则需要直接指定一个路径。
例子:
hive -e "create table dummy (value string); \
load data local inpath '/tmp/dummy.txt(格式相同的数据文件)'
overwrite into table dummy"
hive -e "create table table_name (字段 字段类型,.....) \
row format delimited \
fields terminated by '\t' " -- 按照制表符分隔
hive -e "load data local inpath '数据文件路径' overwrite into table 表名称";
## 创建分区表'
create table table_name(id int)
partitioned by (分区字段 字段类型...);
分区表中插入数据
load data local inpath 'xxx.txt' into table table_name partition (分区字段名称='分区字段值',.....);
## 桶
create table table_name (id int) clustered by (id) into 4 buckets;
查询部分桶的数据
select * from table_name tablesample (bucket 1 out of 4 on id);
## 删除表内所有数据
truncate table table_name;
|| create table table_new like table_name;
## 排序
from table_name select year,name distribute by year sort by year asc,temperatue desc;
## 多表连接语句
select a.*,b.* from a join b on (a.id = b.id) -- 内连接
select a.*,b.* from a left outer join b on (a.id = b.id) -- 外连接
## 子查询
select * from (select * from table_name)
## 视图
create view view_name as select * from table_name
## 与spring整合
导入 hive-jdbc。mybatis-spring-boot-starter
配置yml
spring.datasource
```
## sqoop
```
1. 配置 sqoop-env.sh
./sqoop eval --drive com.mysql.cj.jdbc.Driver --connect jdbc:mysql://localhost:3306/test?serverTimezone=UTC\&useSSL=false --username root --password root --query "show tables"
2. 生成表结构
sqoop create-hive-table --connect jdbc:mysql://localhost/test --table widgets --fields-terminated-by ','
3. 列出mysql 数据库中的所有数据库
sqoop list-databases -connect jdbc:mysql://192.168.1.10:3306 -username root -password root
4. 列出所有数据表
sqoop list-tables -connect jdbc:mysql:///sqoop -username root -password root
5. 通过Sqoop执行SQL语句
sqoop eval -connect jdbc:mysql:///sqoop -username root -password root -query "select * from employee where id=5"
6.1.将sqoop.employee表中的数据导入HDFS的/sqfs目录下
sqoop import -connect jdbc:mysql://192.168.10.71:3306/t2 -username=root -password=root -table employee -m 1 -target-dir /output/1
mysql只认ip地址
叠加
追加模式
sqoop import -connect jdbc:mysql://192.168.10.71:3306/t2 -username root -password root -table employee -m 1 -target-dir /output/3 -incremental append -check-column id -last-value "5"
## **1.将关系型数据的employee表结构复制到H****ive****中**
sqoop create-hive-table -connect jdbc:mysql://192.168.11.51:3306/big1806 -username root -password root -table t1 -hive-table sqoop.t1 -fields-terminated-by "\0001" -lines-terminated-by "\n"注:
-hive-table emp指定在Hive中创建的表名为emp(默认数据库default)
-hive-table sqoop.emp指定在Hive中的sqoop数据库下创建emp表
-fields-terminated-by "\0001" 是设置每列之间的分隔符,"\0001"是ASCII码中的1,是hive的默认行内分隔符,而sqoop的默认行内分隔符为","
-lines-terminated-by "\n" 设置的是每行之间的分隔符,此处为换行符,也是默认的分隔符;
## **2.将关系数据库中的employee表的数据导入文件到H****ive****表中**
sqoop import -connect jdbc:mysql://192.168.1.10:3306/sqoop -username root -password root -table employee -hive-table sqoop.emp -m 1 -fields-terminated-by "\0001" -hive-import
注:
-fields-terminated-by "\0001" 需同创建Hive表时保持一致
-hive-import 指定是Hive导入数据
-split-by id employee中没有主键时,用于指定Mapper时的Key
**追加1**
sqoop import -append -connect jdbc:mysql://192.168.1.10:3306/sqoop -username root -password root -target-dir /user/hive/warehouse/sqoop.db/emp/ -fields-terminated-by "\0001" -query "select * from employee where \$CONDITIONS" -m 1
注:
可以添加-columns,-where参数,同时使用时-where参数会失效
**追加2**
sqoop import -append -connect jdbc:mysql://192.168.1.10:3306/sqoop -username root -password root -table employee -columns "id,name,birthday" -where "id=2" -m 1 -target-dir /user/hive/warehouse/sqoop.db/emp/ -fields-terminated-by "\0001"
注:
-target-dir /user/hive/warehouse/sqoop.db/emp 可用-hive-table sqoop.emp -hive-import替换,但是要去掉 -append 参数。
在导入大对象,比如BLOB和CLOB列时需要特殊处理,小于16MB的大对象可以和别的数据一起存储,超过这个值就存储在_lobs的子目录当中,它们采用的是为大对象做过优化的存储格式,最大能存储2^63字节的数据,我们可以用-inline-lob-limit参数来指定每个lob文件最大的限制是多少,如果设置为0,则大对象使用外部存储。
## **3\. H****ive导入参数**
-hive-home <dir> 重写$HIVE_HOME
-hive-import 插入数据到hive当中,使用hive的默认分隔符
-hive-overwrite 重写插入
-create-hive-table 建表,如果表已经存在,该操作会报错!
-hive-table <table-name> 设置到hive当中的表名
-hive-drop-import-delims 导入到hive时删除 \n, \r, and \0001
-hive-delims-replacement 导入到hive时用自定义的字符替换掉 \n, \r, and \0001
-hive-partition-key hive分区的key
-hive-partition-value <v> hive分区的值
-map-column-hive <map> 类型匹配,sql类型对应到hive类型
**hive空值处理**
sqoop会自动把NULL转换为null处理,但是hive中默认是把\N来表示null,因为预先处理不会生效的,我们需要使用 -null-string 和 -null-non-string来处理空值 把\N转为\\N
例句:sqoop import ... -null-string '\\N' 或-null-non-string '\\N'
**sqoop导入hive数据到MySql碰到hive表中列的值为null的情况:**
在导入数据的过程中,如果碰到列值为null的情况,hive中为null的是以\N代替的,所以你在导入到MySql时,需要加上两个参数:--input-null-string '\\N' --input-null-non-string '\\N',多加一个'\',是为转义。如果你通过这个还不能解决字段为null的情况,还是报什么NumberFormalt异常的话,那就是比较另类的了,没有关系,我们还是要办法解决。
你应该注意到每次通过sqoop导入MySql的时,都会生成一个以MySql表命名的.java文件,然后打成JAR包,给sqoop提交给hadoop 的MR来解析Hive表中的数据。那我们可以根据报的错误,找到对应的行,改写该文件,编译,重新打包,sqoop可以通过 -jar-file ,--class-name 组合让我们指定运行自己的jar包中的某个class。来解析该hive表中的每行数据。脚本如下:一个完整的例子如下:
sqoop export --connect "jdbc:mysql://localhost/aaa?useUnicode=true&characterEncoding=utf-8"
--username aaa --password bbb --table table
--export-dir /hive/warehouse/table --input-fields-terminated-by '\t'
--input-null-string '\\N' --input-null-non-string '\\N'
--class-name com.chamago.sqoop.codegen.bi_weekly_sales_item
--jar-file /tmp/sqoop-chamago/bi_weekly_sales_item.jar
上面--jar-file 参数指定jar包的路径。--class-name 指定jar包中的class。
这样就可以解决所有解析异常了。
## **4.将H****ive****中的表数据导入到****mysql****数据库employee表中**
sqoop export -connect "jdbc:mysql://192.168.11.51:3306/big1806?useUnicode=true&characterEncoding=utf-8" -username root -password root -table t3 -export-dir /user/hive/warehouse/sqoop.db/t1/ part-m-00000 -input-fields-terminated-by '\0001'注:
在进行导入之前,mysql中sqoop数据库中employee表必须已经提起创建好了。
jdbc:mysql://192.168.1.10:3306/sqoop中的IP地址改成localhost会报异常
指定/user/hive/warehouse/sqoop.db/emp/part-m-00000,只加载该文件
指定/user/hive/warehouse/sqoop.db/emp/,加载该目录下的所有文件
```
## kafka
```
1. 解压 Kafka
2. 配置 server.property
3. 发送topic消息
./kafka-topic.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 1 --topic test
4. 查看Kafka 所有消息
./kafka-topic.sh --list --zookeeper node1:2181
5. 查看具体信息
./kafka-topic.sh --describe --zookeeper node1:2181
```
#Flink
```
flink on yarm
第一种 方式:
bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -d 独立开辟一个集群,一直占有资源
bin/yarn-session.sh -id applicationId(集群id)
第二种方式:
bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./xxxx.jar
--- yarn-session.sh 命令
-D<arg> 动态属性
-d,--detached 后台独立运行
-jm,--jobManagerMermory<arg> 设置jobManager的内存,单位时MB
-nm,--name 在yarn上为又一个自定义的应用设置一个名字
-q,--query, 显示yarn中可用资源的(内存,cpu核数)
-qu,--query<arg> : 指定yarn队列
-s,--slots<arg> : 每个taskManager使用的slot的数量
-tm,--taskManagermemory<arg>. 每个taskManager的内存,单位时MB
-z,--zookeeperNamespace<arg>. 针对HA模式再zookeeper上创建nameSpace
-id,--applicaition<yarnAppId> 指定yarn集群上任务id,附到一个后台独立运行yarn session中
--- flink run 命令
-c,--class. 如果jar中没有指定入口类。则通过这个参数指定。一定在jar后面使用
-m,--jobManager<host:port>。指定连接的jobManager的地址
-p,--parallelism<parallelism>。指定任务的并行度,可以覆盖配置文件中的默认值
flink run ./xxx.jar -input hdfs://node1:9000/hello.txt -output hfs://node2:9000/result1
```
## flink dataStream(流处理) 常用的api
DataStream 主要分为三块。dataSource / Transformation / Sink
```
## dataSource 是程序的数据源输入
可以通过StreamExecutionEnvironment.addSource(sourceFunction)
为程序添加一个数据源
## api
readTextFile(Path) 基于文件读取数据
socketTextStream() 基于socket读取数据
fromCollection(collection)。基于集合读取数据
addSource(). 自定义输入。implement 实现 SourceFunction接口 并行度设置实现parallelSourceFunction接口
RishParallelSourceFunction 增加了open。和close 方法
```
```
Transformation 具体操作,他对一个或多个输入数据源进行计算处理,map .flatMap 和 filter
## api
map 输入一个元素。返回一个元素中间可执行清洗转换的操作
flatMap 输入一个原属可以返回零个或者多个元素
filter 过滤函数对数据进行判断 符合条件的就会被留下
keyBy。根据指定的key进行分组。key相同的数据会进入同一个分区
-- DataStream.keyBy("xx") 根据某个字段分组
-- DataStream.keyBy(0). 根据第一个元素分组
Reduce 对数据进行聚合操作,结合当前元素和上一次Reducer返回的值进行聚合操作,然后返回一个新的值
Aggreations: sum.min.max
union 合并多个流。新的流会包含所有流中的数据但是每条流的返回类型必须一致
Connect 和 union类似。但是只能连接两个流 两个流的数据可以不通
coMap , coFlatMap ConnectedStream中需要这种函数
split. 根据规则把一个流切换为多个流
Select 和split 配置使用。选择切分后的流
Random partitioning 随机分区
Rebalancing 对数据集进行再平衡,重分区,消除数据倾斜
R escaping 重新调节
Custom partitioning 自定义分区
```
```
Sink 是程序饿输出,它可以把Transformation 处理之后的数据输出到指定的存储介质中
writeAsText(). 将元素以字符串形式逐行写入
print() / printToErr(). 打印每个元素的toString方法值
addSink()。自定义输出。实现SinkFunction。继承RichSinkFunction
```
## DataSet(批处理)。主要分为三块 datasource Transtormation。Sink
```
## datasource
fromCollection. -- 基于集合
readTextFile(Path)。-- 基于HDFS数据文件
## Transformation
map 输入一个元素。返回一个元素中间可执行清洗转换的操作
flatMap 输入一个原属可以返回零个或者多个元素
filter 过滤函数对数据进行判断 符合条件的就会被留下
mapPartition。类似 map。一次处理一个分区的数据
Distinct。返回一个数据集中去重之后的元素
join. 内连接
outerJoin. 外连接
Cross 获取两个数据集的笛卡尔积
union。返回两个数据集的总和。 数据类型需要一致
First-n。返回集合中的前N个元素
SortPartition。在本地对数据集的所有 分区进行怕需要
Rebalance。对数据集进行再平衡
HashPartition。更具指定key的散列值对数据集进行分区
Range-Partition。更具指定的key对数据集进行范围分区
Custom Partition。自定义分区
## sink
writeAsCsv
writeAsText
print
```
## flink table api。和 sql api
```
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建table执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 读取信息
CsvTableSource csvTableSource = new CsvTableSource("", new String[]{"name", "age"}, new TypeInformation[]{Types.STRING, Types.INT});
// 创建table
tableEnv.registerTableSource("csv", csvTableSource);
// 获取table
Table csv = tableEnv.scan("csv");
Table student = csv.select("name,age");
// 转换为对应的pojo对象
DataStream<Object> stream = tableEnv.toAppendStream(student, Object.class);
// 设置并行度为1
stream.print().setParallelism(1);
env.execute();
```
## 累加器
```
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> dataSource = env.fromElements("a", "b", "c", "d");
DataSet<String> result = dataSource.map(new RichMapFunction<String, String>() {
// 创建累加器
private IntCounter intCounter = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
super.getRuntimeContext().addAccumulator("num", intCounter);
}
@Override
public String map(String s) throws Exception {
this.intCounter.add(1);
return s;
}
}).setParallelism(3);
result.writeAsText("/Applications/tools/java/consumer/neo4j/src/main/resources/result.txt");
JobExecutionResult execute = env.execute("abc" ;
int num = execute.getAccumulatorResult("num");
System.out.println("nun : " + num);
```
```
-- 修改任务并行度的方式
1. yml
2. 启动flink设置
3. jar包代码修改
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.bu</groupId>
<artifactId>neo4j</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>neo4j</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<flink.version>1.10.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-neo4j</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-email</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.3</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-scala-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- redis -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<!-- els -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
-- flink流处理api
'Environment'
getExecutionEnvironment
'--- 创建一个执行环境,表示当前执行程序的上下文.
如果程序是独立调用的,则此方法返回本地执行环境 ;如果从命令行客户端调用程序以
提交到集群,则此方法返回此集群的执行环境 parallelism.default:1 设置并行度'
createLocalEnvironment
' 单独设置本地执行环境 需要在调用时指定默认的并行度'
createRemoteEnvironment
'设置远程执行环境,将jar提交到远程服务器,需要在调用时指定jobManager的IP和端口号,并指定要在集群中运行的jar包
代码提示: StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment('jobManager-hostname',port,'执行jar包位置')'
'Source'
-- 从集合中读取数据
DataStream<SensorReading> sensorDataStream = env.fromCollection(Arrays.asList(new SensorReading('oneDay',123,111)));
-- 从文件中读取数据 readTextFile("filePath");
-- kafka 读取数据 引入flink连接kafka的jar
flink-connector-kafka
Properties properties = new Properties();
properties.setProperty("bootstrap","ocalhost:9092");
Datasource data = env.addSource("topic",new SimpleString(),Properties);
-- 自定义数据源 实现sourceFunction接口
// 实现自定义的数据源
public static class MySource implements SourceFunction<String> {
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
// 代码实现
sourceContext.collect("发送数据");
}
@Override
public void cancel() {
}
'Transform. 转换算子'
== 基本转换算子
-- map
// 转换算子 id 转换为 length
SingleOutputStreamOperator<Object> map = dataSource.map(new MapFunction<SensorReading, Object>() {
@Override
public Object map(SensorReading sensorReading) throws Exception {
sensorReading.setId(sensorReading.getId().length() + "");
return sensorReading;
}
});
-- flatmap
SingleOutputStreamOperator<Object> map1 = dataSource.flatMap(new FlatMapFunction<SensorReading, Object>() {
@Override
public void flatMap(SensorReading sensorReading, Collector<Object> collector) throws Exception {
sensorReading.setId(sensorReading.getId().length() + "");
collector.collect(sensorReading);
}
});
-- filter
// 筛选
SingleOutputStreamOperator<SensorReading> filter = dataSource.filter(new FilterFunction<SensorReading>() {
@Override
public boolean filter(SensorReading sensorReading) throws Exception {
if (sensorReading.getId().length() > 3)
return true;
else {
return false;
}
}
});
== 聚合算子
keyBy
// 分组
KeyedStream<SensorReading, String> key = dataSource.keyBy(SensorReading::getId);
// 求最大值
SingleOutputStreamOperator<SensorReading> timesamp = key.max("Timesamp");
timesamp.print();
-- rolling Aggregation 滚动聚合操作
sum
max
min
maxBy
minBy
-- reducer 自定义
// 自定义reducer
SingleOutputStreamOperator<SensorReading> reduce = key.reduce(new ReduceFunction<SensorReading>() {
// sensorReading 当前 t1 最新
@Override
public SensorReading reduce(SensorReading sensorReading, SensorReading t1) throws Exception {
return new SensorReading(sensorReading.getId(), t1.getTimesamp(), Math.max(sensorReading.getIsFlag(), t1.getIsFlag()));
}
});
== 多流转换算子
-- split。select。
// 按照id是否大于30 切分为两条流
SplitStream<SensorReading> split = reduce.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading sensorReading) {
return Integer.parseInt(sensorReading.getId()) > 30 ? Collections.singletonList("hight") : Collections.singletonList("small");
}
});
DataStream<SensorReading> hight = split.select("hight");
DataStream<SensorReading> small = split.select("small");
DataStream<SensorReading> all = split.select("small", "hight");
all.print();
hight.print();
small.print();
-- 合流
// 合流
ConnectedStreams<Tuple2<String, Double>, SensorReading> connect = map2.connect(small);
SingleOutputStreamOperator<Object> hight1 = connect.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
return new Tuple3<>(stringDoubleTuple2.f0, stringDoubleTuple2.f1, "hight");
}
@Override
public Object map2(SensorReading sensorReading) throws Exception {
return new Tuple3<>(sensorReading.getTimesamp(), sensorReading.getId(), sensorReading.getIsFlag());
}
});
// union. 必须返回的数据类型相同
// union
DataStream<SensorReading> union = hight.union(small);
union.print();
// shuffle 乱序
// gobal
== 输出 sink
dataStream.addSink(new FlinkKafkaProducer09<String>("localhost:9092", "xxx", new SimpleStringSchema()));
== 输出redis
// 定义jedis 连接配置
FlinkJedisPoolConfig localhost = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build();
dataStream.addSink(new RedisSink<>(localhost, new MyMapper()));
public static class MyMapper implements RedisMapper<String> {
// 定义保存数据到redis的命令
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "senser");
}
@Override
public String getKeyFromData(String s) {
return s;
}
@Override
public String getValueFromData(String s) {
return s;
}
}
== 输出es
List<HttpHost> list = new ArrayList<>();
list.add(new HttpHost("localhost", 9200));
DataStreamSink dataStreamSink = dataStream.addSink(new ElasticsearchSink.Builder<String>(list, new EsSinkFanction()).build());
public static class EsSinkFanction implements ElasticsearchSinkFunction<String> {
@Override
public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
IndexRequest source = Requests.indexRequest()
.index("es")
.type("xx")
.source(s);
requestIndexer.add(source);
}
}
==。自定义mysql。sink
// mysql
DataStreamSink<String> stringDataStreamSink = dataStream.addSink(new MySinkFucJdbc());
public static class MySinkFucJdbc extends RichSinkFunction<String> {
private Connection connection = null;
private PreparedStatement preparedStatement = null;
@Override
public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
}
@Override
public void invoke(String value, Context context) throws Exception {
preparedStatement = connection.prepareStatement("insert into senser (id,name) value (?,?)");
// 插入值
preparedStatement.setDouble(0, 123);
preparedStatement.setString(1, value);
preparedStatement.execute();
}
@Override
public void close() throws Exception {
connection.close();
preparedStatement.close();
}
}
== 自定义窗口.
public static class MySinkFucJdbc extends RichSinkFunction<String> {
private Connection connection = null;
private PreparedStatement preparedStatement = null;
@Override
public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
}
@Override
public void invoke(String value, Context context) throws Exception {
preparedStatement = connection.prepareStatement("insert into senser (id,name) value (?,?)");
// 插入值
preparedStatement.setDouble(0, 123);
preparedStatement.setString(1, value);
preparedStatement.execute();
}
@Override
public void close() throws Exception {
connection.close();
preparedStatement.close();
}
}
时间窗口。 timeWindow. 按照传递参数区分 一个参数为滚动窗口。两个参数为滑动窗口
data.keyBy("id")
.timeWindow(Time.seconds(15))
.apply(new WindowFunction<String, Object, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<String> iterable, Collector<Object> collector) throws Exception {
}
});
计数窗口。 countWindow
.countWindow(Time.seconds(15))
.aggregate(new AggregateFunction<String, Tuple2<Double, Integer>, Object>() {
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0, 1);
}
@Override
public Tuple2<Double, Integer> add(String s, Tuple2<Double, Integer> doubleIntegerTuple2) {
return new Tuple2<>(doubleIntegerTuple2.f0 + Double.parseDouble(s), doubleIntegerTuple2.f1 + 1);
}
@Override
public Object getResult(Tuple2<Double, Integer> doubleIntegerTuple2) {
return null;
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> doubleIntegerTuple2, Tuple2<Double, Integer> acc1) {
return null;
}
});
== Trigger 触发器
== evictor。过滤器
-- allowedLateness(). 允许处理延时数据
-- sideOutputLateData() 将迟到的数据放入侧输入流
-- getSideOutput(). 获取侧输入流
OutputTag<String> tag = new OutputTag<String>("id");
SingleOutputStreamOperator<String> sum = data.keyBy("id")
.timeWindow(Time.seconds(15))
// 允许处理15秒后的延时数据
.allowedLateness(Time.seconds(15))
.sideOutputLateData(tag)
.sum("id");
// 获取侧输出流
sum.getSideOutput(tag).print("late");
sum.print("mine");
== waterMark. 水平线。设置延时
// 升序实现
/* .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {
@Override
public long extractAscendingTimestamp(SensorReading sensorReading) {
return sensorReading.getTimesamp() * 1000L;
}
})*/
// assignTimestampsAndWatermarks waterMark
// BoundedOutOfOrdernessTimestampExtractor 参数1 最大乱序程度 一般为毫秒级别
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading sensorReading) {
return sensorReading.getTimesamp() * 1000L;
}
});
== flink 状态管理
-- 算子状态
-- 键控状态
1. 值状态
2. 列表状态
3. 映射状态
4. 聚合状态
== 状态后端
/*env.setStateBackend(new MemoryStateBackend());
env.setStateBackend(new FsStateBackend(""));*/
== ProcessFuncationApi
== tableApi. && flinkSqlApi
引入依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-scala-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
---
package com.bu.neo4j.flinktable;
import com.bu.neo4j.flink.entity.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class Example {
public static void main(String[] args) throws Exception {
// 1. 创建运行时执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 设置分片数量,防止乱序
env.setParallelism(1);
// 3。 读取数据
DataStream<String> inputStream = env.readTextFile("/Applications/tools/java/consumer/neo4j/src/main/resources/Hello.txt");
// 4。 转换数据类型
DataStream<SensorReading> datas = inputStream.map(lone -> {
String[] data = lone.split(" ");
return new SensorReading(data[0], new Long(data[1]), new Double(data[2]));
});
// 5. use table api
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 6. create table by stream
Table dataTable = tableEnv.fromDataStream(datas);
// use table api to transformation
Table result = dataTable.select("id,timestamp").where("id = 'kafka4'");
tableEnv.toAppendStream(result, Row.class).print("result");
env.execute();
}
}
-- 另一种方式连接
// 通过文件路径路径连接
String path = "/Applications/tools/java/consumer/neo4j/src/main/resources/Hello.txt";
tableEnvironment.connect(new FileSystem().path(path))
// 设置文件格式 csv , 分割
.withFormat(new Csv())
// 设置各自段属性
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.BIGINT())
.field("tran", DataTypes.DOUBLE()))
.createTemporaryTable("inputTable");
Table inputTable = tableEnvironment.from("inputTable").where("id = 'kafka1'");
tableEnvironment.toAppendStream(inputTable, Row.class).print("bhz::input");
-- kafka 连接
Kafka kafka = new Kafka()
.version("0.10")
.topic("user_behavior")
.property("bootstrap.servers", "node2.hadoop:9092")
.property("zookeeper.connect", "node2.hadoop:2181");
tableEnv.connect(kafka)
.withFormat(
new Json().failOnMissingField(true).deriveSchema()
)
.withSchema(
new Schema()
.field("user_id", Types.INT)
.field("item_id", Types.INT)
.field("category_id", Types.INT)
.field("behavior", Types.STRING)
.field("ts", Types.STRING)
)
.inAppendMode()
.registerTableSource("tmp_table");
-- 注册mysql
导入jar包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
String sinkDDL = "create table jdbcOutputTable (" +
"id varchar(20) not null" +
"cnt bigint not null" +
") with (" +
"'connector.type' = 'jdbc'," +
"'connector.url' = 'jdbc:mysql://localhost:3306/test'," +
"'connector.table' = 'sensor'," +
"'connector.driver' = 'com.mysql.jdbc.Driver'," +
"'connector.username' = 'root'" +
"'connector.password' = '123456' )";
-- waterTime
.field("user_id", DataTypes.INT())
.field("item_id", DataTypes.INT())
.rowtime(new Rowtime()
.timestampsFromField("item_id")
.watermarksPeriodicBounded(1000))
.field("category_id", DataTypes.INT())
.field("behavior", DataTypes.INT())
.field("ts", DataTypes.STRING())
-- group by. window 操作
-- 自定义函数
```
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。