赞
踩
目录
2.在hbase shell界面中分别输入下面的语句,创建namespace和表
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.11</version>
- <scope>test</scope>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.8.0</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.12</artifactId>
- <version>2.8.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-streams</artifactId>
- <version>2.8.0</version>
- </dependency>
- <!--hbase相关依赖-->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>2.3.5</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>2.3.5</version>
- </dependency>
- </dependencies>
- create_namespace 'events_db'
-
- create 'events_db:users','profile','region','registration'
-
- create 'events_db:user_friend','uf'
-
- create 'events_db:events','schedule','location','creator','remark'
-
- create 'events_db:event_attendee','euat'
-
- create 'events_db:train','eu'
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.Connection;
- import org.apache.hadoop.hbase.client.ConnectionFactory;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Table;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.io.IOException;
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Properties;
-
- /**
- * 将Kafka中的user_friends数据消费到HBase中
- */
- public class UserFriendToHB {
- static int num = 0;// 计数器
-
- public static void main(String[] args) {
- Properties prop = new Properties();
- prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
- prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
- prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
- prop.put(ConsumerConfig.GROUP_ID_CONFIG, "userfriend_group");
- prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
- consumer.subscribe(Collections.singleton("user_friends"));
-
- // 配置HBase信息,连接HBase数据库
- Configuration conf = HBaseConfiguration.create();
- conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
- conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
- conf.set(HConstants.CLIENT_PORT_STR, "2181");
-
- Connection connection = null;
- try {
- connection = ConnectionFactory.createConnection(conf);
- Table userFriendTable = connection.getTable(TableName.valueOf("events_db:user_friend"));
- while (true) {
- ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
- ArrayList<Put> datas = new ArrayList<>();
- for (ConsumerRecord<String, String> record :
- poll) {
- System.out.println(record.value());// userid,friendid
- String[] split = record.value().split(",");
- Put put = new Put(Bytes.toBytes(((split[0]) + split[1]).hashCode()));
- put.addColumn("uf".getBytes(), "user_id".getBytes(), split[0].getBytes());
- put.addColumn("uf".getBytes(), "friend_id".getBytes(), split[1].getBytes());
- datas.add(put);
- }
- num = num + datas.size();
- System.out.println("---------------------------------num:" + num);
- if (datas.size() != 0)
- userFriendTable.put(datas);
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.Connection;
- import org.apache.hadoop.hbase.client.ConnectionFactory;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Table;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.io.IOException;
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Properties;
-
- public class UsersToHB {
- static int num = 0;// 计数器
-
- public static void main(String[] args) {
- Properties prop = new Properties();
- prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
- prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
- prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
- prop.put(ConsumerConfig.GROUP_ID_CONFIG, "users_group");
- prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
- consumer.subscribe(Collections.singleton("users"));
-
- // 配置HBase信息,连接HBase数据库
- Configuration conf = HBaseConfiguration.create();
- conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
- conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
- conf.set(HConstants.CLIENT_PORT_STR, "2181");
-
- Connection connection = null;
-
- try {
- connection = ConnectionFactory.createConnection(conf);
- Table usersTable = connection.getTable(TableName.valueOf("events_db:users"));
- while (true) {
- ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
- ArrayList<Put> datas = new ArrayList<>();
- for (ConsumerRecord<String, String> record :
- poll) {
- System.out.println(record.value());
- String[] user = record.value().split(",");
- Put put = new Put(Bytes.toBytes(user[0]));
- put.addColumn("profile".getBytes(), "birthyear".getBytes(), user[2].getBytes());
- put.addColumn("profile".getBytes(), "gender".getBytes(), user[3].getBytes());
- put.addColumn("region".getBytes(), "locale".getBytes(), user[1].getBytes());
- if (user.length > 5)
- put.addColumn("region".getBytes(), "location".getBytes(), user[5].getBytes());
- if (user.length > 6)
- put.addColumn("region".getBytes(), "timezone".getBytes(), user[6].getBytes());
- if (user.length > 4)
- put.addColumn("registration".getBytes(), "joinedAt".getBytes(), user[4].getBytes());
- datas.add(put);
- }
- num = num + datas.size();
- System.out.println("---------------------------------num:" + num);
- if (datas.size() != 0)
- usersTable.put(datas);
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.Connection;
- import org.apache.hadoop.hbase.client.ConnectionFactory;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Table;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.io.IOException;
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Properties;
-
- public class TrainToHB {
- static int num = 0;// 计数器
-
- public static void main(String[] args) {
- Properties prop = new Properties();
- prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
- prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
- prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
- prop.put(ConsumerConfig.GROUP_ID_CONFIG, "train_group");
- prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
- consumer.subscribe(Collections.singleton("train"));
-
- // 配置HBase信息,连接HBase数据库
- Configuration conf = HBaseConfiguration.create();
- conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
- conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
- conf.set(HConstants.CLIENT_PORT_STR, "2181");
-
- Connection connection = null;
-
- try {
- connection = ConnectionFactory.createConnection(conf);
- Table trainTable = connection.getTable(TableName.valueOf("events_db:train"));
- while (true) {
- ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
- ArrayList<Put> datas = new ArrayList<>();
- for (ConsumerRecord<String, String> record :
- poll) {
- System.out.println(record.value());
- String[] train = record.value().split(",");
- double random = Math.random();
- Put put = new Put(Bytes.toBytes(train[0]+train[1]+random));
- put.addColumn("eu".getBytes(),"user".getBytes(),train[0].getBytes());
- put.addColumn("eu".getBytes(),"event".getBytes(),train[1].getBytes());
- put.addColumn("eu".getBytes(),"invited".getBytes(),train[2].getBytes());
- put.addColumn("eu".getBytes(),"timestamp".getBytes(),train[3].getBytes());
- put.addColumn("eu".getBytes(),"interested".getBytes(),train[4].getBytes());
- put.addColumn("eu".getBytes(),"not_interested".getBytes(),train[5].getBytes());
-
- datas.add(put);
- }
- num = num + datas.size();
- System.out.println("---------------------------------num:" + num);
- if (datas.size() != 0)
- trainTable.put(datas);
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.Connection;
- import org.apache.hadoop.hbase.client.ConnectionFactory;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Table;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.io.IOException;
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Properties;
-
- public class EventsToHB {
- static int num = 0;// 计数器
-
- public static void main(String[] args) {
- Properties prop = new Properties();
- prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
- prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
- prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
- prop.put(ConsumerConfig.GROUP_ID_CONFIG, "events_group");
- prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
- consumer.subscribe(Collections.singleton("events"));
-
- // 配置HBase信息,连接HBase数据库
- Configuration conf = HBaseConfiguration.create();
- conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
- conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
- conf.set(HConstants.CLIENT_PORT_STR, "2181");
-
- Connection connection = null;
-
- try {
- connection = ConnectionFactory.createConnection(conf);
- Table eventsTable = connection.getTable(TableName.valueOf("events_db:events"));
- while (true) {
- ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
- ArrayList<Put> datas = new ArrayList<>();
- for (ConsumerRecord<String, String> record :
- poll) {
- System.out.println(record.value());
- String[] events = record.value().split(",");
- Put put = new Put(Bytes.toBytes((events[0])));
- put.addColumn("creator".getBytes(), "userid".getBytes(), Bytes.toBytes(events[1]));
- put.addColumn("schedule".getBytes(), "starttime".getBytes(), Bytes.toBytes(events[2]));
- put.addColumn("location".getBytes(), "city".getBytes(), Bytes.toBytes(events[3]));
- put.addColumn("location".getBytes(), "state".getBytes(), Bytes.toBytes(events[4]));
- put.addColumn("location".getBytes(), "zip".getBytes(), Bytes.toBytes(events[5]));
- put.addColumn("location".getBytes(), "country".getBytes(), Bytes.toBytes(events[6]));
- put.addColumn("location".getBytes(), "lat".getBytes(), Bytes.toBytes(events[7]));
- put.addColumn("location".getBytes(), "lng".getBytes(), Bytes.toBytes(events[8]));
- put.addColumn("remark".getBytes(), "commonwords".getBytes(), Bytes.toBytes(events[9]));
-
- datas.add(put);
- }
- num = num + datas.size();
- System.out.println("---------------------------------num:" + num);
- if (datas.size() != 0)
- eventsTable.put(datas);
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.Connection;
- import org.apache.hadoop.hbase.client.ConnectionFactory;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Table;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.io.IOException;
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Properties;
-
- public class EventAttendeToHb {
- static int num = 0;// 计数器
-
- public static void main(String[] args) {
- Properties prop = new Properties();
- prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
- prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
- prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
- prop.put(ConsumerConfig.GROUP_ID_CONFIG, "eventattendee_group");
- prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
- consumer.subscribe(Collections.singleton("event_attendees"));
-
- // 配置HBase信息,连接HBase数据库
- Configuration conf = HBaseConfiguration.create();
- conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
- conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
- conf.set(HConstants.CLIENT_PORT_STR, "2181");
-
- Connection connection = null;
-
- try {
- connection = ConnectionFactory.createConnection(conf);
- Table table = connection.getTable(TableName.valueOf("events_db:event_attendee"));
- while (true) {
- ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
- ArrayList<Put> datas = new ArrayList<>();
- for (ConsumerRecord<String, String> record :
- poll) {
- System.out.println(record.value());// eventid,friendid,yes/no/maybe
- String[] eventattend = record.value().split(",");
- Put put = new Put(Bytes.toBytes((eventattend[0]+eventattend[1]+eventattend[2])));
- put.addColumn("euat".getBytes(),"eventid".getBytes(),Bytes.toBytes(eventattend[0]));
- put.addColumn("euat".getBytes(),"friendid".getBytes(),Bytes.toBytes(eventattend[1]));
- put.addColumn("euat".getBytes(),"state".getBytes(),Bytes.toBytes(eventattend[2]));
-
- datas.add(put);
- }
- num = num + datas.size();
- System.out.println("---------------------------------num:" + num);
- if (datas.size() != 0)
- table.put(datas);
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。