当前位置:   article > 正文

Hadoop编程——Elasticsearch for Apache Hadoop从HDFS导入数据到Elasticsearch_hadoop elasticsearch导入

hadoop elasticsearch导入

Hadoop编程——从HDFS导入数据到Elasticsearch

一、Elasticsearch for Apache Hadoop安装

1.1 官网下载zip安装包

1.2 maven方式下载

1.3 将ES-hadoop 的jar包加入环境变量

二、准备数据

三、从HDFS读取文档索引到ES

四、API分析


Hadoop编程——从HDFS导入数据到Elasticsearch

Elasticsearch for Apache Hadoop是一个开源,独立的,独立的小型库,允许Hadoop(无论是使用Map / Reduce还是基于它构建的库,如Hive,或者新的即将到来的库,如Apache Spark)与 Elasticsearch 互动。可以将其视为允许数据双向流动的连接器,以便应用程序可以透明地利用Elasticsearch引擎功能来显着丰富其功能并提高性能。

本示例软件环境简介:Hadoop-3.2.0 + Elasticsearch-6.4.3 + JDK1.8 + Elasticsearch for Hadoop-6.4.3

一、Elasticsearch for Apache Hadoop安装

1.1 官网下载zip安装包

官网给的Elasticsearch for Hadoop的地址https://www.elastic.co/downloads/hadoop,下载后解压后,把dist目录下jar包导入hadoop即可。

如图:

1.2 maven方式下载

使用maven管理jar包更加方便,在pom.xml文件中加入以下依赖:

  1. <dependency>
  2. <groupId>org.elasticsearch</groupId>
  3. <artifactId>elasticsearch-hadoop</artifactId>
  4. <version>6.4.3</version>
  5. </dependency>

PS:上面的依赖包含了MapReduce、Pig、Hive、Spark等完整的依赖,如果只想单独使用某一个功能,可以细化分别加入

Map/Reduce:

  1. <dependency>
  2. <groupId>org.elasticsearch</groupId>
  3. <artifactId>elasticsearch-hadoop-mr</artifactId>
  4. <version>6.4.3</version>
  5. </dependency>

Apache Hive:

  1. <dependency>
  2. <groupId>org.elasticsearch</groupId>
  3. <artifactId>elasticsearch-hadoop-hive</artifactId>
  4. <version>6.4.3</version>
  5. </dependency>

Apache Pig:

  1. <dependency>
  2. <groupId>org.elasticsearch</groupId>
  3. <artifactId>elasticsearch-hadoop-pig</artifactId>
  4. <version>6.4.3</version>
  5. </dependency>

Apache Spark:

  1. <dependency>
  2. <groupId>org.elasticsearch</groupId>
  3. <artifactId>elasticsearch-spark-20_2.10</artifactId>
  4. <version>6.4.3</version>
  5. </dependency>

1.3 将ES-hadoop 的jar包加入环境变量

把ES-HADOOP的jar包导入环境变量,编辑/etc/profile(或.bash_profile),我的路径为:

/opt/hadoop/hadoop-3.2.0/elasticsearch-hadoop-6.4.3

jar包位于elasticsearch-hadoop-6.4.3的dist目录下。编辑/etc/profile,加入一行:

  1. #ES-HADOOP HOME
  2. export ES_HADOOP_HOME=/opt/hadoop/hadoop-3.2.0/elasticsearch-hadoop-6.4.3
  3. export CLASSPATH=$CLASSPATH:$ES_HADOOP_HOME/dist/*

最后source一下使profile文件生效:

source /etc/profile

PS:如果是Hadoop集群的话每个节点都需要拥有eslasticsearch-hadoop的jar包 

Hadoop集群搭建可以参考此博客:Linux从零搭建Hadoop集群(CentOS7+hadoop 3.2.0+JDK1.8完全分布式集群)

 二、准备数据

准备一些测试数据,数据内容为json格式,每行是一条文档。把下列内容保存到test.json中。

  1. {"id":"1","title":"git简介","posttime":"2019-07-18","content":"svn与git的最主要区别..."}
  2. {"id":"2","title":"elasticsearch使用案例","posttime":"2019-07-18","content":"elasticsearch的CURD..."}
  3. {"id":"3","title":"SQL基本操作","posttime":"2019-07-18","content":"svn与git的最主要区别..."}
  4. {"id":"4","title":"Hibernate框架基础","posttime":"2019-07-18","content":"Hibernate框架基础..."}
  5. {"id":"5","title":"Shell基本知识","posttime":"2019-07-18","content":"Shell是什么..."}

启动Hadoop,将test.json文件上传至hdfs

hadoop fs -put test.json /work

 如图:

结果: 

三、从HDFS读取文档索引到ES

从HDFS读取文档索引到Elasticsearch的代码:

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  9. import org.elasticsearch.hadoop.mr.EsOutputFormat;
  10. import java.io.IOException;
  11. /**
  12. * Created by bee on 2019-07-18.
  13. * @Author WS
  14. */
  15. public class TestHDFSToEs {
  16. public static class MyMapper extends Mapper<Object, Text, NullWritable, Text> {
  17. private Text line = new Text();
  18. public void map(Object key, Text value, Mapper<Object, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
  19. if(value.getLength()>0){
  20. line.set(value);
  21. context.write(NullWritable.get(), line);
  22. }
  23. }
  24. }
  25. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  26. Configuration conf = new Configuration();
  27. conf.setBoolean("mapred.map.tasks.speculative.execution", false);
  28. conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
  29. conf.set("es.nodes", "localhost:9200");
  30. //设置索引/类型
  31. conf.set("es.resource", "test/test");
  32. conf.set("es.mapping.id", "id");
  33. conf.set("es.input.json", "yes");
  34. Job job = Job.getInstance(conf, "hadoop es write test");
  35. job.setMapperClass(TestHDFSToEs.MyMapper.class);
  36. job.setInputFormatClass(TextInputFormat.class);
  37. job.setOutputFormatClass(EsOutputFormat.class);
  38. job.setMapOutputKeyClass(NullWritable.class);
  39. job.setMapOutputValueClass(Text.class);
  40. // 设置输入路径
  41. FileInputFormat.setInputPaths(job, new Path
  42. ("hdfs://192.168.217.128:9000/test/test.json"));
  43. job.waitForCompletion(true);
  44. }
  45. }

执行此Main函数,导入成功后会在Elasticsearch中生成test索引。查看内容如下: 

四、API分析

Map过程,按行读入,input kye的类型为Object,input value的类型为Text。输出的key为Text类型,NullWritable是Writable的一个特殊类,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符。MapReduce中如果不需要使用键或值,就可以将键或值声明为NullWritable,这里把输出的key设置NullWritable类型。输出为Text类型,把json字符串序列化。

因为只需要写入,没有Reduce过程。在main函数中,首先创Configuration()类的一个对象conf,通过conf配置一些参数。

  • conf.setBoolean("mapred.map.tasks.speculative.execution", false);

       关闭mapper阶段的执行推测

  • conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);

       关闭reducer阶段的执行推测

  • conf.set("es.nodes", "192.168.1.111:9200");

       配置Elasticsearch的IP和端口

  • conf.set("es.resource", "blog/csdn");

       设置索引到Elasticsearch的索引名和类型名。

  • conf.set("es.mapping.id", "id");

       设置文档id,这个参数”id”是文档中的id字段

  • conf.set("es.input.json", "yes");

       指定输入的文件类型为json。

  • job.setInputFormatClass(TextInputFormat.class);

       设置输入流为文本类型

  • job.setOutputFormatClass(EsOutputFormat.class);

       设置输出为EsOutputFormat类型。

  • job.setMapOutputKeyClass(NullWritable.class);

       设置Map的输出key类型为NullWritable类型

  • job.setMapOutputValueClass(BytesWritable.class);

       设置Map的输出value类型为BytesWritable类型

最后给各位看官来波福利!

阿里云服务器代金券和折扣免费领:https://promotion.aliyun.com/ntms/yunparter/invite.html?userCode=ypbt9nme

性能级主机2-5折:https://promotion.aliyun.com/ntms/act/enterprise-discount.html?userCode=ypbt9nme

新用户云通讯专享8折:https://www.aliyun.com/acts/alicomcloud/new-discount?userCode=ypbt9nme

新老用户云主机低4折专项地址:https://promotion.aliyun.com/ntms/act/qwbk.html?userCode=ypbt9nme

680元即可注册商标专项地址:https://tm.aliyun.com/?userCode=ypbt9nme

17元/月云主机:https://promotion.aliyun.com/ntms/act/qwbk.html?spm=5176.11533447.1097531.13.22805cfaiTv7SN&userCode=ypbt9nme

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/693385
推荐阅读
相关标签
  

闽ICP备14008679号