Backend Development 16 min read

Implementing Distributed WebSocket Messaging with Redis and Kafka in Spring

This article explains how to enable cross‑node WebSocket communication in a distributed Spring application by publishing messages to a Redis or Kafka topic, tracking user connections, and routing messages to the appropriate server instance, complete with full code examples and configuration details.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
Implementing Distributed WebSocket Messaging with Redis and Kafka in Spring

The author answers a question about how to send a WebSocket message from a user connected to one server node to another user connected to a different node in a distributed environment. Two common approaches are presented: (1) publish messages containing userId and content to a message queue such as Redis or Kafka, and let every application instance subscribe to the topic and forward the message to the local WebSocket session if the target user is connected; (2) store the mapping of each user to the node where its WebSocket session resides in Redis, then push the message directly to the target node via the queue, reducing network traffic.

Implementation – Step 1: WebSocket channel enum

public enum WebSocketChannelEnum {
    // Simple point‑to‑point chat example
    CHAT("CHAT", "测试使用的简易点对点聊天", "/topic/reply");

    private String code;
    private String description;
    private String subscribeUrl;

    WebSocketChannelEnum(String code, String description, String subscribeUrl) {
        this.code = code;
        this.description = description;
        this.subscribeUrl = subscribeUrl;
    }

    public String getCode() { return code; }
    public String getDescription() { return description; }
    public String getSubscribeUrl() { return subscribeUrl; }

    public static WebSocketChannelEnum fromCode(String code) {
        if (StringUtils.isNoneBlank(code)) {
            for (WebSocketChannelEnum e : values()) {
                if (e.code.equals(code)) {
                    return e;
                }
            }
        }
        return null;
    }
}

Step 2: Redis‑based message queue configuration

@Configuration
@ConditionalOnClass({JedisCluster.class})
public class RedisConfig {
    @Value("${spring.redis.timeout}") private String timeOut;
    @Value("${spring.redis.cluster.nodes}") private String nodes;
    @Value("${spring.redis.cluster.max-redirects}") private int maxRedirects;
    @Value("${spring.redis.jedis.pool.max-active}") private int maxActive;
    @Value("${spring.redis.jedis.pool.max-wait}") private int maxWait;
    @Value("${spring.redis.jedis.pool.max-idle}") private int maxIdle;
    @Value("${spring.redis.jedis.pool.min-idle}") private int minIdle;
    @Value("${spring.redis.message.topic-name}") private String topicName;

    @Bean
    public JedisPoolConfig jedisPoolConfig() {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(maxActive);
        config.setMaxIdle(maxIdle);
        config.setMinIdle(minIdle);
        config.setMaxWaitMillis(maxWait);
        return config;
    }

    @Bean
    public RedisClusterConfiguration redisClusterConfiguration() {
        RedisClusterConfiguration cfg = new RedisClusterConfiguration(Arrays.asList(nodes));
        cfg.setMaxRedirects(maxRedirects);
        return cfg;
    }

    @Bean
    public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration cfg, JedisPoolConfig pool) {
        return new JedisConnectionFactory(cfg, pool);
    }

    @Bean
    public Jackson2JsonRedisSerializer
jackson2JsonRedisSerializer() {
        Jackson2JsonRedisSerializer
ser = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        ser.setObjectMapper(mapper);
        return ser;
    }

    @Bean
    public RedisTemplate
redisTemplate(JedisConnectionFactory factory,
                                                    Jackson2JsonRedisSerializer
ser) {
        RedisTemplate
tmpl = new RedisTemplate<>();
        tmpl.setConnectionFactory(factory);
        StringRedisSerializer strSer = new StringRedisSerializer();
        tmpl.setKeySerializer(strSer);
        tmpl.setHashKeySerializer(strSer);
        tmpl.setValueSerializer(ser);
        tmpl.setHashValueSerializer(ser);
        tmpl.afterPropertiesSet();
        return tmpl;
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter(MessageReceiver receiver,
                                                       Jackson2JsonRedisSerializer
ser) {
        MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "receiveMessage");
        adapter.setSerializer(ser);
        return adapter;
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory cf,
                                                   MessageListenerAdapter adapter) {
        RedisMessageListenerContainer c = new RedisMessageListenerContainer();
        c.setConnectionFactory(cf);
        c.addMessageListener(adapter, new PatternTopic(topicName));
        return c;
    }
}

Step 3: Message receiver handling the WebSocket payload

@Component
public class MessageReceiver {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired private SimpMessagingTemplate messagingTemplate;
    @Autowired private SimpUserRegistry userRegistry;

    public void receiveMessage(RedisWebsocketMsg msg) {
        logger.info("Received Message: {}", msg);
        SimpUser user = userRegistry.getUser(msg.getReceiver());
        if (user != null && StringUtils.isNoneBlank(user.getName())) {
            WebSocketChannelEnum channel = WebSocketChannelEnum.fromCode(msg.getChannelCode());
            if (channel != null) {
                messagingTemplate.convertAndSendToUser(msg.getReceiver(),
                        channel.getSubscribeUrl(), msg.getContent());
            }
        }
    }
}

Step 4: Controller that sends messages and pulls unread messages

@Controller
@RequestMapping("/wsTemplate")
public class RedisMessageController {
    @Value("${spring.redis.message.topic-name}") private String topicName;
    @Autowired private SimpMessagingTemplate messagingTemplate;
    @Autowired private SimpUserRegistry userRegistry;
    @Resource(name = "redisServiceImpl") private RedisService redisService;

    @PostMapping("/sendToUser")
    @ResponseBody
    public String chat(HttpServletRequest request) {
        String receiver = request.getParameter("receiver");
        String msg = request.getParameter("msg");
        User loginUser = (User) SpringContextUtils.getSession().getAttribute(Constants.SESSION_USER);
        HelloMessage payload = new HelloMessage(String.format("%s say: %s", loginUser.getUsername(), msg));
        sendToUser(loginUser.getUsername(), receiver, WebSocketChannelEnum.CHAT.getSubscribeUrl(),
                JsonUtils.toJson(payload));
        return "ok";
    }

    private void sendToUser(String sender, String receiver, String destination, String payload) {
        SimpUser user = userRegistry.getUser(receiver);
        if (user != null && StringUtils.isNoneBlank(user.getName())) {
            messagingTemplate.convertAndSendToUser(receiver, destination, payload);
        } else if (redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, receiver)) {
            RedisWebsocketMsg
msg = new RedisWebsocketMsg<>(receiver, WebSocketChannelEnum.CHAT.getCode(), payload);
            redisService.convertAndSend(topicName, msg);
        } else {
            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;
            redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload);
        }
    }

    @PostMapping("/pullUnreadMessage")
    @ResponseBody
    public Map
pullUnreadMessage(String destination) {
        Map
result = new HashMap<>();
        User loginUser = (User) SpringContextUtils.getSession().getAttribute(Constants.SESSION_USER);
        String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;
        List
msgs = redisService.rangeList(listKey, 0, -1);
        result.put("code", "200");
        if (msgs != null && !msgs.isEmpty()) {
            redisService.delete(listKey);
            result.put("result", msgs);
        }
        return result;
    }
}

Step 5: WebSocket broker configuration

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Autowired private AuthHandshakeInterceptor authHandshakeInterceptor;
    @Autowired private MyHandshakeHandler myHandshakeHandler;
    @Autowired private MyChannelInterceptor myChannelInterceptor;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat-websocket")
                .addInterceptors(authHandshakeInterceptor)
                .setHandshakeHandler(myHandshakeHandler)
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/message");
        registry.enableSimpleBroker("/topic");
        registry.setUserDestinationPrefix("/user/");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(myChannelInterceptor);
    }
}

Step 6: Minimal HTML page for testing

<!DOCTYPE html>
<html>
<head>
  <meta charset="UTF-8"/>
  <title>Chat With STOMP Message</title>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
</head>
<body>
  <div id="connect-container">
    <input id="target" type="text" value="/chat-websocket"/>
    <button id="connect" onclick="connect();">Connect</button>
    <button id="disconnect" onclick="disconnect();" disabled>Disconnect</button>
    <input id="receiver" placeholder="接收者姓名"/>
    <input id="message" placeholder="消息内容"/>
    <button id="echo" onclick="sendMessage();" disabled>Send Message</button>
  </div>
  <script>
    var stompClient = null;
    function setConnected(connected) {
      $('#connect').prop('disabled', connected);
      $('#disconnect').prop('disabled', !connected);
      $('#echo').prop('disabled', !connected);
    }
    function connect() {
      var target = $('#target').val();
      var socket = new SockJS(target);
      stompClient = Stomp.over(socket);
      stompClient.connect({}, function () {
        setConnected(true);
        stompClient.subscribe('/user/topic/reply', function (resp) {
          console.log(JSON.parse(resp.body).content);
        });
        // pull unread messages after connection
        $.post('/wsTemplate/pullUnreadMessage', {destination:'/topic/reply'});
      }, function () { setConnected(false); });
    }
    function disconnect() { if (stompClient) { stompClient.disconnect(); } setConnected(false); }
    function sendMessage() {
      var receiver = $('#receiver').val();
      var msg = $('#message').val();
      $.post('/wsTemplate/sendToUser', {receiver:receiver, msg:msg});
    }
  </script>
</body>
</html>

The article concludes with a call‑to‑action encouraging readers to like, share, and subscribe to the author's knowledge‑sharing platform.

backend developmentRedisSpringKafkaWebSocketDistributed Messaging
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.