消息队列(MQ)

消息队列MQ是zbus的核心功能,消息队列的模型,与同类产品诸如RabbitMQ概念类似,但zbus消息队列没有采用AMQP相对复杂的协议,而是更加简洁的协议。

消息队列组件包括

规则设计

  1. 队列消息数据单份,分组通道任意多个
  2. 分组通道之内的消费者共享分组读指针,读指针改变影响本分组通道上的所有消费者
  3. 分组通道之间的消费者互不影响
  4. 分组通道的支持消息过滤,基于消息头部Tag

简而言之: 分组之内共享,分组之间独享,分组读指针支持消息标签过滤

分组通道的设计类似同类产品Kafka,但是在消息过滤以及细节上有些差异,相对更加简单。

消息模式

  1. 单播 仅一个分组通道,所有的消费者共享一个分组通道,每条消息只送达其中一个消费者
  2. 广播 每个消费一个分组通道,每条消息抵达所有的消费者
  3. 组播 单播与广播的混合,多个分组通道,每个分组通道上多个消费者负载均衡
  4. 订阅模式 广播的分组上设置消息过滤主题(注意区别队列主题Topic)

基于以上设计,zbus的消息队列能够有效的支持各种业务场景,同时保持底层数据模型的简洁设计。为了便于扩展,消息队列增加MASK标记,可以方便应用在队列上打标签来支持队列个性化,比如支持内存队列与磁盘队列的选择,支持标记队列为RPC作用,支持标记队列使用于代理作用等等。

API示例

生产者

//Broker是对zbus服务器的本地抽象,多地址支持HA
Broker broker = new Broker("localhost:15555"); 

Producer p = new Producer(broker);
p.declareTopic("MyTopic");    //当确定队列不存在需创建

Message msg = new Message();
msg.setTopic("MyTopic");       //设置消息主题
//msg.setTag("oo.account.pp"); //可以设置消息标签
msg.setBody("hello " + System.currentTimeMillis()); 

Message res = p.publish(msg);
System.out.println(res);   

broker.close(); 

消费者

Broker broker = new Broker("localhost:15555");    
ConsumerConfig config = new ConsumerConfig(broker);
config.setTopic("MyTopic");  //指定消息队列主题,同时可以指定分组通道
config.setMessageHandler(new MessageHandler() { 
    @Override
    public void handle(Message msg, MqClient client) throws IOException {
        System.out.println(msg); //消费处理
    }
});

Consumer consumer = new Consumer(config);
consumer.start();