说说MQ之RocketMQ(二)

java admin 1100 0 评论

RocketMQ 的 Java API

RocketMQ 是用 Java 语言开发的,因此,其 Java API 相对是比较丰富的,当然也有部分原因是 RocketMQ 本身提供的功能就比较多。RocketMQ API 提供的功能包括,

  1. 广播消费,这个在之前已经提到过;
  2. 消息过滤,支持简单的 Message Tag 过滤,也支持按 Message Header、body 过滤;
  3. 顺序消费和乱序消费,之前也提到过,这里的顺序消费应该指的是普通顺序性,这一点与 Kafka 相同;
  4. Pull 模式消费,这个是相对 Push 模式来说的,Kafka 就是 Pull 模式消费;
  5. 事务消息,这个好像没有开源,但是 example 代码中有示例,总之,不推荐用;
  6. Tag,RocketMQ 在 Topic 下面又分了一层 Tag,用于表示消息类别,可以用来过滤,但是顺序性还是以 Topic 来看;

单看功能的话,即使不算事务消息,也不算 Tag,RocketMQ 也远超 Kafka,Kafka 应该只实现了 Pull 模式消费 + 顺序消费这2个功能。RocketMQ 的代码示例在 rocketmq-example 中,注意,代码是不能直接运行的,因为所有的代码都少了设置 name server 的部分,需要自己手动加上,例如,producer.setNamesrvAddr("192.168.232.23:9876");

先来看一下生产者的 API,比较简单,只有一种,如下,

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import java.util.List;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("192.168.232.23:9876");
producer.start();
for (int i = 0; i < 10; i++)


try {


{


Message msg = new Message("TopicTest1",// topic


"TagA",// tag


"OrderID188",// key


("RocketMQ "+String.format("%05d", i)).getBytes());// body


SendResult sendResult = producer.send(msg, new MessageQueueSelector() {


@Override


public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {


Integer id = (Integer) arg;


int index = id % mqs.size();


return mqs.get(index);


}


}, i));


System.out.println(String.format("%05d", i)+sendResult);


}


}


catch (Exception e) {


e.printStackTrace();


}
producer.shutdown();
}
}

可以发现,相比 Kafka 的 API,只多了 Tag,但实际上行为有很大不同。Kafka 的生产者客户端,有同步和异步两种模式,但都是阻塞模式,send 方法返回发送状态的 Future,可以通过 Future 的 get 方法阻塞获得发送状态。而 RocketMQ 采用的是同步非阻塞模式,发送之后立刻返回发送状态(而不是 Future)。正常情况下,两者使用上差别不大,但是在高可用场景中发生主备切换的时候,Kafka 的同步可以等待切换完成并重连,最后返回;而 RocketMQ 只能立刻报错,由生产者选择是否重发。所以,在生产者的 API 上,其实 Kafka 是要强一些的。

另外,RocketMQ 可以通过指定 MessageQueueSelector 类的实现来指定将消息发送到哪个分区去,Kafka 是通过指定生产者的 partitioner.class 参数来实现的,灵活性上 RocketMQ 略胜一筹。

再来看消费者的API,由于 RocketMQ 的功能比较多,我们先看 Pull 模式消费的API,如下,

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
public class PullConsumer {
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("192.168.232.23:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
for (MessageQueue mq : mqs) {


System.out.println("Consume from the queue: " + mq);


SINGLE_MQ: while (true) {


try {


long offset = consumer.fetchConsumeOffset(mq, true);


PullResult pullResult =


consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);


if (null != pullResult.getMsgFoundList()) {


for (MessageExt messageExt : pullResult.getMsgFoundList()) {


System.out.print(new String(messageExt.getBody()));


System.out.print(pullResult);


System.out.println(messageExt);


}


}


putMessageQueueOffset(mq, pullResult.getNextBeginOffset());


switch (pullResult.getPullStatus()) {


case FOUND:


// TODO


break;


case NO_MATCHED_MSG:


break;


case NO_NEW_MSG:


break SINGLE_MQ;


case OFFSET_ILLEGAL:


break;


default:


break;


}


}


catch (Exception e) {


e.printStackTrace();


}


}
}
consumer.shutdown();
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null)


return offset;
return 0;
}
}

这部分的 API 其实是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分区,而 Kafka 可以自动管理(当然也可以手动管理),并且不需要指定分区(分区是在 Kafka 订阅的时候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用 OffsetStore 接口,提供了两种管理方式,本地文件和远程 Broker。这部分感觉两者差不多。

下面再看看 Push 模式顺序消费,代码如下,

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("192.168.232.23:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {


AtomicLong consumeTimes = new AtomicLong(0);


@Override


public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {


context.setAutoCommit(false);


System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);


this.consumeTimes.incrementAndGet();


if ((this.consumeTimes.get() % 2) == 0) {


return ConsumeOrderlyStatus.SUCCESS;


}


else if ((this.consumeTimes.get() % 3) == 0) {


return ConsumeOrderlyStatus.ROLLBACK;


}


else if ((this.consumeTimes.get() % 4) == 0) {


return ConsumeOrderlyStatus.COMMIT;


}


else if ((this.consumeTimes.get() % 5) == 0) {


context.setSuspendCurrentQueueTimeMillis(3000);


return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;


}


return ConsumeOrderlyStatus.SUCCESS;


}
});
consumer.start();
System.out.println("Consumer Started.");
}
}

虽然提供了 Push 模式,RocketMQ 内部实际上还是 Pull 模式的 MQ,Push 模式的实现应该采用的是长轮询,这点与 Kafka 一样。使用该方式有几个注意的地方,

  1. 接收消息的监听类要使用 MessageListenerOrderly
  2. ConsumeFromWhere 有几个参数,表示从头开始消费,从尾开始消费,还是从某个 TimeStamp 开始消费;
  3. 可以控制 offset 的提交,应该就是 context.setAutoCommit(false); 的作用;

控制 offset 提交这个特性非常有用,某种程度上扩展一下,就可以当做事务来用了,看代码 ConsumeMessageOrderlyService 的实现,其实并没有那么复杂,在不启用 AutoCommit 的时候,只有返回 COMMIT 才 commit offset;启用 AutoCommit 的时候,返回 COMMITROLLBACK(这个比较扯)、SUCCESS 的时候,都 commit offset。

后来发现,commit offset 功能在 Kafka 里面也有提供,使用新的 API,调用 consumer.commitSync

再看一个 Push 模式乱序消费 + 消息过滤的例子,消费者的代码如下,

import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
consumer.setNamesrvAddr("192.168.232.23:9876");
consumer.subscribe("TopicTest1", MessageFilterImpl.class.getCanonicalName());
consumer.registerMessageListener(new MessageListenerConcurrently() {


@Override


public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,


ConsumeConcurrentlyContext context) {


System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);


return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;


}
});
consumer.start();
System.out.println("Consumer Started.");
}
}

这个例子与之前顺序消费不同的地方在于,

  1. 接收消息的监听类使用的是 MessageListenerConcurrently
  2. 回调方法中,使用的是自动 offset commit;
  3. 订阅的时候增加了消息过滤类 MessageFilterImpl

消息过滤类 MessageFilterImpl 的代码如下,

import com.alibaba.rocketmq.common.filter.MessageFilter;
import com.alibaba.rocketmq.common.message.MessageExt;
public class MessageFilterImpl implements MessageFilter {
@Override
public boolean match(MessageExt msg) {
String property = msg.getUserProperty("SequenceId");
if (property != null) {


int id = Integer.parseInt(property);


if ((id % 3) == 0 && (id > 10)) {


return true;


}
}
return false;
}
}

RocketMQ 执行过滤是在 Broker 端,Broker 所在的机器会启动多个 FilterServer 过滤进程;Consumer 启动后,会向 FilterServer 上传一个过滤的 Java 类;Consumer 从 FilterServer 拉消息,FilterServer 将请求转发给 Broker,FilterServer 从 Broker 收到消息后,按照 Consumer 上传的 Java 过滤程序做过滤,过滤完成后返回给 Consumer。这种过滤方法可以节省网络流量,但是增加了 Broker 的负担。可惜我没有实验出来使用过滤的效果,即使是用 github wiki 上的例子8也没成功,不纠结了。RocketMQ 的按 Tag 过滤的功能也是在 Broker 上做的过滤,能用,是个很方便的功能。

还有一种广播消费模式,比较简单,可以去看代码,不再列出。

总之,RocketMQ 提供的功能比较多,比 Kafka 多很多易用的 API。

转载请注明: 飞嗨_分享互联网 » 说说MQ之RocketMQ(二)

赞 (0) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽

高效,专业,符合SEO

联系我们