Akka入门系列(六):akka cluster中的路由和负载均衡

栏目: Scala · 发布时间: 6年前

内容简介:在使用路由功能之前,我们需要先了解下常规概念:在Akka中,提供了两种做路由的方式:直接使用

在使用路由功能之前,我们需要先了解下常规概念:

Router
Routee

在Akka中,提供了两种做路由的方式:

akka.routing.Router

直接使用Router类

直接使用 akka.routing.Router 类的原理其实与上一章的最简单的例子是一样的,只不过akka的Router类比我们实现的更复杂、更强大。创建Router类时需提供两个参数:

  • 路由规则

    akka为Router类提供了以下几种内置的路由算法类:

    • akka.routing.RoundRobinRoutingLogic
    • akka.routing.RandomRoutingLogic
    • akka.routing.SmallestMailboxRoutingLogic
    • akka.routing.BroadcastRoutingLogic
    • akka.routing.ScatterGatherFirstCompletedRoutingLogic
    • akka.routing.TailChoppingRoutingLogic
    • akka.routing.ConsistentHashingRoutingLogic
      具体算法介绍请参见文章最后的表格
  • 路由目标的序列

    该序列支持通过调用 router.addRouteerouter.removeRoutee 进行动态变化,但需要注意的是, akka.routing.Router 类时一个immutable的线程安全类,即不可改变,这里的改变其实是将原来的router内的的routee队列增加/去掉指定routee后copy一份生成一个新的Router

    def removeRoutee(routee: Routee): Router = copy(routees = routees.filterNot(_ == routee))

依赖

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-cluster-tools_2.12</artifactId>
</dependency>

配置文件application.conf

akka {
  actor {
    provider = "cluster"
  }
  remote {
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
    artery {
      enabled = off
      canonical.hostname = "127.0.0.1"
      canonical.port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551"
    ]
  }
}

实际做事的SlaveActor

public class SlaveActor extends AbstractActor {

    LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class, word-> log.info("Node {} receives: {}", getSelf().path().toSerializationFormat(), word))
                .build();
    }

    public static void main(String[] args) {
        Config config =
                ConfigFactory.parseString("akka.cluster.roles = [slave]")
                        .withFallback(ConfigFactory.load());

        ActorSystem system = ActorSystem.create("ClusterSystem", config);
        system.actorOf(Props.create(SlaveActor.class), "slaveActor");
    }
}

包含路由的MasterActor

public class MasterActor extends AbstractActor {

    LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    private Router router = new Router(new RoundRobinRoutingLogic(), new ArrayList<>());
    private Cluster cluster = Cluster.get(getContext().system());
    boolean isReady = false;
    private static final String SLAVE_PATH = "/user/slaveActor";

    @Override
    public void preStart() throws Exception {
        cluster.subscribe(self(), ClusterEvent.MemberEvent.class, ClusterEvent.ReachabilityEvent.class);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class, msg->{
                    log.info("Master got: {}", msg);
                    if(!isReady)
                        log.warning("Is not ready yet!");
                    else {
                        log.info("Routee size: {}", router.routees().length());
                        router.route(msg, getSender());
                    }
                })
                .match(ClusterEvent.MemberUp.class, mUp->{
                    if(mUp.member().hasRole("slave")) {
                        Address address = mUp.member().address();
                        String path = address + SLAVE_PATH;
                        ActorSelection selection = getContext().actorSelection(path);
                        router = router.addRoutee(selection);
                        isReady=true;
                        log.info("New routee is added!");
                    }
                })
                .match(ClusterEvent.MemberRemoved.class, mRemoved->{
                    router = router.removeRoutee(getContext().actorSelection(mRemoved.member().address()+SLAVE_PATH));
                    log.info("Routee is removed");
                })
                .match(ClusterEvent.UnreachableMember.class, mRemoved-> {
                    router = router.removeRoutee(getContext().actorSelection(mRemoved.member().address() + SLAVE_PATH));
                    log.info("Routee is removed");
                })
                .build();
    }

    public static void main(String[] args) {
        int port = 2551;

        // Override the configuration of the port
        Config config =
                ConfigFactory.parseString(
                        "akka.remote.netty.tcp.port=" + port + "\n" +
                                "akka.remote.artery.canonical.port=" + port)
                        .withFallback(
                                ConfigFactory.parseString("akka.cluster.roles = [master]"))
                        .withFallback(ConfigFactory.load());

        ActorSystem system = ActorSystem.create("ClusterSystem", config);
        ClusterHttpManagement.get(system);
        AkkaManagement.get(system).start();
        system.actorOf(Props.create(MasterActor.class), "masterActor");
    }
}

这里将MasterActor监听了集群的MemberUp事件,通过判断事件中包含的role判断是否是SlaveActor加入集群。如果是,则将该SlaveActor加到 Router 中。同时,如果SlaveActor退出或变成Unreachable状态,则从 Router 中删除。

向MasterActor请求的客户端

public class Client
{
    public static void main( String[] args ) throws InterruptedException {
        Config config = ConfigFactory.load();
        ActorSystem system = ActorSystem.create("ClusterSystem", config);
        ActorSelection toFind = system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:2551/user/masterActor");
        int counter = 0;
        while(true){
            toFind.tell("hello "+counter++, ActorRef.noSender());
            System.out.println("Finish telling");
            Thread.sleep(2000);
        }
    }
}

分别启动四个窗口: 一个masterActor节点,两个slaveActor节点,一个Client,可以看到两个slaveActor轮流打印Client传递进去的消息。这时,把其中一个slaveActor关闭,可以看到Client发送的所有消息将被剩下那个slaveActor打印出来。

使用Router Actor

除了我们自己在Actor里调用 akka.routing.Router 类外,Akka还提供了根据配置直接生成一个内置的RouterActor。路由逻辑在remoting和cluster两个模块中都有,如果要启用remoting中的路由,则需要引入remoting的依赖,在cluster环境下并不推荐直接去用remoting中的路由,而是用cluster模块中的cluster aware router。

RouterActor有两种类型:

  • Pool
    Router 自动创建 Routee 作为自己的子Actor,然后部署到远程节点上。 Routee 被终止时,会自动从 Router 的路由表中删除, 除非使用动态路由(指定resizer),否则 Router 不会重新创建新的 Routee ,当所有的 Routee 都停止时, Router 也自动停止。
  • Group
    Routee actor是在Router actor以外单独创建好了, RouterActoSelection 向指定的Actor Path发送消息 ,但默认并不监控 Routee

Router actor可以通过程序配置或文件配置。如果是通过文件配置时,必须要在代码中使用 FromConfigRemoteRouterConfig (将Routee部署到远程节点去)去显式的读取相关配置,否则即便在配置文件中定义了路由相关配置,akka也不会去使用。

Router actor在转发消息时不会更改消息的sender,而routee actor在回复消息时,消息直接返回到原始的发送者,不再经过router actor。

无论哪种类型,有一块是相同配置:

cluster {
  enabled = on
  allow-local-routees = off
  use-roles = [slave]
}

enabled 是否启用cluster aware router
allow-local-routees 能否在本地,即router所在的节点创建和查找routee
use-roles 使用指定的角色来缩小routee的查找范围,如果routee的配置与这里的不同,则router是找不到该routee的。

Pool

我们在上面例子的基础上,把自己new的Router换成akka内置的RouterActor。改动主要有以下几个:

MasterActor
  1. 配置文件中actor部分增加:
    actor {
        provider = "cluster"
        deployment {
          /masterActor/poolRouter {
            router = round-robin-pool
            nr-of-instance = 5
            cluster {
              enabled = on
              allow-local-routees = on
              use-roles = [master]
            }
          }
          default {
            cluster {
              max-nr-of-instances-per-node = 5
            }
          }
        }
      }

由于我们的 Router 是在masterActor下创建的RouterActor,取名为 poolRouter ,所以其路径显然是 akka.tcp://ClusterSystem@127.0.0.1:2551/user/masterActor/poolRouter ,masterActor启动时读取的是这个配置文件,所以deployment部分对应的就是masterActor及其子Actor,所以这里只需要填入相对路径就好了。注意,由于Routee是由masterActor创建出来的,所以 use-role 必须是与masterActor保持一致,否则会找不到 Routee !

- router 指定预设的路由器

- nr-of-instance routee的个数

注意,有两个参数非常关键:

  • actor.deployment.default.cluster.max-nr-of-instances-per-node 它是配置Router在每个节点上部署的最大Actor数,默认是1。虽然上面我们指定了routee数目为5,但是如果只起一个节点,你会发现永远是
    1个routee在打印结果。
  • max-total-nr-of-instances 定义router所能创建的routee的总数,默认是10000。通常来说足够用了。
  1. 修改 MasterActor 。注释掉的部分是直接使用代码而不用配置文件手动创建 Router 的,有兴趣的可以自己试下。

    public class MasterActor extends AbstractActor {
    
        LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    
        private ActorRef router;
    
        @Override
        public void preStart() throws Exception {
            router = getContext().actorOf(FromConfig.getInstance().props(Props.create(SlaveActor.class)), "poolRouter");
            /*int totalInstances = 1000;
            int maxInstancePerNode = 5, routeeNumbers=5;
            boolean allowLocalRoutees = true;
            String role = "master";
            ClusterRouterPoolSettings settings = new ClusterRouterPoolSettings(totalInstances, maxInstancePerNode, allowLocalRoutees, role);
            ClusterRouterPool routerPool = new ClusterRouterPool(new RoundRobinPool(routeeNumbers), settings);
            router = getContext().actorOf(routerPool.props(Props.create(SlaveActor.class)), "poolRouter");*/
        }
    
        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(String.class, msg->{
                        log.info("Master got: {}", msg);
                        router.tell(msg, getSender());
                    })
                    .build();
        }
    }
  2. 运行

    其他不变,这次只需要启动 ClientMasterActorSlaveActorMasterActor 中会自动创建出来。看到日志

    [INFO] [11/16/2018 14:19:58.361] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:19:58.361] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/masterActor/poolRouter/c1] Node akka://ClusterSystem/user/masterActor/poolRouter/c1#-1154482163 receives: hello
    [INFO] [11/16/2018 14:20:00.362] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:00.362] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/user/masterActor/poolRouter/c2] Node akka://ClusterSystem/user/masterActor/poolRouter/c2#-50692619 receives: hello
    [INFO] [11/16/2018 14:20:02.365] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:02.365] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/user/masterActor/poolRouter/c3] Node akka://ClusterSystem/user/masterActor/poolRouter/c3#1415650532 receives: hello
    [INFO] [11/16/2018 14:20:04.366] [ClusterSystem-akka.actor.default-dispatcher-3] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:04.366] [ClusterSystem-akka.actor.default-dispatcher-3] [akka://ClusterSystem/user/masterActor/poolRouter/c4] Node akka://ClusterSystem/user/masterActor/poolRouter/c4#1345851811 receives: hello
    [INFO] [11/16/2018 14:20:06.368] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:06.368] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/user/masterActor/poolRouter/c5] Node akka://ClusterSystem/user/masterActor/poolRouter/c5#-1384624865 receives: hello

从c1到c5轮流打印,round-robin负载均衡起作用了。

Group

这种方式下, Routee 是在 Router 外被创建的,一般要求尽量在 Router 启动前启动好 Routee ,因为 Router 在启动过程中会尝试去联络 Routee 。使用时与 Pool 型的很像,区别是

  • 需要指定 routees.path (remote方式下支持完整协议路径,比如 akka.tcp://ClusterSystem:2551/user/testActor但是Cluster模式下不支持,只支持相对路径 )
  • 不需要指定也没有 nr-of-instance 参数

GroupActor是根据 routees.path 所配置的相对路径,去当前cluster的每一个节点上用 ActorSelection 去查找指定role的Routee(所以use-roles中的配置一定要和slave启动时的role一致),然后直接tell消息过去。由于整个过程是异步的,就意味着GroupActor的消息发送其实根本不关心节点上对应的Routee是否包含Routee或者是否正常启动,只是简单的根据配置去转发而已。

不去检测是否包含Routee,是因为Akka是Peer-to-Peer的设计,天生就要求所有节点对等,在这个约定下,它会认为cluster中所有节点的代码相同,一定会包含Routee。

不去检测是否正常启动,这个则是由于整个通讯都是异步的。

但我个人认为这里还是使用熔断机制来加强的,使用起来会更加方便。

  1. 修改配置文件
    actor {
        provider = "cluster"
        deployment {
          /masterActor/groupRouter {
            router = round-robin-group
            cluster {
              enabled = on
              allow-local-routees = on
              use-roles = [slave]
            }
          }
        }
    }

use-roles中role加不加引号都可以。

  1. 修改 MasterActor 中Router的名字,与配置文件中保持一致。注释掉的部分是直接使用代码而不用配置文件手动创建Router的,有兴趣的可以自己试下。

    @Override
    public void preStart() throws Exception {
        router = getContext().actorOf(FromConfig.getInstance().props(Props.create(SlaveActor.class)), "groupRouter");
        /*List<String> routeesPaths = Arrays.asList("akka/user/slaveActor");
        router = getContext().actorOf(new RoundRobinGroup(routeesPaths).props(), "groupRouter");*/
    }
  2. 运行

    分别在几个不同窗口启动 MasterActor 、多个 SlaveActor 后,检查集群是否稳定后,即所有节点均是UP,如果启用了 akka-management-cluster-http ,向监控地址发送查询请求,如

    127.0.0.1:8558/cluster/members

    {
        "selfNode": "akka.tcp://ClusterSystem@127.0.0.1:2551",
        "oldestPerRole": {
            "master": "akka.tcp://ClusterSystem@127.0.0.1:2551",
            "dc-default": "akka.tcp://ClusterSystem@127.0.0.1:2551",
            "slave": "akka.tcp://ClusterSystem@127.0.0.1:4914"
        },
        "leader": "akka.tcp://ClusterSystem@127.0.0.1:2551",
        "oldest": "akka.tcp://ClusterSystem@127.0.0.1:2551",
        "unreachable": [],
        "members": [
            {
                "node": "akka.tcp://ClusterSystem@127.0.0.1:2551",
                "nodeUid": "-1141014070",
                "status": "Up",
                "roles": [
                    "master",
                    "dc-default"
                ]
            },
            {
                "node": "akka.tcp://ClusterSystem@127.0.0.1:4914",
                "nodeUid": "344021242",
                "status": "Up",
                "roles": [
                    "slave",
                    "dc-default"
                ]
            },
            {
                "node": "akka.tcp://ClusterSystem@127.0.0.1:4936",
                "nodeUid": "678163307",
                "status": "Up",
                "roles": [
                    "slave",
                    "dc-default"
                ]
            },
            {
                "node": "akka.tcp://ClusterSystem@127.0.0.1:4957",
                "nodeUid": "-573369962",
                "status": "Up",
                "roles": [
                    "slave",
                    "dc-default"
                ]
            }
        ]
    }

然后,启动Client向masterActor发送消息,可以看到均匀的打印出接受的日志,round-robin负载均衡起作用了。再多起几个SlaveActor,会将消息转发到新的actor中去,这就是 GroupPool 方式好的地方,可以动态变化。

此时,你可以尝试修改下配置,将slaveActor变成和masterActor一样的role,再运行后,你会发现有消息丢失,以及转发失败的日志出来。

这是因为在上面所有的例子中,为了方便理解,都是使用一个master+若干slave的方式来演示。

然而Akka的设计是Peer-to-Peer的,即所有节点对等,那么,RouterActor就会理所应当地认为在相同role的节点上都存在Routee,由于并没有去检查Routee是否能工作,直接进行了消息转发,而按照上面的写法masterAcotr所在的节点上压根就没起过slaveActor,所以就造成了消息丢失。

将配置中 allow-local-routees 改为 off ,这时它就不会把masterActor所在节点加到负载列表中去了。但同样的,你可以去起一个空的ActorSystem,看看有什么后果。

附录:

Akka提供的路由算法:

算法 说明 配置 算法类
RoundRobin 轮询的给路由列表中每个Routee发送消息 round-robin-pool 或 round-robin-group akka.routing.RoundRobin
Random 从路由列表中随机抽取一个Routee发送消息 random-pool 或 random-group akka.routing.Random
SmallestMailbox 优先选取路由表中mailbox内消息数最少的Routee发送消息 smallest-mailbox-pool akka.routing.SmallestMailbox
Broadcast 以广播的形式将消息同时转发给所有的Routee broadcast-pool 或 broadcast-group akka.routing.Broadcast
ScatterGatherFirstCompleted 将消息发送给所有的Routee,并等待第一个返回的结果,将该结果返回给发送者,其他结果被忽略掉 scatter-gather-pool 或 scatter-gather-group akka.routing.ScatterGatherFirstCompleted
TailChopping 先随机选一个Routee发送消息,等待一个短时间的延迟后,再随机选一个Routee发送消息,等待第一个返回的结果并将该结果发送回发送者,其他结果被忽略掉 tail-chopping-pool 或 tail-chopping-group akka.routing.TailChopping
ConsistentHashing 使用 一致性Hash算法 选取Routee转发消息 consistent-hashing-pool 或 consistent-hashing-group akka.routing.ConsistentHashing
Balancing 所有的Routee共享同一个mailbox,它会将繁忙的Routee中的任务重新分配给空闲的Routee,不支持group和广播 balancing-pool akka.routing.Balancing

本章代码地址: https://github.com/EdisonXu/akka-start-demo/tree/master/cluster


以上所述就是小编给大家介绍的《Akka入门系列(六):akka cluster中的路由和负载均衡》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Programming Collective Intelligence

Programming Collective Intelligence

Toby Segaran / O'Reilly Media / 2007-8-26 / USD 39.99

Want to tap the power behind search rankings, product recommendations, social bookmarking, and online matchmaking? This fascinating book demonstrates how you can build Web 2.0 applications to mine the......一起来看看 《Programming Collective Intelligence》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具