赞
踩
本文介绍如何在win系统中使用IDEA开发spark应用程序,并将其打成jar包上传到虚拟机中的三个Ubuntu系统,然后在分布式环境中运行。
主要步骤包括:
首先确保已经在虚拟机中安装配置好Hadoop,HBase和Spark,并且可以正常运行。本文假设已经按照之前文章的步骤搭建了一个三节点的Hadoop集群,其中scala版本为2.12,hbase版本为2.3.7,spark版本为3.2.3,hadoop版本为3.2.4
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>org.example</groupId>
- <artifactId>sparkhbase</artifactId>
- <version>1.0-SNAPSHOT</version>
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
-
- <hbase.version>2.3.7</hbase.version>
- <hadoop.version>3.2.4</hadoop.version>
- <spark.version>3.2.3</spark.version>
- <scala.version>2.12</scala.version>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.version}</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.version}</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.version}</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_${scala.version}</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>${hbase.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>${hbase.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-mapreduce</artifactId>
- <version>${hbase.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-common</artifactId>
- <version>${hbase.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <version>${hbase.version}</version>
- <type>pom</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-mllib_${scala.version}</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- </dependencies>
- </project>
- import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
- import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Scan}
- import org.apache.hadoop.hbase.mapreduce.TableInputFormat
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.spark.{SparkConf, SparkContext}
-
- object WordCountFromHBase {
- def main(args: Array[String]): Unit = {
- //创建Spark配置对象
- val conf = new SparkConf().setAppName("WordCountFromHBase").setMaster("local")
- //创建Spark上下文对象
- val sc = new SparkContext(conf)
- //创建HBase配置对象
- val hbaseConf = HBaseConfiguration.create()
- //设置HBase的Zookeeper地址
- hbaseConf.set("hbase.zookeeper.quorum", "hadoop100:2181,hadoop200:2181,hadoop201:2181")
- //设置HBase的Zookeeper端口
- hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
- //设置要读取的HBase表名,提前通过hbase shell创建
- val tableName = "testtable"
- hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
- //创建HBase连接对象
- val connection: Connection = ConnectionFactory.createConnection(hbaseConf)
- //获取HBase表对象
- val table = connection.getTable(TableName.valueOf(tableName))
- //创建一个扫描对象,指定要读取的列族和列名
- val scan = new Scan()
- scan.addColumn(Bytes.toBytes("f"), Bytes.toBytes("word"))
- //将扫描对象转换为字符串,设置到HBase配置对象中
- hbaseConf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan))
- //从HBase中读取数据,返回一个RDD
- val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
- //对RDD进行单词统计
- val wordCount = hbaseRDD.map(tuple => {
- //获取Result对象
- val result = tuple._2
- //获取word列的值,转换为字符串
- val word = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("word")))
- //返回(word, 1)的元组
- (word, 1)
- }).reduceByKey((a, b) => a + b)
- //打印结果
- wordCount.foreach(println)
- //关闭Spark上下文和HBase连接
- sc.stop()
- connection.close()
- }
- }
spark-submit --class com.spark.example.WordCountFromHBase spark-example-1.0-SNAPSHOT.jar
打包方式二:
File->Project Structure->artifacts->点击加号->JAR->from model->点击Main Class选项框后的文件夹->点击Projet->选择main方法->点击ok
仅保留类似红框中函数名的程序包,去掉多余依赖,打成比较小的jar包,需要linux中的软件环境与依赖版本相同才能运行
之后Build->Build artifacts->选中项目点击build即可
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。