首发于前路迢迢

现代化的 Java(二十二)—— Akka Event Bus

在某些异步框架里,Eventbus 是个必不可少的东西,严重依赖这个东西来传递消息。但是在 Akka 里,Eventbus 只是一个消息订阅/发布的可选组件,Akka Actor 的消息传递非常灵活,我们可以仅仅在下面这种情况下考虑 Eventbus:

  • 发布/订阅分离
  • 发布/订阅逻辑可能依赖系统运行时状态和逻辑

所以整个示例项目,我仅仅在一个地方用到了 event bus,这就是行情模块。

现在的行情模块看起来可能有点儿过度设计,我们的系统里现在只有一个交易对,收到消息也只是写日志,似乎没有必要引入这样的一个组件。现在这样的架构是因为:

  • - 这部分代码来自我夏天写的一个服务,原本我想用它订阅老东家的行情,做些量化投资的尝试,不过很快我的注意力转向了机械化计算,就把这部分东西搁置了。
  • - 将来如果继续深化这个项目,我们肯定要向多交易对,多种行情信息订阅发布的场景扩展。
  • - 这个架构的完全体应该是一个 webservice 服务,消息聚合、推送、连接管理各司其职。
  • - 反正也是教学项目,没人拦着的话我还准备继续浪下去,akka-stream GraphDL ,core.async ,java.util.concurrent , Spark,Spring ,JPA JSONB 等等等等,可以玩的很多

所以,我们现在有了一个独立的 Akka 服务,它接收撮合节点发送过来的行情,处理后分发到 Eventbus 的各个 topic ,订阅者从外部连过来之后,挂到topic上去接收消息。

顺便说一下,实用中,这两个节点间的通信非常高频,每次发送的消息体也比较大,在实用场景中要关注这部分通信质量。

所以这个东西的核心,就是一个 Akka Event bus 。Akka 提供了几种不同的 event bus ,我这里用了最偷懒的一个 ManagedActorEventBus ,这个组件的优点在于,订阅它的 actor 停止运行时,它自动回收这个订阅,不需要显式管理。我们先看看这部分实现,代码不多:

package liu.mars.market;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.event.japi.ManagedActorEventBus;

import java.util.HashMap;
import java.util.Set;
import java.util.stream.Stream;

public class QuotationsBus extends ManagedActorEventBus<QuotationsEvent> {
    static private QuotationsBus instance;

    static void init(ActorSystem system) {
        instance = new QuotationsBus(system);
        Stream.of("market.btcusdt.depth0",
                "market.btcusdt.depth1",
                "market.btcusdt.dpeth2",
                "market.btcusdt.dpeth3",
                "market.btcusdt.dpeth4",
                "market.btcusdt.dpeth5").forEach(topic -> {
            topics.put(topic, system.actorOf(TopicActor.props(topic)));
        });
    }

    static private HashMap<String, ActorRef> topics = new HashMap<>();

    private QuotationsBus(ActorSystem system) {
        super(system);
    }

    static public ActorRef getTopic(String topic) {
        return topics.get(topic);
    }

    static public Set<String> topicNames() {
        return topics.keySet();
    }

    @Override
    public int mapSize() {
        return 128;
    }

    @Override
    public ActorRef classify(QuotationsEvent event) {
        return event.topicActor;
    }

    public boolean subscribe(ActorRef subscriber, String topic) {
        return this.subscribe(subscriber, QuotationsBus.getTopic(topic));
    }

    static public QuotationsBus getInstance() {
        return instance;
    }
}


前面我们说过,我们目前的项目里只有一个交易对,所以这个初始化代码就随手写上了,不能当人,真要哪位朋友拿去用,请记得一定要改一改这部分。 Map size 也是随手一写,只是因为官方例子是128所以就128了。实际交易所维护几千个 topic 的行情推送都是很正常的(当然不一定在一个单个的节点上)。这个模板参数表示事件类型,因为 Akka 要管理订阅生存期,所以需要我们为 Topic 提供 Topic Actor。QuotationsEvent 的代码其实很简单:

package liu.mars.market;

import akka.actor.ActorRef;

public class QuotationsEvent {
    public final ActorRef topicActor;
    public final Object data;

    public QuotationsEvent(ActorRef topic, Object data) {
        this.topicActor = topic;
        this.data = data;
    }
}

RouteActor 负责接收 Dash Status 后,生成不同级别的深度数据,推送到 event bus:

package liu.mars.market;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import clojure.lang.IFn;
import jaskell.util.CR;
import liu.mars.market.dash.Depth;
import liu.mars.market.dash.Level;
import liu.mars.market.dash.Make;
import liu.mars.market.status.DashStatus;

import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.IntStream;

public class RouteActor extends AbstractActor {
    LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
    private static final String depth_namespace = "liu.mars.market.depth";
    static {
        CR.require(depth_namespace);
    }

    private IFn merge_depth = CR.var(depth_namespace, "merge-depth").fn();

    static Props props(String symbol) {
        return Props.create(RouteActor.class, RouteActor::new);
    }

    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create().match(DashStatus.class, status -> {
            this.status = status;
            genDepth(this.status);
        }).build();
    }

    private List<Level> mergeLevel(int step, List<? extends Make> orders){
        return (List<Level>) merge_depth.invoke(step, orders);
    }

    private DashStatus status;

    private void genDepth(DashStatus status){
        IntStream.of(0, 1, 2, 3, 4, 5).forEach( step -> {
            String channel = String.format("%s.depth.step%d", status.getSymbol(), step);
            Depth result = new Depth();
            result.setChannel(channel);
            result.setTs(LocalDateTime.now());
            result.setAsk(mergeLevel(step, status.getAskList()));
            result.setBid(mergeLevel(step, status.getBidList()));
            result.setVersion(status.getLatestOrderId());
            QuotationsBus.getInstance()
                    .publish(new QuotationsEvent(QuotationsBus.getTopic(channel), result));
        });
    }
}

其它功能因为这里面还没有实现完整,先不讨论了,订阅响应部分要结合后面的web socket一起介绍。有兴趣的朋友可以先访问官方文档 [Akka Documentation ] 深入了解。

下一章,我们介绍一下这部分代码中出现的深度合并逻辑。

发布于 2019-01-18 22:14