【SpringBoot实战】整合RocketMq

star2017 1年前 ⋅ 2025 阅读

前言

RocketMQ的介绍可以看这里:传送门
本文主要介绍springboot如何集成RocketMq,还有几个简单的代码示例:字符串消息发送,对象消息发送,事务消息。

代码示例

springboot启动类,里面包括了消息的发送:

@SpringBootApplication
public class RocketMqApplication implements CommandLineRunner {
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Value("${demo.rocketmq.topic}")
    private String springTopic;
    @Value("${demo.rocketmq.topic.user}")
    private String userTopic;
    @Value("${demo.rocketmq.transTopic}")
    private String springTransTopic;

    public static void main(String[] args) {
        SpringApplication.run(RocketMqApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        // Send string
        SendResult sendResult = rocketMQTemplate.syncSend(springTopic + ":tagA", "Hello, Msg!");
        System.out.printf("发送消息 topic %s sendResult=%s %n", springTopic, sendResult);

        // Send string with spring Message
        sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World! I'm from spring message ").build());
        System.out.printf("发送消息 to topic %s sendResult=%s %n", springTopic, sendResult);

        // Send object
        User user = new User();
        user.setAge(16);
        user.setName("zs");
        sendResult = rocketMQTemplate.syncSend(userTopic, MessageBuilder.withPayload(user).build());
        System.out.printf("发送消息 to topic %s sendResult=%s %n", userTopic, sendResult);

        //Send transactional messages
        testTransaction();
    }

    private void testTransaction() throws MessagingException {
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
                        setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();
                SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(null,
                        springTransTopic + ":" + tags[i % tags.length], msg, null);
                System.out.printf("发送事务消息=%s , sendResult=%s %n",
                        msg.getPayload(), sendResult.getSendStatus());

                Thread.sleep(10);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @RocketMQTransactionListener
    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);

        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();

        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
            System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",
                    transId);
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(transId, status);
            if (status == 0) {
                System.out.printf("    # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
                return RocketMQLocalTransactionState.COMMIT;
            }

            if (status == 1) {
                System.out.printf("    # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
                return RocketMQLocalTransactionState.ROLLBACK;
            }

            System.out.printf("    # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
            return RocketMQLocalTransactionState.UNKNOWN;
        }

        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
            RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
            Integer status = localTrans.get(transId);
            if (null != status) {
                switch (status) {
                    case 0:
                        retState = RocketMQLocalTransactionState.UNKNOWN;
                        break;
                    case 1:
                        retState = RocketMQLocalTransactionState.COMMIT;
                        break;
                    case 2:
                        retState = RocketMQLocalTransactionState.COMMIT;
                        break;
                }
            }
            System.out.printf("checkLocalTransaction执行," +
                            " msgTransactionId=%s, TransactionState=%s status=%s %n",
                    transId, retState, status);
            return retState;
        }
    }

}

消息发送字符串消费

@Service
@RocketMQMessageListener(nameServer = "${rocketmq.name-server}", topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer")
public class StringConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.printf("字符串消息: %s ; \n", message);
    }

}

消息发送对象消费

@Service
@RocketMQMessageListener(nameServer = "${rocketmq.name-server}", topic = "${demo.rocketmq.topic.user}", consumerGroup = "user_consumer")
public class UserConsumer implements RocketMQListener<User> {

    @Override
    public void onMessage(User message) {
        System.out.printf("对象消息: %s ; \n", message);
    }

}

实体类

public class User implements Serializable {
    private static final long serialVersionUID = 8678741699731601L;
    private String name;
    private Integer age;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Integer getAge() {
        return age;
    }
    public void setAge(Integer age) {
        this.age = age;
    }
    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }

}

application.properties

rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=testGroup
rocketmq.producer.sendMessageTimeout=300000

# properties used in the application
demo.rocketmq.topic=string-topic
demo.rocketmq.orderTopic=order-paid-topic
demo.rocketmq.transTopic=spring-transaction-topic
demo.rocketmq.topic.user=user-topic

源码地址

本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: