当前位置:   article > 正文

Flink实战系列(二)之基础API补充以及Source的使用_apisources 配置

apisources 配置

Transformation顶层类

在这里插入图片描述

Function
    map ==> MapFunction
    filter ==> FilterFunction
    xxx ==>  XxxFunction
    RichXxxFunction  *****
  • 1
  • 2
  • 3
  • 4
  • 5

Streaming编程

在这里插入图片描述

这次我们们来看看Flink的Source和Sink,Flink支持向文件、socket、集合等中读写数据,同时Flink也内置许多connectors,例如Kafka、Hadoop、Redis等。这些内置的connectors都是维护精确一次语义,而SparkStreaming对接Kafka、Redis等需要使用者自己维护精确一次语义。接下来,我们来看看Flink如何自定义Source,本文不涉及到checkpoint等实现。

Anatomy of a Flink Program

在这里插入图片描述
Flink流程不管多复杂,都是上述这个流程

Test

package com.ruozedata.flink.flink02

import com.ruozedata.flink.fink01.Domain.Access
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object SourceApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //测试使用
    env.fromCollection(List(
      Access(201912120010L, "ruozedata.com", 2000),
      Access(201912120011L, "ruozedata.com", 3000)
    )
    ).print()

    env.execute(this.getClass.getSimpleName)
  }
}

结果:
4> Access(201912120010,ruozedata.com,2000)
1> Access(201912120011,ruozedata.com,3000)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
/**
   * Creates a DataStream that contains the given elements. The elements must all be of the
   * same type.
   *
   * Note that this operation will result in a non-parallel data source, i.e. a data source with
   * a parallelism of one.
   */
  def fromElements[T: TypeInformation](data: T*): DataStream[T] = {
    fromCollection(data)
  }

注意:前面的泛型 ,如果加上,会类型限定的

//比如加了[Int]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

在这里插入图片描述

package com.ruozedata.flink.flink02

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object SourceApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //测试使用
    env.fromElements(1, 2L, 3D, 4F, "5").print()

    env.execute(this.getClass.getSimpleName)
  }
}

结果:
4> 3.0
1> 4.0
3> 2
2> 1
2> 5


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

自定义Source

小知识点:

json性能不一定好,因为使用json格式的话,每一行都必须包含字段的名称,这些对于大量的在网络传输上面是不行的,开销很大,生产上都是使用纯文本的,但是每个字段名称,字段分隔符等都是要预先定义好的

官网Flink提供以下三类接口来实现你所想要的Source:

  • SourceFunction:对所有StreamSource的顶层接口,直接继承该接口的Source无法将并行度设置大于1,不支持并发
/**
 * 当Source运行时就会调用run方法,结束时调用cancel
 */
class MyNonParallelSource extends SourceFunction[Access]{

  private var isRunning = true
  override def run(ctx: SourceFunction.SourceContext[Access]): Unit = {
    val domains = List("ruozedata.com", "dongqiudi.com", "zhibo8.com")
    val random = new Random()
    while (isRunning) {
      val time = System.currentTimeMillis() + ""
      val domain = domains(random.nextInt(domains.length))
      val flow = random.nextInt(10000)
      1.to(10).map(x => ctx.collect(Access(time, domain, flow)))
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}

//自定义生成数据,并行度不能被设置超过1
val ds = env.addSource(new MyNonParallelSource)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

这个很重要用于测试产生数据

package com.ruozedata.flink.flink02

import com.ruozedata.flink.fink01.Domain.Access
import org.apache.flink.streaming.api.functions.source.SourceFunction

import scala.util.Random

class AccessSource extends SourceFunction[Access] {
  override def run(sourceContext: SourceFunction.SourceContext[Access]): Unit = {
    val random = new Random()
    val domains = Array("ruozedata.com", "zhibo8.cc", "dongqiudi.com")

    while (running) {
      val timestamp = System.currentTimeMillis()
      1.to(10).map(x => {
        sourceContext.collect(Access(timestamp, domains(random.nextInt(domains.length)), random.nextInt(1000) + x))
      })

      // 休息下 每个5s
      Thread.sleep(5000)
    }
  }

  //开关
  var running = true

  override def cancel(): Unit = {
    running = false
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
package com.ruozedata.flink.flink02

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object SourceApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //测试使用
    env.addSource(new AccessSource).print()


//    env.fromElements(1, 2L, 3D, 4F, "5").print()

    env.execute(this.getClass.getSimpleName)
  }
}

结果是:

4> Access(1588567173212,dongqiudi.com,284)
4> Access(1588567173212,ruozedata.com,901)
2> Access(1588567173212,dongqiudi.com,861)
2> Access(1588567173212,dongqiudi.com,346)
2> Access(1588567173212,ruozedata.com,236)
3> Access(1588567173212,zhibo8.cc,678)
1> Access(1588567173212,dongqiudi.com,621)
1> Access(1588567173212,ruozedata.com,95)
3> Access(1588567173212,zhibo8.cc,375)
3> Access(1588567173212,dongqiudi.com,927)
1> Access(1588567178270,ruozedata.com,630)
.........
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

测试:SourceFunction 是没有并行度可言的

package com.ruozedata.flink.flink02

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object SourceApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //测试使用
    env.addSource(new AccessSource).setParallelism(3).print()

    env.execute(this.getClass.getSimpleName)
  }
}


结果:
Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
	at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55)
	at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:31)
	at org.apache.flink.streaming.api.scala.DataStream.setParallelism(DataStream.scala:130)
	at com.ruozedata.flink.flink02.SourceApp$.main(SourceApp.scala:12)
	at com.ruozedata.flink.flink02.SourceApp.main(SourceApp.scala)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

查看源码:debug查看

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

所以在大数据场景下,没有并行度,这个就不能用

  • ParallelSourceFunction:继承该接口的实例能够将并行度设置大于1
class MyParallelSource extends ParallelSourceFunction[Access] {
  private var isRunning = true

  override def run(ctx: SourceFunction.SourceContext[Access]): Unit = {
    val domains = List("ruozedata.com", "dongqiudi.com", "zhibo8.com")
    val random = new Random()
    while (isRunning) {
      val time = System.currentTimeMillis() + ""
      val domain = domains(random.nextInt(domains.length))
      val flow = random.nextInt(10000)
      1.to(10).map(x => ctx.collect(Access(time, domain, flow)))
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}

//自定义生成数据,并行度能被设置大于1
val ds2 = env.addSource(new MyParallelSource)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
package com.ruozedata.flink.flink02

import com.ruozedata.flink.fink01.Domain.Access
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}

import scala.util.Random

class AccessSource02 extends ParallelSourceFunction[Access] {

  var running = true

  override def cancel(): Unit = {
    running = false
  }

  override def run(ctx: SourceFunction.SourceContext[Access]): Unit = {
    val random = new Random()
    val domains = Array("ruozedata.com", "zhibo8.cc", "dongqiudi.com")

    while (running) {
      val timestamp = System.currentTimeMillis()
      1.to(10).map(x => {
        ctx.collect(Access(timestamp, domains(random.nextInt(domains.length)), random.nextInt(1000) + x))
      })

      // 休息下
      Thread.sleep(5000)
    }
  }


}
//注意:代码是一样的 就换个继承
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
package com.ruozedata.flink.flink02

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object SourceApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //测试使用
    env.addSource(new AccessSource02).setParallelism(3).print()

    env.execute(this.getClass.getSimpleName)
  }
}


结果是:
2> Access(1588584321030,dongqiudi.com,279)
1> Access(1588584321031,zhibo8.cc,57)
2> Access(1588584321030,ruozedata.com,377)
3> Access(1588584321030,ruozedata.com,334)
3> Access(1588584321030,dongqiudi.com,995)
3> Access(1588584321031,ruozedata.com,314)
3> Access(1588584321031,dongqiudi.com,385)
3> Access(1588584321031,ruozedata.com,363)
3> Access(1588584321031,ruozedata.com,872)
4> Access(1588584321030,dongqiudi.com,885)
4> Access(1588584321030,dongqiudi.com,130)
4> Access(1588584321030,ruozedata.com,95)
3> Access(1588584321031,dongqiudi.com,454)
2> Access(1588584321031,dongqiudi.com,141)
1> Access(1588584321031,ruozedata.com,525)
1> Access(1588584321030,dongqiudi.com,693)

注意:
这就是每个5s发送10条数据 但是并行度是3的情况下 
就是 每个5s 发送 10*3 = 30 条数据
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • RichParallelSourceFunction:继承该接口不仅将并行度设置大于1而且能够实现生命周期函数如:open、close,推荐用这个
/**
 * MySQL作为FLink的Source
 */
class MySQLSource extends RichParallelSourceFunction[City]{

  private var conn:Connection = _
  private var state:PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    val url = "jdbc:mysql://localhost:3306/g7"
    val user = "ruoze"
    val password = "ruozedata"
    conn = MySQLUtil.getConnection(url,user,password)
  }

  override def close(): Unit = {
    MySQLUtil.close(conn,state)
  }

  override def run(ctx: SourceFunction.SourceContext[City]): Unit = {
    val sql = "select * from city_info"
    state = conn.prepareStatement(sql)
    val rs = state.executeQuery()
    while(rs.next()){
      val id = rs.getInt(1)
      val name = rs.getString(2)
      val area = rs.getString(3)
      ctx.collect(City(id,name,area))
    }
  }

  override def cancel(): Unit = {}
}

//自定义从MySQL中取出数据,支持并行度大于1
val ds3 = env.addSource(new MySQLSource)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
注意:
	和上面的核心代码一样
	但是 这个Rich里面

/**
 * Base class for implementing a parallel data source. Upon execution, the runtime will
 * execute as many parallel instances of this function as configured parallelism
 * of the source.
 *
 * <p>The data source has access to context information (such as the number of parallel
 * instances of the source, and which parallel instance the current instance is)
 * via {@link #getRuntimeContext()}. It also provides additional life-cycle methods
 * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.</p>
 *
 * @param <OUT> The type of the records produced by this source.
 */
@Public
public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
		implements ParallelSourceFunction<OUT> {

	private static final long serialVersionUID = 1L;
}


1. extends AbstractRichFunction 
而AbstractRichFunction 
是有生命周期的方法
    open()
    close()

所以:
	如果做一些
	1.文件系统 
	2.初始化
	3.io
	4.mysql 等等
就得在open里和close里面获取连接 和关闭连接

因为:
	open()  close() 一个task会执行一次 (前面文章有例子)task就是并行度
	所以要使用这个Function 
	生命周期是可以控制的
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
package com.ruozedata.flink.flink02

import com.ruozedata.flink.fink01.Domain.Access
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}

import scala.util.Random

class AccessSource03  extends RichParallelSourceFunction[Access] {
  var running = true

  override def cancel(): Unit = {
    running = false
  }

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
  }

  override def close(): Unit = {
    super.close()
  }

  override def run(ctx: SourceFunction.SourceContext[Access]): Unit = {
    val random = new Random()
    val domains = Array("ruozedata.com","zhibo8.cc","dongqiudi.com")

    while(running) {
      val timestamp = System.currentTimeMillis()
      1.to(10).map(x => {
        ctx.collect(Access(timestamp, domains(random.nextInt(domains.length)), random.nextInt(1000)+x))
      })

      // 休息下
      Thread.sleep(5000)
    }
  }

}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

上面就是 Source的最基本的使用

连接MySQL

在这里插入图片描述

在这里插入图片描述
那么现在有一个需求:连接MySQL ,查看官网 没有MySQL的Connector,那么只能去github上找 或者自己写一个Source即可;需求就是:把MySQL数据读取进来 ,操作MySQL pom里要加入MySQL依赖的

mysql> select * from mysql.student;
+------+--------+----------+------+
| id   | name   | password | age  |
+------+--------+----------+------+
|    1 | kairis | leo123   |   18 |
|    2 | tonny  | 222      |   19 |
|    3 | hello  | 333      |   20 |
+------+--------+----------+------+
3 rows in set (0.00 sec)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
package com.ruozedata.flink.flink02

import java.sql.{Connection, PreparedStatement}

import com.ruozedata.flink.fink01.Domain.Student
import com.ruozedata.flink.utils.MySQLUtils
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}

/**
 * 读取MySQL中的数据
 */

class MySQLSource extends RichSourceFunction[Student] {
  var connection: Connection = _
  var pstmt: PreparedStatement = _

  // 在open方法中建立连接
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    connection = MySQLUtils.getConnection()
    pstmt = connection.prepareStatement("select * from student")
  }

  // 释放
  override def close(): Unit = {
    super.close()
    MySQLUtils.closeResource(connection, pstmt)
  }

  override def cancel(): Unit = {

  }

  override def run(ctx: SourceFunction.SourceContext[Student]): Unit = {
    val rs = pstmt.executeQuery()
    while (rs.next()) {
      val student = Student(rs.getInt("id"), rs.getString("name"), rs.getString("password"), rs.getInt("age"))
      ctx.collect(student)
    }
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
package com.ruozedata.flink.flink02

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object SourceApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //测试使用
//    env.addSource(new AccessSource03).setParallelism(3).print()
    env.addSource(new MySQLSource ).print()

    env.execute(this.getClass.getSimpleName)
  }
}

结果:
4> Student(3,hello,333,20)
2> Student(1,kairis,leo123,18)
3> Student(2,tonny,222,19)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

当然上面这种实现方式太low了,还有更好的实现方式

注意:
	连接MySQL 生产上是有并行度的 
	你不能在run里面建立连接   为什么呢?什么会触发run呢?主要的逻辑会触发run 
	这就类似Spark里面的 foreach 和foreachPartiion
得借助于 open方法:
	 去拿到连接   这个是每个task会执行一次
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
env.addSource(new MySQLSource).setParallelism(3).print()
会报错的:
Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
为什么呢?
但是这种方式 太low了 


使用scalikjdbc: 这种方式更优雅些
在配置文件里可以配置连接池
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
class ScalikeJDBCMySQLSource extends RichSourceFunction[Student] {
  override def run(ctx: SourceFunction.SourceContext[Student]): Unit = {
    println("~~~run~~~~")
    DBs.setupAll() // parse configuration file
    DB.readOnly { implicit session => {
      SQL("select * from student").map(rs => {
        val student = Student(rs.int("id"), rs.string("name"), rs.string("password"), rs.int("age"))
        ctx.collect(student)
      }).list().apply()

    }
    }
  }

  override def cancel(): Unit = {

  }
}


运行结果:
~~~run~~~~
4> Student(1,kairis,leo123,18)
1> Student(2,tonny,222,19)
2> Student(3,hello,333,20)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

读取kafka的数据

注意:不同版本的kafka Flink 使用的api不同

在这里插入图片描述

但是在 0.11之后就统一了

在这里插入图片描述

所以我使用的 pom是:
我的flink 是 1.9.0的
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
Flink’s Kafka consumer is
 called FlinkKafkaConsumer08 (or 09 for
  Kafka 0.9.0.x versions, etc. 
 or 
 just FlinkKafkaConsumer for Kafka >= 1.0.0 versions).
  It provides access to one or more Kafka topics.



Source:
	Flink作为Kafka的消费者
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

Kafka Consumer

在这里插入图片描述

Example:

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
stream = env
    .addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
    .print()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

FlinkKafkaConsumer010源码

/**
	/**
	 * Creates a new Kafka streaming source consumer for Kafka 0.10.x.
	 *
	 * @param topic
	 *           The name of the topic that should be consumed.
	 * @param valueDeserializer
	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
	 * @param props
	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
	 */
	public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
		this(Collections.singletonList(topic), valueDeserializer, props);
	}
//构造器三个参数
1.topic
2.DeserializationSchema   接收数据 反序列化器 ,Kafka生产数据的时候是要序列化的,消费数据时是要反序列化的
3.Properties	
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
先开启:
package com.ruozedata.flink.flink02

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

object SourceApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //测试使用
    //    env.addSource(new ScalikeJDBCMySQLSource).print()

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop001:9092") //有几个节点就配几个,不一定要配全
    // only required for Kafka 0.8
    //    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "ruozedata-flink-test")

    val consumer = new FlinkKafkaConsumer010[String]("ruozedata_offset", new SimpleStringSchema(), properties)

    env.addSource(consumer).print()

    env.execute(this.getClass.getSimpleName)
  }
}

结果:
4> c
4> f
4> d
4> e
4> a
4> d

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
往kafka发送数据:
object DataGenerator {
  private val logger: Logger = LoggerFactory.getLogger(DataGenerator.getClass)

  def main(args: Array[String]): Unit = {

    val props = new Properties()
    props.put("bootstrap.servers", "hadoop001:9092")
    props.put("acks", "all")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)

    for (i <- 0 to 5) {
      Thread.sleep(100)

      //拿一个abcdef
      val word: String = String.valueOf((new Random().nextInt(6) + 'a').toChar)
  val part = i % 3 //发到哪个分区 因为是三个分区

   logger.error("word : {}", word)

      val record = producer.send(new ProducerRecord[String, String]("ruozedata_offset", "", word))

    }

    producer.close()
    println("数据产生完毕..........")
  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
Flink里面的kafka的offset 非常 好管理的 
Spark里面的 是批次 每个批次的偏移量的管理 
Flink是一条数据 进来的 一条数据进来的 
控制偏移量就是api的使用 很简单
  • 1
  • 2
  • 3
  • 4
val myConsumer = new FlinkKafkaConsumer08[String](...)
myConsumer.setStartFromEarliest()      // start from the earliest record possible
myConsumer.setStartFromLatest()        // start from the latest record
myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets()  // the default behaviour

val stream = env.addSource(myConsumer)

这是官网上的
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

那么先讲使用,里面的状态和checkpoint 容错等 后续写入文章

package com.ruozedata.flink.flink02

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010

object SourceApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //测试使用
    //    env.addSource(new ScalikeJDBCMySQLSource).print()

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop001:9092") //有几个节点就配几个,不一定要配全
    // only required for Kafka 0.8
    //    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "ruozedata-flink-test")

    val consumer = new FlinkKafkaConsumer010[String]("ruozedata_offset", new SimpleStringSchema(), properties)

    //控制偏移量
    consumer.setStartFromEarliest()

    env.addSource(consumer).print()

    env.execute(this.getClass.getSimpleName)
  }
}


结果:
4> c
4> f
4> d
4> e
4> a
4> d

看 这就把我们前面写入kafka的数据读出来了
所以Flink里的偏移量很简单 底层管理的很好

问题:
 我能不能指定 一个分区 分区里面的 offset呢?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

在这里插入图片描述

在这里插入图片描述
上述这种场景就可以在修数据的时候用

总结

DataSource有内置的,connector,以及自定义的

DataStream Transformations

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/index.html#datastream-transformations

和Spark大部分都差不多 ,下面说一下 Spark里没有的

package com.ruozedata.flink.flink02

import com.ruozedata.flink.fink01.Domain.Access
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

object TranformationApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = env.readTextFile("data/access.log").map(x => {
      val splits = x.split(",")
      Access(splits(0).toLong, splits(1), splits(2).toLong)
    })


      stream.keyBy(_.domain).sum("traffic").print("sum")

    env.execute(this.getClass.getSimpleName)
  }


}

结果是:
sum:3> Access(201912120010,dongqiudi.com,1000)
sum:3> Access(201912120010,zhibo8.com,5000)
sum:3> Access(201912120010,ruozedata.com,2000)
sum:3> Access(201912120010,dongqiudi.com,7000)
sum:3> Access(201912120010,ruozedata.com,6000)

原始数据:

201912120010,ruozedata.com,2000
201912120010,dongqiudi.com,6000
201912120010,zhibo8.com,5000
201912120010,ruozedata.com,4000
201912120010,dongqiudi.com,1000

为什么结果是这样子的:
   因为这是进来一条统计一次 
注意和Spark的区别: Spark里是一个批次的
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

拓展一下

package com.ruozedata.flink.flink02

import com.ruozedata.flink.fink01.Domain.Access
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

object TranformationApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = env.readTextFile("data/access.log").map(x => {
      val splits = x.split(",")
      Access(splits(0).toLong, splits(1), splits(2).toLong)
    })


    stream.keyBy(_.domain).reduce((x, y) => {
      Access(x.time, x.domain, (x.traffic + y.traffic + 100))
    }).print()

    env.execute(this.getClass.getSimpleName)
  }


}


结果:
3> Access(201912120010,zhibo8.com,5000)
3> Access(201912120010,ruozedata.com,4000)
3> Access(201912120010,dongqiudi.com,1000)
3> Access(201912120010,ruozedata.com,6100)
3> Access(201912120010,dongqiudi.com,7100)

注意:
1.reduce 比sum灵活  ,reduce可以实现自己的业务逻辑,keyby之后 是把相同的domain 放到一起了
2.为什么zhibo8.com 结果没有+100

进来一条统计一次  为什么结果没有+100呢?
因为 就1个呀  ,reduce 把相邻的两个做操作 ,1个操作啥
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

分流(把一个流拆开):

v

这个功能 可能会用的到:
这里使用split 后续是使用 siteoutput输出  因为split过时了  不过没有关系
package com.ruozedata.flink.flink02

import com.ruozedata.flink.fink01.Domain.Access
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

object TranformationApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = env.readTextFile("data/access.log").map(x => {
      val splits = x.split(",")
      Access(splits(0).toLong, splits(1), splits(2).toLong)
    })


    // 5000 6000 7000三个结果
    val splitStream = stream.keyBy("domain").sum("traffic").split(x => {
      if (x.traffic > 6000) {
        Seq("大客户")
      } else {
        Seq("一般客户")
      }
    })

    splitStream.select("大客户").print("大客户")
    splitStream.select("一般客户").print("一般客户")

    env.execute(this.getClass.getSimpleName)
  }
}

结果是:
一般客户:3> Access(201912120010,dongqiudi.com,1000)
一般客户:3> Access(201912120010,zhibo8.com,5000)
一般客户:3> Access(201912120010,ruozedata.com,2000)
大客户:3> Access(201912120010,dongqiudi.com,7000)
一般客户:3> Access(201912120010,ruozedata.com,6000)

splitStream.select("大客户", "一般客户").print("ALL")

结果是:
一般客户:3> Access(201912120010,dongqiudi.com,1000)
ALL:3> Access(201912120010,dongqiudi.com,1000)
一般客户:3> Access(201912120010,ruozedata.com,4000)
ALL:3> Access(201912120010,ruozedata.com,4000)
一般客户:3> Access(201912120010,zhibo8.com,5000)
ALL:3> Access(201912120010,zhibo8.com,5000)
一般客户:3> Access(201912120010,ruozedata.com,6000)
ALL:3> Access(201912120010,ruozedata.com,6000)
大客户:3> Access(201912120010,dongqiudi.com,7000)
ALL:3> Access(201912120010,dongqiudi.com,7000)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

合流

两个流是可以合的 :
union
connect

object TranformationApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment


    val stream = env.readTextFile("data/access.log").map(x => {
      val splits = x.split(",")
      Access(splits(0).toLong, splits(1), splits(2).toLong)
    })
    

        val stream1 = env.addSource(new AccessSource)
        val stream2 = env.addSource(new AccessSource)

//stream1和stream2的数据类型是一样的
        stream1.union(stream2).map(x=>{
          println("接收到的数据:" + x)
          x
        }).print()
  

    env.execute(this.getClass.getSimpleName)
  }

}

结果:不重要哈 结果不重要 
接收到的数据:Access(1588660509252,ruozedata.com,786)
4> Access(1588660509252,ruozedata.com,786)
接收到的数据:Access(1588660509252,ruozedata.com,807)
4> Access(1588660509252,ruozedata.com,807)
接收到的数据:Access(1588660509252,dongqiudi.com,883)
4> Access(1588660509252,dongqiudi.com,883)
接收到的数据:Access(1588660509255,zhibo8.cc,455)
4> Access(1588660509255,zhibo8.cc,455)
接收到的数据:Access(1588660509255,ruozedata.com,691)
4> Access(1588660509255,ruozedata.com,691)
接收到的数据:Access(1588660509252,ruozedata.com,59)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
/**
   * Creates a new DataStream by merging DataStream outputs of
   * the same type with each other. The DataStreams merged using this operator
   * will be transformed simultaneously.
   *
   */
  def union(dataStreams: DataStream[T]*): DataStream[T] =
    asScalaStream(stream.union(dataStreams.map(_.javaStream): _*))
union:  he same type with each other.  要求数据类型是一样的 两个流

但是生产上 要合并两个流 很少 两个流数据类型是一样的 

 /**
   * Creates a new ConnectedStreams by connecting
   * DataStream outputs of different type with each other. The
   * DataStreams connected using this operators can be used with CoFunctions.
   */
  def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] =
    asScalaStream(stream.connect(dataStream.javaStream))


两个流 数据类型不一样的: 使用 connect
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
两个流 数据类型不一样的:

def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R):


注意:
fun1: IN1 => R, fun2: IN2 => R

fun1: IN1 => R   第一个流  做的操作
 fun2: IN2 => R  第二个流 做的操作
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
object TranformationApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment


    val stream = env.readTextFile("data/access.log").map(x => {
      val splits = x.split(",")
      Access(splits(0).toLong, splits(1), splits(2).toLong)
    })

    val stream1 = env.addSource(new AccessSource)
    val stream2 = env.addSource(new AccessSource)
   
   val stream2New = stream2.map(x => ("J哥", x))
   
   stream1.connect(stream2New).map(x=>x,y=>y).print()

    env.execute(this.getClass.getSimpleName)
  }

}

结果是:
3> Access(1588660731150,ruozedata.com,308)
3> Access(1588660731150,zhibo8.cc,868)
3> Access(1588660731150,dongqiudi.com,994)
4> Access(1588660731150,ruozedata.com,721)
4> Access(1588660731150,ruozedata.com,640)
4> Access(1588660731150,zhibo8.cc,381)
3> (J哥,Access(1588660731154,dongqiudi.com,639))
3> (J哥,Access(1588660731154,zhibo8.cc,282))
3> (J哥,Access(1588660731154,ruozedata.com,645))
2> Access(1588660731150,dongqiudi.com,379)
2> Access(1588660731150,ruozedata.com,406)
2> (J哥,Access(1588660731154,zhibo8.cc,249))
2> (J哥,Access(1588660731154,dongqiudi.com,973))
2> (J哥,Access(1588660731154,zhibo8.cc,97))
4> (J哥,Access(1588660731154,ruozedata.com,342))
4> (J哥,Access(1588660731154,ruozedata.com,72))
1> Access(1588660731150,dongqiudi.com,248)
1> Access(1588660731150,ruozedata.com,55)
1> (J哥,Access(1588660731154,ruozedata.com,185))
1> (J哥,Access(1588660731154,dongqiudi.com,174))
2> Access(1588660736197,dongqiudi.com,213)
2> Access(1588660736197,ruozedata.com,517)
2> Access(1588660736197,dongqiudi.com,374)
3> Access(1588660736197,zhibo8.cc,156)
1> Access(1588660736197,dongqiudi.com,62)
1> Access(1588660736197,zhibo8.cc,642)
4> Access(1588660736197,zhibo8.cc,552)
1> Access(1588660736197,ruozedata.com,604)
3> Access(1588660736197,zhibo8.cc,318)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
connect: 
     Connects  two data streams retaining their types 
     数据结构可以不同
     two data streams  **
union:
       Union of two or more data streams
        数据结构要相同
        two or more data streams
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Physical partitioning

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#physical-partitioning

class RuozedataPartitioner extends Partitioner[String] {
  override def partition(key: String, numPartitions: Int): Int = {
    println("partitions: " + numPartitions)
    // 注意 scala 里面 不用使用 equals  直接 ==  即可
    if(key == "ruozedata.com"){
      0
    } else if(key == "dongqiudi.com"){
      1
    } else {
      2
    }

  }
}


注意:
Partitioner[String]

传进去的泛型 是 key的类型 

分区的前提 是 kv      
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
/**
   * Partitions a tuple DataStream on the specified key fields using a custom partitioner.
   * This method takes the key position to partition on, and a partitioner that accepts the key
   * type.
   *
   * Note: This method works only on single field keys.
   */
  def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T] =
    asScalaStream(stream.partitionCustom(partitioner, field))

field: Int  注意这个
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
package com.ruozedata.flink.flink02

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

object TranformationApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(3)

    env.addSource(new AccessSource)
      //这里分区肯定是要kv的格式
      .map(x=>(x.domain, x))
      .partitionCustom(new RuozedataPartitioner, 0)
      .map(x => {
        println("current thread id is: " + Thread.currentThread().getId + " , value is: " + x)
        x._2
      }).print()


    env.execute(this.getClass.getSimpleName)
  }

}

结果是:
partitions: 3
partitions: 3
partitions: 3
partitions: 3
partitions: 3
partitions: 3
partitions: 3
partitions: 3
partitions: 3
partitions: 3
current thread id is: 60 , value is: (ruozedata.com,Access(1588661525735,ruozedata.com,985))
1> Access(1588661525735,ruozedata.com,985)
current thread id is: 60 , value is: (ruozedata.com,Access(1588661525735,ruozedata.com,199))
1> Access(1588661525735,ruozedata.com,199)
current thread id is: 63 , value is: (zhibo8.cc,Access(1588661525735,zhibo8.cc,826))
3> Access(1588661525735,zhibo8.cc,826)
current thread id is: 63 , value is: (zhibo8.cc,Access(1588661525735,zhibo8.cc,421))
3> Access(1588661525735,zhibo8.cc,421)
current thread id is: 60 , value is: (ruozedata.com,Access(1588661525735,ruozedata.com,900))
1> Access(1588661525735,ruozedata.com,900)
current thread id is: 60 , value is: (ruozedata.com,Access(1588661525735,ruozedata.com,699))
1> Access(1588661525735,ruozedata.com,699)
current thread id is: 62 , value is: (dongqiudi.com,Access(1588661525735,dongqiudi.com,840))
2> Access(1588661525735,dongqiudi.com,840)
current thread id is: 62 , value is: (dongqiudi.com,Access(1588661525735,dongqiudi.com,115))
2> Access(1588661525735,dongqiudi.com,115)
current thread id is: 62 , value is: (dongqiudi.com,Access(1588661525735,dongqiudi.com,8))
2> Access(1588661525735,dongqiudi.com,8)
current thread id is: 63 , value is: (zhibo8.cc,Access(1588661525735,zhibo8.cc,109))
3> Access(1588661525735,zhibo8.cc,109)
partitions: 3
partitions: 3
partitions: 3
partitions: 3
partitions: 3
partitions: 3
partitions: 3
partitions: 3
partitions: 3
partitions: 3
current thread id is: 60 , value is: (ruozedata.com,Access(1588661530778,ruozedata.com,475))
current thread id is: 63 , value is: (zhibo8.cc,Access(1588661530778,zhibo8.cc,189))
current thread id is: 62 , value is: (dongqiudi.com,Access(1588661530778,dongqiudi.com,408))
3> Access(1588661530778,zhibo8.cc,189)
2> Access(1588661530778,dongqiudi.com,408)
current thread id is: 63 , value is: (zhibo8.cc,Access(1588661530778,zhibo8.cc,663))
1> Access(1588661530778,ruozedata.com,475)
3> Access(1588661530778,zhibo8.cc,663)
current thread id is: 63 , value is: (zhibo8.cc,Access(1588661530778,zhibo8.cc,766))
current thread id is: 60 , value is: (ruozedata.com,Access(1588661530778,ruozedata.com,799))
1> Access(1588661530778,ruozedata.com,799)
current thread id is: 62 , value is: (dongqiudi.com,Access(1588661530778,dongqiudi.com,126))
2> Access(1588661530778,dongqiudi.com,126)
3> Access(1588661530778,zhibo8.cc,766)
current thread id is: 60 , value is: (ruozedata.com,Access(1588661530778,ruozedata.com,110))
current thread id is: 63 , value is: (zhibo8.cc,Access(1588661530778,zhibo8.cc,361))
1> Access(1588661530778,ruozedata.com,110)
3> Access(1588661530778,zhibo8.cc,361)
current thread id is: 60 , value is: (ruozedata.com,Access(1588661530778,ruozedata.com,783))
1> Access(1588661530778,ruozedata.com,783)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
注意:线程
1-> ruozedata.com
2->dongqiudi.com
3>zhibo8.cc  
结果是没有问题的

这就是分区器的简单使用
让每个线程处理不同的数据

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/484433
推荐阅读
相关标签
  

闽ICP备14008679号