1.钉钉报警,查看日志
2020-07-15 14:00:02.660 ERROR [cash-loan-flow, 6e6394c11fbe52f8, 1179b55f3078ce15, true] [ ] --- [AMQP Connection 172.**.**.***:*****] [PublisherCallbackChannelImpl.java:771] org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - No listener for seq:965
发现日志有No listener for seq 错误信息。
查看源码PublisherCallbackChannelImpl.java
else {
Listener listener = this.listenerForSeq.remove(seq);
if (listener != null) {
SortedMap<Long, PendingConfirm> confirmsForListener = this.pendingConfirms.get(listener);
PendingConfirm pendingConfirm;
if (remove) {
pendingConfirm = confirmsForListener.remove(seq);
}
else {
pendingConfirm = confirmsForListener.get(seq);
}
if (pendingConfirm != null) {
doHandleConfirm(ack, listener, pendingConfirm);
}
}
else {
logger.error("No listener for seq:" + seq);
}
}
2.查看代码分析原因
客户端使用的是java RabbitTemplate ,查看mq配置项有 factory.setPublisherConfirms(true);但是RabbitTemplate方法没有实现 rabbitTemplate.setConfirmCallback();导致mq在成功收到消息后回调生产者时失败
@Bean("contractTaskMessageConnectionFactory")
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setChannelCacheSize(1024);
factory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
factory.setChannelCacheSize(180 * 1000);
factory.setConnectionCacheSize(1024);
factory.setHost(uhost);
factory.setPort(uport);
factory.setUsername(uuser);
factory.setPassword(upassword);
factory.setVirtualHost(uvirtualHost);
// 开启了确认机制
factory.setPublisherConfirms(true);
@Bean("contractTaskMessageRabbitTemplate")
public RabbitTemplate rabbitTemplate(@Qualifier("asyncTaskConnectionFactory") CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange(exchangeName);
rabbitTemplate.setRetryTemplate(attemptsRetry());
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 此处没有实现确认机制
return rabbitTemplate;
}
3.解决问题
在RabbitTemplate方法中实现 rabbitTemplate.setConfirmCallback();
@Bean("contractTaskMessageRabbitTemplate")
public RabbitTemplate rabbitTemplate(@Qualifier("asyncTaskConnectionFactory") CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange(exchangeName);
rabbitTemplate.setRetryTemplate(attemptsRetry());
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 此处实现确认机制
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.error("消息确认失败,correlationData={},cause={},重新发送", correlationData, cause);
}
});