Ha elolvasta a Scalachainról szóló korábbi történetemet, valószínűleg észrevette, hogy ez korántsem elosztott rendszer. Hiányzik az összes szolgáltatás, hogy megfelelően működjön más csomópontokkal. Add hozzá, hogy az egyetlen csomópontból álló blokklánc haszontalan. Ezért úgy döntöttem, hogy ideje dolgozni a kérdésen.
Mivel a Scalachain-ot az Akka hajtja, miért ne használná az esélyt az Akka Clusterrel való játékra? Létrehoztam egy egyszerű projektet, hogy egy kicsit bíbelődjek az Akka Clusterrel, és ebben a történetben megosztom a tanulságaimat. Három csomópontból álló klasztert fogunk létrehozni, a Cluster Aware Routers segítségével a terhelés kiegyenlítésére. Minden egy Docker-tárolóban fog futni, és a könnyű telepítéshez a docker-compose funkciót fogjuk használni.
Ok, tekerjünk! ?
Az Akka Cluster gyors bemutatása
Az Akka Cluster nagy támogatást nyújt az elosztott alkalmazások létrehozásához. A legjobb felhasználási eset az, ha van olyan csomópontja, amelyet N-szer szeretne megismételni egy elosztott környezetben. Ez azt jelenti, hogy az összes N csomópont társ, amely ugyanazt a kódot futtatja. Az Akka Cluster az egyben biztosítja a tagok felfedezését ugyanabban a klaszterben. A fürt tudatos útválasztók segítségével ki lehet egyensúlyozni az üzeneteket a különböző csomópontokban lévő szereplők között. Lehetőség van a kiegyensúlyozó politika megválasztására is, így a terhelés-kiegyenlítés egy sütemény!
Valójában kétféle útválasztó közül választhat:
Csoportos útválasztó - Azok a szereplők, akiknek az üzeneteket elküldik - az úgynevezett útválasztóknak - a színész útvonaluk alapján vannak megadva. Az útválasztók megosztják a fürtben létrehozott útvonalakat. Ebben a példában egy csoportos útválasztót fogunk használni.

Pool Router - Az útvonalakat az útválasztó hozza létre és telepíti, így ők a gyermekei a színészi hierarchiában. Az útvonalakat nem osztják meg az útválasztók. Ez ideális egy elsődleges replika szcenárióhoz, ahol minden útválasztó az elsődleges, és az útvonala a replikákat.

Ez csak a jéghegy csúcsa, ezért kérem, hogy olvassa el a hivatalos dokumentációt további betekintés céljából.
Klaszter matematikai számításokhoz
Képzeljünk el egy felhasználási esetet. Tegyük fel, hogy tervezünk egy rendszert matematikai számítások végrehajtására kérésre. A rendszer online telepítve van, ezért REST API-ra van szüksége a számítási kérelmek fogadásához. Egy belső processzor kezeli ezeket a kéréseket, végrehajtja a számítást és visszaadja az eredményt.
A processzor jelenleg csak a Fibonacci számot tudja kiszámítani. Úgy döntünk, hogy egy csomópont-fürtöt használunk a terhelés elosztására a csomópontok között és a teljesítmény javítására. Az Akka Cluster kezeli a fürtdinamikát és a csomópontok közötti terheléselosztást. Oké, jól hangzik!
A színész hierarchiája
Először az első dolgokat: meg kell határoznunk színészi hierarchiánkat. A rendszer három funkcionális részre osztható: az üzleti logikára , a klaszterkezelésre és magára a csomópontra . Van egy szerver is, de nem szereplő, és ezen később dolgozni fogunk.
Üzleti logika
Az alkalmazásnak matematikai számításokat kell végeznie. Meghatározhatunk egy egyszerű Processor
szereplőt az összes számítási feladat kezelésére. Minden számítás, amelyet támogatunk, megvalósítható egy adott szereplőnél, amely ennek a gyermeke lesz Processor
. Ily módon az alkalmazás moduláris, könnyebben bővíthető és karbantartható. Jelenleg az egyetlen gyermek Processor
lesz a ProcessorFibonacci
színész. Gondolom, kitalálhatja, mi a feladata. Ennek elégnek kell lennie az induláshoz.
Klaszterkezelés
A klaszter kezeléséhez szükségünk van a ClusterManager
. Egyszerűen hangzik, igaz? Ez a színész mindent kezel a klaszterrel kapcsolatban, például visszajuttatja tagjait, ha megkérdezik. Hasznos lenne naplózni, hogy mi történik a fürtön belül, ezért meghatározunk egy ClusterListener
szereplőt. Ez a gyermek gyermeke ClusterManager
, és feliratkozik az azokat naplózó fürteseményekre.
Csomópont
A Node
színész a hierarchiánk gyökere. Rendszerünk belépési pontja kommunikál az API-val. A Processor
és a ClusterManager
gyermekei, a ProcessorRouter
színésszel együtt. Ez a rendszer terheléselosztója, elosztva a terhelést Processor
s között . Cluster Aware Router-ként fogjuk konfigurálni, így mindenki ProcessorRouter
üzenetet küldhet Processor
s-nek minden csomóponton.

Színész megvalósítása
Ideje színészeink megvalósítására! Ökölben megvalósítjuk a rendszer üzleti logikájához kapcsolódó szereplőket. Ezután haladunk a klaszter menedzsment szereplőin és végül a root szereplőn ( Node
).
ProcesszorFibonacci
Ez a színész végzi a Fibonacci-szám kiszámítását. Compute
Üzenetet kap, amely tartalmazza a kiszámolandó számot és a válaszadó szereplő hivatkozását. A hivatkozás fontos, mivel különböző kérő szereplők lehetnek. Ne feledje, hogy elosztott környezetben dolgozunk!
Miután Compute
megkapta az üzenetet, a fibonacci
függvény kiszámítja az eredményt. Egy ProcessorResponse
objektumba csomagoljuk , hogy információt nyújtsunk a számítást végrehajtó csomópontról. Ez később hasznos lesz a körmérkőzéses politika működésének megtekintéséhez.
Az eredményt ezután elküldjük annak a színésznek, akinek válaszolnunk kell. Könnyű borsó.
object ProcessorFibonacci { sealed trait ProcessorFibonacciMessage case class Compute(n: Int, replyTo: ActorRef) extends ProcessorFibonacciMessage def props(nodeId: String) = Props(new ProcessorFibonacci(nodeId)) def fibonacci(x: Int): BigInt = { @tailrec def fibHelper(x: Int, prev: BigInt = 0, next: BigInt = 1): BigInt = x match { case 0 => prev case 1 => next case _ => fibHelper(x - 1, next, next + prev) } fibHelper(x) } } class ProcessorFibonacci(nodeId: String) extends Actor { import ProcessorFibonacci._ override def receive: Receive = { case Compute(value, replyTo) => { replyTo ! ProcessorResponse(nodeId, fibonacci(value)) } } }
Processzor
A Processor
színész irányítja a meghatározott részfeldolgozókat, például a Fibonaccit. Példának kell lennie az alfeldolgozók számára, és továbbítaniuk kell a kérelmeket nekik. Most már csak egy al-processzor, így a Processor
kapott egyfajta üzenet: ComputeFibonacci
. Ez az üzenet tartalmazza a kiszámításához szükséges Fibonacci számot. Miután megkapta, a kiszámolandó számot elküldi a FibonacciProcessor
, a hivatkozással együtt sender()
.
object Processor { sealed trait ProcessorMessage case class ComputeFibonacci(n: Int) extends ProcessorMessage def props(nodeId: String) = Props(new Processor(nodeId)) } class Processor(nodeId: String) extends Actor { import Processor._ val fibonacciProcessor: ActorRef = context.actorOf(ProcessorFibonacci.props(nodeId), "fibonacci") override def receive: Receive = { case ComputeFibonacci(value) => { val replyTo = sender() fibonacciProcessor ! Compute(value, replyTo) } } }
ClusterListener
Hasznos információkat szeretnénk naplózni arról, hogy mi történik a fürtben. Ez segíthet a rendszer hibakeresésében, ha szükséges. Ez a ClusterListener
színész célja . Indulás előtt feliratkozik a fürt eseményüzeneteire. A színész reagál az üzenetekre tetszik MemberUp
, UnreachableMember
vagy MemberRemoved
, a fakitermelés, a megfelelő esemény. Ha ClusterListener
leáll, leiratkozik a fürt eseményeiről.
object ClusterListener { def props(nodeId: String, cluster: Cluster) = Props(new ClusterListener(nodeId, cluster)) } class ClusterListener(nodeId: String, cluster: Cluster) extends Actor with ActorLogging { override def preStart(): Unit = { cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case MemberUp(member) => log.info("Node {} - Member is Up: {}", nodeId, member.address) case UnreachableMember(member) => log.info(s"Node {} - Member detected as unreachable: {}", nodeId, member) case MemberRemoved(member, previousStatus) => log.info(s"Node {} - Member is Removed: {} after {}", nodeId, member.address, previousStatus) case _: MemberEvent => // ignore } }
ClusterManager
A klaszter irányításáért felelős szereplő az ClusterManager
. Létrehozza a ClusterListener
szereplőt, és kérésre megadja a klaszter tagjainak listáját. Lehetne bővíteni további funkciók hozzáadásával, de most ez elég.
object ClusterManager { sealed trait ClusterMessage case object GetMembers extends ClusterMessage def props(nodeId: String) = Props(new ClusterManager(nodeId)) } class ClusterManager(nodeId: String) extends Actor with ActorLogging { val cluster: Cluster = Cluster(context.system) val listener: ActorRef = context.actorOf(ClusterListener.props(nodeId, cluster), "clusterListener") override def receive: Receive = { case GetMembers => { sender() ! cluster.state.members.filter(_.status == MemberStatus.up) .map(_.address.toString) .toList } } }
ProcessorRouter
A processzorok közötti terheléselosztást a ProcessorRouter
. A Node
színész készíti, de ezúttal az összes szükséges információt a rendszer konfigurációjában adják meg.
class Node(nodeId: String) extends Actor { //... val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") //... }
Elemezzük a application.conf
fájl megfelelő részét .
akka { actor { ... deployment { /node/processorRouter { router = round-robin-group routees.paths = ["/user/node/processor"] cluster { enabled = on allow-local-routees = on } } } } ... }
Az első dolog az útválasztó színész elérési útjának megadása /node/processorRouter
. Ezen a tulajdonságon belül konfigurálhatjuk az útválasztó viselkedését:
router
: ez az irányelv az üzenetek terheléselosztásához. Azért választottamround-robin-group
, de sok más van.routees.paths
: ezek az útvonalak azokhoz a szereplőkhöz, amelyek megkapják az útválasztó által kezelt üzeneteket. Azt mondjuk: „Amikor üzenetet kap, keresse meg az ezen utaknak megfelelő szereplőket. Válasszon egyet a házirendnek megfelelően, és továbbítsa neki az üzenetet. ” Mivel Cluster Aware Routereket használunk, az útvonalak a fürt bármelyik csomópontján lehetnek.cluster.enabled
: klaszterben működünk? A válaszon
természetesen az!cluster.allow-local-routees
: Itt engedélyezzük az útválasztónak, hogy a csomópontjában válasszon egy útválasztót.
Ezzel a konfigurációval létrehozhatunk egy útválasztót a processzorok közötti munka egyensúlyának betöltéséhez.
Csomópont
Színészi hierarchiánk gyökere a Node
. Ez létrehozza a gyermek szereplőket - ClusterManager
,, Processor
és ProcessorRouter
-, és továbbítja az üzeneteket a megfelelőnek. Itt semmi bonyolult.
object Node { sealed trait NodeMessage case class GetFibonacci(n: Int) case object GetClusterMembers def props(nodeId: String) = Props(new Node(nodeId)) } class Node(nodeId: String) extends Actor { val processor: ActorRef = context.actorOf(Processor.props(nodeId), "processor") val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") val clusterManager: ActorRef = context.actorOf(ClusterManager.props(nodeId), "clusterManager") override def receive: Receive = { case GetClusterMembers => clusterManager forward GetMembers case GetFibonacci(value) => processorRouter forward ComputeFibonacci(value) } }
Szerver és API
Every node of our cluster runs a server able to receive requests. The Server
creates our actor system and is configured through the application.conf
file.
object Server extends App with NodeRoutes { implicit val system: ActorSystem = ActorSystem("cluster-playground") implicit val materializer: ActorMaterializer = ActorMaterializer() val config: Config = ConfigFactory.load() val address = config.getString("http.ip") val port = config.getInt("http.port") val nodeId = config.getString("clustering.ip") val node: ActorRef = system.actorOf(Node.props(nodeId), "node") lazy val routes: Route = healthRoute ~ statusRoutes ~ processRoutes Http().bindAndHandle(routes, address, port) println(s"Node $nodeId is listening at //$address:$port") Await.result(system.whenTerminated, Duration.Inf) }
Akka HTTP powers the server itself and the REST API, exposing three simple endpoints. These endpoints are defined in the NodeRoutes
trait.
The first one is /health
, to check the health of a node. It responds with a 200 OK
if the node is up and running
lazy val healthRoute: Route = pathPrefix("health") { concat( pathEnd { concat( get { complete(StatusCodes.OK) } ) } ) }
The /status/members
endpoint responds with the current active members of the cluster.
lazy val statusRoutes: Route = pathPrefix("status") { concat( pathPrefix("members") { concat( pathEnd { concat( get { val membersFuture: Future[List[String]] = (node ? GetClusterMembers).mapTo[List[String]] onSuccess(membersFuture) { members => complete(StatusCodes.OK, members) } } ) } ) } ) }
The last (but not the least) is the /process/fibonacci/n
endpoint, used to request the Fibonacci number of n
.
lazy val processRoutes: Route = pathPrefix("process") { concat( pathPrefix("fibonacci") { concat( path(IntNumber) { n => pathEnd { concat( get { val processFuture: Future[ProcessorResponse] = (node ? GetFibonacci(n)).mapTo[ProcessorResponse] onSuccess(processFuture) { response => complete(StatusCodes.OK, response) } } ) } } ) } ) }
It responds with a ProcessorResponse
containing the result, along with the id of the node where the computation took place.
Cluster Configuration
Once we have all our actors, we need to configure the system to run as a cluster! The application.conf
file is where the magic takes place. I’m going to split it in pieces to present it better, but you can find the complete file here.
Let’s start defining some useful variables.
clustering { ip = "127.0.0.1" ip = ${?CLUSTER_IP} port = 2552 port = ${?CLUSTER_PORT} seed-ip = "127.0.0.1" seed-ip = ${?CLUSTER_SEED_IP} seed-port = 2552 seed-port = ${?CLUSTER_SEED_PORT} cluster.name = "cluster-playground" }
Here we are simply defining the ip and port of the nodes and the seed, as well as the cluster name. We set a default value, then we override it if a new one is specified. The configuration of the cluster is the following.
akka { actor { provider = "cluster" ... /* router configuration */ ... } remote { log-remote-lifecycle-events = on netty.tcp { hostname = ${clustering.ip} port = ${clustering.port} } } cluster { seed-nodes = [ "akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port} ] auto-down-unreachable-after = 10s } } ... /* server vars */ ... /* cluster vars */ }
Akka Cluster is build on top of Akka Remoting, so we need to configure it properly. First of all, we specify that we are going to use Akka Cluster saying that provider = "cluster"
. Then we bind cluster.ip
and cluster.port
to the hostname
and port
of the netty
web framework.
The cluster requires some seed nodes as its entry points. We set them in the seed-nodes
array, in the format akka.tcp://"{clustering.cluster.name}"@"{clustering.seed-ip}":”${clustering.seed-port}”
. Right now we have one seed node, but we may add more later.
The auto-down-unreachable-after
property sets a member as down after it is unreachable for a period of time. This should be used only during development, as explained in the official documentation.
Ok, the cluster is configured, we can move to the next step: Dockerization and deployment!
Dockerization and deployment
To create the Docker container of our node we can use sbt-native-packager. Its installation is easy: add addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15")
to the plugin.sbt
file in the project/
folder. This amazing tool has a plugin for the creation of Docker containers. it allows us to configure the properties of our Dockerfile in the build.sbt
file.
// other build.sbt properties enablePlugins(JavaAppPackaging) enablePlugins(DockerPlugin) enablePlugins(AshScriptPlugin) mainClass in Compile := Some("com.elleflorio.cluster.playground.Server") dockerBaseImage := "java:8-jre-alpine" version in Docker := "latest" dockerExposedPorts := Seq(8000) dockerRepository := Some("elleflorio")
Once we have setup the plugin, we can create the docker image running the command sbt docker:publishLocal
. Run the command and taste the magic… ?
We have the Docker image of our node, now we need to deploy it and check that everything works fine. The easiest way is to create a docker-compose
file that will spawn a seed and a couple of other nodes.
version: '3.5' networks: cluster-network: services: seed: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '2552:2552' - '8000:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: seed CLUSTER_SEED_IP: seed node1: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8001:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node1 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552 node2: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8002:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node2 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552
I won’t spend time going through it, since it is quite simple.
Let’s run it!
Time to test our work! Once we run the docker-compose up
command, we will have a cluster of three nodes up and running. The seed
will respond to requests at port :8000
, while node1
and node2
at port :8001
and :8002
. Play a bit with the various endpoints. You will see that the requests for a Fibonacci number will be computed by a different node each time, following a round-robin policy. That’s good, we are proud of our work and can get out for a beer to celebrate! ?
Conclusion
We are done here! We learned a lot of things in these ten minutes:
- What Akka Cluster is and what can do for us.
- How to create a distributed application with it.
- How to configure a Group Router for load-balancing in the cluster.
- How to Dockerize everything and deploy it using docker-compose.
You can find the complete application in my GitHub repo. Feel free to contribute or play with it as you like! ?
See you! ?