rabbitmq异常

1.钉钉报警,查看日志

2020-07-15 14:00:02.660 ERROR [cash-loan-flow, 6e6394c11fbe52f8, 1179b55f3078ce15, true] [          ] --- [AMQP Connection 172.**.**.***:*****] [PublisherCallbackChannelImpl.java:771] org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - No listener for seq:965

发现日志有No listener for seq 错误信息。

查看源码PublisherCallbackChannelImpl.java

else {
        Listener listener = this.listenerForSeq.remove(seq);
        if (listener != null) {
            SortedMap<Long, PendingConfirm> confirmsForListener = this.pendingConfirms.get(listener);
            PendingConfirm pendingConfirm;
            if (remove) {
                pendingConfirm = confirmsForListener.remove(seq);
            }
            else {
                pendingConfirm = confirmsForListener.get(seq);
            }
            if (pendingConfirm != null) {
                doHandleConfirm(ack, listener, pendingConfirm);
            }
        }
        else {
            logger.error("No listener for seq:" + seq);
        }
    }

2.查看代码分析原因

客户端使用的是java RabbitTemplate ,查看mq配置项有 factory.setPublisherConfirms(true);但是RabbitTemplate方法没有实现 rabbitTemplate.setConfirmCallback();导致mq在成功收到消息后回调生产者时失败

 @Bean("contractTaskMessageConnectionFactory")
public CachingConnectionFactory connectionFactory() {
    CachingConnectionFactory factory = new CachingConnectionFactory();
    factory.setChannelCacheSize(1024);
    factory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
    factory.setChannelCacheSize(180 * 1000);
    factory.setConnectionCacheSize(1024);
    factory.setHost(uhost);
    factory.setPort(uport);
    factory.setUsername(uuser);
    factory.setPassword(upassword);
    factory.setVirtualHost(uvirtualHost);
    // 开启了确认机制
    factory.setPublisherConfirms(true);

@Bean("contractTaskMessageRabbitTemplate")
public RabbitTemplate rabbitTemplate(@Qualifier("asyncTaskConnectionFactory") CachingConnectionFactory connectionFactory) {

    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setExchange(exchangeName);
    rabbitTemplate.setRetryTemplate(attemptsRetry());
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
      // 此处没有实现确认机制

    return rabbitTemplate;
}

3.解决问题

在RabbitTemplate方法中实现 rabbitTemplate.setConfirmCallback();

 @Bean("contractTaskMessageRabbitTemplate")
public RabbitTemplate rabbitTemplate(@Qualifier("asyncTaskConnectionFactory") CachingConnectionFactory connectionFactory) {

    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setExchange(exchangeName);
    rabbitTemplate.setRetryTemplate(attemptsRetry());
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    // 此处实现确认机制
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (!ack) {
            log.error("消息确认失败,correlationData={},cause={},重新发送", correlationData, cause);
        }
    });

注:https://zhuanlan.zhihu.com/p/70200202