forked from I2P_Developers/i2p.i2p
NTCP minor cleanups, javadocs, atomics
This commit is contained in:
@ -12,6 +12,7 @@ import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.zip.Adler32;
|
||||
|
||||
import net.i2p.data.Base64;
|
||||
@ -113,8 +114,8 @@ class NTCPConnection {
|
||||
private byte _prevWriteEnd[];
|
||||
/** current partially read I2NP message */
|
||||
private final ReadState _curReadState;
|
||||
private long _messagesRead;
|
||||
private long _messagesWritten;
|
||||
private final AtomicLong _messagesRead = new AtomicLong();
|
||||
private final AtomicLong _messagesWritten = new AtomicLong();
|
||||
private long _lastSendTime;
|
||||
private long _lastReceiveTime;
|
||||
private long _lastRateUpdated;
|
||||
@ -161,6 +162,8 @@ class NTCPConnection {
|
||||
|
||||
private static final int INFO_PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE_LOW;
|
||||
private static final String FIXED_RI_VERSION = "0.9.12";
|
||||
private static final AtomicLong __connID = new AtomicLong();
|
||||
private final long _connID = __connID.incrementAndGet();
|
||||
|
||||
/**
|
||||
* Create an inbound connected (though not established) NTCP connection
|
||||
@ -285,6 +288,7 @@ class NTCPConnection {
|
||||
* be under 1 minute)
|
||||
* @param prevWriteEnd exactly 16 bytes, not copied, do not corrupt
|
||||
* @param prevReadEnd 16 or more bytes, last 16 bytes copied
|
||||
* @return old conn to be closed by caller, or null
|
||||
*/
|
||||
private synchronized NTCPConnection locked_finishInboundEstablishment(
|
||||
SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) {
|
||||
@ -313,9 +317,9 @@ class NTCPConnection {
|
||||
return System.currentTimeMillis()-_establishedOn;
|
||||
}
|
||||
|
||||
public long getMessagesSent() { return _messagesWritten; }
|
||||
public long getMessagesSent() { return _messagesWritten.get(); }
|
||||
|
||||
public long getMessagesReceived() { return _messagesRead; }
|
||||
public long getMessagesReceived() { return _messagesRead.get(); }
|
||||
|
||||
public long getOutboundQueueSize() {
|
||||
int queued;
|
||||
@ -390,6 +394,9 @@ class NTCPConnection {
|
||||
es.close(cause, e);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a second connection with the same peer...
|
||||
*/
|
||||
private synchronized NTCPConnection locked_close(boolean allowRequeue) {
|
||||
if (_chan != null) try { _chan.close(); } catch (IOException ioe) { }
|
||||
if (_conKey != null) _conKey.cancel();
|
||||
@ -1070,7 +1077,7 @@ class NTCPConnection {
|
||||
+ msg.getLifetime()
|
||||
+ " with " + buf.capacity() + " bytes (uid=" + System.identityHashCode(msg)+" on " + toString() + ")");
|
||||
}
|
||||
_messagesWritten++;
|
||||
_messagesWritten.incrementAndGet();
|
||||
_transport.sendComplete(msg);
|
||||
}
|
||||
} else {
|
||||
@ -1517,7 +1524,7 @@ class NTCPConnection {
|
||||
if (read != null) {
|
||||
_transport.messageReceived(read, _remotePeer, null, timeToRecv, _size);
|
||||
_lastReceiveTime = System.currentTimeMillis();
|
||||
_messagesRead++;
|
||||
_messagesRead.incrementAndGet();
|
||||
}
|
||||
} catch (I2NPMessageException ime) {
|
||||
if (_log.shouldLog(Log.WARN)) {
|
||||
@ -1550,7 +1557,8 @@ class NTCPConnection {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "NTCP conn " +
|
||||
(_isInbound ? "from " : "to ") +
|
||||
_connID +
|
||||
(_isInbound ? " from " : " to ") +
|
||||
(_remotePeer == null ? "unknown" : _remotePeer.calculateHash().toBase64().substring(0,6)) +
|
||||
(isEstablished() ? "" : " not established") +
|
||||
" created " + DataHelper.formatDuration(getTimeSinceCreated()) + " ago," +
|
||||
|
@ -199,15 +199,16 @@ public class NTCPTransport extends TransportImpl {
|
||||
|
||||
/**
|
||||
* @param con that is established
|
||||
* @return the previous connection to the same peer, null if no such.
|
||||
* @return the previous connection to the same peer, must be closed by caller, null if no such.
|
||||
*/
|
||||
NTCPConnection inboundEstablished(NTCPConnection con) {
|
||||
_context.statManager().addRateData("ntcp.inboundEstablished", 1);
|
||||
markReachable(con.getRemotePeer().calculateHash(), true);
|
||||
Hash peer = con.getRemotePeer().calculateHash();
|
||||
markReachable(peer, true);
|
||||
//_context.banlist().unbanlistRouter(con.getRemotePeer().calculateHash());
|
||||
NTCPConnection old;
|
||||
synchronized (_conLock) {
|
||||
old = _conByIdent.put(con.getRemotePeer().calculateHash(), con);
|
||||
old = _conByIdent.put(peer, con);
|
||||
}
|
||||
return old;
|
||||
}
|
||||
@ -477,6 +478,9 @@ public class NTCPTransport extends TransportImpl {
|
||||
return (con != null) && con.isEstablished() && con.tooBacklogged();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return usually the con passed in, but possibly a second connection with the same peer...
|
||||
*/
|
||||
NTCPConnection removeCon(NTCPConnection con) {
|
||||
NTCPConnection removed = null;
|
||||
RouterIdentity ident = con.getRemotePeer();
|
||||
|
Reference in New Issue
Block a user