Backend Development 18 min read

Implementing Distributed WebSocket Messaging with Spring Cloud Alibaba, Redis, and Kafka

This article explains how to solve cross‑server WebSocket communication in a distributed Java application by using a message queue (Redis or Kafka) to broadcast messages, storing user‑node mappings, and providing complete Spring configuration, code examples, and a demo page for end‑to‑end testing.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
Implementing Distributed WebSocket Messaging with Spring Cloud Alibaba, Redis, and Kafka

Hello everyone, I am Chen. My video tutorial series "Spring Cloud Alibaba Practical Project" is now complete, covering Alibaba middleware, OAuth2 microservice authentication, gray release, and distributed transactions.

Last week a friend in the Knowledge Sphere asked about a WebSocket scenario: in a distributed deployment behind Nginx, a user connected to one server needs to send a message to another user whose WebSocket connection resides on a different server.

To address this, we need a distributed WebSocket solution, which can be achieved by either of two approaches:

Publish messages (userId, content) to a message‑queue topic (e.g., Redis or Kafka). Each node subscribes to the topic, extracts the receiver’s user ID, checks if the corresponding connection exists locally, and pushes the message if present.

After a user establishes a WebSocket connection, record the node information in Redis. When sending a message, look up the target node and push the message directly to that node via the queue, reducing network traffic.

Implementation (Approach 1)

The following sections detail the concrete implementation.

Source code is uploaded to GitHub. Follow the public account "Java Backend Interviewer" and reply with keyword "8529" to obtain it.

1. Define a WebSocketChannelEnum

public enum WebSocketChannelEnum {
    // Simple point‑to‑point chat for testing
    CHAT("CHAT", "Simple chat for testing", "/topic/reply");

    WebSocketChannelEnum(String code, String description, String subscribeUrl) {
        this.code = code;
        this.description = description;
        this.subscribeUrl = subscribeUrl;
    }

    private String code;
    private String description;
    private String subscribeUrl;

    public String getCode() { return code; }
    public String getDescription() { return description; }
    public String getSubscribeUrl() { return subscribeUrl; }

    public static WebSocketChannelEnum fromCode(String code) {
        if (StringUtils.isNoneBlank(code)) {
            for (WebSocketChannelEnum e : values()) {
                if (e.code.equals(code)) {
                    return e;
                }
            }
        }
        return null;
    }
}

2. Configure Redis as a Message Queue

In medium‑to‑large production projects, Redis is not recommended as a reliable queue; consider Kafka or RabbitMQ instead.

@Configuration
@ConditionalOnClass({JedisCluster.class})
public class RedisConfig {
    @Value("${spring.redis.timeout}")
    private String timeOut;
    @Value("${spring.redis.cluster.nodes}")
    private String nodes;
    @Value("${spring.redis.cluster.max-redirects}")
    private int maxRedirects;
    @Value("${spring.redis.jedis.pool.max-active}")
    private int maxActive;
    @Value("${spring.redis.jedis.pool.max-wait}")
    private int maxWait;
    @Value("${spring.redis.jedis.pool.max-idle}")
    private int maxIdle;
    @Value("${spring.redis.jedis.pool.min-idle}")
    private int minIdle;
    @Value("${spring.redis.message.topic-name}")
    private String topicName;

    @Bean
    public JedisPoolConfig jedisPoolConfig() {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(maxActive);
        config.setMaxIdle(maxIdle);
        config.setMinIdle(minIdle);
        config.setMaxWaitMillis(maxWait);
        return config;
    }

    @Bean
    public RedisClusterConfiguration redisClusterConfiguration() {
        RedisClusterConfiguration configuration = new RedisClusterConfiguration(Arrays.asList(nodes));
        configuration.setMaxRedirects(maxRedirects);
        return configuration;
    }

    @Bean
    public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration, JedisPoolConfig jedisPoolConfig) {
        return new JedisConnectionFactory(configuration, jedisPoolConfig);
    }

    @Bean
    public Jackson2JsonRedisSerializer
jackson2JsonRedisSerializer() {
        Jackson2JsonRedisSerializer
serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        serializer.setObjectMapper(mapper);
        return serializer;
    }

    @Bean
    public RedisTemplate
redisTemplate(JedisConnectionFactory factory, Jackson2JsonRedisSerializer
serializer) {
        RedisTemplate
template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        StringRedisSerializer stringSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringSerializer);
        template.setHashKeySerializer(stringSerializer);
        template.setValueSerializer(serializer);
        template.setHashValueSerializer(serializer);
        template.afterPropertiesSet();
        return template;
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter(MessageReceiver receiver, Jackson2JsonRedisSerializer
serializer) {
        MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "receiveMessage");
        adapter.setSerializer(serializer);
        return adapter;
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic(topicName));
        return container;
    }
}

Configuration snippet (application.yml/YAML):

spring:
  redis:
    cluster:
      nodes: namenode22:6379,datanode23:6379,datanode24:6379
      max-redirects: 6
    timeout: 300000
    jedis:
      pool:
        max-active: 8
        max-wait: 100000
        max-idle: 8
        min-idle: 0
    message:
      topic-name: topic-test

3. Define a Redis Message Receiver

@Component
public class MessageReceiver {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private SimpUserRegistry userRegistry;

    /** Process WebSocket messages */
    public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) {
        logger.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg));
        SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());
        if (simpUser != null && StringUtils.isNoneBlank(simpUser.getName())) {
            WebSocketChannelEnum channel = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
            if (channel != null) {
                messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channel.getSubscribeUrl(), redisWebsocketMsg.getContent());
            }
        }
    }
}

4. Controller for Sending WebSocket Messages

@Controller
@RequestMapping("/wsTemplate")
public class RedisMessageController {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Value("${spring.redis.message.topic-name}")
    private String topicName;

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private SimpUserRegistry userRegistry;

    @Resource(name = "redisServiceImpl")
    private RedisService redisService;

    /** Send a message to a specific user */
    @PostMapping("/sendToUser")
    @ResponseBody
    public String chat(HttpServletRequest request) {
        String receiver = request.getParameter("receiver");
        String msg = request.getParameter("msg");
        HttpSession session = SpringContextUtils.getSession();
        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
        HelloMessage data = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg));
        this.sendToUser(loginUser.getUsername(), receiver, WebSocketChannelEnum.CHAT.getSubscribeUrl(), JsonUtils.toJson(data));
        return "ok";
    }

    /** Core logic for sending a message and handling offline users */
    private void sendToUser(String sender, String receiver, String destination, String payload) {
        SimpUser simpUser = userRegistry.getUser(receiver);
        if (simpUser != null && StringUtils.isNoneBlank(simpUser.getName())) {
            messagingTemplate.convertAndSendToUser(receiver, destination, payload);
        } else if (redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, receiver)) {
            RedisWebsocketMsg
msg = new RedisWebsocketMsg<>(receiver, WebSocketChannelEnum.CHAT.getCode(), payload);
            redisService.convertAndSend(topicName, msg);
        } else {
            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;
            logger.info(MessageFormat.format("User {0} is offline, storing message {1} into Redis list {2}", receiver, payload, listKey));
            redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload);
        }
    }

    /** Pull unread messages for a specific destination */
    @PostMapping("/pullUnreadMessage")
    @ResponseBody
    public Map
pullUnreadMessage(String destination) {
        Map
result = new HashMap<>();
        try {
            HttpSession session = SpringContextUtils.getSession();
            User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;
            List
msgs = redisService.rangeList(listKey, 0, -1);
            result.put("code", "200");
            if (msgs != null && !msgs.isEmpty()) {
                redisService.delete(listKey);
                result.put("result", msgs);
            }
        } catch (Exception e) {
            result.put("code", "500");
            result.put("msg", e.getMessage());
        }
        return result;
    }
}

5. WebSocket Configuration

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Autowired
    private AuthHandshakeInterceptor authHandshakeInterceptor;
    @Autowired
    private MyHandshakeHandler myHandshakeHandler;
    @Autowired
    private MyChannelInterceptor myChannelInterceptor;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat-websocket")
                .addInterceptors(authHandshakeInterceptor)
                .setHandshakeHandler(myHandshakeHandler)
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/message");
        registry.enableSimpleBroker("/topic");
        registry.setUserDestinationPrefix("/user/");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(myChannelInterceptor);
    }
}

6. Demo Page

<head>
    ... (HTML head with meta tags, CSS/JS includes) ...
    <script type="text/javascript">
        var stompClient = null;
        $(function(){
            var target = $("#target");
            if (window.location.protocol === 'http:') {
                target.val('http://' + window.location.host + target.val());
            } else {
                target.val('https://' + window.location.host + target.val());
            }
        });
        function setConnected(connected){
            var connect = $("#connect");
            var disconnect = $("#disconnect");
            var echo = $("#echo");
            if (connected){
                connect.addClass("layui-btn-disabled");
                disconnect.removeClass("layui-btn-disabled");
                echo.removeClass("layui-btn-disabled");
            } else {
                connect.removeClass("layui-btn-disabled");
                disconnect.addClass("layui-btn-disabled");
                echo.addClass("layui-btn-disabled");
            }
            connect.attr("disabled", connected);
            disconnect.attr("disabled", !connected);
            echo.attr("disabled", !connected);
        }
        function connect(){
            var target = $("#target").val();
            var ws = new SockJS(target);
            stompClient = Stomp.over(ws);
            stompClient.connect({}, function(){
                setConnected(true);
                log('Info: STOMP connection opened.');
                pullUnreadMessage("/topic/reply");
                stompClient.subscribe("/user/topic/reply", function(response){
                    log(JSON.parse(response.body).content);
                });
            }, function(){
                setConnected(false);
                log('Info: STOMP connection closed.');
            });
        }
        function disconnect(){
            if (stompClient != null){
                stompClient.disconnect();
                stompClient = null;
            }
            setConnected(false);
            log('Info: STOMP connection closed.');
        }
        function sendMessage(){
            if (stompClient != null){
                var receiver = $("#receiver").val();
                var msg = $("#message").val();
                log('Sent: ' + JSON.stringify({receiver:receiver, msg:msg}));
                $.ajax({
                    url: "/wsTemplate/sendToUser",
                    type: "POST",
                    dataType: "json",
                    async: true,
                    data: {receiver:receiver, msg:msg}
                });
            } else {
                layer.msg('STOMP connection not established, please connect.', {offset:'auto', icon:2});
            }
        }
        function pullUnreadMessage(destination){
            $.ajax({
                url: "/wsTemplate/pullUnreadMessage",
                type: "POST",
                dataType: "json",
                async: true,
                data: {destination:destination},
                success: function(data){
                    if (data.result != null){
                        $.each(data.result, function(i,item){
                            log(JSON.parse(item).content);
                        });
                    } else if (data.code != null && data.code == "500") {
                        layer.msg(data.msg, {offset:'auto', icon:2});
                    }
                }
            });
        }
        function log(message){
            console.debug(message);
        }
    </script>
</head>
<body>
    ... (HTML body with input fields for target, receiver, message and buttons for connect, disconnect, send) ...
</body>

The article also contains promotional sections inviting readers to join the "Knowledge Sphere" for additional video tutorials, download PDFs of Spring Cloud, Spring Boot, and MyBatis advanced courses, and to like, share, and follow the author.

backendjavaRedisSpringKafkaWebSocketDistributed Messaging
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.