栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 软件开发 > 后端开发 > Java

netty 实现消息推送

Java 更新时间:发布时间: 百科书网 趣学号

netty 实现消息推送

           

               

                              

基本介绍

         

                      

            

registry:注册中心,存储receiver的host、port

receiver:消息消费,将host、port数据推送到注册中心

sender:消息发送者,从注册中心获取receiver元数据,然后发送消息

         

Message:发送的消息

@Data
public class Message implements Serializable {

    private String topic;
    private Object content;
}

        

RegistryProtocol:receiver向注册中心注册的数据

@Data
public class RegistryProtocol implements Serializable {

    private String consumerHost;
    private Integer consumerPort;
}

           

RegistryInfo:注册中心向sender返回的数据

@Data
public class RegistryInfo implements Serializable {

    private String host;
    private Integer port;
}

                

Person:消息内容

@Data
public class Person implements Serializable {

    private String name;
    private Integer age;
}

       

Product:消息内容

@Data
public class Product implements Serializable {

    private String name;
    private Float price;
    private Integer num;
}

          

         

                              

注册中心

    

RrgistryCache:存储receiver的host、port

public class RegistryCache {

    public static String consumerHost;
    public static Integer consumerPort;
}

         

RegistryRegisterInfosHandler:处理receiver元数据注册

public class RegistryRegisterInfosHandler extends SimpleChannelInboundHandler {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RegistryProtocol registryProtocol) throws Exception {
        if (registryProtocol != null){
            RegistryCache.consumerHost = registryProtocol.getConsumerHost();
            RegistryCache.consumerPort = registryProtocol.getConsumerPort();
        }

        System.out.println("注册中心注册信息:"+ RegistryCache.consumerPort + " ==> " + RegistryCache.consumerPort);
    }
}

            

RegistryFetchInfosHandler:处理sender元数据拉取

public class RegistryFetchInfosHandler extends SimpleChannelInboundHandler {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        if ("fetch registry infos".equalsIgnoreCase(s)){
            RegistryInfo registryInfo = new RegistryInfo();
            registryInfo.setHost(RegistryCache.consumerHost);
            registryInfo.setPort(RegistryCache.consumerPort);
            channelHandlerContext.channel().writeAndFlush(registryInfo);

            System.out.println("sender到注册中心获取注册信息:"+registryInfo);
        }
    }
}

          

RegistryServer:注册中心服务器,接收receiver元数据注册、sender元数据拉取

public class RegistryServer {

    public static void startServer(int port){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline channelPipeline = socketChannel.pipeline();
                            channelPipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(RegistryServer.class.getClassLoader())));
                            channelPipeline.addLast(new ObjectEncoder());
                            channelPipeline.addLast(new RegistryRegisterInfosHandler());
                            channelPipeline.addLast(new RegistryFetchInfosHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        startServer(8000);
    }
}

          

              

                              

消息消费端

   

ReceiverRegistryHandler:消费端元数据注册

public class ReceiverRegistryHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        RegistryProtocol registryProtocol = new RegistryProtocol();
        registryProtocol.setConsumerHost("localhost");
        registryProtocol.setConsumerPort(8001);

        ctx.channel().writeAndFlush(registryProtocol);
    }
}

          

ReceiverRegistryClient:消费端元数据注册客户端

public class ReceiverRegistryClient {

    public static void register(String host, int port){
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline channelPipeline = socketChannel.pipeline();
                            channelPipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
                            channelPipeline.addLast(new ObjectEncoder());
                            channelPipeline.addLast(new ReceiverRegistryHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        register("localhost",8000);
    }
}

          

ReceiverMessageHandler:消费端消费消息

public class ReceiverMessageHandler extends SimpleChannelInboundHandler {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {
        System.out.println("消费端获取到消息:"+message);
    }
}

          

ReceiverMessageServer:消费端消息消费服务器

public class ReceiverMessageServer {

    public static void consumeMessage(){
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(eventLoopGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline channelPipeline = socketChannel.pipeline();
                            channelPipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(ReceiverMessageServer.class.getClassLoader())));
                            channelPipeline.addLast(new ObjectEncoder());
                            channelPipeline.addLast(new ReceiverMessageHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(8001).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        consumeMessage();
    }
}

         

             

                              

消息发送端

  

FetchRegistryHandler:从注册中心拉取数据

public class FetchRegistryInfosHandler extends SimpleChannelInboundHandler {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().writeAndFlush("fetch registry infos");
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RegistryInfo registryInfo) throws Exception {
        System.out.println("sender读取到注册信息:"+registryInfo);
        SenderClient.consumerHost = registryInfo.getHost();
        SenderClient.consumerPort = registryInfo.getPort();

        channelHandlerContext.channel().close();
    }
}

             

SenderMessageHandler:消息发送端消息发送

public class SenderMessageHandler extends ChannelInboundHandlerAdapter {

    private final Message message;

    public SenderMessageHandler(Message message){
        this.message = message;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("sender发送消息:" + message);
        ctx.channel().writeAndFlush(message).addListener(ChannelFutureListener.CLOSE);
    }
}

          

SenderClient:消息发送客户端

public class SenderClient {

    public static String consumerHost;
    public static Integer consumerPort;

    private static void fetchInfos(){
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline channelPipeline = socketChannel.pipeline();
                            channelPipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(SenderClient.class.getClassLoader())));
                            channelPipeline.addLast(new ObjectEncoder());
                            channelPipeline.addLast(new FetchRegistryInfosHandler());
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect("localhost",8000).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

    public static void sendMessage(Message message){
        if (consumerHost == null || consumerPort ==null){
            fetchInfos();
        }

        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline channelPipeline = socketChannel.pipeline();
                            channelPipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(SenderClient.class.getClassLoader())));
                            channelPipeline.addLast(new ObjectEncoder());
                            channelPipeline.addLast(new SenderMessageHandler(message));
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect(consumerHost,consumerPort).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        Message message = new Message();
        message.setTopic("person");

        Person person = new Person();
        person.setName("瓜田李下");
        person.setAge(20);
        message.setContent(person);
        sendMessage(message);


        Message message2 = new Message();
        message.setTopic("product");

        Product product = new Product();
        product.setName("苹果");
        product.setPrice(10f);
        product.setNum(10);

        message2.setContent(product);
        sendMessage(message2);
    }
}

        

             

                              

使用测试

        

启动注册中心、receiver注册、消费、sender客户端,receiver控制台输出:

17:10:12.827 [nioEventLoopGroup-2-3] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
17:10:12.827 [nioEventLoopGroup-2-3] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.chunkSize: 32
17:10:12.827 [nioEventLoopGroup-2-3] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.blocking: false
17:10:12.840 [nioEventLoopGroup-2-3] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
17:10:12.840 [nioEventLoopGroup-2-3] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
17:10:12.842 [nioEventLoopGroup-2-3] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@3e899371
消费端获取到消息:Message(topic=person, content=Person(name=瓜田李下, age=20))
消费端获取到消息:Message(topic=null, content=Product(name=苹果, price=10.0, num=10))

        

                  

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/959682.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号