当前位置:   article > 正文

日志项目之——将kafka数据存入hbase中_flink如何将kafka的json格式主题写入hbase

flink如何将kafka的json格式主题写入hbase

目录

1.添加依赖

2.在hbase shell界面中分别输入下面的语句,创建namespace和表

3.UserFriendToHB

4.UsersToHB

5.TrainToHB

6.EventsToHB

7.EventAttendeToHb


1.添加依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>junit</groupId>
  4. <artifactId>junit</artifactId>
  5. <version>4.11</version>
  6. <scope>test</scope>
  7. </dependency>
  8. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
  9. <dependency>
  10. <groupId>org.apache.kafka</groupId>
  11. <artifactId>kafka-clients</artifactId>
  12. <version>2.8.0</version>
  13. </dependency>
  14. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
  15. <dependency>
  16. <groupId>org.apache.kafka</groupId>
  17. <artifactId>kafka_2.12</artifactId>
  18. <version>2.8.0</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.kafka</groupId>
  22. <artifactId>kafka-streams</artifactId>
  23. <version>2.8.0</version>
  24. </dependency>
  25. <!--hbase相关依赖-->
  26. <dependency>
  27. <groupId>org.apache.hbase</groupId>
  28. <artifactId>hbase-client</artifactId>
  29. <version>2.3.5</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.hbase</groupId>
  33. <artifactId>hbase-server</artifactId>
  34. <version>2.3.5</version>
  35. </dependency>
  36. </dependencies>

2.在hbase shell界面中分别输入下面的语句,创建namespace和表

  1. create_namespace 'events_db'
  2. create 'events_db:users','profile','region','registration'
  3. create 'events_db:user_friend','uf'
  4. create 'events_db:events','schedule','location','creator','remark'
  5. create 'events_db:event_attendee','euat'
  6. create 'events_db:train','eu'

3.UserFriendToHB

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.hbase.HBaseConfiguration;
  3. import org.apache.hadoop.hbase.HConstants;
  4. import org.apache.hadoop.hbase.TableName;
  5. import org.apache.hadoop.hbase.client.Connection;
  6. import org.apache.hadoop.hbase.client.ConnectionFactory;
  7. import org.apache.hadoop.hbase.client.Put;
  8. import org.apache.hadoop.hbase.client.Table;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import org.apache.kafka.clients.consumer.ConsumerConfig;
  11. import org.apache.kafka.clients.consumer.ConsumerRecord;
  12. import org.apache.kafka.clients.consumer.ConsumerRecords;
  13. import org.apache.kafka.clients.consumer.KafkaConsumer;
  14. import org.apache.kafka.common.serialization.StringDeserializer;
  15. import java.io.IOException;
  16. import java.time.Duration;
  17. import java.util.ArrayList;
  18. import java.util.Collections;
  19. import java.util.Properties;
  20. /**
  21. * 将Kafka中的user_friends数据消费到HBase中
  22. */
  23. public class UserFriendToHB {
  24. static int num = 0;// 计数器
  25. public static void main(String[] args) {
  26. Properties prop = new Properties();
  27. prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
  28. prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  29. prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  30. prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
  31. prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
  32. prop.put(ConsumerConfig.GROUP_ID_CONFIG, "userfriend_group");
  33. prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  34. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
  35. consumer.subscribe(Collections.singleton("user_friends"));
  36. // 配置HBase信息,连接HBase数据库
  37. Configuration conf = HBaseConfiguration.create();
  38. conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
  39. conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
  40. conf.set(HConstants.CLIENT_PORT_STR, "2181");
  41. Connection connection = null;
  42. try {
  43. connection = ConnectionFactory.createConnection(conf);
  44. Table userFriendTable = connection.getTable(TableName.valueOf("events_db:user_friend"));
  45. while (true) {
  46. ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
  47. ArrayList<Put> datas = new ArrayList<>();
  48. for (ConsumerRecord<String, String> record :
  49. poll) {
  50. System.out.println(record.value());// userid,friendid
  51. String[] split = record.value().split(",");
  52. Put put = new Put(Bytes.toBytes(((split[0]) + split[1]).hashCode()));
  53. put.addColumn("uf".getBytes(), "user_id".getBytes(), split[0].getBytes());
  54. put.addColumn("uf".getBytes(), "friend_id".getBytes(), split[1].getBytes());
  55. datas.add(put);
  56. }
  57. num = num + datas.size();
  58. System.out.println("---------------------------------num:" + num);
  59. if (datas.size() != 0)
  60. userFriendTable.put(datas);
  61. try {
  62. Thread.sleep(10);
  63. } catch (InterruptedException e) {
  64. e.printStackTrace();
  65. }
  66. }
  67. } catch (IOException e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. }

4.UsersToHB

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.hbase.HBaseConfiguration;
  3. import org.apache.hadoop.hbase.HConstants;
  4. import org.apache.hadoop.hbase.TableName;
  5. import org.apache.hadoop.hbase.client.Connection;
  6. import org.apache.hadoop.hbase.client.ConnectionFactory;
  7. import org.apache.hadoop.hbase.client.Put;
  8. import org.apache.hadoop.hbase.client.Table;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import org.apache.kafka.clients.consumer.ConsumerConfig;
  11. import org.apache.kafka.clients.consumer.ConsumerRecord;
  12. import org.apache.kafka.clients.consumer.ConsumerRecords;
  13. import org.apache.kafka.clients.consumer.KafkaConsumer;
  14. import org.apache.kafka.common.serialization.StringDeserializer;
  15. import java.io.IOException;
  16. import java.time.Duration;
  17. import java.util.ArrayList;
  18. import java.util.Collections;
  19. import java.util.Properties;
  20. public class UsersToHB {
  21. static int num = 0;// 计数器
  22. public static void main(String[] args) {
  23. Properties prop = new Properties();
  24. prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
  25. prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  26. prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  27. prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
  28. prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
  29. prop.put(ConsumerConfig.GROUP_ID_CONFIG, "users_group");
  30. prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  31. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
  32. consumer.subscribe(Collections.singleton("users"));
  33. // 配置HBase信息,连接HBase数据库
  34. Configuration conf = HBaseConfiguration.create();
  35. conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
  36. conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
  37. conf.set(HConstants.CLIENT_PORT_STR, "2181");
  38. Connection connection = null;
  39. try {
  40. connection = ConnectionFactory.createConnection(conf);
  41. Table usersTable = connection.getTable(TableName.valueOf("events_db:users"));
  42. while (true) {
  43. ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
  44. ArrayList<Put> datas = new ArrayList<>();
  45. for (ConsumerRecord<String, String> record :
  46. poll) {
  47. System.out.println(record.value());
  48. String[] user = record.value().split(",");
  49. Put put = new Put(Bytes.toBytes(user[0]));
  50. put.addColumn("profile".getBytes(), "birthyear".getBytes(), user[2].getBytes());
  51. put.addColumn("profile".getBytes(), "gender".getBytes(), user[3].getBytes());
  52. put.addColumn("region".getBytes(), "locale".getBytes(), user[1].getBytes());
  53. if (user.length > 5)
  54. put.addColumn("region".getBytes(), "location".getBytes(), user[5].getBytes());
  55. if (user.length > 6)
  56. put.addColumn("region".getBytes(), "timezone".getBytes(), user[6].getBytes());
  57. if (user.length > 4)
  58. put.addColumn("registration".getBytes(), "joinedAt".getBytes(), user[4].getBytes());
  59. datas.add(put);
  60. }
  61. num = num + datas.size();
  62. System.out.println("---------------------------------num:" + num);
  63. if (datas.size() != 0)
  64. usersTable.put(datas);
  65. try {
  66. Thread.sleep(10);
  67. } catch (InterruptedException e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. } catch (IOException e) {
  72. e.printStackTrace();
  73. }
  74. }
  75. }

5.TrainToHB

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.hbase.HBaseConfiguration;
  3. import org.apache.hadoop.hbase.HConstants;
  4. import org.apache.hadoop.hbase.TableName;
  5. import org.apache.hadoop.hbase.client.Connection;
  6. import org.apache.hadoop.hbase.client.ConnectionFactory;
  7. import org.apache.hadoop.hbase.client.Put;
  8. import org.apache.hadoop.hbase.client.Table;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import org.apache.kafka.clients.consumer.ConsumerConfig;
  11. import org.apache.kafka.clients.consumer.ConsumerRecord;
  12. import org.apache.kafka.clients.consumer.ConsumerRecords;
  13. import org.apache.kafka.clients.consumer.KafkaConsumer;
  14. import org.apache.kafka.common.serialization.StringDeserializer;
  15. import java.io.IOException;
  16. import java.time.Duration;
  17. import java.util.ArrayList;
  18. import java.util.Collections;
  19. import java.util.Properties;
  20. public class TrainToHB {
  21. static int num = 0;// 计数器
  22. public static void main(String[] args) {
  23. Properties prop = new Properties();
  24. prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
  25. prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  26. prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  27. prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
  28. prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
  29. prop.put(ConsumerConfig.GROUP_ID_CONFIG, "train_group");
  30. prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  31. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
  32. consumer.subscribe(Collections.singleton("train"));
  33. // 配置HBase信息,连接HBase数据库
  34. Configuration conf = HBaseConfiguration.create();
  35. conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
  36. conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
  37. conf.set(HConstants.CLIENT_PORT_STR, "2181");
  38. Connection connection = null;
  39. try {
  40. connection = ConnectionFactory.createConnection(conf);
  41. Table trainTable = connection.getTable(TableName.valueOf("events_db:train"));
  42. while (true) {
  43. ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
  44. ArrayList<Put> datas = new ArrayList<>();
  45. for (ConsumerRecord<String, String> record :
  46. poll) {
  47. System.out.println(record.value());
  48. String[] train = record.value().split(",");
  49. double random = Math.random();
  50. Put put = new Put(Bytes.toBytes(train[0]+train[1]+random));
  51. put.addColumn("eu".getBytes(),"user".getBytes(),train[0].getBytes());
  52. put.addColumn("eu".getBytes(),"event".getBytes(),train[1].getBytes());
  53. put.addColumn("eu".getBytes(),"invited".getBytes(),train[2].getBytes());
  54. put.addColumn("eu".getBytes(),"timestamp".getBytes(),train[3].getBytes());
  55. put.addColumn("eu".getBytes(),"interested".getBytes(),train[4].getBytes());
  56. put.addColumn("eu".getBytes(),"not_interested".getBytes(),train[5].getBytes());
  57. datas.add(put);
  58. }
  59. num = num + datas.size();
  60. System.out.println("---------------------------------num:" + num);
  61. if (datas.size() != 0)
  62. trainTable.put(datas);
  63. try {
  64. Thread.sleep(10);
  65. } catch (InterruptedException e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. } catch (IOException e) {
  70. e.printStackTrace();
  71. }
  72. }
  73. }

6.EventsToHB

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.hbase.HBaseConfiguration;
  3. import org.apache.hadoop.hbase.HConstants;
  4. import org.apache.hadoop.hbase.TableName;
  5. import org.apache.hadoop.hbase.client.Connection;
  6. import org.apache.hadoop.hbase.client.ConnectionFactory;
  7. import org.apache.hadoop.hbase.client.Put;
  8. import org.apache.hadoop.hbase.client.Table;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import org.apache.kafka.clients.consumer.ConsumerConfig;
  11. import org.apache.kafka.clients.consumer.ConsumerRecord;
  12. import org.apache.kafka.clients.consumer.ConsumerRecords;
  13. import org.apache.kafka.clients.consumer.KafkaConsumer;
  14. import org.apache.kafka.common.serialization.StringDeserializer;
  15. import java.io.IOException;
  16. import java.time.Duration;
  17. import java.util.ArrayList;
  18. import java.util.Collections;
  19. import java.util.Properties;
  20. public class EventsToHB {
  21. static int num = 0;// 计数器
  22. public static void main(String[] args) {
  23. Properties prop = new Properties();
  24. prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
  25. prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  26. prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  27. prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
  28. prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
  29. prop.put(ConsumerConfig.GROUP_ID_CONFIG, "events_group");
  30. prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  31. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
  32. consumer.subscribe(Collections.singleton("events"));
  33. // 配置HBase信息,连接HBase数据库
  34. Configuration conf = HBaseConfiguration.create();
  35. conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
  36. conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
  37. conf.set(HConstants.CLIENT_PORT_STR, "2181");
  38. Connection connection = null;
  39. try {
  40. connection = ConnectionFactory.createConnection(conf);
  41. Table eventsTable = connection.getTable(TableName.valueOf("events_db:events"));
  42. while (true) {
  43. ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
  44. ArrayList<Put> datas = new ArrayList<>();
  45. for (ConsumerRecord<String, String> record :
  46. poll) {
  47. System.out.println(record.value());
  48. String[] events = record.value().split(",");
  49. Put put = new Put(Bytes.toBytes((events[0])));
  50. put.addColumn("creator".getBytes(), "userid".getBytes(), Bytes.toBytes(events[1]));
  51. put.addColumn("schedule".getBytes(), "starttime".getBytes(), Bytes.toBytes(events[2]));
  52. put.addColumn("location".getBytes(), "city".getBytes(), Bytes.toBytes(events[3]));
  53. put.addColumn("location".getBytes(), "state".getBytes(), Bytes.toBytes(events[4]));
  54. put.addColumn("location".getBytes(), "zip".getBytes(), Bytes.toBytes(events[5]));
  55. put.addColumn("location".getBytes(), "country".getBytes(), Bytes.toBytes(events[6]));
  56. put.addColumn("location".getBytes(), "lat".getBytes(), Bytes.toBytes(events[7]));
  57. put.addColumn("location".getBytes(), "lng".getBytes(), Bytes.toBytes(events[8]));
  58. put.addColumn("remark".getBytes(), "commonwords".getBytes(), Bytes.toBytes(events[9]));
  59. datas.add(put);
  60. }
  61. num = num + datas.size();
  62. System.out.println("---------------------------------num:" + num);
  63. if (datas.size() != 0)
  64. eventsTable.put(datas);
  65. try {
  66. Thread.sleep(10);
  67. } catch (InterruptedException e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. } catch (IOException e) {
  72. e.printStackTrace();
  73. }
  74. }
  75. }

7.EventAttendeToHb

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.hbase.HBaseConfiguration;
  3. import org.apache.hadoop.hbase.HConstants;
  4. import org.apache.hadoop.hbase.TableName;
  5. import org.apache.hadoop.hbase.client.Connection;
  6. import org.apache.hadoop.hbase.client.ConnectionFactory;
  7. import org.apache.hadoop.hbase.client.Put;
  8. import org.apache.hadoop.hbase.client.Table;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import org.apache.kafka.clients.consumer.ConsumerConfig;
  11. import org.apache.kafka.clients.consumer.ConsumerRecord;
  12. import org.apache.kafka.clients.consumer.ConsumerRecords;
  13. import org.apache.kafka.clients.consumer.KafkaConsumer;
  14. import org.apache.kafka.common.serialization.StringDeserializer;
  15. import java.io.IOException;
  16. import java.time.Duration;
  17. import java.util.ArrayList;
  18. import java.util.Collections;
  19. import java.util.Properties;
  20. public class EventAttendeToHb {
  21. static int num = 0;// 计数器
  22. public static void main(String[] args) {
  23. Properties prop = new Properties();
  24. prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.180.147:9092");
  25. prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  26. prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  27. prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// 手动提交
  28. prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// 自动提交时,提交时间
  29. prop.put(ConsumerConfig.GROUP_ID_CONFIG, "eventattendee_group");
  30. prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  31. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
  32. consumer.subscribe(Collections.singleton("event_attendees"));
  33. // 配置HBase信息,连接HBase数据库
  34. Configuration conf = HBaseConfiguration.create();
  35. conf.set(HConstants.HBASE_DIR, "hdfs://192.168.180.147:9000/hbase");
  36. conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.180.147");
  37. conf.set(HConstants.CLIENT_PORT_STR, "2181");
  38. Connection connection = null;
  39. try {
  40. connection = ConnectionFactory.createConnection(conf);
  41. Table table = connection.getTable(TableName.valueOf("events_db:event_attendee"));
  42. while (true) {
  43. ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
  44. ArrayList<Put> datas = new ArrayList<>();
  45. for (ConsumerRecord<String, String> record :
  46. poll) {
  47. System.out.println(record.value());// eventid,friendid,yes/no/maybe
  48. String[] eventattend = record.value().split(",");
  49. Put put = new Put(Bytes.toBytes((eventattend[0]+eventattend[1]+eventattend[2])));
  50. put.addColumn("euat".getBytes(),"eventid".getBytes(),Bytes.toBytes(eventattend[0]));
  51. put.addColumn("euat".getBytes(),"friendid".getBytes(),Bytes.toBytes(eventattend[1]));
  52. put.addColumn("euat".getBytes(),"state".getBytes(),Bytes.toBytes(eventattend[2]));
  53. datas.add(put);
  54. }
  55. num = num + datas.size();
  56. System.out.println("---------------------------------num:" + num);
  57. if (datas.size() != 0)
  58. table.put(datas);
  59. try {
  60. Thread.sleep(10);
  61. } catch (InterruptedException e) {
  62. e.printStackTrace();
  63. }
  64. }
  65. } catch (IOException e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/749860
推荐阅读
相关标签
  

闽ICP备14008679号