鍍金池/ 問(wèn)答/Java  網(wǎng)絡(luò)安全/ netty服務(wù)端EventExecutorGroup業(yè)務(wù)線程組中多個(gè)不同的線程重

netty服務(wù)端EventExecutorGroup業(yè)務(wù)線程組中多個(gè)不同的線程重復(fù)處理同一個(gè)數(shù)據(jù)

最近我在使用java的netty框架(4.1.6)來(lái)編寫服務(wù)端的程序的時(shí)候遇到了一個(gè)麻煩的問(wèn)題,這個(gè)問(wèn)題一直困擾了我很久

我這邊netty服務(wù)端在接受到客戶端連接之后,會(huì)接受客戶端傳來(lái)的數(shù)據(jù),并將數(shù)據(jù)在 EventExecutorGroup提供的線程中進(jìn)行耗時(shí)業(yè)務(wù)處理,但是問(wèn)題來(lái)了,當(dāng)客戶端向我發(fā)送一個(gè)數(shù)據(jù)時(shí),比如發(fā)送的數(shù)據(jù)為A,有的時(shí)候EventExecutorGroup中會(huì)有多個(gè)線程(2~3)在不同的時(shí)間內(nèi)接受這個(gè)A數(shù)據(jù),并且進(jìn)行處理,這使得我的數(shù)據(jù)庫(kù)中總是會(huì)插入相同的數(shù)據(jù)

請(qǐng)問(wèn)各位大神我該如何解決這個(gè)問(wèn)題呢?

引導(dǎo)代碼

public class ChiconyServerBootstrap 
{    
    private int port;
    private Conf MyConf;
    public static String Path;
    
    public ChiconyServerBootstrap(String Path)
    {        
        try
        {
            //自定義配置文件類
            MyConf = new Conf(Path);    
            //得到監(jiān)聽(tīng)端口
            port = MyConf.getPort();
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
    }
    
    public void start()
    {
        ServerBootstrap serverbootstrap = new ServerBootstrap();
        NioEventLoopGroup connect_group = new NioEventLoopGroup();
        NioEventLoopGroup io_group = new NioEventLoopGroup();
        final EventExecutorGroup work_group = new DefaultEventExecutorGroup(10);
        
        try
        {
            serverbootstrap.group(connect_group, io_group);
            serverbootstrap.channel(NioServerSocketChannel.class);
            serverbootstrap.childHandler(new ChannelInitializer<SocketChannel>()
            {
                @Override
                public void initChannel(SocketChannel ch) throws Exception
                {
                    ch.pipeline().addLast(work_group, new WorkHandler());
                }
            });
            
            serverbootstrap.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60);
            serverbootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);        
            erbootstrap.bind(new InetSocketAddress(port)).sync();
            future.channel().closeFuture().sync();
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            connect_group.shutdownGracefully();
            io_group.shutdownGracefully();
            work_group.shutdownGracefully();
        }
    }
    
    public static void main(String[] args)
    {
        ChiconyServerBootstrap server;
        //得到配置文件
        Path = "./Conf/config.properties";
        server = new ChiconyServerBootstrap(Path);
        server.start();
    }
}

業(yè)務(wù)線程的handler

public class WorkHandler extends ChannelInboundHandlerAdapter 
{
    //一下代碼中省略了相關(guān)的參數(shù),log為一個(gè)自定義的日志類
    
    //遠(yuǎn)程主機(jī)的IP
    private String HostIP;
    //遠(yuǎn)程主句的端口
    private int HostPort;
    //保存主機(jī)返回的數(shù)據(jù)
    private byte[] HostRetData;
    //定義一個(gè)參數(shù)配置類
    private Conf myConf;
    //保存客戶端請(qǐng)求數(shù)據(jù)
    private byte[] PosRequestData;
    //一個(gè)buffer來(lái)保存POS傳遞到channelHandler的數(shù)據(jù)
    private ByteBuffer PosSendData_Buffer;
    //保存接受的全部的數(shù)據(jù)的長(zhǎng)度
    private int PosSendData_number;
    //定義一個(gè)參數(shù)用來(lái)保存數(shù)據(jù)庫(kù)的表明
    private String TableName;
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {        
        //得到參數(shù)配置類 
        myConf = new Conf(ChiconyServerBootstrap.Path);
        HostIP = myConf.getHostIP();
        HostPort = myConf.getHostPort();
        TableName = myConf.getTableName();
        RequestAH = myConf.getRequestAH();
        
        PosSendData_Buffer = ByteBuffer.allocate(2048);
        PosSendData_number = 0;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        byte[] tmp = (byte[])msg;
        
        //重新整合已經(jīng)取得的數(shù)據(jù),防止channelRead多次調(diào)用
        PosSendData_number += tmp.length;
        for(int i = 0; i < tmp.length; i++)
        {
            PosSendData_Buffer.put(tmp[i]);
        }
        
        ReferenceCountUtil.release(msg);
    }
    
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
    {                
        //將相關(guān)的數(shù)據(jù)進(jìn)行業(yè)務(wù)處理,并且發(fā)向三方服務(wù)器之后將返回?cái)?shù)據(jù)使用HostRetData保存
        
        //將返回的數(shù)據(jù)返回給客戶端
        if(HostRetData != null)
        {
            //將數(shù)據(jù)傳輸?shù)较乱粋€(gè)ChannelOutboundHandler的緩存中
            ByteBuf encoded = ctx.alloc().buffer(HostRetData.length);
            encoded.writeBytes(HostRetData);
            ctx.writeAndFlush(encoded);
            ctx.close();
        }
        else
        {
            ctx.close();
        }
        
        //對(duì)返回的數(shù)據(jù)進(jìn)行處理并且進(jìn)行數(shù)據(jù)庫(kù)插入操作,插入使用的原生的JDBC,數(shù)據(jù)庫(kù)使用mysql
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
    {    
        cause.printStackTrace(); 
    }
}
回答
編輯回答
情已空

請(qǐng)問(wèn)最后解決了嗎?

2018年1月9日 07:19