本文讲解前不久进入apache顶级项目的RocketMQ,几个简单的例子讲解如何搭建RocketMQ,以及发送消息,接受消息。
RocketMQ安装
RocketMQ的安装需要自行编译,接下来编译源码(本文下载源码放在windows系统D:\softwares\
目录下)
下载源码
1
git clone -b develop https://github.com/apache/rocketmq.git
编译
1
2cd rocketmq
mvn -Prelease-all -DskipTests clean install -U启动rocketmq
1
2
3
4
5
6
7
8cd distribution\target\apache-rocketmq
set ROCKETMQ_HOME=D:\softwares\rocketmq\distribution\target\apache-rocketmq
bin\mqnamesrv.cmd
再开启一个cmd窗口,进入到D:\softwares\rocketmq\distribution\target\apache-rocketmq
d:
cd D:\softwares\rocketmq\distribution\target\apache-rocketmq
set ROCKETMQ_HOME=D:\softwares\rocketmq\distribution\target\apache-rocketmq
bin\mqbroker.cmd -n localhost:9876mqnamesrv启动成功
mqbroker启动后无输出
PS:不要关闭两个窗口
下面开始代码,写发送消息,接受消息。
引入maven依赖
1
2
3
4
5<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.1.0-incubating</version>
</dependency>发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27package org.xxz.test.mq;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class ProducerTest {
public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message();
msg.setTopic("test");
msg.setBody("hello rocketmq".getBytes());
SendResult sendResult = producer.send(msg);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("send msg ok...");
}
producer.shutdown();
}
}
接受消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32package org.xxz.test.mq;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
public class ConsumerTest {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("test", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody(), StandardCharsets.UTF_8));
}
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
就是这么简单,后续更精彩哦。。。
如果您觉得文章有用或对您有帮助,欢迎通过以下方式赞助我。 ♪(^∀^●)ノ
本文由 xxzkid | 我叫王也道长创作,采用
CC BY 3.0 CN协议 进行许可。
可自由转载、引用,但需署名作者且注明文章出处。
本文标题:RocketMQ 入门
本文链接:https://xxzkid.github.io/2017/rocketmq-01/
本文标题:RocketMQ 入门
本文链接:https://xxzkid.github.io/2017/rocketmq-01/