Akka实现Spark心跳机制


Akka实现Spark心跳机制

上文我们已经具体讲解过Spark的心跳机制

在本文我们利用Akka框架来实现一个简易的Spark心跳机制

在文末,会附上具体源码

关于Akka的大致概述及用法可参考这位大佬的链接:https://juejin.cn/post/6977713688560009246

本文源码主要为 ifIAmADJ 这位大佬的源码,但是可能是Akka版本关系,运行端口会报错,所以在此基础上做了些许改动

具体实现


目录结构

└─heartbeat
    ├─common
    │      HeartBeat.scala
    │      RegisteredWorkerInfo.scala
    │      RegisterWorkerInfo.scala
    │      RemoveTimeOutWorker.scala
    │      SendHeartBeat.scala
    │      StartTimeOutWorker.scala
    │      WorkerInfo.scala
    │
    ├─master
    │      SparkMaster.scala
    │
    └─worker
            SparkWorker.scala

其中

common 中放置一些心跳传递的相关类

master 存放伪Master节点

worker 存放伪Worker节点

依赖

build.sbt 文件

ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.13.8"

resolvers += "Typesafe Repository" at "https://maven.aliyun.com/repository/public/"

lazy val root = (project in file("."))
  .settings(
    name := "Akka"
  )

val AkkaVersion = "2.6.19"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % AkkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % AkkaVersion

common 包

/**
 * worker 收到 SendHeartBeat 时,向 master 发送对应的心跳信息,并标注自己的 id
 *
 * @param id
 */
case class HeartBeat(id: String) extends Serializable
/**
 * 当注册成功时,返回这个单例对象。
 *
 * case object 和 case class 的区别是:
 * case object 不具备 apply, unapply 方法。
 *
 */
case object RegisteredWorkerInfo extends Serializable
/**
 * worker 注册时发送给服务器的信息。
 *
 * @param id
 * @param cpu
 * @param ram
 */
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int) extends Serializable
/**
 * AKka 上下文通过计时器提醒 master 删除过期的 workers 时,发送此单例对象。
 */
case object RemoveTimeOutWorker extends Serializable
/**
 * Akka 上下文通过计时器提醒 worker 发送心跳信息时,发送此单例对象。
 */
case object SendHeartBeat extends Serializable
/**
 * master 在启动时会向自己发送此单例对象,来触发定期检查机制。
 */
case object StartTimeOutWorker extends Serializable
/**
 * 这个结构体用于 master 将每个注册的 worker 信息保存到 hashMap 当中。
 *
 * 为什么 master 不直接存放 RegisterWorkerInfo 呢?
 * 在之后,WorkerInfo 会进行拓展。(比如增加 worker 上一次的心跳时间)
 *
 * @param id
 * @param cpu
 * @param ram
 */
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) extends Serializable {

  // 拓展:需要记录每个 worker 上次发送心跳消息的信息。
  var lastHeartBeat: Long = System.currentTimeMillis()

}

master 包

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.spark.heartbeat.common._
import com.typesafe.config.ConfigFactory

import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps

class SparkMaster extends Actor {


  //定义一个管理 worker 信息的 hashMap , 这个 hashMap 必须是可变的。
  val workers: mutable.Map[String, WorkerInfo] = mutable.Map[String, WorkerInfo]()

  override def receive: Receive = {
    case "start" =>
      println(" Master 服务器启动成功!")

      // 自启动 workers 的定时检查机制。
      self ! StartTimeOutWorker

    case RegisterWorkerInfo(id, cpu, ram) =>
      //接受到客户端的注册信息
      if (!workers.contains(id)) {
        //提取该 worker 的基本信息。
        val workerInfo = new WorkerInfo(id, cpu, ram)

        workers += (id -> workerInfo)

        //一切操作完成时,直接返回该单例对象(伴生类)
        sender() ! RegisteredWorkerInfo
      }

    case HeartBeat(id) =>
      // 更新指定 id worker 的信息。
      // 1.取出消息
      val info: WorkerInfo = workers(id)

      // 2.更新时间
      info.lastHeartBeat = System.currentTimeMillis()

      workers += id -> info

      println(s"Worker id : $id 的信息更新了!")

    case StartTimeOutWorker =>
      println("准备定期检测 worker 心跳:")
      import context.dispatcher

      context.system.scheduler.scheduleWithFixedDelay(0 millis, 9000 millis, self, RemoveTimeOutWorker)

    case RemoveTimeOutWorker =>

      val now: Long = System.currentTimeMillis()

      // 检查哪些 worker 心跳超时了,从 hashMap 当中删除。
      // 利用 Scala 的函数式编程来解决问题。
      // 1. 筛选出超时的 workers
      // 2. 将这些 workers 从 hashMap当中移除。
      //
      // 判断超时的逻辑:
      // (now - specified_id_worker's lastHeartBeat) > threshold 。
      // 由于网络传输存在一些少数的延迟(一般集群都在局域网内),这个 threshold 我们选取 6 秒。

      workers.values.filter(worker => now - worker.lastHeartBeat > 6000)
        .foreach(worker => workers.remove(worker.id))

      println("当前有" + workers.size + "个 workers 存活。")

  }
}

object SparkMaster {
  def main(args: Array[String]): Unit = {

    //1.绑定本机地址和启动的端口号
    val host = "127.0.0.1" //绑定为本机地址
    val port = 9999 //绑定本机启动的端口号

    //2.绑定配置文件
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.artery.enable="on"
         |akka.remote.artery.canonical.hostname=$host
         |akka.remote.artery.canonical.port=$port
         |akka.actor.allow-java-serialization=on
         |""".stripMargin
    )

    val sparkMasterSystem = ActorSystem("master", config = config)

    val sparkMasterRef: ActorRef = sparkMasterSystem.actorOf(Props[SparkMaster], "SparkMaster-01")

    sparkMasterRef ! "start"

  }
}

worker 包

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.spark.heartbeat.common.{HeartBeat, RegisterWorkerInfo, RegisteredWorkerInfo, SendHeartBeat}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._
import scala.language.postfixOps

class SparkWorker(masterHost: String, masterPort: Int) extends Actor {

  // 实际上就是 ActorRef 的另一种叫法。
  var masterProxy: ActorSelection = _

  // 每个 Worker 生成一个随机 id。
  val id: String = java.util.UUID.randomUUID().toString

  // 初始化 Master 的 Proxy。
  override def preStart(): Unit = {
    masterProxy = context.actorSelection(
      s"akka://master@$masterHost:$masterPort/user/SparkMaster-01")
  }

  override def receive: Receive = {
    case "start" =>
      println(" Worker 服务器启动成功!")
      masterProxy ! RegisterWorkerInfo(id, 8, 16 * 1024)
    case RegisteredWorkerInfo =>

      println(s" Worker : $id 注册成功了!")

      //注册成功之后,定义一个计时器,每隔一段时间发送消息。
      import context.dispatcher

      /*
      1. initialDelay:初始延迟。设定当 worker 收到注册成功的消息之后,立刻发送一次心跳检测。
      2. internalDelay:时间间隔。设定每 3000 毫秒发送一次心跳消息。
      3. actorRef:本机的 actor 系统将向哪个 actor 发送消息。
      4. message:发送消息的内容。
      其逻辑是:本机的 Akka 系统通过计时器"提醒"此 worker 发送心跳消息,
      然后 worker 收到此"提醒"之后,再向 master 发送真正的心跳消息。
       */
      context.system.scheduler.scheduleWithFixedDelay(0 millis, 3000 millis, self, SendHeartBeat)

    case SendHeartBeat =>

      println(s" Worker : $id 发送了心跳信息。")
      masterProxy ! HeartBeat(id)

  }


}

object SparkWorker {

  def main(args: Array[String]): Unit = {

    //1.绑定本机地址和启动的端口号
    val workerHost = "127.0.0.1" //绑定为本机地址
    val workerPort = 10001 //绑定本机启动的端口号

    //1.1 配置远端地址和启动的端口号
    val masterHost = "127.0.0.1"
    val masterPort = 9999


    //2.绑定配置文件
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.artery.enable="on"
         |akka.remote.artery.canonical.hostname=$workerHost
         |akka.remote.artery.canonical.port=$workerPort
         |akka.actor.allow-java-serialization=on
         |""".stripMargin
    )


    val workerSystem = ActorSystem("worker", config = config)

    val sparkWorkerActorRef: ActorRef = workerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort)), "SparkWorker")

    sparkWorkerActorRef ! "start"

  }
}

Akka 和 Spark 的纠葛

从上文可以看出,Akka用来实现Spark的心跳机制十分方便

Spark 在 Spark 2.x 以前是支持Akko作为内部通信的,但是从Spark 2.x 开始,就不再支持 Akka 了

那是因为

在 Spark 1.3 年代,为了解决大块数据(如Shuffle)的传输问题,Spark引入了Netty通信框架。但是从 Spark 1.6, Spark 可以配置使用 Akka 或者 Netty 了,这意味着 Netty 可以完全替代 Akka 了。再到 Spark 2, Spark 已经完全抛弃 Akka 了,全部使用 Netty 了。

为什么 Spark 无情地有步骤有预谋地抛弃 Akka 呢?Spark 官方倒是给了一个说法:https://issues.apache.org/jira/browse/SPARK-5293。

A lot of Spark user applications are using (or want to use) Akka. Akka as a whole can contribute great architectural simplicity and uniformity. However, because Spark depends on Akka, it is not possible for users to rely on different versions, and we have received many requests in the past asking for help about this specific issue. For example, Spark Streaming might be used as the receiver of Akka messages - but our dependency on Akka requires the upstream Akka actors to also use the identical version of Akka.

Since our usage of Akka is limited (mainly for RPC and single-threaded event loop), we can replace it with alternative RPC implementations and a common event loop in Spark.

谷歌机翻

许多SPARK用户应用程序都使用(或想要使用)AKKA。Akka从整体上可以贡献出极大的建筑简单性和统一性。但是,由于Spark取决于Akka,因此用户不可能依靠不同的版本,并且过去我们收到了许多请求,请求有关此特定问题的帮助。例如,Spark流可以用作AKKA消息的接收器 - 但是我们对Akka的依赖性要求上游Akka Actors也可以使用相同的Akka版本。由于我们对Akka的使用受到限制(主要用于RPC和单线读取事件循环),因此我们可以将其替换为Spark中的替代RPC实现和一个常见的事件循环。

大意就是很多 Spark 用户在使用 Spark 之后,就必须使用 Spark 依赖的那个版本的 Akka。Spark 主要用了 Akka 的 RPC 和 单线程 event-loop,因此 Spark 没有必要依赖完全的 Akka。最终 Spark 用 netty 实现下简易版本的 Akka。

源码:https://kanaikee.jetbrains.space/p/spark-akka-heartbeat/repositories/spark-akka-heartbeat/files


文章作者: kanaikee
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 kanaikee !
  目录