消息队列apache rocketmq4.2入门(二)
原创 2018-03-09 11:12 阅读(1421)次
上一篇我们讲了apache rocketmq4.2的安装,这是目前最新版本了,我本着要研究他的事务消息,却发现apache官方文档里都没有这个,百度了下好像大家都在说事务消息还没有真开源,好失望,但我下载的4.2的官方示例却有事务消息的示例代码,真不知道官方要干嘛。。好了话不多说,我看看其他功能代码吧,我也是根据官方示例弄下来,这里分享下,直接上代码了,其他不说了:
Maven:
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
MQProducer:package com.cloud.ali.mq;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class MQProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
producer.start();
for (int i = 0; i < 10000000; i++)
try {
//keys:消息体的唯一标识
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
MQPushConsumer:package com.cloud.ali.mq;
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class MQPushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.setNamesrvAddr("rocketmq-nameserver1:9876;rocketmq-nameserver2:9876");
consumer.subscribe("TopicTest", "*");
/**
* ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费
* ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET 一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费
* ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST @Deprecated
* ConsumeFromWhere.CONSUME_FROM_MAX_OFFSET @Deprecated
* ConsumeFromWhere.CONSUME_FROM_MIN_OFFSET @Deprecated
* ConsumeFromWhere.CONSUME_FROM_TIMESTAMP 一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,
* 时间点设置参见DefaultMQPushConsumer.consumeTimestamp参数
*
* MessageModel.CLUSTERING 集群消费,一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个
Consumer Group 有 3 个实例(可能是 3 个迕程,戒者 3 台机器),那举每个实例只消费其中的 3 条消息
* MessageModel.BROADCASTING 广播
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20170422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
/**
* List<MessageExt> msgs 这里虽然是list 但实际上基本都只会是一条,除非消息堆积,
* 且记住一定是先启动消费者,再启动生产者
* 否则极有可能导致消息的重复消费
*/
for(MessageExt mext : msgs) {
try {
System.out.println("消费了一条消息:"+new String(mext.getBody(),"utf-8"));
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
//消费失败告诉mq重新发送继续消费 如果多次消费仍不成功可以记录在数据库中,可以通过mext.getReconsumeTimes()获取消费次数
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 告诉mq消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
收发消息还是可以的,但本人对不能实现事务消息感到失望,只能等apache接下来的版本会不会实现了。如何安装rocketMQ请参考上一篇文章:消息队列apache rocketmq4.2入门安装