鍍金池/ 問(wèn)答/Java  網(wǎng)絡(luò)安全/ 請(qǐng)問(wèn)我該如何使用EventExecutorGroup在Netty中執(zhí)行相關(guān)的耗時(shí)

請(qǐng)問(wèn)我該如何使用EventExecutorGroup在Netty中執(zhí)行相關(guān)的耗時(shí)操作呢? 可以的話請(qǐng)?zhí)峁┮粋€(gè)列子

之前在寫(xiě)一個(gè)netty的服務(wù)端遇到了問(wèn)題,調(diào)查資料之后知道netty的耗時(shí)任務(wù)在EventExecutorGroup中執(zhí)行,我是ChannelInboundHandlerAdapter和EventExecutorGroup混合使用,也就是將ChannelInboundHandler放到EventExecutorGroup來(lái)執(zhí)行耗時(shí)任務(wù),主要是在ChannelInboundHandler的channelReadComplete中執(zhí)行任務(wù),但是還是出現(xiàn)了錯(cuò)誤,想要知道如何正確使用EventExecutorGroup執(zhí)行耗時(shí)任務(wù)

回答
編輯回答
假灑脫

請(qǐng)參考我這個(gè)答案。鏈接描述

補(bǔ)充代碼,代碼不是完整的,不過(guò)可以參考一下。

public class ChanHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = LoggerFactory.getLogger(ChanHandler.class);

    @Resource
    private BusinessService businessService;
    @Resource
    private ChannelManager channelManager;


    /**
     * 建立連接時(shí)
     * @param ctx
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {

        //...
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) {

        //...
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            String requestStr = (String)msg;
            if(ctx.executor().inEventLoop()){
                businessLogic(ctx,requestStr);
            }else {
                ctx.executor().execute(() -> businessLogic(ctx,requestStr));
            }

        }finally {
            ReferenceCountUtil.release(msg);
        }
    }

    /**
     * 處理業(yè)務(wù)邏輯
     * @param ctx
     * @param requestStr
     */
    private void businessLogic(ChannelHandlerContext ctx,String requestStr){
        Response res = null;
        try {
            JSONObject request = JSON.parseObject(requestStr);
            switch (request.getString("action")){
                case "dev_login":
                    res = businessService.dev_login(request,ctx);
                    break;
                case "ping":
                    res = new Response("pong",null);
                    break;
                case "msg":
                    res = businessService.processing_msg(request,ctx);
                    break;
                case "quit":
                    String key = ConnUtils.getKey(ctx);
                    channelManager.removeConnection(key);
            }
        }catch (Exception e){
            res = new Response("error",400,"無(wú)法解析的字符","");
        }finally {
            IOUtil.writeAndFlush(ctx,res);
        }
    }
    
    // ...
}
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

EventExecutorGroup logicGroup = new DefaultEventExecutorGroup(16);

try {

    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup,workerGroup);
    bootstrap.channel(NioServerSocketChannel.class);
    bootstrap.handler(new LoggingHandler(LogLevel.INFO));
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new LoggingHandler(LogLevel.INFO));

            ByteBuf byteBuf = Unpooled.copiedBuffer(Const.DELIMITER.getBytes());
            pipeline.addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));

            pipeline.addLast(new IdleStateHandler(readWaitSeconds, 0, 0));

            pipeline.addLast(new StringDecoder());
            pipeline.addLast(new StringEncoder());

            pipeline.addLast(logicGroup,chanHandler);
        }
    });

    bootstrap.option(ChannelOption.SO_BACKLOG, 128);
    bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

    ChannelFuture future = bootstrap.bind(port).sync();

    future.channel().closeFuture().sync();

} catch (InterruptedException e) {
    logger.error("Server error,{}",e);
} finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    logicGroup.shutdownGracefully();
}
2018年6月24日 15:55