概述
简介
- 解决高并发的问题,其实主要有两种解决思路,其中一个就是使用锁的概念,这是非常普遍的,但是当面对的业务量不断增多,各种锁就会非常多。导致很难维护,同时最重要的,使用锁其实是一个降低了系统效率的方法,也就是由于上下文切换等等带来的浪费。
- 而另一种方法就是使用ACTOR模型,那么什么是ACTOR模型呢,
请在另一篇blog中查看
当然其中不论是使用锁、CAS,还是使用actor模型,都是各有利弊的。在不同的业务中有不同的优点。
但是在大数据的spark框架中,actor模型很普遍也很适合,有利于有向图逻辑的计算。所以这次我分享一下scala写的一个主从模型框架。
框架模型
首先明确需求:
- Master能够监听slaver的加入
- 同时Master能够监听slaver的故障
- Master拥有所有节点的信息
对一来说:
在Actor模型中,定义一个注册Message,Slaver想要加入集群的时候,发送注册Msg给Master,然后如果符合要求,就加入到一个Maser的全局变量,这个全局变量也就解决了上述说的第三个需求。
对二来说:
既然要确认slaver的存活情况,就需要引入心跳机制,也就是说,每隔一段时间,每一个Slaver就需要向Master发送一个心跳信号,也就是一个Msg。如果在一段时间内Master收到了这个心跳信号,就知道了,这个Slaver是正常运行的,是存活的。那么既然有存活的,就肯定有不能发送心跳信号的“死”的Slaver。
这个就需要Master定时检测有没有收到每一个已经注册过的Slaver的心跳Msg,如果在TIMELIMIT这段时间内没有收到,那么就删除这个节点的注册信息。
这个时候这个时候就有这么一个问题:
如何运行这个定时任务呢,实现的方法其实主要是两种,第一个就是直接编写一个循环的定时,当然可以实现,但是一般比较主流的方法就是给自己一个心跳。
自己 的心跳机制作为定时的任务的触发器。定时给自己发送Msg。
目录结构
运行结果
首先我们启动Master节点,
由图可见,现在有0个Slaver
然后我们启动一个Slaver
一段时间后,发现Master
已经注册成功,然后我们当然可以继续启动Slaver,然后count会继续增加为2 ,
然后我们关掉Slaver,
然后Master会显示count减少。
这样就实现了我们的目的,在实际应用中我们可以实现集群的管理,有向图逻辑任务的管理等等。
代码
slaverInfo
package day0208
/**
* @Author: Braylon
* @Date: 2020/2/8 12:14
* @Version: 1.0
*/
class SlaverInfo(var ID:String, var Host:String, var cores:String, var backup:String) {
var lastHeartBeat:Long = System.currentTimeMillis()
override def toString() = s"SlaverInfo:$ID, $Host, $cores, $backup"
}
Messages
package day0208
/**
* @Author: Braylon
* @Date: 2020/2/8 12:09
* @Version: 1.0
*/
//从节点信息
case class RegisterMsg(var ID:String ,var host:String, var cores:String, var backup:String)
//从节点触发心跳信息
case class SelfHeartBeatMsg()
//从节点---》master 心跳信息
case class ToMasterHeartBeat(var ID:String)
//主节点信息
case class Master2SlaverACK()
//主节点自检测节点存活信息
case class SelfCheckNodesState()
class Messages {
}
Master
package day0208
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.collection.mutable
/**
* @Author: Braylon
* @Date: 2020/2/8 12:09
* @Version: 1.0
*/
class Master extends Actor{
val slaversMap:mutable.HashMap[String, SlaverInfo] = new mutable.HashMap[String, SlaverInfo]()
val slaversSet:mutable.HashSet[SlaverInfo] = new mutable.HashSet[SlaverInfo]()
val TIMELIMIT = 10 * 1000
override def receive() = {
case RegisterMsg(id, host, cores, backup) => {
if (!slaversMap.contains(id)){
val new_slaverInfo = new SlaverInfo(id, host, cores, backup)
slaversSet.add(new_slaverInfo)
slaversMap(id) = new_slaverInfo
sender ! Master2SlaverACK
}
}
case ToMasterHeartBeat(id) => {
val slaverInfo_ = slaversMap(id)
println("get heartBeat from slaver:" + slaverInfo_)
slaverInfo_.lastHeartBeat = System.currentTimeMillis()
}
case SelfCheckNodesState => {
val curTimestamp = System.currentTimeMillis()
val outRecord = slaversSet.filter(x => curTimestamp - x.lastHeartBeat > TIMELIMIT)
for (item <- outRecord){
slaversSet -= item
slaversMap.remove(item.ID)
}
println("slavers count :" + slaversSet.size)
}
}
override def preStart(): Unit = {
context.system.scheduler.schedule(5 millis, TIMELIMIT milli, self, SelfCheckNodesState)
}
}
object Master{
def main(args: Array[String]): Unit = {
val host:String = "localhost"
val port = 9999
val confStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config = ConfigFactory.parseString(confStr)
val actorSys = ActorSystem.create("MasterNode", config)
actorSys.actorOf(Props[Master],"Master")
}
}
Slaver
package day0208
import java.util.UUID
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor._
import com.typesafe.config.ConfigFactory
/**
* @Author: Braylon
* @Date: 2020/2/8 12:09
* @Version: 1.0
*/
class Slaver extends Actor{
val ID : String = UUID.randomUUID().toString
var master : ActorSelection = null
override def preStart(): Unit = {
//创建连接
master = context.system.actorSelection("akka.tcp://MasterNode@localhost:9999/user/Master")
//发送注册
master ! RegisterMsg(ID,"localhost","16","无备注")
}
override def receive() = {
case Master2SlaverACK => {
context.system.scheduler.schedule(0 millis, 5000 millis, self, SelfHeartBeatMsg)
}
case SelfHeartBeatMsg => {
println("slaver self heartBeat")
master ! ToMasterHeartBeat(ID)
}
}
}
object Slaver{
def main(args: Array[String]): Unit = {
var slaverPort = 9990
var confStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.port = $slaverPort
""".stripMargin
var config = ConfigFactory.parseString(confStr)
var actorSys = ActorSystem.create("SlaverNode",config)
actorSys.actorOf(Props[Slaver],"Slaver")
}
}
愿身体健康
大家共勉~~
最后
以上就是小巧绿草为你收集整理的大数据学习(十七)scala实现actor模型管理系统的全部内容,希望文章能够帮你解决大数据学习(十七)scala实现actor模型管理系统所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复