当前位置:   article > 正文

Scala编程实战_scala实战

scala实战

 

 

Scala编程实战

  1. 课程目标
    1. 目标:熟练使用Scala编写程序

 

 

  1. 项目概述
    1. 需求

目前大多数的分布式架构底层通信都是通过RPC实现的,RPC框架非常多,比如前我们学过的Hadoop项目的RPC通信框架,但是Hadoop在设计之初就是为了运行长达数小时的批量分析而设计的,在某些极端的情况下,任务提交的延迟很高,所以Hadoop的RPC显得有些笨重。

 

Spark 的RPC是通过Akka类库实现的,Akka用Scala语言开发,基于Actor并发模型实现,Akka具有高可靠、高性能、可扩展等特点,使用Akka可以轻松实现分布式RPC功能。

    1. Akka简介

Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。

 

Actor模型:在计算机科学领域,Actor模型是一个并行计算(Concurrent Computation)模型,它把actor作为并行计算的基本元素来对待:为响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。

Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性:

 

1.提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发

2.提供了异步非阻塞的、高性能的事件驱动编程模型

3.超级轻量级事件处理(每GB堆内存几百万Actor)

 

  1. 项目实现
    1. 架构图

    1. 重要类介绍
      1. ActorSystem

在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor

      1. Actor

在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。

 

  1. preStart()方法:该方法在Actor对象构造器执行后执行,整个Actor生命周期中仅执行一次。
  2. receive()方法:该方法在ActorpreStart方法执行完成后执行,用于接收发送消息,会被反复执行。
    1. Master
  1. package cn.akkaTest.spark
  2. import scala.concurrent.duration._
  3. import akka.actor.{Props, ActorSystem, Actor}
  4. import akka.actor.Actor.Receive
  5. import com.typesafe.config.ConfigFactory
  6. import scala.collection.mutable
  7. /**
  8.   * Master为整个集群中的主节点
  9.   * Master继承了Actor
  10.   */
  11. class Master extends Actor{
  12.   //保存WorkerID和Work信息的map
  13.   val idToWorker = new mutable.HashMap[String, WorkerInfo]
  14.   //保存所有Worker信息的Set
  15.   val workers = new mutable.HashSet[WorkerInfo]
  16.   //Worker超时时间
  17.   val WORKER_TIMEOUT = 10 * 1000
  18.   //重新receive方法
  19.   //导入隐式转换,用于启动定时器
  20.   import context.dispatcher
  21.   //构造方法执行完执行一次
  22.   override def preStart(): Unit = {
  23.     //启动定时器,定时执行
  24.     context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker)
  25.   }
  26.   //该方法会被反复执行,用于接收消息,通过case class模式匹配接收消息
  27.   override def receive: Receive = {
  28.     //Worker向Master发送的注册消息
  29.     case RegisterWorker(id, workerHost, memory, cores) => {
  30.       if(!idToWorker.contains(id)) {
  31.         val worker = new WorkerInfo(id, workerHost, memory, cores)
  32.         workers.add(worker)
  33.         idToWorker(id) = worker
  34.         sender ! RegisteredWorker("192.168.10.1")
  35.       }
  36.     }
  37.     //Worker向Master发送的心跳消息
  38.     case HeartBeat(workerId) => {
  39.       val workerInfo = idToWorker(workerId)
  40.       workerInfo.lastHeartbeat = System.currentTimeMillis()
  41.     }
  42.     //Master自己向自己发送的定期检查超时Worker的消息
  43.     case CheckOfTimeOutWorker => {
  44.       val currentTime = System.currentTimeMillis()
  45.       val toRemove = workers.filter(w => currentTime - w.lastHeartbeat > WORKER_TIMEOUT).toArray
  46.       for(worker <- toRemove){
  47.         workers -= worker
  48.         idToWorker.remove(worker.id)
  49.       }
  50.       println("worker size: " + workers.size)
  51.     }
  52.   }
  53. }
  54. object Master {
  55.   //程序执行入口
  56.   def main(args: Array[String]) {
  57.     val host = "192.168.10.1"
  58.     val port = 8888
  59.     //创建ActorSystem的必要参数
  60.     val configStr =
  61.       s"""
  62.          |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
  63.          |akka.remote.netty.tcp.hostname = "$host"
  64.          |akka.remote.netty.tcp.port = "$port"
  65.        """.stripMargin
  66.     val config = ConfigFactory.parseString(configStr)
  67.     //ActorSystem是单例的,用来创建Actor
  68.     val actorSystem = ActorSystem.create("MasterActorSystem", config)
  69.     //启动Actor,Master会被实例化,生命周期方法会被调用
  70.     actorSystem.actorOf(Props[Master], "Master")
  71.   }
  72. }
 
    1. Worker
  1. package cn.akkaTest.spark
  2. import java.util.UUID
  3. import scala.concurrent.duration._
  4. import akka.actor.{ActorSelection, Props, ActorSystem, Actor}
  5. import akka.actor.Actor.Receive
  6. import com.typesafe.config.ConfigFactory
  7. /**
  8.   * Worker为整个集群的从节点
  9.   * Worker继承了Actor
  10.   */
  11. class Worker extends Actor{
  12.   //Worker端持有Master端的引用(代理对象)
  13.   var master: ActorSelection = null
  14.   //生成一个UUID,作为Worker的标识
  15.   val id = UUID.randomUUID().toString
  16.   //构造方法执行完执行一次
  17.   override def preStart(): Unit = {
  18.     //Worker向MasterActorSystem发送建立连接请求
  19.     master = context.system.actorSelection("akka.tcp://MasterActorSystem@192.168.10.1:8888/user/Master")
  20.     //Worker向Master发送注册消息
  21.     master ! RegisterWorker(id, "192.168.10.1", 10240, 8)
  22.   }
  23.   //该方法会被反复执行,用于接收消息,通过case class模式匹配接收消息
  24.   override def receive: Receive = {
  25.     //Master向Worker的反馈信息
  26.     case RegisteredWorker(masterUrl) => {
  27.       import context.dispatcher
  28.       //启动定时任务,向Master发送心跳
  29.       context.system.scheduler.schedule(0 millis, 5000 millis, self, SendHeartBeat)
  30.     }
  31.     case SendHeartBeat => {
  32.       println("worker send heartbeat")
  33.       master ! HeartBeat(id)
  34.     }
  35.   }
  36. }
  37. object Worker {
  38.   def main(args: Array[String]) {
  39.     val clientPort = 2552
  40.     //创建WorkerActorSystem的必要参数
  41.     val configStr =
  42.       s"""
  43.          |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
  44.          |akka.remote.netty.tcp.port = $clientPort
  45.        """.stripMargin
  46.     val config = ConfigFactory.parseString(configStr)
  47.     val actorSystem = ActorSystem("WorkerActorSystem", config)
  48.     //启动Actor,Master会被实例化,生命周期方法会被调用
  49.     actorSystem.actorOf(Props[Worker], "Worker")
  50.   }
  51. }
 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/864565
推荐阅读
相关标签
  

闽ICP备14008679号