NTCP: Read all available data when able (ticket #2243)

This commit is contained in:
zzz
2018-07-06 13:31:46 +00:00
parent 1460bec8cf
commit a916c1a22a

View File

@ -613,50 +613,68 @@ class EventPumper implements Runnable {
*/
private void processRead(SelectionKey key) {
NTCPConnection con = (NTCPConnection)key.attachment();
ByteBuffer buf = acquireBuf();
ByteBuffer buf = null;
try {
int read = con.getChannel().read(buf);
if (read < 0) {
if (con.isInbound() && con.getMessagesReceived() <= 0) {
InetAddress addr = con.getChannel().socket().getInetAddress();
int count;
if (addr != null) {
byte[] ip = addr.getAddress();
ByteArray ba = new ByteArray(ip);
count = _blockedIPs.increment(ba);
if (_log.shouldLog(Log.WARN))
_log.warn("EOF on inbound before receiving any, blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
while (true) {
buf = acquireBuf();
int read = 0;
int readThisTime;
int readCount = 0;
while ((readThisTime = con.getChannel().read(buf)) > 0) {
read += readThisTime;
readCount++;
}
if (readThisTime < 0 && read == 0)
read = readThisTime;
if (_log.shouldDebug())
_log.debug("Read " + read + " bytes total in " + readCount + " times from " + con);
if (read < 0) {
if (con.isInbound() && con.getMessagesReceived() <= 0) {
InetAddress addr = con.getChannel().socket().getInetAddress();
int count;
if (addr != null) {
byte[] ip = addr.getAddress();
ByteArray ba = new ByteArray(ip);
count = _blockedIPs.increment(ba);
if (_log.shouldLog(Log.WARN))
_log.warn("EOF on inbound before receiving any, blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
} else {
count = 1;
if (_log.shouldLog(Log.WARN))
_log.warn("EOF on inbound before receiving any: " + con);
}
_context.statManager().addRateData("ntcp.dropInboundNoMessage", count);
} else {
count = 1;
if (_log.shouldLog(Log.WARN))
_log.warn("EOF on inbound before receiving any: " + con);
if (_log.shouldLog(Log.DEBUG))
_log.debug("EOF on " + con);
}
_context.statManager().addRateData("ntcp.dropInboundNoMessage", count);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("EOF on " + con);
}
con.close();
releaseBuf(buf);
} else if (read == 0) {
// stay interested
//key.interestOps(key.interestOps() | SelectionKey.OP_READ);
releaseBuf(buf);
// workaround for channel stuck returning 0 all the time, causing 100% CPU
int consec = con.gotZeroRead();
if (consec >= 5) {
_context.statManager().addRateData("ntcp.zeroReadDrop", 1);
if (_log.shouldLog(Log.WARN))
_log.warn("Fail safe zero read close " + con);
con.close();
} else {
_context.statManager().addRateData("ntcp.zeroRead", consec);
if (_log.shouldLog(Log.INFO))
_log.info("nothing to read for " + con + ", but stay interested");
releaseBuf(buf);
break;
}
} else {
if (read == 0) {
// stay interested
//key.interestOps(key.interestOps() | SelectionKey.OP_READ);
releaseBuf(buf);
// workaround for channel stuck returning 0 all the time, causing 100% CPU
int consec = con.gotZeroRead();
if (consec >= 5) {
_context.statManager().addRateData("ntcp.zeroReadDrop", 1);
if (_log.shouldLog(Log.WARN))
_log.warn("Fail safe zero read close " + con);
con.close();
} else {
_context.statManager().addRateData("ntcp.zeroRead", consec);
if (_log.shouldLog(Log.INFO))
_log.info("nothing to read for " + con + ", but stay interested");
}
break;
}
// Process the data received
// clear counter for workaround above
con.clearZeroRead();
// go around again if we filled the buffer (so we can read more)
boolean keepReading = !buf.hasRemaining();
// ZERO COPY. The buffer will be returned in Reader.processRead()
buf.flip();
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf);
@ -665,21 +683,31 @@ class EventPumper implements Runnable {
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
_context.statManager().addRateData("ntcp.queuedRecv", read);
con.queuedRecv(buf, req);
break;
} else {
// stay interested
//key.interestOps(key.interestOps() | SelectionKey.OP_READ);
con.recv(buf);
_context.statManager().addRateData("ntcp.read", read);
if (readThisTime < 0) {
// EOF, we're done
con.close();
break;
}
if (!keepReading)
break;
}
}
} // while true
} catch (CancelledKeyException cke) {
releaseBuf(buf);
if (buf != null)
releaseBuf(buf);
if (_log.shouldLog(Log.WARN)) _log.warn("error reading on " + con, cke);
con.close();
_context.statManager().addRateData("ntcp.readError", 1);
} catch (IOException ioe) {
// common, esp. at outbound connect time
releaseBuf(buf);
if (buf != null)
releaseBuf(buf);
if (con.isInbound() && con.getMessagesReceived() <= 0) {
InetAddress addr = con.getChannel().socket().getInetAddress();
int count;
@ -712,7 +740,8 @@ class EventPumper implements Runnable {
}
con.close();
} catch (NotYetConnectedException nyce) {
releaseBuf(buf);
if (buf != null)
releaseBuf(buf);
// ???
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
if (_log.shouldLog(Log.WARN))