This commit is contained in:
zzz
2011-01-06 18:14:48 +00:00
parent 3867e6144a
commit a087c82db9

View File

@ -54,8 +54,8 @@ import net.i2p.util.Log;
*
*/
public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
private RouterContext _context;
private Log _log;
private final RouterContext _context;
private final Log _log;
private SocketChannel _chan;
private SelectionKey _conKey;
/** list of ByteBuffer containing data we have read and are ready to process, oldest first */
@ -71,7 +71,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
private long _establishedOn;
private EstablishState _establishState;
private NTCPTransport _transport;
private boolean _isInbound;
private final boolean _isInbound;
private boolean _closed;
private NTCPAddress _remAddr;
private RouterIdentity _remotePeer;
@ -87,17 +87,17 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
private byte _curReadBlock[];
/** next byte to which data should be placed in the _curReadBlock */
private int _curReadBlockIndex;
private byte _decryptBlockBuf[];
private final byte _decryptBlockBuf[];
/** last AES block of the encrypted I2NP message (to serve as the next block's IV) */
private byte _prevReadBlock[];
private byte _prevWriteEnd[];
/** current partially read I2NP message */
private ReadState _curReadState;
private final ReadState _curReadState;
private long _messagesRead;
private long _messagesWritten;
private long _lastSendTime;
private long _lastReceiveTime;
private long _created;
private final long _created;
private long _nextMetaTime;
/** unencrypted outbound metadata buffer */
private final byte _meta[] = new byte[16];
@ -127,28 +127,21 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
_created = System.currentTimeMillis();
_lastSendTime = _created;
_lastReceiveTime = _created;
_transport = transport;
_chan = chan;
_readBufs = new LinkedBlockingQueue();
_writeBufs = new LinkedBlockingQueue();
_bwRequests = new ConcurrentHashSet(2);
_outbound = new LinkedBlockingQueue();
_established = false;
_isInbound = true;
_closed = false;
_decryptBlockBuf = new byte[16];
_curReadBlock = new byte[16];
_prevReadBlock = new byte[16];
_curReadState = new ReadState();
_establishState = new EstablishState(ctx, transport, this);
_conKey = key;
_conKey.attach(this);
_sendingMeta = false;
_consecutiveBacklog = 0;
transport.establishing(this);
initialize();
}
/**
* Create an outbound unconnected NTCP connection
*
@ -157,26 +150,25 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
_created = System.currentTimeMillis();
_lastSendTime = _created;
_lastReceiveTime = _created;
_transport = transport;
_remotePeer = remotePeer;
_remAddr = remAddr;
_readBufs = new LinkedBlockingQueue();
_writeBufs = new LinkedBlockingQueue();
_bwRequests = new ConcurrentHashSet(2);
_outbound = new LinkedBlockingQueue();
_established = false;
_isInbound = false;
_closed = false;
_decryptBlockBuf = new byte[16];
_curReadState = new ReadState();
initialize();
}
private void initialize() {
_lastSendTime = _created;
_lastReceiveTime = _created;
_curReadBlock = new byte[16];
_prevReadBlock = new byte[16];
_curReadState = new ReadState();
_remotePeer = remotePeer;
_sendingMeta = false;
_consecutiveBacklog = 0;
//_establishState = new EstablishState(ctx, transport, this);
transport.establishing(this);
_transport.establishing(this);
}
public SocketChannel getChannel() { return _chan; }
@ -207,13 +199,18 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY);
_nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
}
/** @return seconds */
public long getClockSkew() { return _clockSkew; }
/** @return milliseconds */
public long getUptime() {
if (!_established)
return getTimeSinceCreated();
else
return System.currentTimeMillis()-_establishedOn;
}
public long getMessagesSent() { return _messagesWritten; }
public long getMessagesReceived() { return _messagesRead; }
public long getOutboundQueueSize() {
@ -222,9 +219,16 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
queued++;
return queued;
}
/** @return milliseconds */
public long getTimeSinceSend() { return System.currentTimeMillis()-_lastSendTime; }
/** @return milliseconds */
public long getTimeSinceReceive() { return System.currentTimeMillis()-_lastReceiveTime; }
/** @return milliseconds */
public long getTimeSinceCreated() { return System.currentTimeMillis()-_created; }
public int getConsecutiveBacklog() { return _consecutiveBacklog; }
public boolean isClosed() { return _closed; }
@ -624,26 +628,26 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
msg.beginTransmission();
long begin = System.currentTimeMillis();
//long begin = System.currentTimeMillis();
PrepBuffer buf = (PrepBuffer)msg.releasePreparationBuffer();
if (buf == null)
throw new RuntimeException("buf is null for " + msg);
_context.aes().encrypt(buf.unencrypted, 0, buf.encrypted, 0, _sessionKey, _prevWriteEnd, 0, buf.unencryptedLength);
System.arraycopy(buf.encrypted, buf.encrypted.length-16, _prevWriteEnd, 0, _prevWriteEnd.length);
long encryptedTime = System.currentTimeMillis();
//long encryptedTime = System.currentTimeMillis();
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Encrypting " + msg + " [" + System.identityHashCode(msg) + "] crc=" + crc.getValue() + "\nas: "
// + Base64.encode(encrypted, 0, 16) + "...\ndecrypted: "
// + Base64.encode(unencrypted, 0, 16) + "..." + "\nIV=" + Base64.encode(_prevWriteEnd, 0, 16));
_transport.getPumper().wantsWrite(this, buf.encrypted);
long wantsTime = System.currentTimeMillis();
//long wantsTime = System.currentTimeMillis();
releaseBuf(buf);
long releaseTime = System.currentTimeMillis();
if (_log.shouldLog(Log.DEBUG))
_log.debug("prepared outbound " + System.identityHashCode(msg)
+ " encrypted=" + (encryptedTime-begin)
+ " wantsWrite=" + (wantsTime-encryptedTime)
+ " releaseBuf=" + (releaseTime-wantsTime));
//long releaseTime = System.currentTimeMillis();
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("prepared outbound " + System.identityHashCode(msg)
// + " encrypted=" + (encryptedTime-begin)
// + " wantsWrite=" + (wantsTime-encryptedTime)
// + " releaseBuf=" + (releaseTime-wantsTime));
// for every 6-12 hours that we are connected to a peer, send them
// our updated netDb info (they may not accept it and instead query
@ -666,9 +670,9 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
//if (!_isInbound && !_established)
// return;
msg.beginPrepare();
long begin = System.currentTimeMillis();
//long begin = System.currentTimeMillis();
PrepBuffer buf = acquireBuf();
long alloc = System.currentTimeMillis();
//long alloc = System.currentTimeMillis();
I2NPMessage m = msg.getMessage();
buf.baseLength = m.toByteArray(buf.base);
@ -688,7 +692,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_context.random().nextBytes(buf.pad); // maybe more than necessary, but its only the prng
System.arraycopy(buf.pad, 0, buf.unencrypted, 2+sz, buf.padLength);
long serialized = System.currentTimeMillis();
//long serialized = System.currentTimeMillis();
buf.crc.update(buf.unencrypted, 0, buf.unencryptedLength-4);
long val = buf.crc.getValue();
@ -699,11 +703,11 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
DataHelper.toLong(buf.unencrypted, buf.unencryptedLength-4, 4, val);
buf.encrypted = new byte[buf.unencryptedLength];
long crced = System.currentTimeMillis();
//long crced = System.currentTimeMillis();
msg.prepared(buf);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Buffered prepare took " + (crced-begin) + ", alloc=" + (alloc-begin)
+ " serialize=" + (serialized-alloc) + " crc=" + (crced-serialized));
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Buffered prepare took " + (crced-begin) + ", alloc=" + (alloc-begin)
// + " serialize=" + (serialized-alloc) + " crc=" + (crced-serialized));
}
private static final int MIN_PREP_BUFS = 5;
@ -827,14 +831,16 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
* contents have been fully allocated
*/
public void queuedRecv(ByteBuffer buf, FIFOBandwidthLimiter.Request req) {
addRequest(req);
req.attach(buf);
req.setCompleteListener(this);
addRequest(req);
}
/** ditto for writes */
public void queuedWrite(ByteBuffer buf, FIFOBandwidthLimiter.Request req) {
addRequest(req);
req.attach(buf);
req.setCompleteListener(this);
addRequest(req);
}
/**