赞
踩
一、需求:计算网页访问量前三名
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- /**
- * 需求:计算网页访问量前三名
- * 用户:喜欢视频 直播
- * 帮助企业做经营和决策
- *
- * 看数据
- */
- object UrlCount {
- def main(args: Array[String]): Unit = {
- //1.加载数据
- val conf:SparkConf = new SparkConf().setAppName("UrlCount").setMaster("local[2]")
- //spark程序入口
- val sc: SparkContext = new SparkContext(conf)
- //载入数据
- val rdd1: RDD[String] = sc.textFile("e:/access.log")
-
- //2.对数据进行计算 w,1 h,1
- val rdd2: RDD[(String, Int)] = rdd1.map(line => {
- val s: Array[String] = line.split("\t")
- //标注为出现1次
- (s(1), 1)
- })
-
- //3.将相同的网址进行累加求和 网页,201
- val rdd3:RDD[(String, Int)] = rdd2.reduceByKey(_+_)
-
- //4.排序 取出前三
- val rdd4: Array[(String, Int)] = rdd3.sortBy(_._2, false).take(3)
-
- //5.遍历打印
- rdd4.foreach(x => {
- println("网址为:" + x._1 + "访问量为:" + x._2)
- })
-
- //6.关闭资源
- sc.stop()
- }
- }

结果:
二、需求:求出每个学院 访问第一位的网址
- import java.net.URL
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- /**
- * 需求:求出每个学院 访问第一位的网址
- * bigdata:video(直播)
- * java:video
- * python:teacher
- */
- object UrlGroupCount {
- def main(args: Array[String]): Unit = {
- //1.创建sparkContext
- val conf: SparkConf = new SparkConf().setAppName("UrlGroupCount").setMaster("local[2]")
- val sc: SparkContext = new SparkContext(conf)
-
- //2.加载数据
- val rdd1: RDD[String] = sc.textFile("e:/access.log")
-
- //3.切分
- val rdd2: RDD[(String, Int)] = rdd1.map(line => {
- val s: Array[String] = line.split("\t")
- //网址,1
- (s(1), 1)
- })
-
- //4.求出总的访问量 网址,总的访问量
- val rdd3: RDD[(String, Int)] = rdd2.reduceByKey(_+_)
-
- //5.取出学院
- val rdd4: RDD[(String, String, Int)] = rdd3.map(x => {
- //拿到url
- val url: String = x._1
- //java中拿到主机名
- val host: String = new URL(url).getHost.split("[.]")(0)
- //元组输出
- (host, url, x._2)
- })
-
- //6.按照学院进行分组
- val rdd5: RDD[(String, List[(String, String, Int)])] = rdd4.groupBy(_._1).mapValues(it => {
- //倒序
- it.toList.sortBy(_._3).reverse.take(1)
- })
-
- //7.遍历打印
- rdd5.foreach(x => {
- println("学院为:" + x._1 + "," + "访问量第一的为:" + x._2)
- })
-
- //8.关闭资源
- sc.stop()
- }
- }

结果:
三、需求:加入自定义分区,按照学院分区,相同的学院分为一个结果文件
- import java.net.URL
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{Partitioner, SparkConf, SparkContext}
-
- import scala.collection.mutable
-
- /**
- * 需求:加入自定义分区
- * 按照学院分区,相同的学院分为一个结果文件
- */
- object UrlParCount {
- def main(args: Array[String]): Unit = {
- //1.创建sparkContext
- val conf: SparkConf = new SparkConf().setAppName("UrlParCount").setMaster("local[2]")
- val sc: SparkContext = new SparkContext(conf)
-
- //2.加载数据
- val rdd1 = sc.textFile("e:/access.log").map(line => {
- val s: Array[String] = line.split("\t")
- //元组输出
- (s(1), 1)
- })
-
- //3.聚合
- val rdd2: RDD[(String, Int)] = rdd1.reduceByKey(_+_)
-
- //4.自定义格式
- val rdd3: RDD[(String, (String, Int))] = rdd2.map(t => {
- val url = t._1
- val host = new URL(url).getHost
- val xHost: String = host.split("[.]")(0)
- //元组输出
- (xHost, (url, t._2))
- })
-
- //5.加入自定义分区
- val xueyuan: Array[String] = rdd3.map(_._1).distinct().collect
- val xueYuanPartitioner: XueYuanPartitioner = new XueYuanPartitioner(xueyuan)
-
- //6.加入分区规则
- val rdd4: RDD[(String, (String, Int))] = rdd3.partitionBy(xueYuanPartitioner).mapPartitions(it => {
- it.toList.sortBy(_._2._2).reverse.take(1).iterator
- })
-
- //7.遍历打印
- rdd4.saveAsTextFile("e://pout")
-
- //8.关闭资源
- sc.stop()
- }
- }
-
- class XueYuanPartitioner(xy: Array[String]) extends Partitioner {
- //自定义规则 学院 分区号
- val rules: mutable.HashMap[String, Int] = new mutable.HashMap[String, Int]()
- var number = 0
-
- //遍历学院
- for(i <- xy){
- //学院与分区号对应
- rules += (i -> number)
- //分区号递增
- number += 1
- }
-
- //总的分区个数
- override def numPartitions: Int = xy.length
-
- //拿到分区
- override def getPartition(key: Any): Int = {
- rules.getOrElse(key.toString, 0)
- }
- }

结果:
1、part-00000
(bigdata,(http://bigdata.xxxxxx.com/bigdata/video.shtml,503))
2、part-00001
(java,(http://java.xxxxxx.com/java/course/cloud.shtml,1028))
3、part-00002
(net,(http://net.xxxxxx.com/net/video.shtml,525))
四、pom.xml文件
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.demo.spark</groupId>
- <artifactId>SparkWC</artifactId>
- <version>1.0-SNAPSHOT</version>
-
- <properties>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <scala.version>2.11.8</scala.version>
- <spark.version>2.2.0</spark.version>
- <hadoop.version>2.8.4</hadoop.version>
- <encoding>UTF-8</encoding>
- </properties>
-
- <dependencies>
- <!-- scala的依赖导入 -->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <!-- spark的依赖导入 -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <!-- hadoop-client API的导入 -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.39</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <pluginManagement>
- <plugins>
- <!-- scala的编译插件 -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.2.2</version>
- </plugin>
- <!-- java的编译插件 -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.5.1</version>
- </plugin>
- </plugins>
- </pluginManagement>
- <plugins>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>add-source</goal>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <executions>
- <execution>
- <phase>compile</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <!-- 打jar包插件 -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.4.3</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- </project>

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。