赞
踩
写数据的流程:
(1-3是和读流程是类似的,都需要找到当前要写入的rowkey,应该存放在哪个region、哪个region server)
HBase 2.0+ In memory compaction(总共的流程为三个阶段的合并)
<property>
<name>hbase.hregion.majorcompaction</name>
<value>604800000</value>
<source>hbase-default.xml</source>
</property>
public class BankRecordMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, MapReduceExtendedCell> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将Mapper获取到Text文本行,转换为TransferRecord实体类
// 7e59c946-b1c6-4b04-a60a-f69c7a9ef0d6,SU8sXYiQgJi8,6225681772493291,杭州银行,丁杰,4896117668090896,
// 卑文彬,老婆,节日快乐,电脑客户端,电子银行转账,转账完成,2020-5-13 21:06:92,11659.0
TransferRecord transferRecord = TransferRecord.parse(value.toString());
// 从实体类中获取ID,并转换为rowkey
String rowkeyString = transferRecord.getId();
byte[] rowkeyByteArray = Bytes.toBytes(rowkeyString);
byte[] columnFamily = Bytes.toBytes("C1");
byte[] colId = Bytes.toBytes("id");
byte[] colCode = Bytes.toBytes("code");
byte[] colRec_account = Bytes.toBytes("rec_account");
byte[] colRec_bank_name = Bytes.toBytes("rec_bank_name");
byte[] colRec_name = Bytes.toBytes("rec_name");
byte[] colPay_account = Bytes.toBytes("pay_account");
byte[] colPay_name = Bytes.toBytes("pay_name");
byte[] colPay_comments = Bytes.toBytes("pay_comments");
byte[] colPay_channel = Bytes.toBytes("pay_channel");
byte[] colPay_way = Bytes.toBytes("pay_way");
byte[] colStatus = Bytes.toBytes("status");
byte[] colTimestamp = Bytes.toBytes("timestamp");
byte[] colMoney = Bytes.toBytes("money");
// 构建输出key:new ImmutableBytesWrite(rowkey)
ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(rowkeyByteArray);
// 使用KeyValue类构建单元格,每个需要写入到表中的字段都需要构建出来单元格
KeyValue kvId = new KeyValue(rowkeyByteArray, columnFamily, colId, Bytes.toBytes(transferRecord.getId()));
KeyValue kvCode = new KeyValue(rowkeyByteArray, columnFamily, colCode, Bytes.toBytes(transferRecord.getCode()));
KeyValue kvRec_account = new KeyValue(rowkeyByteArray, columnFamily, colRec_account, Bytes.toBytes(transferRecord.getRec_account()));
KeyValue kvRec_bank_name = new KeyValue(rowkeyByteArray, columnFamily, colRec_bank_name, Bytes.toBytes(transferRecord.getRec_bank_name()));
KeyValue kvRec_name = new KeyValue(rowkeyByteArray, columnFamily, colRec_name, Bytes.toBytes(transferRecord.getRec_name()));
KeyValue kvPay_account = new KeyValue(rowkeyByteArray, columnFamily, colPay_account, Bytes.toBytes(transferRecord.getPay_account()));
KeyValue kvPay_name = new KeyValue(rowkeyByteArray, columnFamily, colPay_name, Bytes.toBytes(transferRecord.getPay_name()));
KeyValue kvPay_comments = new KeyValue(rowkeyByteArray, columnFamily, colPay_comments, Bytes.toBytes(transferRecord.getPay_comments()));
KeyValue kvPay_channel = new KeyValue(rowkeyByteArray, columnFamily, colPay_channel, Bytes.toBytes(transferRecord.getPay_channel()));
KeyValue kvPay_way = new KeyValue(rowkeyByteArray, columnFamily, colPay_way, Bytes.toBytes(transferRecord.getPay_way()));
KeyValue kvStatus = new KeyValue(rowkeyByteArray, columnFamily, colStatus, Bytes.toBytes(transferRecord.getStatus()));
KeyValue kvTimestamp = new KeyValue(rowkeyByteArray, columnFamily, colTimestamp, Bytes.toBytes(transferRecord.getTimestamp()));
KeyValue kvMoney = new KeyValue(rowkeyByteArray, columnFamily, colMoney, Bytes.toBytes(transferRecord.getMoney()));
// 使用context.write将输出输出
// 构建输出的value:new MapReduceExtendedCell(keyvalue对象)
context.write(immutableBytesWritable, new MapReduceExtendedCell(kvId));
context.write(immutableBytesWritable, new MapReduceExtendedCell(kvCode));
context.write(immutableBytesWritable, new MapReduceExtendedCell(kvRec_account));
context.write(immutableBytesWritable, new MapReduceExtendedCell(kvRec_bank_name));
context.write(immutableBytesWritable, new MapReduceExtendedCell(kvRec_name));
context.write(immutableBytesWritable, new MapReduceExtendedCell(kvPay_account));
context.write(immutableBytesWritable, new MapReduceExtendedCell(kvPay_name));
context.write(immutableBytesWritable, new MapReduceExtendedCell(kvPay_comments));
context.write(immutableBytesWritable, new MapReduceExtendedCell(kvPay_channel));
context.write(immutableBytesWritable, new MapReduceExtendedCell(kvPay_way));
context.write(immutableBytesWritable, new MapReduceExtendedCell(kvStatus));
context.write(immutableBytesWritable, new MapReduceExtendedCell(kvTimestamp));
context.write(immutableBytesWritable, new MapReduceExtendedCell(kvMoney));
}
}
异常:报错,连接2181失败,仔细看是连接的本地的localhost的zk,本地是没有ZK
解决办法:
INFO - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
WARN - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
WARN - 0x59321afb to localhost:2181 failed for get of /hbase/hbaseid, code = CONNECTIONLOSS, retries = 1
INFO - Opening socket connection to server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
WARN - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
public class BankRecordBulkLoadDriver {
public static void main(String[] args) throws Exception {
// 1. 使用HBaseConfiguration.create()加载配置文件
Configuration configuration = HBaseConfiguration.create();
// 2. 创建HBase连接
Connection connection = ConnectionFactory.createConnection(configuration);
// 3. 获取HTable
Table table = connection.getTable(TableName.valueOf("ITCAST_BANK:TRANSFER_RECORD"));
// 4. 构建MapReduce JOB
// a) 使用Job.getInstance构建一个Job对象
Job job = Job.getInstance(configuration);
// b) 调用setJarByClass设置要执行JAR包的class
job.setJarByClass(BankRecordBulkLoadDriver.class);
// c) 调用setInputFormatClass为TextInputFormat.class
job.setInputFormatClass(TextInputFormat.class);
// d) 设置MapperClass
job.setMapperClass(BankRecordMapper.class);
// e) 设置输出键Output Key Class
job.setOutputKeyClass(ImmutableBytesWritable.class);
// f) 设置输出值Output Value Class
job.setOutputValueClass(MapReduceExtendedCell.class);
// g) 设置输入输出到HDFS的路径,输入路径/bank/input,输出路径/bank/output
// i. FileInputFormat.setInputPaths
FileInputFormat.setInputPaths(job, new Path("hdfs://node1.itcast.cn:8020/bank/input"));
// ii. FileOutputFormat.setOutputPath
FileOutputFormat.setOutputPath(job, new Path("hdfs://node1.itcast.cn:8020/bank/output"));
// h) 使用connection.getRegionLocator获取HBase Region的分布情况
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf("ITCAST_BANK:TRANSFER_RECORD"));
// i) 使用HFileOutputFormat2.configureIncrementalLoad配置HFile输出
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
// 5. 调用job.waitForCompletion执行MapReduce程序
if(job.waitForCompletion(true)) {
System.exit(0);
}
else {
System.exit(1);
}
}
}
LSM树:这种树结构是多种结构的组合
为了保证写入的效率,对整个结构进行了分层,C0、C1、C2…
写入数据的时候,都是写入到C0,就要求C0的写入是很快的,例如:HBase写的就是MemStore——跳表结构(也有其他用红黑树之类的)
C0达到一定的阈值,就开始刷写到C1,进行合并,Compaction
C1达到一定的条件,也就即席合并到C2
存在磁盘中的C1\C2层的数据一般是以B+树方式存储,方便检索
WAL预写日志:首先写数据为了避免数据丢失,一定要写日志,WAL会记录所有的put/delete操作之类的,如果出现问题,可以通过回放WAL预写日志来恢复数据
比较适合写多读少的场景,如果读取比较多,需要创建二级索引
布隆过滤器判断的结果:
每个集群会有系统配置,社区一定会把一些通用的、适应性强的作为默认配置,有很多都是折中的配置。很多时候,出现问题的时候,我们要考虑优化。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。