当前位置:   article > 正文

HBase表数据的读、写操作与综合操作_试使用 storm 完成 hbase 数据库的读取和写入操作

试使用 storm 完成 hbase 数据库的读取和写入操作

HBase表数据的读、写操作与综合操作

一、实验目标

  1. 熟练掌握通过HBase shell命令来设计HBase表结构实例
  2. 掌握使用HBase编程创建HBase表、删除HBase表、修改HBase表和查看HBase表和表结构。
  3. 掌握通过HBase 编程实现HBase表数据的读、写操作

二、实验要求及注意事项

  1. 给出每个实验的主要实验步骤、实现代码和测试效果截图。
  2. 对本次实验工作进行全面的总结分析。
  3. 建议工程名,类名、包名或表名显示个人学号或者姓名

三、实验内容及步骤

实验任务1:使用MapReduce批量将HBase表中数据导入到HDFS上。表名和表中数据自拟。

主要实现步骤和运行效果图:

完整程序

WjwReadMapper:

package hbase;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.*;
import org.apache.hadoop.io.*;
public class WjwReadMapper extends TableMapper<Writable, Writable> {
	private Text k=new Text();
	private Text v=new Text();
	public static final String F1="\u0001";
	protected void setup(Context c){
	}
	public void map(ImmutableBytesWritable row,Result r,Context c){
		String value=null;
		String rk=new String(row.get());
		byte[] family=null;
		byte[] column=null;
		long ts=0L;
		try{
			for(KeyValue kv:r.list()){
			value=Bytes.toStringBinary(kv.getValue());
			family=kv.getFamily();
			column=kv.getQualifier();
			ts=kv.getTimestamp();
			k.set(rk);
			v.set(Bytes.toString(family)+F1+Bytes.toString(column)+F1+value+F1+ts);
			c.write(k, v);
			}
		}catch(Exception e){
			e.printStackTrace();
			System.err.println();
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

WjwReadMain:

package hbase;
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.*;

public class WjwReadMain {
	public static final Log LOG = LogFactory.getLog(WjwMain.class);
	public static final String NAME = "Member Test1";
	public static final String TEMP_INDEX_PATH = "hdfs://master:9000/tmp/tb_wjw";
	public static String inputTable = "tb_wjw";
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
		Configuration conf = HBaseConfiguration.create();
		Scan scan = new Scan();
		scan.setBatch(0);
		scan.setCaching(10000);
		scan.setMaxVersions();
		scan.setTimeRange(System.currentTimeMillis() - 3*24*3600*1000L, System.currentTimeMillis());
		scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("keyword"));
		conf.setBoolean("mapred.map.tasks.speculative.execution", false);
		conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
		Path tmpIndexPath = new Path(TEMP_INDEX_PATH);
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(tmpIndexPath)){
			fs.delete(tmpIndexPath, true);
		}
		
		Job job = new Job(conf, NAME);
		job.setJarByClass(WjwMain.class);
		TableMapReduceUtil.initTableMapperJob(inputTable, scan, WjwMapper.class, Text.class, Text.class, job);
	   job.setNumReduceTasks(0);
	   job.setOutputFormatClass(TextOutputFormat.class);
	   FileOutputFormat.setOutputPath(job, tmpIndexPath);
	   boolean success = job.waitForCompletion(true);
	   System.exit(success?0:1);
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

运行结果

创建表,用于等会将数据传入hadoop里

1-1

运行map程序将表数据导入hadoop,并查看是否导入成功

1-2

实验任务2:使用MapReduce批量将HDFS上的数据导入到HBase表中。表名和数据自拟,建议体现个人学号或姓名。使用Java编程创建表和删除表,表名和列族自拟。

主要实现步骤和运行效果图:

完整程序

WjwWriteMapper:

package hbase;
import java.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.*;
public class WjwWriteMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
	private byte[] family=null;
	private byte[] qualifier=null;
	private byte[] val=null;
	private String rk=null;
	private long ts=System.currentTimeMillis();
	protected void map(LongWritable key,Text value,Context context) throws InterruptedException, IOException{
		try{
			String line=value.toString();
			String[] arr=line.split("\t",-1);
			if(arr.length==2){
				 rk=arr[0];
			    String[] vals=arr[1].split("\u0001",-1);
			    if(vals.length==4){
			        family=vals[0].getBytes();
					  qualifier=vals[1].getBytes();
					  val=vals[2].getBytes();
					  ts=Long.parseLong(vals[3]);
					  Put put=new Put(rk.getBytes(),ts);
					  put.add(family,qualifier,val);
			        context.write(new ImmutableBytesWritable(rk.getBytes()), put);
			    }
			}
		}catch(Exception e){
		    e.printStackTrace();
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

WjwWriteMain:

package hbase;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.mapreduce.*;
import java.io.IOException;
import org.apache.commons.logging.*;
public class WjwWriteMain extends Configured implements Tool{
	static final Log LOG=LogFactory.getLog(WjwWriteMain.class);
	public int run(String[] args)throws Exception{
		if(args.length!=2){
		    LOG.info("2 parameters needed!");
		}
		String input="hdfs://master:9000/tmp/tb_wjw/part-m-00000";
		String table="tb_wjw01";
		Configuration conf=HBaseConfiguration.create();
		Job job=new Job(conf,"Input from file "+input+" into table "+table);
		job.setJarByClass(WjwWriteMain.class);
		job.setMapperClass(WjwWriteMapper.class);
		job.setJarByClass(WjwWriteMain.class);
		job.setMapperClass(WjwWriteMapper.class);
		job.setOutputFormatClass(TableOutputFormat.class);
		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,table);
		job.setOutputKeyClass(ImmutableBytesWritable.class);
		job.setOutputValueClass(Waitable.class);
		job.setNumReduceTasks(0);
		FileInputFormat.addInputPath(job, new Path(input));
		return job.waitForCompletion(true)?0:1;
	}
	public static void main(String[] args) throws IOException {
	    Configuration conf=new Configuration();
		 String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
		try {
		    System.out.println(ToolRunner.run(conf, new WjwWriteMain(),otherArgs));
		}catch(Exception e) {
		    e.printStackTrace();
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

运行结果

创建一个空表tb_wjw01,用于等会将tb_wjw的数据导入tb_wjw01

2-1

配置yarn,并运行map程序

2-2
2-3

查看hadoop里的表tb_wjw

2-4

将hadoop里tb_wjw的数据导入hbase里的tb_wjw01里面

2-5

实验任务3:在实验任务1和实验任务2的基础上,通过HBase编程,实现创建HBase表,修改HBase表(包括增加列族和删除列族),向HBase表中写入数据,读取HBase表中数据,查看HBase数据库中所有表和表结构功能,建议在一个类中定义多个方法实现上述功能,并进行验证。表名和数据自拟。

主要实现步骤和运行效果图:

完整程序

package hbase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

public class WjwHbase{
    private static Configuration conf = HBaseConfiguration.create();

    public static void createTable(String tableName, String[] families)
            throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            System.out.println("Table already exists!");
        } else {
            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
            for (String family : families) {
                tableDesc.addFamily(new HColumnDescriptor(family));
            }
            admin.createTable(tableDesc);
            System.out.println("Table created successfully!");
        }
        admin.close();
        conn.close();
    }

    public static void addRecord(String tableName, String rowKey,
                                  String family, String qualifier, String value) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
        table.put(put);
        System.out.println("Record added successfully!");
        table.close();
        conn.close();
    }

    public static void deleteRecord(String tableName, String rowKey,
                                     String family, String qualifier) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        table.delete(delete);
        System.out.println("Record deleted successfully!");
        table.close();
        conn.close();
    }

    public static void deleteTable(String tableName) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            admin.disableTable(TableName.valueOf(tableName));
            admin.deleteTable(TableName.valueOf(tableName));
            System.out.println("Table deleted successfully!");
        } else {
            System.out.println("Table does not exist!");
        }
        admin.close();
        conn.close();
    }

    public static void addColumnFamily(String tableName, String columnFamily) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            HColumnDescriptor columnDesc = new HColumnDescriptor(columnFamily);
            admin.addColumn(TableName.valueOf(tableName), columnDesc);
            System.out.println("Column family added successfully!");
        } else {
            System.out.println("Table does not exist!");
        }
        admin.close();
        conn.close();
    }

    public static void deleteColumnFamily(String tableName, String columnFamily) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            admin.deleteColumn(TableName.valueOf(tableName), Bytes.toBytes(columnFamily));
            System.out.println("Column family deleted successfully!");
        } else {
            System.out.println("Table does not exist!");
        }
        admin.close();
        conn.close();
    }

    public static void getRecord(String tableName, String rowKey,
                                  String family, String qualifier) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        Result result = table.get(get);
        byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        System.out.println("Result: " + Bytes.toString(value));
        table.close();
        conn.close();
    }

    public static void scanTable(String tableName) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) {
            System.out.println("Result: " + result);
        }
        table.close();
        conn.close();
    }

    public static void listTables() throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        HTableDescriptor[] tableDescs = admin.listTables();
        List<String> tableNames = new ArrayList<String>();
        for (HTableDescriptor tableDesc : tableDescs) {
            tableNames.add(tableDesc.getNameAsString());
        }
        System.out.println("Tables: " + tableNames);
        admin.close();
        conn.close();
    }

    public static void describeTable(String tableName) throws IOException {
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        HTableDescriptor tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName));
        System.out.println("Table structure: " + tableDesc);
        admin.close();
        conn.close();
    }

    public static void main(String[] args) throws IOException {
        String tableName = "wjwtest";
        String rowKey = "row1";
        String family = "cf1";
        String qualifier = "q1";
        String value = "this is wjw!";
        String columnFamily = "cf2";
        String[] families = {family};
        createTable(tableName, families);
        addRecord(tableName, rowKey, family, qualifier, value);
        getRecord(tableName, rowKey, family, qualifier);
        scanTable(tableName);
        addColumnFamily(tableName, columnFamily);
        describeTable(tableName);
        deleteColumnFamily(tableName, columnFamily);
        deleteRecord(tableName, rowKey, family, qualifier);
        deleteTable(tableName);
        listTables();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163

运行结果

3-1
3-2
3-3
3-4
3-5
3-6

附:系列文章

实验文章目录直达链接
实验01Hadoop安装部署https://want595.blog.csdn.net/article/details/132767284
实验02HDFS常用shell命令https://want595.blog.csdn.net/article/details/132863345
实验03Hadoop读取文件https://want595.blog.csdn.net/article/details/132912077
实验04HDFS文件创建与写入https://want595.blog.csdn.net/article/details/133168180
实验05HDFS目录与文件的创建删除与查询操作https://want595.blog.csdn.net/article/details/133168734
实验06SequenceFile、元数据操作与MapReduce单词计数https://want595.blog.csdn.net/article/details/133926246
实验07MapReduce编程:数据过滤保存、UID 去重https://want595.blog.csdn.net/article/details/133947981
实验08MapReduce 编程:检索特定群体搜索记录和定义分片操作https://want595.blog.csdn.net/article/details/133948849
实验09MapReduce 编程:join操作和聚合操作https://want595.blog.csdn.net/article/details/133949148
实验10MapReduce编程:自定义分区和自定义计数器https://want595.blog.csdn.net/article/details/133949522
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/502392
推荐阅读
相关标签
  

闽ICP备14008679号