当前位置:   article > 正文

SparkStreaming的案例及应用_sparkstreaming案例

sparkstreaming案例

1.案例:通过网络监听端口的方式,实现SparkStreaming的单词计数功能,弊端就是不能全局累加,只能累加同一批的数据

创建Maven项目:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.aura.spark</groupId>
    <artifactId>1711spark</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.3.0</spark.version>
        <hadoop.version>2.7.5</hadoop.version>
        <encoding>UTF-8</encoding>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-graphx_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-graphx_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>




        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>${spark.version}</version>

        </dependency>




    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

创建Scala代码 NetWordCount.scala  (注意是object)

千万注意local[2]最少是2,否则什么事也不会做,因为有Recevier 启动起来就是一个task任务,就需要一个线程,只写1个线程(去接收数据)就没有线程处理数据了,所以在本地最少要写2

而且代码是微批处理并不是非常准确的实时计算,每Seconds(2)两秒运行一次本批数据

  1. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4. /**
  5. * Create NetWordCount.scala by jenrey on 2018/5/11 21:58
  6. */
  7. object NetWordCount {
  8. def main(args: Array[String]): Unit = {
  9. /**
  10. * 初始化程序入口
  11. */
  12. val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("NetWordCount")
  13. val sc = new SparkContext(conf)
  14. val ssc = new StreamingContext(sc,Seconds(2))
  15. /*StreamingContext源码会createNewSparkContext,所以可以省略创建SparkContext对象。
  16. val ssc = new StreamingContext(conf,Seconds(2))*/
  17. /**
  18. * 通过程序入口获取DStream
  19. * 我们通过监听的方式获取数据,监听主机名和端口。只要一有数据就可以获取到,相当于通过Socket的方式
  20. */
  21. //ReceiverInputDStream就是个DStream,继承InputDStream继承DStream(就是一个抽象类,其实就是个HashMap(Time,RDD[T])一个时间点对应一个RDD )
  22. val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)
  23. /**
  24. * 对DStream流进行操作
  25. */
  26. //下面都是Transformation操作
  27. val wordCountDStream: DStream[(String, Int)] = dstream.flatMap(line => line.split(","))
  28. .map((_, 1))
  29. .reduceByKey(_ + _)
  30. //output的操作类似于RDD的Action
  31. wordCountDStream.print() //把数据打印出来
  32. /**
  33. * 启动应用程序(固定操作)
  34. */
  35. //启动我们的程序
  36. ssc.start();
  37. //等待结束
  38. ssc.awaitTermination();
  39. //如果结束就释放资源
  40. ssc.stop();
  41. }
  42. }

在hadoop04几点上使用下面的命令:

[hadoop@hadoop04 ~]$ nc -lk 9999

================================================

(安装nc:)

[hadoop@hadoop04 ~]$ sudo yum install nc


输入nc命令查看是否安装成功


==================================================

先让代码跑起来。

再在hadoop04 的 nc下输入hadoop,hadoop


IDEA控制台的输出结果:



案例2:在HDFS的WordCount程序

  1. import org.apache.spark.streaming.dstream.DStream
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4. /**
  5. * 用SparkStreaming在HA模式下的HDFS跑WordCount程序
  6. */
  7. object HDFSWordCount {
  8. def main(args: Array[String]): Unit = {
  9. val conf = new SparkConf().setMaster("local[2]").setAppName("HDFSWordCount")
  10. val sc = new SparkContext(conf)
  11. val ssc = new StreamingContext(sc,Seconds(2))
  12. /**
  13. * 数据的输入
  14. */
  15. //监控的是一个目录即文件夹,新增文件就可以接收到了
  16. val fileDStream: DStream[String] = ssc.textFileStream("hdfs://myha01/streaming")
  17. /**
  18. * 数据的处理
  19. */
  20. val wordCountDStream: DStream[(String, Int)] = fileDStream.flatMap(_.split(","))
  21. .map((_, 1))
  22. .reduceByKey(_ + _)
  23. /**
  24. * 数据的输出
  25. */
  26. wordCountDStream.print()
  27. ssc.start()
  28. ssc.awaitTermination()
  29. ssc.stop()
  30. }
  31. }


1)在hdfs上新建一个文件夹用来监听文件夹,一旦往监听的文件夹下面放入文件就能监听的到数据

注意:如果用的是HA模式,一定要把两个配置(core-site.xml和hdfs-site.xml)文件放到resources下面


在hdfs创建一个空的用来测试代码的文件夹streaming

[hadoop@hadoop04 ~]$ hadoop fs -mkdir /streaming

把IDEA里面的代码跑起来!!!!

然后vim hello.txt写入

you,jump

i,jump

然后上传到streaming文件夹下面

[hadoop@hadoop04 ~]$ hadoop fs -put hello.txt /streaming




案例3:updateStateByKey的WordCount程序

  1. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4. /**
  5. * Create by jenrey on 2018/5/13 15:09
  6. */
  7. object UpdateStateByKeyWordCount {
  8. def main(args: Array[String]): Unit = {
  9. /**
  10. * 初始化程序入口
  11. */
  12. val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyWordCount")
  13. val sc = new SparkContext(conf)
  14. val ssc = new StreamingContext(sc, Seconds(2))
  15. //注意一定要设置checkpoint目录,否则程序报错,但是这个HDFS目录一定要有权限,这个目录不用提前创建,自动创建
  16. ssc.checkpoint("hdfs://myha01/StreamingCheckPoint")
  17. /**
  18. * 数据的输入
  19. */
  20. val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)
  21. /**
  22. * 数据的处理
  23. */
  24. val wordCountDStream: DStream[(String, Int)] = dstream.flatMap(_.split(","))
  25. .map((_, 1))
  26. //updateStateByKey(updateFunc: (Seq[V], Option[S]) => Option[S]) 注意里面是一个函数
  27. //Option:Some:有值,None:没值
  28. //ByKey:操作就是分组
  29. //you,1
  30. //you,1 => you,{1,1}和jump,{1}
  31. //jump,1
  32. //下面这个函数每一个key都会调用一次这个函数
  33. //所以values:Seq[Int]代表List{1,1} state:Option[Int]代表上一次这个单词出现了多少次,如果上一次没出现过就是None,如果出现过就是Some该1次就1次该2次就2次
  34. .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
  35. val current: Int = values.sum
  36. //上一次出现多少次如果有就有,没有就是0
  37. val lastCount: Int = state.getOrElse(0)
  38. //既然这个单词能调用这个方法,那么这个单词必然最少出现了一次就是Some,所以当前+上一次的就是这个单词一共出现多少次
  39. Some(current + lastCount)
  40. })
  41. /**
  42. * 数据的输出
  43. */
  44. wordCountDStream.print()
  45. ssc.start()
  46. ssc.awaitTermination()
  47. ssc.stop()
  48. }
  49. }

运行代码:

注意容易报错的两个点:

1.没有设置checkpoint导致报错

2.没有权限,报错,如下图


需要去设置权限

[hadoop@hadoop04 ~]$ hdfs dfs -chmod 777 /

然后运行代码:

在hadoop04节点进行发送数据:

[hadoop@hadoop04 ~]$ nc -lk 9999




案例4:程序运行后停止再运行接着上次的结果继续计算的WordCount程序

普通的为什么就不能停止后接着运行呢?因为又一次创建了程序入口(new StreamingContext),是完全两个不同的程序入口,说白了就是关心Driver服务。两个Driver服务不同。

checkpoint可以把Driver服务里面的信息存到这个目录里,那么下一次我们启动的时候通过checkpoint里面的数据把Driver服务恢复成跟上一次一样的,那么再计算的时候就相当于对上一次的结果进行累加了

  1. import org.apache.spark.streaming.dstream.ReceiverInputDStream
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4. object DriverHAWordCount {
  5. def main(args: Array[String]): Unit = {
  6. //注意本程序需要执行一次输入点数据,然后关闭再次执行就可以接着上次进行累加了
  7. val checkpointDirectory: String = "hdfs://myha01/StreamingCheckPoint3";
  8. def functionToCreateContext(): StreamingContext = {
  9. val conf = new SparkConf().setMaster("local[2]").setAppName("DriverHAWordCount")
  10. val sc = new SparkContext(conf)
  11. val ssc = new StreamingContext(sc, Seconds(2))
  12. ssc.checkpoint(checkpointDirectory)
  13. val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)
  14. val wordCountDStream = dstream.flatMap(_.split(","))
  15. .map((_, 1))
  16. .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
  17. val currentCount = values.sum
  18. val lastCount = state.getOrElse(0)
  19. Some(currentCount + lastCount)
  20. })
  21. wordCountDStream.print()
  22. ssc.start()
  23. ssc.awaitTermination()
  24. ssc.stop()
  25. //最后一行代码就是返回的
  26. ssc
  27. }
  28. //从里面获取一个程序入口,如果checkpointDirectory目录里面有程序入口就用这个,如果没有就新new一个程序入口(或者说一个Driver服务)
  29. val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
  30. ssc.start()
  31. ssc.awaitTermination()
  32. ssc.stop()
  33. }
  34. }

运行程序:


关闭代码程序,再次运行,等待一会再输入数据




案例5:单词黑名单

  1. import org.apache.spark.broadcast.Broadcast
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. object WordBlack {
  7. def main(args: Array[String]): Unit = {
  8. val conf = new SparkConf().setMaster("local[2]").setAppName("WordBlack")
  9. val sc: SparkContext = new SparkContext(conf)
  10. val ssc = new StreamingContext(sc, Seconds(2))
  11. /**
  12. * 数据的输入
  13. */
  14. val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)
  15. /**
  16. * 自己模拟一个黑名单(正常是用mysql,hbase,redis数据库读取出来的
  17. */
  18. //直接转化为RDD
  19. val wordBlackList: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(List("?", "!", "*"))
  20. .map(param => (param, true))
  21. /**
  22. * (?,true)
  23. * (!,true)
  24. * (*,true)
  25. */
  26. val balckList: Array[(String, Boolean)] = wordBlackList.collect()
  27. //broadcast广播出去
  28. val blackListBroadcast: Broadcast[Array[(String, Boolean)]] = ssc.sparkContext.broadcast(balckList)
  29. /**
  30. * 数据的处理
  31. */
  32. val wordOneDStream: DStream[(String, Int)] = dstream.flatMap(_.split(","))
  33. .map((_, 1))
  34. //transform把DStream转换成RDD,需要又返回值,并且类型为RDD
  35. val wordCountDStream: DStream[(String, Int)] = wordOneDStream.transform(rdd => {
  36. val filterRDD: RDD[(String, Boolean)] = rdd.sparkContext.parallelize(blackListBroadcast.value)
  37. val resultRDD: RDD[(String, (Int, Option[Boolean]))] = rdd.leftOuterJoin(filterRDD)
  38. /**
  39. * String, (Int, Option[Boolean])
  40. * String:word
  41. * Int:1
  42. * Option:有可能join上也有可能join不上
  43. *
  44. * 思路:我们要的是join不上的,说白了要的是Option[Boolean]=None
  45. * filter:
  46. * true代表我们要
  47. */
  48. resultRDD.filter(tuple => {
  49. tuple._2._2.isEmpty
  50. }).map(_._1)
  51. }).map((_, 1)).reduceByKey(_ + _)
  52. /**
  53. * 数据的输出
  54. */
  55. wordCountDStream.print()
  56. ssc.start()
  57. ssc.awaitTermination()
  58. ssc.stop()
  59. }
  60. }

先运行代码,然后再hadoop04传送数据




案例6:窗口操作Window Operations

  1. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4. /**
  5. * 需求:每隔4秒统计最近6秒的单词计数的情况
  6. * reduceByKeyAndWindow
  7. */
  8. object WindowOperatorTest {
  9. def main(args: Array[String]): Unit = {
  10. val conf = new SparkConf().setMaster("local[2]").setAppName("WordBlack")
  11. val sc: SparkContext = new SparkContext(conf)
  12. val ssc = new StreamingContext(sc, Seconds(2))
  13. /**
  14. * 数据的输入
  15. * 到目前为止这个地方还没有跟生产进行对接。
  16. */
  17. val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04", 9999)
  18. /**
  19. * 数据的处理
  20. */
  21. val resultWordCountDStream: DStream[(String, Int)] = dstream.flatMap(_.split(","))
  22. .map((_, 1))
  23. /**
  24. * reduceFunc: (V, V) => V, 匿名函数-达到对单词次数进行累加的效果
  25. * windowDuration: Duration, 统计多少秒以内的数据-窗口的大小
  26. * slideDuration: Duration, 每隔多少时间-滑动的大小
  27. * //numPartitions: Int 指定分区数,要么跟核数有关要么和指定分区数有关
  28. * 注意:这两个数一定要是Seconds(2)的倍数
  29. */
  30. .reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(6), Seconds(4))
  31. /**
  32. * 数据的输出
  33. */
  34. resultWordCountDStream.print()
  35. ssc.start()
  36. ssc.awaitTermination()
  37. ssc.stop()
  38. }
  39. }


foreachRDD的使用

注意foreachRDD下面的代码是执行在Driver端的

1.下图会报错

  1. dstream.foreachRDD { rdd =>
  2. val connection = createNewConnection() // executed at the driver
  3. rdd.foreach { record =>
  4. connection.send(record) // executed at the worker
  5. }
  6. }

connection对象是需要发到executor上执行的。这就需要网络的传输了。就需要序列化。但是这个对象是不支持序列化的。所以就发送不过去,会报错无法序列化。


2.不报错,缺陷每次都需要创建connection对象,还有就是每次处理的是一条数据,频繁的创建和销毁对数据库连接,对数据库影响很大

  1. dstream.foreachRDD { rdd =>
  2. rdd.foreach { record =>
  3. val connection = createNewConnection()
  4. connection.send(record)
  5. connection.close()
  6. }
  7. }

3.不报错,一次拿到一个partition数据,并创建一个connection对象。再把每个分区的数据一条一条发送到数据库。

  1. dstream.foreachRDD { rdd =>
  2. rdd.foreachPartition { partitionOfRecords =>
  3. val connection = createNewConnection()
  4. partitionOfRecords.foreach(record => connection.send(record))
  5. connection.close()
  6. }
  7. }

调优:我们可以弄个连接池,再弄个批处理,每一百条数据提交一次到数据库

4.不报错,使用连接池

  1. dstream.foreachRDD { rdd =>
  2. rdd.foreachPartition { partitionOfRecords =>
  3. // ConnectionPool is a static, lazily initialized pool of connections
  4. val connection = ConnectionPool.getConnection()
  5. partitionOfRecords.foreach(record => connection.send(record))
  6. ConnectionPool.returnConnection(connection) // return to the pool for future reuse
  7. }
  8. }

注意:从连接池中获取,使用完把连接丢回到连接池,少了创建的步骤,但是还是一条数据一条数据提交。所以我们要批处理,就要考虑到事务。

下面是操作实战:

现在mysql建立一张表:


写一个连接池的scala文件:

  1. import java.sql.{Connection, DriverManager}
  2. object ConnectionPool {
  3. private val max=8 ;//连接池的连接总数
  4. private val connectionNum=10;//每次产生的连接数
  5. private var conNum=0;//当前连接池已经产生的连接数
  6. import java.util
  7. private val pool=new util.LinkedList[Connection]();//连接池
  8. {
  9. Class.forName("com.mysql.jdbc.Driver")
  10. }
  11. /**
  12. * 释放连接
  13. */
  14. def returnConnection(conn:Connection):Unit={
  15. pool.push(conn);
  16. }
  17. /**
  18. * 获取连接
  19. */
  20. def getConnection():Connection={
  21. //同步代码块
  22. AnyRef.synchronized({
  23. if(pool.isEmpty()){
  24. for( i <- 1 to connectionNum){
  25. val conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/aura","root","root");
  26. pool.push(conn);
  27. conNum+1;
  28. }
  29. }
  30. pool.poll();
  31. })
  32. }
  33. }

下面的代码:

  1. /**
  2. * Create by jenrey on 2018/5/13 20:27
  3. */
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. import org.apache.spark.streaming.dstream.ReceiverInputDStream
  7. /**
  8. * 接收nc的数据,并把数据存到mysql表中
  9. */
  10. object OutputTest {
  11. def main(args: Array[String]): Unit = {
  12. val conf = new SparkConf().setMaster("local[2]").setAppName("OutputTest")
  13. val sc = new SparkContext(conf)
  14. val ssc = new StreamingContext(sc,Seconds(2))
  15. ssc.checkpoint("hdfs://myha01/StreamingCheckPoint3")
  16. /**
  17. * 数据的输入
  18. */
  19. val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop04",9999)
  20. val wordCountDStream = dstream.flatMap(_.split(","))
  21. .map((_, 1))
  22. .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
  23. val currentCount = values.sum
  24. val lastCount = state.getOrElse(0)
  25. Some(currentCount + lastCount)
  26. })
  27. /**
  28. * 数据的输出
  29. */
  30. wordCountDStream.foreachRDD( rdd=>{
  31. rdd.foreachPartition( paritition =>{
  32. //从连接池中获取连接
  33. val connection = ConnectionPool.getConnection()
  34. //获取Statement对象(用来发送sql指令)
  35. val statement = connection.createStatement()
  36. paritition.foreach{
  37. case (word,count) =>{
  38. val sql=s"insert into aura.1711wordcount values(now(),'$word',$count)"
  39. print(sql)
  40. //借助于Statement发送sql指令
  41. statement.execute(sql)
  42. }
  43. }
  44. //把connection对象再还回给连接池
  45. ConnectionPool.returnConnection(connection)
  46. } )
  47. })
  48. ssc.start()
  49. ssc.awaitTermination()
  50. ssc.stop()
  51. }
  52. }

先运行程序再发送nc数据:



缺点就是把每一次的记录都打印出来了。

如果想要最新的数据就存在Hbase上面,Hbase会自动进行覆盖。


下面是将结果保存到Mysql的代码全集:
  1. import java.sql.DriverManager
  2. import org.apache.spark.storage.StorageLevel
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. /**
  6. * WordCount程序,Spark Streaming消费TCP Server发过来的实时数据的例子:
  7. *
  8. * 1、在master服务器上启动一个Netcat server
  9. * `$ nc -lk 9998` (如果nc命令无效的话,我们可以用yum install -y nc来安装nc)
  10. *
  11. *
  12. * create table wordcount(ts bigint, word varchar(50), count int);
  13. *
  14. * spark-shell --total-executor-cores 4 --executor-cores 2 --master spark://master:7077 --jars mysql-connector-java-5.1.44-bin.jar,c3p0-0.9.1.2.jar,spark-streaming-basic-1.0-SNAPSHOT.jar
  15. *
  16. *
  17. */
  18. object NetworkWordCountForeachRDD {
  19. def main(args: Array[String]) {
  20. val sparkConf = new SparkConf().setAppName("NetworkWordCountForeachRDD")
  21. val sc = new SparkContext(sparkConf)
  22. // Create the context with a 1 second batch size
  23. val ssc = new StreamingContext(sc, Seconds(5))
  24. //创建一个接收器(ReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理
  25. val lines = ssc.socketTextStream("hadoop1", 9998, StorageLevel.MEMORY_AND_DISK_SER)
  26. //处理的逻辑,就是简单的进行word count
  27. val words = lines.flatMap(_.split(" "))
  28. val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  29. //将结果保存到Mysql(一)
  30. /**
  31. *
  32. * 这个代码会报错的!!!
  33. */
  34. wordCounts.foreachRDD { (rdd, time) =>
  35. Class.forName("com.mysql.jdbc.Driver")
  36. val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/aura", "root", "root")
  37. val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
  38. rdd.foreach { record =>
  39. statement.setLong(1, time.milliseconds)
  40. statement.setString(2, record._1)
  41. statement.setInt(3, record._2)
  42. statement.execute()
  43. }
  44. statement.close()
  45. conn.close()
  46. }
  47. //启动Streaming处理流
  48. ssc.start()
  49. ssc.stop(false)
  50. //将结果保存到Mysql(二)
  51. wordCounts.foreachRDD { (rdd, time) =>
  52. rdd.foreach { record =>
  53. Class.forName("com.mysql.jdbc.Driver")
  54. val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/aura", "root", "root")
  55. val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
  56. statement.setLong(1, time.milliseconds)
  57. statement.setString(2, record._1)
  58. statement.setInt(3, record._2)
  59. statement.execute()
  60. statement.close()
  61. conn.close()
  62. }
  63. }
  64. //将结果保存到Mysql(三)
  65. wordCounts.foreachRDD { (rdd, time) =>
  66. rdd.foreachPartition { partitionRecords =>
  67. Class.forName("com.mysql.jdbc.Driver")
  68. val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/aura", "root", "root")
  69. val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
  70. partitionRecords.foreach { case (word, count) =>
  71. statement.setLong(1, time.milliseconds)
  72. statement.setString(2, word)
  73. statement.setInt(3, count)
  74. statement.execute()
  75. }
  76. statement.close()
  77. conn.close()
  78. }
  79. }
  80. //将结果保存到Mysql(四),使用连接池
  81. wordCounts.foreachRDD { (rdd, time) =>
  82. rdd.foreachPartition { partitionRecords =>
  83. val conn = ConnectionPool.getConnection
  84. val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
  85. partitionRecords.foreach { case (word, count) =>
  86. statement.setLong(1, time.milliseconds)
  87. statement.setString(2, word)
  88. statement.setInt(3, count)
  89. statement.execute()
  90. }
  91. statement.close()
  92. ConnectionPool.returnConnection(conn)
  93. }
  94. }
  95. //将结果保存到Mysql(五),批处理
  96. wordCounts.foreachRDD { (rdd, time) =>
  97. rdd.foreachPartition { partitionRecords =>
  98. val conn = ConnectionPool.getConnection
  99. val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
  100. partitionRecords.foreach { case (word, count) =>
  101. statement.setLong(1, time.milliseconds)
  102. statement.setString(2, word)
  103. statement.setInt(3, count)
  104. statement.addBatch()
  105. }
  106. statement.executeBatch()
  107. statement.close()
  108. ConnectionPool.returnConnection(conn)
  109. }
  110. }
  111. //将结果保存到Mysql(六),批处理引入事务
  112. wordCounts.foreachRDD { (rdd, time) =>
  113. rdd.foreachPartition { partitionRecords =>
  114. val conn = ConnectionPool.getConnection
  115. //把自动提交改为false
  116. conn.setAutoCommit(false)
  117. val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
  118. partitionRecords.foreach { case (word, count) =>
  119. statement.setLong(1, time.milliseconds)
  120. statement.setString(2, word)
  121. statement.setInt(3, count)
  122. statement.addBatch()
  123. }
  124. statement.executeBatch()
  125. statement.close()
  126. conn.commit()
  127. conn.setAutoCommit(true)
  128. ConnectionPool.returnConnection(conn)
  129. }
  130. }
  131. //将结果保存到Mysql(七),控制批处理的量,每500条提交一次
  132. wordCounts.foreachRDD { (rdd, time) =>
  133. rdd.foreachPartition { partitionRecords =>
  134. val conn = ConnectionPool.getConnection
  135. conn.setAutoCommit(false)
  136. val statement = conn.prepareStatement(s"insert into wordcount(ts, word, count) values (?, ?, ?)")
  137. partitionRecords.zipWithIndex.foreach { case ((word, count), index) =>
  138. statement.setLong(1, time.milliseconds)
  139. statement.setString(2, word)
  140. statement.setInt(3, count)
  141. statement.addBatch()
  142. if (index != 0 && index % 500 == 0) {
  143. statement.executeBatch()
  144. conn.commit()
  145. }
  146. }
  147. statement.executeBatch()
  148. statement.close()
  149. conn.commit()
  150. conn.setAutoCommit(true)
  151. ConnectionPool.returnConnection(conn)
  152. }
  153. }
  154. //等待Streaming程序终止
  155. ssc.awaitTermination()
  156. }
  157. }

案例7:SparkStreaming的数据源来自kafka

首先在pom.xml中写入下面的代码


编写下面的代码:

  1. import kafka.serializer.StringDecoder
  2. import org.apache.spark.streaming.dstream.DStream
  3. import org.apache.spark.streaming.kafka.KafkaUtils
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. import org.apache.spark.streaming.{Seconds, StreamingContext}
  6. object KafkaTest {
  7. def main(args: Array[String]): Unit = {
  8. val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaTest")
  9. val sc = new SparkContext(conf)
  10. val ssc = new StreamingContext(sc,Seconds(2))
  11. //使用kafka是需要下面的目录的,因为SparkStreaming自己要维护一些东西的,要持久化,存到内存是易丢失的。
  12. ssc.checkpoint("hdfs://myha01/streamingkafka")
  13. /**
  14. * 数据的输入:KafkaUtils.createDirectStream
  15. *
  16. * def createDirectStream[K: ClassTag,V: ClassTag,KD <:Decoder[K]: ClassTag,VD <:Decoder[V]: ClassTag] (
  17. * 下面是三个参数:
  18. * ssc: StreamingContext,
  19. * kafkaParams: Map[String, String],
  20. * topics: Set[String]) 可以一下子读多个topics,但是我们这里读一个topics就行了
  21. */
  22. //指定kafka broker的机器,也就是kafka的地址
  23. val kafkaParams = Map("metadata.broker.list" -> "hadoop03:9092")
  24. val topics = Set("aura")
  25. //kafka读出来数据是kv的形式[String代表k的数据类型(k可就是偏移位置的信息, String代表v的数据类型(kafka内每一条数据), StringDecoder代表的就是解码器, StringDecoder]
  26. //原来直接返回的是InputDStream[(String,String)]的KV数据类型,因为偏移位置的信息对我们是没有用的所以我们要.map(_._2)
  27. val kafkaDStream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  28. ssc, kafkaParams, topics).map(_._2)
  29. /**
  30. * 数据的处理
  31. * 也已经比较正式了
  32. */
  33. kafkaDStream.flatMap(_.split(","))
  34. .map((_,1))
  35. .reduceByKey(_+_)
  36. .print()
  37. ssc.start()
  38. ssc.awaitTermination()
  39. ssc.stop()
  40. }
  41. }


启动kafka高可用集群:

注意先启动ZK

[hadoop@hadoop03 kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties

[hadoop@hadoop04 kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties

[hadoop@hadoop05 kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties

创建topic

[hadoop@hadoop03 kafka_2.11-0.8.2.0]$ bin/kafka-topics.sh --create --zookeeper hadoop02:2181 --replication-factor 3 --partitions 3 --topic aura


创建生产者

[hadoop@hadoop03 kafka_2.11-0.8.2.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 -topic aura


然后运行代码:

然后再hadoop03输入下面的内容





案例8:数据黑名单过滤

  1. import java.sql.Date
  2. import java.util.Properties
  3. import kafka.serializer.StringDecoder
  4. import org.apache.spark.rdd.RDD
  5. import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
  6. import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
  7. import org.apache.spark.streaming.dstream.DStream
  8. import org.apache.spark.streaming.kafka.KafkaUtils
  9. import org.apache.spark.streaming.{Seconds, StreamingContext}
  10. import org.apache.spark.{SparkConf, SparkContext}
  11. /**
  12. * timestamp:
  13. * 时间戳,用户点击广告的时间
  14. * province:
  15. * 省份,用户在哪个省份点击的广告
  16. * city:
  17. * 城市,用户在哪个城市点击的广告
  18. * userid:
  19. * 用户的唯一标识
  20. * advid:
  21. * 被点击的广告id
  22. */
  23. object AdvApplicationTest {
  24. def main(args: Array[String]): Unit = {
  25. val conf = new SparkConf()
  26. conf.setMaster("local")
  27. conf.setAppName("AdvApplicationTest")
  28. conf.set("","") //序列化
  29. val sc = new SparkContext(conf)
  30. val ssc = new StreamingContext(sc,Seconds(5))
  31. //getOrCreate():有就拿过来,没有就创建,类似于单例模式:
  32. val spark: SparkSession = SparkSession.builder()
  33. .config(conf).getOrCreate()
  34. /**
  35. * 第一步:从kafka获取数据(direct 方式)
  36. * K: ClassTag,
  37. V: ClassTag,
  38. KD <: Decoder[K]: ClassTag,
  39. VD <: Decoder[V]: ClassTag] (
  40. ssc: StreamingContext,
  41. kafkaParams: Map[String, String],
  42. topics: Set[String]
  43. */
  44. val kafkaParams = Map("metadata.broker.list" -> "hadoop3:9092")
  45. val topics = Set("aura")
  46. val logDstream: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  47. ssc, kafkaParams, topics).map(_._2)
  48. /**
  49. * 第二步:进行黑名单过滤
  50. */
  51. val filterLogDStream: DStream[String] = blackListFilter(logDstream,ssc)
  52. /**
  53. * 【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户
  54. *
  55. *
  56. * zhangsan:
  57. * A:50 B:60
  58. * lisi:
  59. * A:50 A:20 A:40 这就是黑名单用户
  60. * 如果一个用户今天是黑名单用户,那么明天还是黑名单用户吗?
  61. * 这个看业务而定。
  62. *
  63. * 第三步:动态生成黑名单 实时生成黑名单
  64. */
  65. DynamicGenerationBlacklists(filterLogDStream,spark)
  66. /**
  67. * 第四步:
  68. * 实时统计每天各省各城市广告点击量
  69. */
  70. val dateProvinceCityAdvClick_Count = ProvinceCityAdvClick_Count(filterLogDStream)
  71. /**
  72. * 第五步:
  73. * 实时统计每天各省热门广告
  74. * 分组求TopN
  75. *
  76. * transform froeachRDD
  77. * rdd => dataframe
  78. * SparkSQL:
  79. * SQL
  80. */
  81. /**
  82. * 第六步:
  83. * 实时统计每天每个广告在最近一小时的滑动窗口的点击趋势
  84. */
  85. ssc.start()
  86. ssc.awaitTermination()
  87. ssc.stop()
  88. }
  89. /**
  90. * 对黑名单数据进行过滤
  91. * logDstream 从kafka读取数据
  92. * 返回的就是进行黑名单过滤以后的数据
  93. */
  94. def blackListFilter(logDstream: DStream[String],ssc:StreamingContext):DStream[String]={
  95. /**
  96. * 这个地方应该是去数据库里面去读取数据
  97. * 三个常用的数据库:Redis,HBase,Mysql
  98. * black_list
  99. */
  100. val blackList = List((1L,true),(2L,true),(3L,true))
  101. val blackListRDD = ssc.sparkContext.parallelize(blackList)
  102. val balckListBroadcast = ssc.sparkContext.broadcast(blackListRDD.collect())
  103. /**
  104. * 这个地方的黑名单,应该是从我们的持久化的数据库里面读取的:有三个数据库是我们常用的:
  105. * 1)Reids 自己去百度一下
  106. * 2) HBase 自己去百度一下
  107. * 3) Mysql 上课演示过
  108. * SparkCore的方式读取的
  109. * SparkSQL -> dataframe -> rdd
  110. */
  111. logDstream.transform( rdd =>{
  112. val user_lineRDD=rdd.map( line =>{
  113. val fields = line.split(",")
  114. (fields(3).toLong,line)
  115. })
  116. val blackRDD = rdd.sparkContext.parallelize(balckListBroadcast.value)
  117. //只有keyValue的形式才能进行join,所以需要上面的操作
  118. val resultRDD: RDD[(Long, (String, Option[Boolean]))] = user_lineRDD.leftOuterJoin(blackRDD)
  119. resultRDD.filter( tuple =>{
  120. tuple._2._2.isEmpty
  121. }).map(_._2._1)
  122. })
  123. }
  124. /**
  125. * 动然生成黑名单
  126. * @param filterLogDStream 黑名单过滤万了以后的数据
  127. * 【一个用户】【一天内】对【某个广告】点击的次数超过了【100次】,这样的用户属于黑名单用户
  128. *
  129. * 梳理一下思路:
  130. * 这个需求 跟 我们单词计数很像,无非不就是实时统计每个单词出现了多少次
  131. * 如果发现某个单词出现了一个100,那么他就是黑名单单词
  132. * 方式一:
  133. * (date_userid_advid,v)=map
  134. * 实时统计出来每个单词出现了多少次=updateStateBykey (对内存的要求高一点)
  135. * 张三 A 80
  136. * 李四 B 99
  137. * 100
  138. * fitler 过滤出来次数 一百以上 把它写入 MySQL,Reids,HBase 数据库
  139. * 方式二:
  140. * (date_userid_advid,v)=map
  141. * 每次处理的是本批次的数据 reduceBykey(对内存的要求低一点)
  142. * HBase:
  143. * rowkey: date_userid_advid 2
  144. * 本批次 3
  145. * 5
  146. * Redis
  147. * 方式三:
  148. * MySQL的方式
  149. */
  150. def DynamicGenerationBlacklists(filterLogDStream: DStream[String],spark:SparkSession):Unit={
  151. val date_userid_advid_ds=filterLogDStream.map( line =>{
  152. val fields = line.split(",")
  153. val time = new Date( fields(0).toLong)
  154. val date = DateUtils.formatDateKey(time)
  155. val userid = fields(3)
  156. val advid = fields(4)
  157. //20180512_
  158. (date+"_"+userid+"_"+advid,1L)
  159. }).reduceByKey(_+_)
  160. date_userid_advid_ds.foreachRDD( rdd =>{
  161. rdd.foreachPartition( partition =>{
  162. val connection = ConnectionPool.getConnection()
  163. val statement = connection.createStatement()
  164. partition.foreach{
  165. case(date_userid_advid,count) =>{
  166. val fields = date_userid_advid.split("_")
  167. val date = fields(0)
  168. val userid = fields(1).toLong
  169. val advid = fields(2).toLong
  170. val sql=s"insert into aura.tmp_advclick_count values($date,$userid,$advid,$count)";
  171. statement.execute(sql);
  172. }
  173. }
  174. ConnectionPool.returnConnection(connection)
  175. })
  176. })
  177. /**
  178. *生成黑名单
  179. */
  180. val df: DataFrame = spark.read.format("jdbc")
  181. .option("url", "jdbc:mysql://localhost:3306/aura")
  182. .option("user", "aura")
  183. .option("password", "aura")
  184. .option("dbtable", "tmp_advclick_count")
  185. .load()
  186. df.createOrReplaceTempView("tmp_advclick_count")
  187. val sql=
  188. """
  189. SELECT
  190. userid
  191. FROM
  192. (
  193. SELECT
  194. date,userid,advid,sum(click_count) c_count
  195. FROM
  196. tmp_advclick_count
  197. GROUP BY
  198. date,userid,advid
  199. ) t
  200. WHERE
  201. t.c_count > 100
  202. """
  203. //统计出来黑名单
  204. val blacklistdf = spark.sql(sql).distinct()
  205. val properties = new Properties()
  206. properties.put("user","aura")
  207. properties.put("password","aura")
  208. blacklistdf.write.mode(SaveMode.Append)
  209. .jdbc("jdbc:mysql://localhost:3306/aura","black_list",properties)
  210. }
  211. /**
  212. * 实时统计每天各省各城市广告点击量
  213. * @param filterLogDStream
  214. */
  215. def ProvinceCityAdvClick_Count(filterLogDStream: DStream[String]):DStream[(String,Long)]={
  216. /**
  217. * 思路
  218. * map => (k,v) => date+province+city+advid 1
  219. * updateStateBykey
  220. */
  221. var f=(input:Seq[Long],state:Option[Long]) =>{
  222. val current_count = input.sum
  223. val last_count = state.getOrElse(0)
  224. Some(current_count+last_count)
  225. }
  226. filterLogDStream.map( line =>{
  227. val fields = line.split(",")
  228. val time = fields(0).toLong
  229. val mydate = new Date(time)
  230. val date = DateUtils.formatDateKey(mydate)
  231. val province = fields(1)
  232. val city = fields(2)
  233. val advid = fields(4)
  234. (date+"_"+province+"_"+city+"_"+advid,1L)
  235. }).updateStateByKey(f)
  236. /**
  237. * 如果开发有需求的话,可以把这些数据库写入 MySQL数据库 ,Hbase
  238. */
  239. }
  240. /**
  241. * 实时统计 各省热门广告
  242. *
  243. * transform : rdd -> datafram -> table -> sql
  244. * @param date_province_city_advid_count
  245. */
  246. def ProvinceAdvClick_Count(date_province_city_advid_count:DStream[(String,Long)],spark:SparkSession): Unit ={
  247. date_province_city_advid_count.transform( rdd =>{
  248. var date_province_advid_count= rdd.map{
  249. case(date_province_city_advid,count) =>{
  250. val fields = date_province_city_advid.split("_")
  251. val date = fields(0)
  252. val province = fields(1)
  253. val advid = fields(3)
  254. (date+"_"+province+"_"+advid,count)
  255. }
  256. }.reduceByKey(_+_)
  257. val rowRDD=date_province_advid_count.map( tuple =>{
  258. val fields = tuple._1.split("_")
  259. val date = fields(0)
  260. val provnice = fields(1)
  261. val advid = fields(2).toLong
  262. val count = tuple._2
  263. Row(date,provnice,advid,count)
  264. })
  265. val schema=StructType(
  266. StructField("date",StringType,true)::
  267. StructField("province",StringType,true)::
  268. StructField("advid",LongType,true)::
  269. StructField("count",LongType,true):: Nil
  270. )
  271. val df = spark.createDataFrame(rowRDD,schema)
  272. df.createOrReplaceTempView("temp_date_province_adv_count")
  273. val sql=
  274. """
  275. select
  276. *
  277. from
  278. (
  279. select
  280. date,province,advid,count,row_number() over(partition by province ordr by count desc) rank
  281. from
  282. temp_date_province_adv_count
  283. ) temp
  284. where temp.rank < 10
  285. """
  286. /**
  287. * 把结果持久化到数据库
  288. */
  289. spark.sql(sql)
  290. rdd
  291. })
  292. }
  293. }
  1. import java.text.ParseException;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Calendar;
  4. import java.util.Date;
  5. /**
  6. * DateTime Utils
  7. *
  8. * Created by XuanYu on 2016/5/31.
  9. */
  10. public class DateUtils {
  11. public static final SimpleDateFormat TIME_FORMAT =
  12. new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  13. public static final SimpleDateFormat DATE_FORMAT =
  14. new SimpleDateFormat("yyyy-MM-dd");
  15. public static final SimpleDateFormat DATEKEY_FORMAT =
  16. new SimpleDateFormat("yyyyMMdd");
  17. /**
  18. * 判断一个时间是否在另一个时间之前
  19. * @param time1 第一个时间
  20. * @param time2 第二个时间
  21. * @return 判断结果
  22. */
  23. public static boolean before(String time1, String time2) {
  24. try {
  25. Date dateTime1 = TIME_FORMAT.parse(time1);
  26. Date dateTime2 = TIME_FORMAT.parse(time2);
  27. if(dateTime1.before(dateTime2)) {
  28. return true;
  29. }
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. }
  33. return false;
  34. }
  35. /**
  36. * 判断一个时间是否在另一个时间之后
  37. * @param time1 第一个时间
  38. * @param time2 第二个时间
  39. * @return 判断结果
  40. */
  41. public static boolean after(String time1, String time2) {
  42. try {
  43. Date dateTime1 = TIME_FORMAT.parse(time1);
  44. Date dateTime2 = TIME_FORMAT.parse(time2);
  45. if(dateTime1.after(dateTime2)) {
  46. return true;
  47. }
  48. } catch (Exception e) {
  49. e.printStackTrace();
  50. }
  51. return false;
  52. }
  53. /**
  54. * 计算时间差值(单位为秒)
  55. * @param time1 时间1
  56. * @param time2 时间2
  57. * @return 差值
  58. */
  59. public static int minus(String time1, String time2) {
  60. try {
  61. Date datetime1 = TIME_FORMAT.parse(time1);
  62. Date datetime2 = TIME_FORMAT.parse(time2);
  63. long millisecond = datetime1.getTime() - datetime2.getTime();
  64. return Integer.valueOf(String.valueOf(millisecond / 1000));
  65. } catch (Exception e) {
  66. e.printStackTrace();
  67. }
  68. return 0;
  69. }
  70. /**
  71. * 获取年月日和小时
  72. * @param datetime 时间(yyyy-MM-dd HH:mm:ss)
  73. * @return 结果(yyyy-MM-dd_HH)
  74. */
  75. public static String getDateHour(String datetime) {
  76. String date = datetime.split(" ")[0];
  77. String hourMinuteSecond = datetime.split(" ")[1];
  78. String hour = hourMinuteSecond.split(":")[0];
  79. return date + "_" + hour;
  80. }
  81. /**
  82. * 获取当天日期(yyyy-MM-dd)
  83. * @return 当天日期
  84. */
  85. public static String getTodayDate() {
  86. return DATE_FORMAT.format(new Date());
  87. }
  88. /**
  89. * 获取昨天的日期(yyyy-MM-dd)
  90. * @return 昨天的日期
  91. */
  92. public static String getYesterdayDate() {
  93. Calendar cal = Calendar.getInstance();
  94. cal.setTime(new Date());
  95. cal.add(Calendar.DAY_OF_YEAR, -1);
  96. Date date = cal.getTime();
  97. return DATE_FORMAT.format(date);
  98. }
  99. /**
  100. * 格式化日期(yyyy-MM-dd)
  101. * @param date Date对象
  102. * @return 格式化后的日期
  103. */
  104. public static String formatDate(Date date) {
  105. return DATE_FORMAT.format(date);
  106. }
  107. /**
  108. * 格式化时间(yyyy-MM-dd HH:mm:ss)
  109. * @param date Date对象
  110. * @return 格式化后的时间
  111. */
  112. public static String formatTime(Date date) {
  113. return TIME_FORMAT.format(date);
  114. }
  115. /**
  116. * 解析时间字符串
  117. * @param time 时间字符串
  118. * @return Date
  119. */
  120. public static Date parseTime(String time) {
  121. try {
  122. return TIME_FORMAT.parse(time);
  123. } catch (ParseException e) {
  124. e.printStackTrace();
  125. }
  126. return null;
  127. }
  128. /**
  129. * 格式化日期key
  130. * @param date
  131. * @return
  132. */
  133. public static String formatDateKey(Date date) {
  134. return DATEKEY_FORMAT.format(date);
  135. }
  136. /**
  137. * 格式化日期key
  138. * @param datekey
  139. * @return
  140. */
  141. public static Date parseDateKey(String datekey) {
  142. try {
  143. return DATEKEY_FORMAT.parse(datekey);
  144. } catch (ParseException e) {
  145. e.printStackTrace();
  146. }
  147. return null;
  148. }
  149. /**
  150. * 格式化时间,保留到分钟级别
  151. * yyyyMMddHHmm
  152. * @param date
  153. * @return
  154. */
  155. public static String formatTimeMinute(Date date) {
  156. SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
  157. return sdf.format(date);
  158. }
  159. }
  1. import java.sql.{Connection, DriverManager}
  2. object ConnectionPool {
  3. private val max=8 ;//连接池的连接总数
  4. private val connectionNum=10;//每次产生的连接数
  5. private var conNum=0;//当前连接池已经产生的连接数
  6. import java.util
  7. private val pool=new util.LinkedList[Connection]();//连接池
  8. {
  9. Class.forName("com.mysql.jdbc.Driver")
  10. }
  11. /**
  12. * 释放连接
  13. */
  14. def returnConnection(conn:Connection):Unit={
  15. pool.push(conn);
  16. }
  17. /**
  18. * 获取连接
  19. */
  20. def getConnection():Connection={
  21. //同步代码块
  22. AnyRef.synchronized({
  23. if(pool.isEmpty()){
  24. for( i <- 1 to connectionNum){
  25. val conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/aura","root","root");
  26. pool.push(conn);
  27. conNum+1;
  28. }
  29. }
  30. pool.poll();
  31. })
  32. }
  33. }







声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/945515
推荐阅读
相关标签
  

闽ICP备14008679号