赞
踩
上节完成的内容如下:
先实现到MySQL保存前的内容,我们需要先编写测试一下我们的代码是否正确
package icu.wzk import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SuperWordCount1 { private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+") private val punctuation = "[\\)\\.,:;'!\\?]" def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("ScalaSuperWordCount1") .setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val lines: RDD[String] = sc.textFile(args(0)) lines .flatMap(_.split("\\s+")) .map(_.toLowerCase) .map(_.replaceAll(punctuation, "")) .filter(word => !stopWords.contains(word) && word.trim.nonEmpty) .map((_, 1)) .reduceByKey(_ + _) .sortBy(_._2, false) .collect() .foreach(println) sc.stop() } }
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
同时我们需要在build的部分,也要加入对应的内容,让驱动可以加载进来:
<build> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>4.4.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <archive> <manifest> <mainClass>cn.lagou.sparkcore.WordCount</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
我们新建一个数据库,也要新建一个数据表
CREATE TABLE `wordcount` (
`word` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
我们在 foreach 中保存了数据,此时需要创建大量的MySQL连接,效率是比较低的。
package icu.wzk import com.mysql.cj.xdevapi.PreparableStatement import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import java.sql.{Connection, DriverManager, PreparedStatement} object SuperWordCount2 { private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+") private val punctuation = "[\\)\\.,:;'!\\?]" def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("ScalaSuperWordCount2") .setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val lines: RDD[String] = sc.textFile(args(0)) val words: RDD[String] = lines .flatMap(_.split("\\s+")) .map(_.trim.toLowerCase()) val clearWords: RDD[String] = words .filter(!stopWords.contains(_)) .map(_.replaceAll(punctuation, "")) val result: RDD[(String, Int)] = clearWords .map((_, 1)) .reduceByKey(_ + _) .sortBy(_._2, false) result.foreach(println) // 输出到 MySQL val username = "hive" val password = "hive@wzk.icu" val url = "jdbc:mysql://h122.wzk.icu:3306/spark-test?useUnicode=true&characterEncoding=utf-8&useSSL=false" var conn: Connection = null var stmt: PreparedStatement = null var sql = "insert into wordcount values(?, ?)" result.foreach{ case (word, count) => try { conn = DriverManager.getConnection(url, username, password) stmt = conn.prepareStatement(sql) stmt.setString(1, word) stmt.setInt(2, count) } catch { case e: Exception => e.printStackTrace() } finally { if (stmt != null) { stmt.close() } if (conn != null) { conn.close() } } } sc.stop() } }
优化后使用 foreachPartition 保存数据,一个分区创建一个链接:cache RDD
注意:
package icu.wzk import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import java.sql.{Connection, DriverManager, PreparedStatement} object SuperWordCount3 { private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+") private val punctuation = "[\\)\\.,:;'!\\?]" private val username = "hive" private val password = "hive@wzk.icu" private val url = "jdbc:mysql://h122.wzk.icu:3306/spark-test?useUnicode=true&characterEncoding=utf-8&useSSL=false" def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("ScalaSuperWordCount2") .setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val lines: RDD[String] = sc.textFile(args(0)) val words: RDD[String] = lines .flatMap(_.split("\\s+")) .map(_.trim.toLowerCase()) val clearWords: RDD[String] = words .filter(!stopWords.contains(_)) .map(_.replaceAll(punctuation, "")) val result: RDD[(String, Int)] = clearWords .map((_, 1)) .reduceByKey(_ + _) .sortBy(_._2, false) result.foreach(println) result.foreachPartition(saveAsMySQL) sc.stop() } def saveAsMySQL(iter: Iterator[(String, Int)]): Unit = { var conn: Connection = null var stmt: PreparedStatement = null var sql = "insert into wordcount values(?, ?)" try { conn = DriverManager.getConnection(url, username, password) stmt = conn.prepareStatement(sql) iter.foreach{ case (word, count) => stmt.setString(1, word) stmt.setInt(2, count) } } catch { case e: Exception => e.printStackTrace() } finally { if (stmt != null) { stmt.close() } if (conn != null) { conn.close() } } } }
mvn clean package
打包并上传到项目:
spark-submit --master local[*] --class icu.wzk.SuperWordCount1 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/goodtbl.java
运行结果如下图:
spark-submit --master local[*] --class icu.wzk.SuperWordCount2 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/goodtbl.java
spark-submit --master local[*] --class icu.wzk.SuperWordCount3 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/goodtbl.java
运行结果如下图:
查看数据库,内容如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。