RocketMQ 入门

本文讲解前不久进入apache顶级项目的RocketMQ,几个简单的例子讲解如何搭建RocketMQ,以及发送消息,接受消息。

RocketMQ安装

RocketMQ的安装需要自行编译,接下来编译源码(本文下载源码放在windows系统D:\softwares\目录下)

  1. 下载源码

    1
    git clone -b develop https://github.com/apache/rocketmq.git
  2. 编译

    1
    2
    cd rocketmq
    mvn -Prelease-all -DskipTests clean install -U
  3. 启动rocketmq

    1
    2
    3
    4
    5
    6
    7
    8
    cd distribution\target\apache-rocketmq
    set ROCKETMQ_HOME=D:\softwares\rocketmq\distribution\target\apache-rocketmq
    bin\mqnamesrv.cmd
    # 再开启一个cmd窗口,进入到D:\softwares\rocketmq\distribution\target\apache-rocketmq
    d:
    cd D:\softwares\rocketmq\distribution\target\apache-rocketmq
    set ROCKETMQ_HOME=D:\softwares\rocketmq\distribution\target\apache-rocketmq
    bin\mqbroker.cmd -n localhost:9876

    mqnamesrv启动成功

    image

    mqbroker启动后无输出

    PS:不要关闭两个窗口

    下面开始代码,写发送消息,接受消息。

    引入maven依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.1.0-incubating</version>
    </dependency>

    发送消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    package org.xxz.test.mq;

    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.SendStatus;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;

    public class ProducerTest {

    public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    Message msg = new Message();
    msg.setTopic("test");
    msg.setBody("hello rocketmq".getBytes());
    SendResult sendResult = producer.send(msg);
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
    System.out.println("send msg ok...");
    }
    producer.shutdown();
    }

    }

    image

    接受消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    package org.xxz.test.mq;
    import java.nio.charset.StandardCharsets;
    import java.util.List;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    public class ConsumerTest {

    public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.subscribe("test", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
    for (MessageExt msg : msgs) {
    System.out.println(new String(msg.getBody(), StandardCharsets.UTF_8));
    }
    } catch (Exception e) {
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    consumer.start();
    }
    }

    image

    就是这么简单,后续更精彩哦。。。