SparkStreaming简单例子
◆ 构建第一个Streaming程序: (wordCount)
◆ Spark Streaming 程序最好以使用Maven或者sbt编译出来的独立应用的形式运行。
◆ 准备工作:
1.引入Spark Streaming的jar
2.scala流计算import声明
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
1.初始化StreamingContext对象
//创建一个本地StreamingContext两个工作线程和批间隔1秒。
val conf = new SparkConf()
conf.setMaster(“local[2]")
conf.setAppName(“ NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
2.获取DStream对象
//创建一个连接到主机名的DStream,像localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
3.操作DStream对象
//将每一行接收到的数据通过空格分割成单词
val words = lines.flatMap(_.split(" “))
//导入StreamingContext中的隐式转换
import org.apache.spark.streaming.StreamingContext._
// 对每一批次的单词进行转化求和
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// 每个批次中默认打印前十个元素到控制台
wordCounts.print()
4.启动流处理程序
ssc.start// 开始计算
ssc.awaitTermination() // 等待计算终止
ssc.stop() //结束应用
启动网络端口,模拟发送数据
1.借助于nc命令,手动输入数据
Linux/Mac :nc
Windows:cat
nc -lk 9999
2.借助于代码,编写一个模拟数据发生器
package com.briup.streaming import java.io.PrintWriter import java.net.ServerSocket import scala.io.Source object MassageServer { // 定义随机获取整数的方法 def index(length: Int) = { import java.util.Random val rdm = new Random rdm.nextInt(length) } def main(args: Array[String]) { println("模拟数据器启动!!!") // 获取指定文件总的行数 val filename ="Spark/ihaveadream.txt"; val lines = Source.fromFile(filename).getLines.toList val filerow = lines.length // 指定监听某端口,当外部程序请求时建立连接 val serversocket = new ServerSocket(9999); while (true) { //监听9999端口,获取socket对象 val socket = serversocket.accept() // println(socket) new Thread() { override def run = { println("Got client connected from: " + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(), true) while (true) { Thread.sleep(1000) // 当该端口接受请求时,随机获取某行数据发送给对方 val content = lines(index(filerow)) println (content) out.write(content + '\n') out.flush() } socket.close() } }.start() } } }
注意事项:
◆ 1.启动 Spark Streaming 之前所作的所有步骤只是创建了执行流程, 程序没有真正
连接上数据源,也没有对数据进行任何操作,只是设定好了所有的执行计划
◆ 2.当 ssc.start()启动后程序才真正进行所有预期的操作
◆ 3.执行会在另一个线程中进行,所以需要调用awaitTermination来等待流计算完成
◆ 4.一个Streaming context只能启动一次
◆ 5.如果模式是本地模式,那么请务必设置local[n] ,n>=2 1个用于接收,1个用于处理
package com.briup.streaming import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Duration, StreamingContext} object MyTestOldAPI { def main(args: Array[String]): Unit = { //设置日志级别 Logger.getLogger("org").setLevel(Level.WARN) //1 获取DS val conf = new SparkConf().setAppName("MyTestOldAPI").setMaster("local[*]") val dss = new StreamingContext(conf, Duration(1000)) val ds = dss.socketTextStream("localhost", 9999) //2 逻辑处理 //统计 val res = ds.filter(_ != "").flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) res.print() //3 开启实时处理任务 dss.start() dss.awaitTermination() dss.stop() } }