Skip to content

java.lang.IndexOutOfBoundsException: PooledUnsafeDirectByteBuf #15

@testwen00

Description

@testwen00

2台pc测试,一台运行KcpRttExampleServer

    KcpRttExampleServer kcpRttExampleServer = new KcpRttExampleServer();
    ChannelConfig channelConfig = new ChannelConfig();
    channelConfig.nodelay(true,30,2,true);
    channelConfig.setSndwnd(1024);
    channelConfig.setRcvwnd(1024);
    channelConfig.setMtu(1400);
    channelConfig.setFecDataShardCount(10);
    channelConfig.setFecParityShardCount(3);
    channelConfig.setAckNoDelay(true);
    channelConfig.setTimeoutMillis(5000);
    channelConfig.setUseConvChannel(true);
    channelConfig.setCrc32Check(false);
    KcpServer kcpServer = new KcpServer();
    kcpServer.init(kcpRttExampleServer,channelConfig,20003);

一台运行KcpRttExampleClient

   public KcpRttExampleClient() {
    data = Unpooled.buffer(220000);
    for (int i = 0; i < data.capacity(); i++) {
        data.writeByte((byte) i);
    }

    rtts = new int[30000];
    for (int i = 0; i < rtts.length; i++) {
        rtts[i] = -1;
    }
    startTime = System.currentTimeMillis();
    scheduleSrv = new ScheduledThreadPoolExecutor(1);
}

public static void main(String[] args) {
    ChannelConfig channelConfig = new ChannelConfig();
    channelConfig.nodelay(true,30,2,true);
    channelConfig.setSndwnd(1024);
    channelConfig.setRcvwnd(1024);
    channelConfig.setMtu(1400);
    channelConfig.setAckNoDelay(true);
    channelConfig.setConv(55);
    channelConfig.setFecDataShardCount(10);
    channelConfig.setFecParityShardCount(3);
    channelConfig.setCrc32Check(false);
    KcpClient kcpClient = new KcpClient();
    kcpClient.init(channelConfig);

    KcpRttExampleClient kcpClientRttExample = new KcpRttExampleClient();
    kcpClient.connect(new InetSocketAddress("192.168.2.180",20003),channelConfig,kcpClientRttExample);

}

@Override
public void onConnected(Ukcp ukcp) {
    future = scheduleSrv.scheduleWithFixedDelay(() -> {
        ByteBuf byteBuf = rttMsg(++count);
        ukcp.write(byteBuf);
        byteBuf.release();
        if (count >= rtts.length) {
            // finish
            future.cancel(true);
            byteBuf = rttMsg(-1);
            ukcp.write(byteBuf);
            byteBuf.release();

        }
    }, 20, 20, TimeUnit.MILLISECONDS);
}

@Override
public void handleReceive(ByteBuf byteBuf, Ukcp ukcp) {
    int curCount = byteBuf.readShort();

    if (curCount == -1) {
        scheduleSrv.schedule(new Runnable() {
            @Override
            public void run() {
                int sum = 0;
                for (int rtt : rtts) {
                    sum += rtt;
                }
                System.out.println("average: "+ (sum / rtts.length));
                System.out.println(Snmp.snmp.toString());
                ukcp.close();
                //ukcp.setTimeoutMillis(System.currentTimeMillis());
                System.exit(0);
            }
        }, 3, TimeUnit.SECONDS);
    } else {
        int idx = curCount - 1;
        long time = byteBuf.readInt();
        if (rtts[idx] != -1) {
            System.out.println("???");
        }
        //log.info("rcv count {} {}", curCount, System.currentTimeMillis());
        rtts[idx] = (int) (System.currentTimeMillis() - startTime - time);
        System.out.println("rtt : "+ curCount+"  "+ rtts[idx]);
    }
}

@Override
public void handleException(Throwable ex, Ukcp kcp)
{
    ex.printStackTrace();
}

@Override
public void handleClose(Ukcp kcp) {
    scheduleSrv.shutdown();
    try {
        scheduleSrv.awaitTermination(3, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    int sum = 0;
    int max = 0;
    for (int rtt : rtts) {
        if(rtt>max){
            max = rtt;
        }
        sum += rtt;
    }
    System.out.println("average: "+ (sum / rtts.length)+" max:"+max);
    System.out.println(Snmp.snmp.toString());
    System.out.println("lost percent: "+(Snmp.snmp.RetransSegs.doubleValue()/Snmp.snmp.OutPkts.doubleValue()));


}


/**
 * count+timestamp+dataLen+data
 *
 * @param count
 * @return
 */
public ByteBuf rttMsg(int count) {
    ByteBuf buf = Unpooled.buffer(220000);
    buf.writeShort(count);
    buf.writeInt((int) (System.currentTimeMillis() - startTime));

    //int dataLen = new Random().nextInt(200);
    //buf.writeBytes(new byte[dataLen]);

    int dataLen = data.readableBytes();
    buf.writeShort(dataLen);
    buf.writeBytes(data, data.readerIndex(), dataLen);

    return buf;
}

KcpRttExampleServer 报错:

bytebuf长度: 1394 读出长度26846
[104] [-34] [56] [0] [0] [0] [-127] [75] [0] [4] [20]。。。。。
Snmp{BytesSent=0, BytesReceived=0, MaxConn=0, ActiveOpens=0, PassiveOpens=0, CurrEstab=0, InErrs=0, InCsumErrors=0, KCPInErrors=0, 收到包=12071, 发送包=15022, InSegs=9058, OutSegs=11557, 收到字节=7091032, 发送字节=14605712, 总共重发数=6269, 快速重发数=0, 空闲快速重发数=0, 超时重发数=6269, 收到重复包数量=1687, fec恢复数=16, fec恢复错误数=1, 收到fecData数=9050, 收到fecParity数=3021, fec缓存冗余淘汰data包数=2085, fec收到重复的数据包=0}

java.lang.IndexOutOfBoundsException: PooledUnsafeDirectByteBuf(ridx: 2, widx: 1394, cap: 1394).slice(2, 26846)
at io.netty.buffer.AbstractUnpooledSlicedByteBuf.checkSliceOutOfBounds(AbstractUnpooledSlicedByteBuf.java:474)
at io.netty.buffer.AbstractUnpooledSlicedByteBuf.<init>(AbstractUnpooledSlicedByteBuf.java:38)
at io.netty.buffer.UnpooledSlicedByteBuf.<init>(UnpooledSlicedByteBuf.java:24)
at io.netty.buffer.AbstractByteBuf.slice(AbstractByteBuf.java:1223)
at com.backblaze.erasure.fec.FecDecode.decode(FecDecode.java:211)
at kcp.Ukcp.input(Ukcp.java:135)
at kcp.ReadTask.execute(ReadTask.java:60)
at threadPool.netty.NettyMessageExecutor.lambda$execute$0(NettyMessageExecutor.java:32)
at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions