当前位置:   article > 正文

Spark Streaming及示例_sparkstreaming例子

sparkstreaming例子

Spark Streaming及示例

一、Spark Streaming介绍

Spark Streaming是近实时(near real time)的小批处理系统 。Spark Streaming是Spark core API的扩展,支持实时数据流的处理,并且具有可扩展,高吞吐量,容错的特点。 数据可以从许多来源获取,如Kafka,Flume,Kinesis或TCP sockets,并且可以使用复杂的算法进行处理,这些算法使用诸如map,reduce,join和window等高级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等

可以从三点进行考虑:输入—–计算—–输出。正如下图所示:

在这里插入图片描述

从图中也能看出它将输入的数据分成多个batch进行处理,严格来说spark streaming 并不是一个真正的实时框架,因为他是分批次进行处理的。

Spark Streaming提供了一个高层抽象,称为discretized stream或DStream,它表示连续的数据流。 DStream可以通过Kafka,Flume和Kinesis等来源的输入数据流创建,也可以通过在其他DStream上应用高级操作来创建。在内部,DStream表示为一系列RDD。
Discretized Streams或DStream是Spark Streaming提供的基本抽象。 它表示连续的数据流,即从源接收的输入数据流或通过转换输入流生成的已处理数据流。 在内部,DStream由连续的RDD系列表示,这是Spark对不可变的分布式数据集的抽象。 DStream中的每个RDD都包含来自特定时间间隔的数据,如下图所示。

在这里插入图片描述

在DStream上应用的任何操作都会转化为对每个RDD的操作

二、Spark Streaming示例
1、WordCount

该示例只计算5秒批次内的统计结果

在这里插入图片描述

maven的pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.jiang</groupId>
    <artifactId>spark</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.12.10</scala.version>
        <hadoop.version>3.2.0</hadoop.version>
        <spark.version>3.0.1</spark.version>
    </properties>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.10</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.1</version>
        </dependency>

         <!--https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.1</version>
        </dependency>


        <!--mysql数据库访问-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.37</version>
        </dependency>

        <dependency>
            <groupId>com.hankcs</groupId>
            <artifactId>hanlp</artifactId>
            <version>portable-1.7.8</version>
        </dependency>

    </dependencies>

    <build>
        <!--资源文件夹-->
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
        <!--声明并引入构建的插件-->
            <!--用于编译Scala代码到class-->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!--    -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114

scala代码:

package com.jiang.streaming

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCount01 {

  def main(args: Array[String]): Unit = {

    //TODO 0.准备环境
    val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次

    //TODO 1.加载数据
     val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.89.15",9999)

    //TODO 2.处理数据
    val resultDS: DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_+_)

    //TODO 3.输出结果
    resultDS.print()

    //TODO 4.启动并等待结果
    ssc.start()
    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    ssc.stop(stopSparkContext = true,stopGracefully = true)

  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

启动数据源:nc -lk 9999

启动程序

演示结果:

在这里插入图片描述

在这里插入图片描述

2、状态管理

该示例计算程序运行到结束的所有统计

scala代码:

package com.jiang.streaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/*
 * @param null
*
*  @Description: 状态管理
*/

object WordCount02 {

  def main(args: Array[String]): Unit = {

    //TODO 0.准备环境
    val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次

    // The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
    // state 存在checkpoint
    ssc.checkpoint("./ckp")

    //TODO 1.加载数据
     val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.89.15",9999)

    //TODO 2.处理数据
    // 定义一个函数用来处理状态
    // currentValues:表示该key(如spark)的当前批次的值,如:[1,1]
    // historyValues 表示该key(如spark)的历史值,第一次是0,后面就是之前的累加值
    val updateFunc = (currentValues:Seq[Int],historyValues:Option[Int]) =>{
      if(currentValues.size > 0){
        val currentResult = currentValues.sum + historyValues.getOrElse(0)
        Some(currentResult)
      }else{
        historyValues
      }
    }

    val resultDS: DStream[(String, Int)] = lines.flatMap(_.split("\\s+"))
      .map((_,1))
//      .reduceByKey(_+_)
        .updateStateByKey(updateFunc) //updateFunc: (Seq[Int], Option[NotInferedS]) => Option[NotInferedS]

    //TODO 3.输出结果
    resultDS.print()

    //TODO 4.启动并等待结果
    ssc.start()
    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    ssc.stop(stopSparkContext = true,stopGracefully = true)

  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
3、状态恢复

启动程序接着上次停止的数据继续统计

scala代码:

package com.jiang.streaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/*
 * @param null
*
*  @Description: 状态恢复
*/

object WordCount03 {

  def creatingFun():StreamingContext = {

    //TODO 0.准备环境
    val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次

    // The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
    // state 存在checkpoint
    ssc.checkpoint("./ckp")

    //TODO 1.加载数据
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.89.15",9999)

    //TODO 2.处理数据
    // 定义一个函数用来处理状态
    // currentValues:表示该key(如spark)的当前批次的值,如:[1,1]
    // historyValues 表示该key(如spark)的历史值,第一次是0,后面就是之前的累加值
    val updateFunc = (currentValues:Seq[Int],historyValues:Option[Int]) =>{
      if(currentValues.size > 0){
        val currentResult = currentValues.sum + historyValues.getOrElse(0)
        Some(currentResult)
      }else{
        historyValues
      }
    }

    val resultDS: DStream[(String, Int)] = lines.flatMap(_.split("\\s+"))
      .map((_,1))
      //      .reduceByKey(_+_)
      .updateStateByKey(updateFunc) //updateFunc: (Seq[Int], Option[NotInferedS]) => Option[NotInferedS]

    //TODO 3.输出结果
    resultDS.print()
    ssc
  }

  def main(args: Array[String]): Unit = {
    //TODO 0.准备环境
    val ssc: StreamingContext = StreamingContext.getOrCreate("./ckp",creatingFun _)
    ssc.sparkContext.setLogLevel("WARN")

    //TODO 4.启动并等待结果
    ssc.start()
    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    ssc.stop(stopSparkContext = true,stopGracefully = true)

  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
4、窗口计算

注意:窗口滑动长度和窗口长度一定要是SparkStreaming微批处理时间的整数倍,不然会报错.

scala代码:

package com.jiang.streaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object WordCountAndWindow {

  def main(args: Array[String]): Unit = {

    //TODO 0.准备环境
    val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次

    //TODO 1.加载数据
     val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.89.15",9999)

    //TODO 2.处理数据
    val resultDS: DStream[(String, Int)] = lines.flatMap(_.split("\\s+"))
        .map((_,1))

//      .reduceByKey(_+_)
      // windowDuration 窗口长度(大小),表示要计算最近多长时间的数据
      // slideDuration  滑动间隔,表示每隔多长时间计算一次
      // windowDuration和slideDuration必须是batchDuration(批次)的倍数
      .reduceByKeyAndWindow((a:Int,b:Int) => a+b,Seconds(10),Seconds(5))

    //TODO 3.输出结果
    resultDS.print()

    //TODO 4.启动并等待结果
    ssc.start()
    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    ssc.stop(stopSparkContext = true,stopGracefully = true)

  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
5、 模拟排行榜每隔10s计算最近20s的top3数据

scala代码:

package com.jiang.streaming

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/*
 * @param null
*
*  @Description:  模拟排行榜每隔10s计算最近20s的top3数据
*/

object WordCountAndTopN {

  def main(args: Array[String]): Unit = {

    //TODO 0.准备环境
    val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次

    //TODO 1.加载数据
     val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.89.15",9999)

    //TODO 2.处理数据
    // resultDS 是20s内统计的rdd集合
    val resultDS: DStream[(String, Int)] = lines.flatMap(_.split("\\s+"))
      .map((_, 1))
      //      .reduceByKey(_+_)
      // windowDuration 窗口长度(大小),表示要计算最近多长时间的数据
      // slideDuration  滑动间隔,表示每隔多长时间计算一次
      // windowDuration和slideDuration必须是batchDuration(批次)的倍数
      .reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(20), Seconds(10))

    val sortRDDs: DStream[(String, Int)] = resultDS.transform(rdd => { // rdd 是统计集合内的每个rdd个体
      val sortRdDD: RDD[(String, Int)] = rdd.sortBy(_._2, false)
      val top3: Array[(String, Int)] = rdd.sortBy(_._2, false).take(3)
      println("==========top3============")
      top3.foreach(println)
      println("==========top3============")
      sortRdDD
    })

    //TODO 3.输出结果
//    resultDS.print()
      sortRDDs.print()


    //TODO 4.启动并等待结果
    ssc.start()
    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    ssc.stop(stopSparkContext = true,stopGracefully = true)

  }

}
/*
* 测试数据
*
云南新增本土确诊病例3例热 云南新增本土确诊病例3例热 云南新增本土确诊病例3例热 云南新增本土确诊病例3例热 云南新增本土确诊病例3例热
航天员出舱七个小时怎么喝水 航天员出舱七个小时怎么喝水 航天员出舱七个小时怎么喝水 航天员出舱七个小时怎么喝水
中国成功发射风云三号05星 中国成功发射风云三号05星 中国成功发射风云三号05星
BOSS直聘等被启动网络安全审查新 BOSS直聘等被启动网络安全审查新
菲律宾军机坠毁致47死49伤
*/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
6、自定义输出----foreachRDD

将结果自定义输出到控制台、本地文件、mysql等

Scala代码:

package com.jiang.streaming

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, Timestamp}

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/*
 * @param null
*
*  @Description:  模拟排行榜每隔10s计算最近20s的top3数据
*  将统计的数据自定义输出。如:mysql、hdfs等
*/

object WordCountAndTopN02 {

  val url = "jdbc:mysql://localhost:3306/bigdata_test"
  var username = "root"
  val passwd = "123456"

  def main(args: Array[String]): Unit = {

    //TODO 0.准备环境
    val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次

    //TODO 1.加载数据
     val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.89.15",9999)

    //TODO 2.处理数据
    // resultDS 是20s内统计的rdd集合
    val resultDS: DStream[(String, Int)] = lines.flatMap(_.split("\\s+"))
      .map((_, 1))
      //      .reduceByKey(_+_)
      // windowDuration 窗口长度(大小),表示要计算最近多长时间的数据
      // slideDuration  滑动间隔,表示每隔多长时间计算一次
      // windowDuration和slideDuration必须是batchDuration(批次)的倍数
      .reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(20), Seconds(10))

    val sortRDDs: DStream[(String, Int)] = resultDS.transform(rdd => { // rdd 是统计集合内的每个rdd个体
      val sortRdDD: RDD[(String, Int)] = rdd.sortBy(_._2, false)
      val top3: Array[(String, Int)] = rdd.sortBy(_._2, false).take(3)
      println("==========top3============")
      top3.foreach(println)
      println("==========top3============")
      sortRdDD
    })

    //TODO 3.输出结果
    // sortRDDs.print() // 默认输出
    // 自定义输出
    sortRDDs.foreachRDD((rdd,time) => {
      val milliseconds: Long = time.milliseconds
      println("=======自定义输出======")
      println("batchtime---->"+milliseconds)
      println("=======自定义输出======")
      // 将结果自定义输出
      // 控制台输出
      rdd.foreach(println)
      // 输出到hdfs/本地文件
      rdd.coalesce(1).saveAsTextFile("data/output/result-"+milliseconds)
      // 输出到Mysql
      rdd.foreachPartition(iter => {
        // 开启连接
        val conn: Connection = DriverManager.getConnection(url,username,passwd)
        val sql = "insert into `hotwords` (`time`,`word`,`count`) values (?,?,?)"
        val ps: PreparedStatement = conn.prepareStatement(sql)

        iter.foreach(t =>{
          val word: String = t._1
          val count: Int = t._2
          ps.setTimestamp(1,new Timestamp(milliseconds))
          ps.setString(2,word)
          ps.setInt(3,count)
          ps.addBatch()
        })
        ps.executeBatch()
        // 关闭连接
        conn.close()
        ps.close()
      })
    })


    //TODO 4.启动并等待结果
    ssc.start()
    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    ssc.stop(stopSparkContext = true,stopGracefully = true)

  }

}
/*
* 测试数据
*
云南新增本土确诊病例3例热 云南新增本土确诊病例3例热 云南新增本土确诊病例3例热 云南新增本土确诊病例3例热 云南新增本土确诊病例3例热
航天员出舱七个小时怎么喝水 航天员出舱七个小时怎么喝水 航天员出舱七个小时怎么喝水 航天员出舱七个小时怎么喝水
中国成功发射风云三号05星 中国成功发射风云三号05星 中国成功发射风云三号05星
BOSS直聘等被启动网络安全审查新 BOSS直聘等被启动网络安全审查新
菲律宾军机坠毁致47死49伤
*/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/975633
推荐阅读
相关标签
  

闽ICP备14008679号