现代化的 Java(二十二)—— Akka Event Bus
在某些异步框架里,Eventbus 是个必不可少的东西,严重依赖这个东西来传递消息。但是在 Akka 里,Eventbus 只是一个消息订阅/发布的可选组件,Akka Actor 的消息传递非常灵活,我们可以仅仅在下面这种情况下考虑 Eventbus:
- 发布/订阅分离
- 发布/订阅逻辑可能依赖系统运行时状态和逻辑
所以整个示例项目,我仅仅在一个地方用到了 event bus,这就是行情模块。
现在的行情模块看起来可能有点儿过度设计,我们的系统里现在只有一个交易对,收到消息也只是写日志,似乎没有必要引入这样的一个组件。现在这样的架构是因为:
- - 这部分代码来自我夏天写的一个服务,原本我想用它订阅老东家的行情,做些量化投资的尝试,不过很快我的注意力转向了机械化计算,就把这部分东西搁置了。
- - 将来如果继续深化这个项目,我们肯定要向多交易对,多种行情信息订阅发布的场景扩展。
- - 这个架构的完全体应该是一个 webservice 服务,消息聚合、推送、连接管理各司其职。
- - 反正也是教学项目,没人拦着的话我还准备继续浪下去,akka-stream GraphDL ,core.async ,java.util.concurrent , Spark,Spring ,JPA JSONB 等等等等,可以玩的很多
所以,我们现在有了一个独立的 Akka 服务,它接收撮合节点发送过来的行情,处理后分发到 Eventbus 的各个 topic ,订阅者从外部连过来之后,挂到topic上去接收消息。
顺便说一下,实用中,这两个节点间的通信非常高频,每次发送的消息体也比较大,在实用场景中要关注这部分通信质量。
所以这个东西的核心,就是一个 Akka Event bus 。Akka 提供了几种不同的 event bus ,我这里用了最偷懒的一个 ManagedActorEventBus ,这个组件的优点在于,订阅它的 actor 停止运行时,它自动回收这个订阅,不需要显式管理。我们先看看这部分实现,代码不多:
package liu.mars.market;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.event.japi.ManagedActorEventBus;
import java.util.HashMap;
import java.util.Set;
import java.util.stream.Stream;
public class QuotationsBus extends ManagedActorEventBus<QuotationsEvent> {
static private QuotationsBus instance;
static void init(ActorSystem system) {
instance = new QuotationsBus(system);
Stream.of("market.btcusdt.depth0",
"market.btcusdt.depth1",
"market.btcusdt.dpeth2",
"market.btcusdt.dpeth3",
"market.btcusdt.dpeth4",
"market.btcusdt.dpeth5").forEach(topic -> {
topics.put(topic, system.actorOf(TopicActor.props(topic)));
});
}
static private HashMap<String, ActorRef> topics = new HashMap<>();
private QuotationsBus(ActorSystem system) {
super(system);
}
static public ActorRef getTopic(String topic) {
return topics.get(topic);
}
static public Set<String> topicNames() {
return topics.keySet();
}
@Override
public int mapSize() {
return 128;
}
@Override
public ActorRef classify(QuotationsEvent event) {
return event.topicActor;
}
public boolean subscribe(ActorRef subscriber, String topic) {
return this.subscribe(subscriber, QuotationsBus.getTopic(topic));
}
static public QuotationsBus getInstance() {
return instance;
}
}
前面我们说过,我们目前的项目里只有一个交易对,所以这个初始化代码就随手写上了,不能当人,真要哪位朋友拿去用,请记得一定要改一改这部分。 Map size 也是随手一写,只是因为官方例子是128所以就128了。实际交易所维护几千个 topic 的行情推送都是很正常的(当然不一定在一个单个的节点上)。这个模板参数表示事件类型,因为 Akka 要管理订阅生存期,所以需要我们为 Topic 提供 Topic Actor。QuotationsEvent 的代码其实很简单:
package liu.mars.market;
import akka.actor.ActorRef;
public class QuotationsEvent {
public final ActorRef topicActor;
public final Object data;
public QuotationsEvent(ActorRef topic, Object data) {
this.topicActor = topic;
this.data = data;
}
}
RouteActor 负责接收 Dash Status 后,生成不同级别的深度数据,推送到 event bus:
package liu.mars.market;
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import clojure.lang.IFn;
import jaskell.util.CR;
import liu.mars.market.dash.Depth;
import liu.mars.market.dash.Level;
import liu.mars.market.dash.Make;
import liu.mars.market.status.DashStatus;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.IntStream;
public class RouteActor extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
private static final String depth_namespace = "liu.mars.market.depth";
static {
CR.require(depth_namespace);
}
private IFn merge_depth = CR.var(depth_namespace, "merge-depth").fn();
static Props props(String symbol) {
return Props.create(RouteActor.class, RouteActor::new);
}
@Override
public Receive createReceive() {
return ReceiveBuilder.create().match(DashStatus.class, status -> {
this.status = status;
genDepth(this.status);
}).build();
}
private List<Level> mergeLevel(int step, List<? extends Make> orders){
return (List<Level>) merge_depth.invoke(step, orders);
}
private DashStatus status;
private void genDepth(DashStatus status){
IntStream.of(0, 1, 2, 3, 4, 5).forEach( step -> {
String channel = String.format("%s.depth.step%d", status.getSymbol(), step);
Depth result = new Depth();
result.setChannel(channel);
result.setTs(LocalDateTime.now());
result.setAsk(mergeLevel(step, status.getAskList()));
result.setBid(mergeLevel(step, status.getBidList()));
result.setVersion(status.getLatestOrderId());
QuotationsBus.getInstance()
.publish(new QuotationsEvent(QuotationsBus.getTopic(channel), result));
});
}
}
其它功能因为这里面还没有实现完整,先不讨论了,订阅响应部分要结合后面的web socket一起介绍。有兴趣的朋友可以先访问官方文档 [Akka Documentation ] 深入了解。
下一章,我们介绍一下这部分代码中出现的深度合并逻辑。