当前位置:   article > 正文

Intellij IDEA编写Spark应用程序的环境配置和操作步骤_idea spark

idea spark

本文介绍如何在win系统中使用IDEA开发spark应用程序,并将其打成jar包上传到虚拟机中的三个Ubuntu系统,然后在分布式环境中运行。

主要步骤包括:

  • 安装Scala插件:在Intellij IDEA中安装Scala插件,并重启IDEA。
  • 创建Maven项目:在Intellij IDEA中创建一个Maven项目,选择Scala语言,并添加Spark和HBase依赖。
  • 配置Scala SDK:在Intellij IDEA中添加Scala SDK,并给项目添加Scala支持。
  • 编写Spark应用程序:在src/main/scala目录下创建一个Scala对象,并编写Spark代码。
  • 打包和运行Spark项目:在本地模式下测试Spark应用程序,打包成jar包,上传到虚拟机中的master节点,使用spark-submit命令提交到集群。

 基础环境

 首先确保已经在虚拟机中安装配置好Hadoop,HBase和Spark,并且可以正常运行。本文假设已经按照之前文章的步骤搭建了一个三节点的Hadoop集群,其中scala版本为2.12,hbase版本为2.3.7,spark版本为3.2.3,hadoop版本为3.2.4

一、安装Scala插件

  • 在Intellij IDEA中,选择File->Settings->Plugins,在Marketplace中搜索scala进行安装,安装后根据提示重启IDEA。

二、创建Maven项目

  • 在Intellij IDEA中,选择File->New->Project,选择Maven作为项目类型,填写项目名称和位置。
  • 在pom.xml文件中添加Spark和HBase相关的依赖,注意要与虚拟机中的Spark版本和Scala版本保持一致。本文使用的是Spark 3.2.3和Scala 2.12。例如:
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>sparkhbase</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. <hbase.version>2.3.7</hbase.version>
  13. <hadoop.version>3.2.4</hadoop.version>
  14. <spark.version>3.2.3</spark.version>
  15. <scala.version>2.12</scala.version>
  16. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  17. </properties>
  18. <dependencies>
  19. <dependency>
  20. <groupId>org.apache.spark</groupId>
  21. <artifactId>spark-core_${scala.version}</artifactId>
  22. <version>${spark.version}</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.apache.spark</groupId>
  26. <artifactId>spark-streaming_${scala.version}</artifactId>
  27. <version>${spark.version}</version>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.apache.spark</groupId>
  31. <artifactId>spark-sql_${scala.version}</artifactId>
  32. <version>${spark.version}</version>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.apache.spark</groupId>
  36. <artifactId>spark-hive_${scala.version}</artifactId>
  37. <version>${spark.version}</version>
  38. </dependency>
  39. <dependency>
  40. <groupId>org.apache.hadoop</groupId>
  41. <artifactId>hadoop-client</artifactId>
  42. <version>${hadoop.version}</version>
  43. </dependency>
  44. <dependency>
  45. <groupId>org.apache.hadoop</groupId>
  46. <artifactId>hadoop-common</artifactId>
  47. <version>${hadoop.version}</version>
  48. </dependency>
  49. <dependency>
  50. <groupId>org.apache.hadoop</groupId>
  51. <artifactId>hadoop-hdfs</artifactId>
  52. <version>${hadoop.version}</version>
  53. </dependency>
  54. <dependency>
  55. <groupId>org.apache.hbase</groupId>
  56. <artifactId>hbase-client</artifactId>
  57. <version>${hbase.version}</version>
  58. </dependency>
  59. <dependency>
  60. <groupId>org.apache.hbase</groupId>
  61. <artifactId>hbase-server</artifactId>
  62. <version>${hbase.version}</version>
  63. </dependency>
  64. <dependency>
  65. <groupId>org.apache.hbase</groupId>
  66. <artifactId>hbase-mapreduce</artifactId>
  67. <version>${hbase.version}</version>
  68. </dependency>
  69. <dependency>
  70. <groupId>org.apache.hbase</groupId>
  71. <artifactId>hbase-common</artifactId>
  72. <version>${hbase.version}</version>
  73. </dependency>
  74. <dependency>
  75. <groupId>org.apache.hbase</groupId>
  76. <artifactId>hbase</artifactId>
  77. <version>${hbase.version}</version>
  78. <type>pom</type>
  79. </dependency>
  80. <dependency>
  81. <groupId>org.apache.spark</groupId>
  82. <artifactId>spark-mllib_${scala.version}</artifactId>
  83. <version>${spark.version}</version>
  84. </dependency>
  85. </dependencies>
  86. </project>

三、配置Scala SDK

  • 在Intellij IDEA中,选择File->Project Structure->Global Libraries,添加Scala SDK,选择本地安装的Scala版本。
  • 在项目中右键选择Add Framework Support,在弹出的对话框中勾选Scala,并选择对应的SDK。

四、编写Spark应用程序

  • 在src/main/scala目录下创建一个包,例如com.spark.example,并在该包下创建一个Scala对象,例如WordCountFromHBase。编写Spark应用程序的代码,例如:
  1. import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
  2. import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Scan}
  3. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  4. import org.apache.hadoop.hbase.util.Bytes
  5. import org.apache.spark.{SparkConf, SparkContext}
  6. object WordCountFromHBase {
  7. def main(args: Array[String]): Unit = {
  8. //创建Spark配置对象
  9. val conf = new SparkConf().setAppName("WordCountFromHBase").setMaster("local")
  10. //创建Spark上下文对象
  11. val sc = new SparkContext(conf)
  12. //创建HBase配置对象
  13. val hbaseConf = HBaseConfiguration.create()
  14. //设置HBase的Zookeeper地址
  15. hbaseConf.set("hbase.zookeeper.quorum", "hadoop100:2181,hadoop200:2181,hadoop201:2181")
  16. //设置HBase的Zookeeper端口
  17. hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
  18. //设置要读取的HBase表名,提前通过hbase shell创建
  19. val tableName = "testtable"
  20. hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
  21. //创建HBase连接对象
  22. val connection: Connection = ConnectionFactory.createConnection(hbaseConf)
  23. //获取HBase表对象
  24. val table = connection.getTable(TableName.valueOf(tableName))
  25. //创建一个扫描对象,指定要读取的列族和列名
  26. val scan = new Scan()
  27. scan.addColumn(Bytes.toBytes("f"), Bytes.toBytes("word"))
  28. //将扫描对象转换为字符串,设置到HBase配置对象中
  29. hbaseConf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan))
  30. //从HBase中读取数据,返回一个RDD
  31. val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
  32. //对RDD进行单词统计
  33. val wordCount = hbaseRDD.map(tuple => {
  34. //获取Result对象
  35. val result = tuple._2
  36. //获取word列的值,转换为字符串
  37. val word = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("word")))
  38. //返回(word, 1)的元组
  39. (word, 1)
  40. }).reduceByKey((a, b) => a + b)
  41. //打印结果
  42. wordCount.foreach(println)
  43. //关闭Spark上下文和HBase连接
  44. sc.stop()
  45. connection.close()
  46. }
  47. }

五、打包和运行Spark项目

  • 在Intellij IDEA中右键运行WordCountFromHBase对象,可以在本地模式下测试Spark应用程序是否正确。如果没有问题,可以进行打包操作。
  • 在Intellij IDEA中打开Maven工具栏,双击lifecycle下的package命令,将项目打成jar包。打包完成后的jar包在target目录下,例如spark-example-1.0-SNAPSHOT.jar。
  • 将jar包上传到虚拟机中的hadoop100主节点,userjar/目录。
  • 在master节点上使用spark-submit命令提交Spark应用程序到集群,指定jar包路径和主类名。例如:
spark-submit --class com.spark.example.WordCountFromHBase spark-example-1.0-SNAPSHOT.jar
  • 查看Spark应用程序的运行结果,可以在终端中输出,也可以在Spark Web UI中查看。

打包方式二: 

File->Project Structure->artifacts->点击加号->JAR->from model->点击Main Class选项框后的文件夹->点击Projet->选择main方法->点击ok

仅保留类似红框中函数名的程序包,去掉多余依赖,打成比较小的jar包,需要linux中的软件环境与依赖版本相同才能运行

 

之后Build->Build artifacts->选中项目点击build即可


 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/427233
推荐阅读
相关标签
  

闽ICP备14008679号