
承接上文:【如何构建商业级别聊天系统】 Netty 构建 WebSocket 服务 上篇
上两张效果图吧:
在构建 WebSocket 服务的上篇,我们介绍了几个核心的由 Netty 提供的处理 Http 以及升级 WebScoket 协议的 Handler
那么在建立好连接后后续的数据传输和处理就需要我们来编写自己的 Handler 来进行处理了,无论你是想讲消息入库还是分发都可以通过定制 Handler 并将其加入到 pipline 中来实现。
这里贴上上篇的代码方便大家查看
@Slf4j @Component public class WebsocketServer implements ApplicationListener自定义 Handler 处理连接和分发{ private static final int WEBSOCKET_IDLE_TIMEOUT = SystemPropertyUtil.getInt("star.websocket.idle_timeout", 180); private static final int WEBSOCKET_BACKLOG = SystemPropertyUtil.getInt("star.websocket.backlog", 200); private static final int WEBSOCKET_PORT = SystemPropertyUtil.getInt("star.websocket.port", 18680); private ServerBootstrap serverBootstrap; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private Channel serverChannel; @Resource private CommonConnectHandler commonConnectHandler; @Resource private DiscardServerHandler discardServerHandler; @Override public void onApplicationEvent(ContextRefreshedEvent event) { log.info("Starting WebSocketServer on port {} ", WEBSOCKET_PORT); long startMillis = System.currentTimeMillis(); bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("WebsocketBoss")); workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("WebsocketWorker")); serverBootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) //使用NioServerSocketChannel作为服务器的通道实现 .channel(NioServerSocketChannel.class) // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。 // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理 // 如果同一时刻超出队列上限则会拒绝连接 .option(ChannelOption.SO_BACKLOG, WEBSOCKET_BACKLOG) // 禁用客户端 TCP Nagle 算法拥塞控制,这里需要实时性,不等待阻塞低载荷数据包的发送 .childOption(ChannelOption.TCP_NODELAY, true) .handler(new LoggingHandler(LogLevel.TRACE)) .childHandler(new ChannelInitializer () { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(64 * 1024)); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new WebSocketServerProtocolHandler("/chatws")); pipeline.addLast(new TextWebSocketframeHandler()); pipeline.addLast(new MqttWebSocketCodec()); pipeline.addLast(commonConnectHandler); pipeline.addLast(discardServerHandler); } }); ChannelFuture channelFuture = serverBootstrap.bind(WEBSOCKET_PORT); channelFuture.syncUninterruptibly(); serverChannel = channelFuture.channel(); log.info("WebSocketServer started in {} ms", System.currentTimeMillis() - startMillis); } @PreDestroy public void close() { log.info("Closing WebSocketServer"); long startMillis = System.currentTimeMillis(); try { if (serverChannel != null) { serverChannel.close().syncUninterruptibly(); } } catch (Throwable e) { log.warn("serverChannel close error!", e); } try { if (serverBootstrap != null) { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } catch (Throwable e) { log.warn("close eventLoopGroup error!", e); } log.info("WebSocketServer closed in {} ms", System.currentTimeMillis() - startMillis); } }
pipeline.addLast(commonConnectHandler); pipeline.addLast(discardServerHandler);
本篇的重点在于上面这两个 Handler 的处理逻辑。
此时我们先实现一个简单的需求:在多个客户端连接上的 WebSocket 服务后可以进行消息的传递,将发送的消息发送给所有在线的客户端。相当于一个简单的聊天室
顺着这个需求来分析一下如何实现:
这个时候职责就很清晰了
commonConnectHandler 负责连接管理
discardServerHandler 负责数据分发 (当然这个名称可能起的不是很对,目前还是 Demo 就先这样吧哈哈)
CommonConnectHandler 连接管理import com.star.im.websocket.mgr.ConnectionManager;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@ChannelHandler.Sharable
public class CommonConnectHandler extends ChannelInboundHandlerAdapter {
@Resource
private ConnectionManager connectionManager;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 注册所有连接
connectionManager.registerConnection(ctx.channel());
ctx.fireChannelRead(msg);
}
}
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Component
public class ConnectionManager {
private final ConcurrentHashMap channelMap = new ConcurrentHashMap<>(10000);
private final Set connectionPool = Collections.synchronizedSet(new HashSet<>(10000));
private final static AtomicInteger ATOMIC_INTEGER = new AtomicInteger();
public boolean registerConnection(Channel conn) {
if (channelMap.containsKey(conn)) {
return true;
}
int num = ATOMIC_INTEGER.incrementAndGet();
NettyConnection nettyConnection = NettyConnection.builder().channel(conn).userName(num + " 号用户").cid(num).build();
connectionPool.add(nettyConnection);
channelMap.put(conn, nettyConnection);
log.info("registerConnection {}", nettyConnection);
return true;
}
public void removeConnection(NettyConnection channel) {
connectionPool.remove(channel);
channelMap.remove(channel.getChannel());
destroyConnection(channel);
}
private void destroyConnection(NettyConnection conn) {
log.info("destroyConnection {}", conn);
conn.getChannel().close();
}
public NettyConnection getConnection(Channel channel) {
return channelMap.get(channel);
}
public Set getConnectionPool() {
return connectionPool;
}
}
import io.netty.channel.Channel;
import lombok.*;
import java.io.Serializable;
@ToString(callSuper = true)
@EqualsAndHashCode()
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class NettyConnection implements Serializable {
private long cid;
private String userName;
private transient Channel channel;
}
对于连接管理,核心类主要为以上几个
NettyConnection 为存储连接的实体类。
ConnectionManager 为连接管理类用于管理生成 NettyConnection 。其中最重要的就是 Channel 这个类,最终的数据发送都是通过改类完成,同时这里简单地生成了一下 cid 来标识用户。
CommonConnectHandler 的作用就是处理每个进来的连接,并将它们注册到 ConnectionManager 中。
DiscardServerHandler 数据分发import com.star.im.websocket.mgr.ConnectionManager;
import com.star.im.websocket.mgr.NettyConnection;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.Set;
@Slf4j
@Component
@ChannelHandler.Sharable
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
@Resource
private ConnectionManager connectionManager;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof String) {
log.info("channelRead String, msg={}", msg);
System.out.println(msg);
String msgStr = String.valueOf(msg);
Set channelList = connectionManager.getConnectionPool();
Channel currentChannel = ctx.channel();
NettyConnection currentConnection = connectionManager.getConnection(currentChannel);
String userName = currentConnection.getUserName();
for (NettyConnection connection : channelList) {
String finalMsg = Objects.equals(connection, currentConnection) ? "你说:" + msgStr : userName + ":" + msgStr;
ByteBuf buf = Unpooled.copiedBuffer(finalMsg, CharsetUtil.UTF_8);
ChannelFuture channelFuture = connection.getChannel().writeAndFlush(buf);
channelFuture.addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
log.error(">>> String failure! ch=" + connection + ", finalMsg=" + finalMsg,
future.cause());
connectionManager.removeConnection(connection);
}
});
}
} else {
log.info("channelRead ctx={}, msg={}", ctx, msg);
ByteBuf in = (ByteBuf) msg;
StringBuilder stringBuilder = new StringBuilder();
while (in.isReadable()) {
char readByte = (char) in.readByte();
stringBuilder.append(readByte);
}
System.out.println(stringBuilder.toString());
ByteBuf buf = Unpooled.copiedBuffer(stringBuilder.toString() + " return ", CharsetUtil.UTF_8);
ctx.writeAndFlush(buf);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
这里主要的处理逻辑在第一块 if 当中,在本案例中我们主要处理 String 类型的消息,通过 ConnectionManager 获取所有在线的连接,在分发的时候文案区分自己和他人。
H5 页面以上就是服务端的内容了,这里给出一个测试用的 H5 页面,方便测试效果
最后Hello, world! 最后一条消息:{{newMessage}}
说点什么{{item}}
本篇向大家介绍了自己定制的 Handler 的处理机制,以及通常的连接管理和分发逻辑,整体具备一定的参考意义,后续将以此为基础进行优化迭代。