diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index 16ae226613..9d73d164fd 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -613,50 +613,68 @@ class EventPumper implements Runnable { */ private void processRead(SelectionKey key) { NTCPConnection con = (NTCPConnection)key.attachment(); - ByteBuffer buf = acquireBuf(); + ByteBuffer buf = null; try { - int read = con.getChannel().read(buf); - if (read < 0) { - if (con.isInbound() && con.getMessagesReceived() <= 0) { - InetAddress addr = con.getChannel().socket().getInetAddress(); - int count; - if (addr != null) { - byte[] ip = addr.getAddress(); - ByteArray ba = new ByteArray(ip); - count = _blockedIPs.increment(ba); - if (_log.shouldLog(Log.WARN)) - _log.warn("EOF on inbound before receiving any, blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con); + while (true) { + buf = acquireBuf(); + int read = 0; + int readThisTime; + int readCount = 0; + while ((readThisTime = con.getChannel().read(buf)) > 0) { + read += readThisTime; + readCount++; + } + if (readThisTime < 0 && read == 0) + read = readThisTime; + if (_log.shouldDebug()) + _log.debug("Read " + read + " bytes total in " + readCount + " times from " + con); + if (read < 0) { + if (con.isInbound() && con.getMessagesReceived() <= 0) { + InetAddress addr = con.getChannel().socket().getInetAddress(); + int count; + if (addr != null) { + byte[] ip = addr.getAddress(); + ByteArray ba = new ByteArray(ip); + count = _blockedIPs.increment(ba); + if (_log.shouldLog(Log.WARN)) + _log.warn("EOF on inbound before receiving any, blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con); + } else { + count = 1; + if (_log.shouldLog(Log.WARN)) + _log.warn("EOF on inbound before receiving any: " + con); + } + _context.statManager().addRateData("ntcp.dropInboundNoMessage", count); } else { - count = 1; - if (_log.shouldLog(Log.WARN)) - _log.warn("EOF on inbound before receiving any: " + con); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("EOF on " + con); } - _context.statManager().addRateData("ntcp.dropInboundNoMessage", count); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("EOF on " + con); - } - con.close(); - releaseBuf(buf); - } else if (read == 0) { - // stay interested - //key.interestOps(key.interestOps() | SelectionKey.OP_READ); - releaseBuf(buf); - // workaround for channel stuck returning 0 all the time, causing 100% CPU - int consec = con.gotZeroRead(); - if (consec >= 5) { - _context.statManager().addRateData("ntcp.zeroReadDrop", 1); - if (_log.shouldLog(Log.WARN)) - _log.warn("Fail safe zero read close " + con); con.close(); - } else { - _context.statManager().addRateData("ntcp.zeroRead", consec); - if (_log.shouldLog(Log.INFO)) - _log.info("nothing to read for " + con + ", but stay interested"); + releaseBuf(buf); + break; } - } else { + if (read == 0) { + // stay interested + //key.interestOps(key.interestOps() | SelectionKey.OP_READ); + releaseBuf(buf); + // workaround for channel stuck returning 0 all the time, causing 100% CPU + int consec = con.gotZeroRead(); + if (consec >= 5) { + _context.statManager().addRateData("ntcp.zeroReadDrop", 1); + if (_log.shouldLog(Log.WARN)) + _log.warn("Fail safe zero read close " + con); + con.close(); + } else { + _context.statManager().addRateData("ntcp.zeroRead", consec); + if (_log.shouldLog(Log.INFO)) + _log.info("nothing to read for " + con + ", but stay interested"); + } + break; + } + // Process the data received // clear counter for workaround above con.clearZeroRead(); + // go around again if we filled the buffer (so we can read more) + boolean keepReading = !buf.hasRemaining(); // ZERO COPY. The buffer will be returned in Reader.processRead() buf.flip(); FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf); @@ -665,21 +683,31 @@ class EventPumper implements Runnable { key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); _context.statManager().addRateData("ntcp.queuedRecv", read); con.queuedRecv(buf, req); + break; } else { // stay interested //key.interestOps(key.interestOps() | SelectionKey.OP_READ); con.recv(buf); _context.statManager().addRateData("ntcp.read", read); + if (readThisTime < 0) { + // EOF, we're done + con.close(); + break; + } + if (!keepReading) + break; } - } + } // while true } catch (CancelledKeyException cke) { - releaseBuf(buf); + if (buf != null) + releaseBuf(buf); if (_log.shouldLog(Log.WARN)) _log.warn("error reading on " + con, cke); con.close(); _context.statManager().addRateData("ntcp.readError", 1); } catch (IOException ioe) { // common, esp. at outbound connect time - releaseBuf(buf); + if (buf != null) + releaseBuf(buf); if (con.isInbound() && con.getMessagesReceived() <= 0) { InetAddress addr = con.getChannel().socket().getInetAddress(); int count; @@ -712,7 +740,8 @@ class EventPumper implements Runnable { } con.close(); } catch (NotYetConnectedException nyce) { - releaseBuf(buf); + if (buf != null) + releaseBuf(buf); // ??? key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); if (_log.shouldLog(Log.WARN))