Implementing Max‑Effort Notification with RabbitMQ in Spring Boot
This guide explains how to design a max‑effort notification mechanism for a recharge scenario using Spring Boot 2.4.12 and RabbitMQ 3.7.4, detailing the interaction flow, repeat‑notification strategy, message verification, and provides complete code for both pay‑manager and users‑manager modules, including configuration, entities, services, listeners, and controllers.
Environment: Spring Boot 2.4.12 + RabbitMQ 3.7.4
What is Max‑Effort Notification
This is a recharge case.
Interaction flow:
1. Account system calls the recharge system interface.
2. After payment, the recharge system notifies the account system of the recharge result; if the notification fails, the recharge system retries according to a strategy.
3. Account system receives the notification and updates the recharge status.
4. If the account system does not receive the notification, it actively queries the recharge system for the result.
The goal of a max‑effort notification scheme is to make the notifying party try as hard as possible to deliver the business result to the receiver, including:
A mechanism for repeated notifications when the receiver may have missed the message.
A verification mechanism allowing the receiver to query the notifier for the message if it was not received or needs to be re‑consumed.
Difference from reliable message consistency
Solution philosophy: Reliable messaging requires the sender to guarantee delivery to the receiver; max‑effort notification only tries its best, and the receiver may need to query the sender for the result.
Application scenarios: Reliable messaging focuses on transactional consistency during the transaction; max‑effort notification focuses on post‑transaction notification of the result.
Technical direction: Reliable messaging ensures end‑to‑end consistency; max‑effort notification provides reliability mechanisms for the receiver to query when delivery fails.
Implementing Max‑Effort Notification with RabbitMQ
Related RabbitMQ articles: "SpringBoot RabbitMQ Message Reliable Send and Receive" and "RabbitMQ Message Confirmation Mechanism (confirm)".
Project structure:
Submodule pay‑manager
Configuration file:
<code>server:
port: 8080
---
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
concurrency: 5
maxConcurrency: 10
prefetch: 5
acknowledgeMode: MANUAL
retry:
enabled: true
initialInterval: 3000
maxAttempts: 3
defaultRequeueRejected: false
</code>Entity class (records recharge amount and account info):
<code>@Entity
@Table(name = "t_pay_info")
public class PayInfo implements Serializable {
@Id
private Long id;
private BigDecimal money;
private Long accountId;
}
</code>DAO and Service:
<code>public interface PayInfoRepository extends JpaRepository<PayInfo, Long> {
PayInfo findByOrderId(String orderId);
}
@Service
public class PayInfoService {
@Resource
private PayInfoRepository payInfoRepository;
@Resource
private RabbitTemplate rabbitTemplate;
@Transactional
public PayInfo savePayInfo(PayInfo payInfo) {
payInfo.setId(System.currentTimeMillis());
PayInfo result = payInfoRepository.save(payInfo);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replaceAll("-", ""));
try {
rabbitTemplate.convertAndSend("pay-exchange", "pay.#", new ObjectMapper().writeValueAsString(payInfo), correlationData);
} catch (AmqpException | JsonProcessingException e) {
e.printStackTrace();
}
return result;
}
public PayInfo queryByOrderId(String orderId) {
return payInfoRepository.findByOrderId(orderId);
}
}
</code>Controller interface:
<code>@RestController
@RequestMapping("/payInfos")
public class PayInfoController {
@Resource
private PayInfoService payInfoService;
@PostMapping("/pay")
public Object pay(@RequestBody PayInfo payInfo) {
payInfoService.savePayInfo(payInfo);
return "支付已提交,等待结果";
}
@GetMapping("/queryPay")
public Object queryPay(String orderId) {
return payInfoService.queryByOrderId(orderId);
}
}
</code>Submodule users‑manager
Application configuration:
<code>server:
port: 8081
---
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
concurrency: 5
maxConcurrency: 10
prefetch: 5
acknowledgeMode: MANUAL
retry:
enabled: true
initialInterval: 3000
maxAttempts: 3
defaultRequeueRejected: false
</code>Entity classes:
<code>@Entity
@Table(name = "t_users")
public class Users {
@Id
private Long id;
private String name;
private BigDecimal money;
}
@Entity
@Table(name = "t_users_log")
public class UsersLog {
@Id
private Long id;
private String orderId;
@Column(columnDefinition = "int default 0")
private Integer status = 0; // 0: paying, 1: paid, 2: cancelled
private BigDecimal money;
private Date createTime;
}
</code>DAO interfaces:
<code>public interface UsersRepository extends JpaRepository<Users, Long> {}
public interface UsersLogRepository extends JpaRepository<UsersLog, Long> {
UsersLog findByOrderId(String orderId);
}
</code>Service class:
<code>@Service
public class UsersService {
@Resource
private UsersRepository usersRepository;
@Resource
private UsersLogRepository usersLogRepository;
@Transactional
public boolean updateMoneyAndLogStatus(Long id, String orderId) {
UsersLog usersLog = usersLogRepository.findByOrderId(orderId);
if (usersLog != null && usersLog.getStatus() == 1) {
throw new RuntimeException("已支付");
}
Users users = usersRepository.findById(id).orElse(null);
if (users == null) {
throw new RuntimeException("账户不存在");
}
users.setMoney(users.getMoney().add(usersLog.getMoney()));
usersRepository.save(users);
usersLog.setStatus(1);
usersLogRepository.save(usersLog);
return true;
}
@Transactional
public boolean saveLog(UsersLog usersLog) {
usersLog.setId(System.currentTimeMillis());
usersLogRepository.save(usersLog);
return true;
}
}
</code>Message listener:
<code>@Component
public class PayMessageListener {
private static final Logger logger = LoggerFactory.getLogger(PayMessageListener.class);
@Resource
private UsersService usersService;
@SuppressWarnings("unchecked")
@RabbitListener(queues = {"pay-queue"})
@RabbitHandler
public void receive(Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
byte[] buf = null;
try {
buf = message.getBody();
logger.info("接受到消息:{}", new String(buf, "UTF-8"));
Map<String, Object> result = new JsonMapper().readValue(buf, Map.class);
Long id = ((Integer) result.get("accountId")) + 0L;
String orderId = (String) result.get("orderId");
usersService.updateMoneyAndLogStatus(id, orderId);
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
logger.error("消息接受出现异常:{}, 异常消息:{}", e.getMessage(), new String(buf, Charset.forName("UTF-8")));
e.printStackTrace();
try {
// Put such messages into a dead‑letter queue for manual investigation.
channel.basicReject(deliveryTag, false);
} catch (IOException e1) {
logger.error("拒绝消息重入队列异常:{}", e1.getMessage());
e1.printStackTrace();
}
}
}
}
</code>Controller interface:
<code>@RestController
@RequestMapping("/users")
public class UsersController {
@Resource
private RestTemplate restTemplate;
@Resource
private UsersService usersService;
@PostMapping("/pay")
public Object pay(Long id, BigDecimal money) throws Exception {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
String orderId = UUID.randomUUID().toString().replaceAll("-", "");
Map<String, String> params = new HashMap<>();
params.put("accountId", String.valueOf(id));
params.put("orderId", orderId);
params.put("money", money.toString());
UsersLog usersLog = new UsersLog();
usersLog.setCreateTime(new Date());
usersLog.setOrderId(orderId);
usersLog.setMoney(money);
usersLog.setStatus(0);
usersService.saveLog(usersLog);
HttpEntity<String> requestEntity = new HttpEntity<>(new ObjectMapper().writeValueAsString(params), headers);
return restTemplate.postForObject("http://localhost:8080/payInfos/pay", requestEntity, String.class);
}
}
</code>Testing
Initial data:
Account submodule console:
Payment submodule console:
Database table data:
Done!
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.