Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spring-cloud-starter-bus-rocketmq 分布式事件无法被其他APP监听 #3461

Closed
ArrenQ opened this issue Sep 25, 2023 · 2 comments
Closed

Comments

@ArrenQ
Copy link

ArrenQ commented Sep 25, 2023

Which Component
spring-cloud-starter-bus-rocketmq

Describe what problem you have encountered
我正在使用 spring-cloud-starter-bus-rocketmq, 版本为: 2021.0.4.0, 现在遇到的问题是 A 应用广播的事件无法被B应用监听。简单的代码如下:
公共代码:
I am using spring-cloud-starter-bus-rocketmq, version: 2021.0.4.0. The problem I am encountering now is that the events broadcast by application A cannot be monitored by application B. The simple code is as follows:
common code:

public class MessageManager {
    @Resource private BusProperties busProperties;
    @Value("${spring.application.name}") private String appName;
    @Resource private ApplicationEventPublisher applicationEventPublisher;

    public void emit(String message) {
        applicationEventPublisher.publishEvent(new RemoteMessageEvent(appName + ":" + message, busProperties.getId()));
    }
}
public class RemoteMessageEvent extends RemoteApplicationEvent {

    public RemoteMessageEvent(String message, String id) {
        super(message, id, DEFAULT_DESTINATION_FACTORY.getDestination(null));
    }
}
@Slf4j
@RestController
public class BusController {

    @Resource private MessageManager messageManager;

    @GetMapping("/send/{msg}")
    public void send(@PathVariable String msg) {
        messageManager.emit(msg);
    }

    @EventListener
    public void listen(RemoteMessageEvent event) {
        log.info("node1 event -> id {}, msg: {}, os -> {}, ds -> {}", event.getId(), event.getSource(), event.getOriginService(), event.getDestinationService());
    }
}
@SpringBootApplication
@RemoteApplicationEventScan(basePackages = "com.chuang.test.common")
public class Application {

    @Bean
    public MessageManager messageManager() {
        return new MessageManager();
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

APP 1 配置如下:

spring:
  application:
    name: bus-node-01
  cloud:
    bus:
      id: ${spring.application.name}:${server.port}
      trace:
        enabled: true
    stream:
      rocketmq:
        bindings:
          springCloudBusInput: 
            consumer:
              messageModel: broadcasting
        binder:
          name-server: 10.92.21.4:9876
          # producer group
          group: node01
server:
  port: 10001

management:
  endpoints:
    web:
      exposure:
        include: bus-env,httptrace

App2 配置如下:

spring:
  application:
    name: bus-node-02
  cloud:
    bus:
      id: ${spring.application.name}:${server.port}
      trace:
        enabled: true
    stream:
      rocketmq:
        bindings:
          springCloudBusInput: 
            consumer:
              messageModel: broadcasting
        binder:
          name-server: 10.92.21.4:9876
          # producer group
          group: node02
server:
  port: 10002

management:
  endpoints:
    web:
      exposure:
        include: bus-env,httptrace

两个APP的代码完全一样,都是通过访问 /send/{msg} 来发布一个事件,然后通过EventListener来监听事件。我在这个代码中遇到了2个问题。

  1. spring-cloud-starter-bus-rocketmq 中会自动加入 rocketmq.bindings.springCloudBusInput.consumer.broadcasting: true 配置,使得注册的consumer group 模式为 BROADCASTING,这在我使用 2.2.9.RELEASE 版本前是可用的。但是版本升级到 2021.0.4.0 时已经没有效果了,consumer group 模式总是为 CLUSTERING。我在加入bindings.springCloudBusInput.consumer.messageModel: broadcasting 后才生效。

  2. 当第一个问题解决后,consumer group 的模式在 rocketmq admin 中已经显示为 BROADCASTING,当我欣喜的进行测试时,我发现 APP1 的事件仍然只有 APP1 监听到了, APP2 完全没有收到。但是此时,当我在 rocketmq admin中给 springCloudBus topic 发送消息时,APP1 和 APP2 都能收到这个消息。因此,我认为consumer group 是没问题的,事件监听大概也没问题。于是我咨询了chatGPT (搜索引擎和社区相遇这个问题的资料太少了),它告诉我发送消息时需要将消息的flag设置为 2,但我debug时发现 flag 是0,而且我找不到任何办法将 flag 设置为 2.

The codes of the two APPs are exactly the same. They publish an event by accessing /send/{msg}, and then listen to the event through EventListener. I have 2 problems with this code.

  1. The rocketmq.bindings.springCloudBusInput.consumer.broadcasting: true configuration will be automatically added to spring-cloud-starter-bus-rocketmq, so that the registered consumer group mode is BROADCASTING, which is what I used in 2.2.9. RELEASE is available before version. However, it has no effect when the version is upgraded to 2021.0.4.0, and the consumer group mode is always CLUSTERING. It took effect after I added bindings.springCloudBusInput.consumer.messageModel: broadcasting.

  2. After the first problem was solved, the mode of the consumer group was displayed as BROADCASTING in rocketmq admin. When I tested with joy, I found that only APP1 was listening to the event of APP1, and APP2 was not received at all. But at this time, when I send a message to the springCloudBus topic in rocketmq admin, both APP1 and APP2 can receive the message. Therefore, I think the consumer group is no problem, and event monitoring is probably no problem either. So I consulted chatGPT (there is too little information on this issue when search engines and communities meet), and it told me that when sending a message, I need to set the flag of the message to 2, but when I debugged it, I found that the flag was 0, and I couldn't find any The method is to set flag to 2.

spring-cloud-starter-bus-rocketmq是一个非常小巧简单的工具,分布式事件是它的主要工作之一,我使用的版本也是严格按照cloud alibaba版本说明来搭配的,甚至为此改动了许多地方。我无法理解,为什么仅仅从2.2.9.RELEASE升级到 2021.0.4.0 会出现这么明显且难以解决的问题。

spring-cloud-starter-bus-rocketmq is a very small and simple tool. Distributed events are one of its main tasks. The version I use is strictly in accordance with the cloud alibaba version instructions, and has even been changed for this purpose. many places. I can’t understand why such an obvious and difficult-to-solve problem occurs just when upgrading from 2.2.9.RELEASE to 2021.0.4.0.

我想要了解该如何解决这个问题,以及社区后续是否会把问题1 作为bug来解决。毕竟 bus 就是在stream基础上加了一些默认配置,既然默认使用 rocketmq-bus-group 作为消费者组名,那么组的模式就应该默认是 BROADCASTING 才能满足分布式事件的需求,如果还需要使用者去写补充配置就没太大意义了,使用者还要去了解rocketmq-bus 做了哪些工作,更何况其内置代码中还有 rocketmq.bindings.springCloudBusInput.consumer.broadcasting: true 这样的配置,却完全没有效果,更让人觉得费解。

I would like to know how to solve this problem and whether the community will fix issue 1 as a bug in the future. After all, bus adds some default configurations based on stream. Since rocketmq-bus-group is used by default as the consumer group name, the group mode should default to BROADCASTING to meet the needs of distributed events. If it is still It doesn't make much sense to require users to write supplementary configurations. Users also need to understand what rocketmq-bus has done, not to mention that its built-in code also has rocketmq.bindings.springCloudBusInput.consumer.broadcasting: true like this configuration, but it has no effect at all, which is even more puzzling.

Describe what information you have read
我找了git相关的问题,链接如下:
I searched for git related issues, the link is:
eg. I have read the reference doc of Sentinel

关于版本我参考的是:
Regarding the version, my reference is:
https://github.com/alibaba/spring-cloud-alibaba/wiki/%E7%89%88%E6%9C%AC%E8%AF%B4%E6%98%8E

@ArrenQ
Copy link
Author

ArrenQ commented Sep 26, 2023

请忽略问题2的描述,这应该是chatGpt 的错误回答,我通过debug发现其实际上其他APP有收到事件消息,只不过在转换过程中失败了。进而变成了 UnknownRemoteApplicationEvent, 这个问题我还在进一步查。

@ArrenQ
Copy link
Author

ArrenQ commented Sep 26, 2023

参考了#2742 的资料已经解决

    @Bean(RocketMQMessageConverter.DEFAULT_NAME)
    public CompositeMessageConverter rocketMqMessageConvert() {
        Set<MessageConverter> messageConverters = new HashSet();
        ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
        byteArrayMessageConverter.setContentTypeResolver((ContentTypeResolver)null);
        messageConverters.add(byteArrayMessageConverter);
        messageConverters.add(new StringMessageConverter());
        return new CompositeMessageConverter(messageConverters);
    }

通过这个配置将消息的序列化和反序列化改成一样。

@ArrenQ ArrenQ closed this as completed Sep 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant