2006-07-14 jrandom

* Improve the multitransport shitlisting (thanks Complication!)
    * Allow routers with a capacity of 16-32KBps to be used in tunnels under
      the default configuration (thanks for the stats Complication!)
    * Properly allow older router references to load on startup
      (thanks bar, Complication, et al!)
    * Add a new "i2p.alwaysAllowReseed" advanced config property, though
      hopefully today's changes should make this unnecessary (thanks void!)
    * Improved NTCP buffering
    * Close NTCP connections if we are too backlogged when writing to them
This commit is contained in:
jrandom
2006-07-14 18:08:44 +00:00
committed by zzz
parent 900d8a2026
commit f6320696dd
19 changed files with 190 additions and 172 deletions

View File

@ -107,7 +107,8 @@ public class SummaryHelper {
}
public boolean allowReseed() {
return (_context.netDb().getKnownRouters() < 30);
return (_context.netDb().getKnownRouters() < 30) ||
Boolean.valueOf(_context.getProperty("i2p.alwaysAllowReseed", "false")).booleanValue();
}
public int getAllPeers() { return _context.netDb().getKnownRouters(); }

View File

@ -822,7 +822,7 @@ public class DataHelper {
return (ms / (60 * 1000)) + "m";
} else if (ms < 3 * 24 * 60 * 60 * 1000) {
return (ms / (60 * 60 * 1000)) + "h";
} else if (ms > 365 * 24 * 60 * 60 * 1000) {
} else if (ms > 365l * 24l * 60l * 60l * 1000l) {
return "n/a";
} else {
return (ms / (24 * 60 * 60 * 1000)) + "d";

View File

@ -1,4 +1,15 @@
$Id: history.txt,v 1.491 2006-07-01 17:44:34 complication Exp $
$Id: history.txt,v 1.492 2006-07-04 16:17:44 jrandom Exp $
2006-07-14 jrandom
* Improve the multitransport shitlisting (thanks Complication!)
* Allow routers with a capacity of 16-32KBps to be used in tunnels under
the default configuration (thanks for the stats Complication!)
* Properly allow older router references to load on startup
(thanks bar, Complication, et al!)
* Add a new "i2p.alwaysAllowReseed" advanced config property, though
hopefully today's changes should make this unnecessary (thanks void!)
* Improved NTCP buffering
* Close NTCP connections if we are too backlogged when writing to them
2006-07-04 jrandom
* New NIO-based tcp transport (NTCP), enabled by default for outbound

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.430 $ $Date: 2006-07-01 17:44:36 $";
public final static String ID = "$Revision: 1.431 $ $Date: 2006-07-04 16:18:17 $";
public final static String VERSION = "0.6.1.21";
public final static long BUILD = 2;
public final static long BUILD = 3;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -101,7 +101,10 @@ public class Shitlist {
}
}
_context.netDb().fail(peer);
if (transport == null) {
// we hate the peer on *any* transport
_context.netDb().fail(peer);
}
//_context.tunnelManager().peerFailed(peer);
//_context.messageRegistry().peerFailed(peer);
if (!wasAlready)
@ -192,6 +195,9 @@ public class Shitlist {
buf.append(" <a href=\"netdb.jsp#").append(key.toBase64().substring(0, 6)).append("\">(?)</a>");
buf.append(" expiring in ");
buf.append(DataHelper.formatDuration(entry.expireOn-_context.clock().now()));
Set transports = entry.transports;
if ( (transports != null) && (transports.size() > 0) )
buf.append(" on the following transports: ").append(transports);
if (entry.cause != null) {
buf.append("<br />\n");
buf.append(entry.cause);

View File

@ -117,7 +117,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
* dont accept any dbDtore of a router over 24 hours old (unless we dont
* know anyone or just started up)
*/
private final static long ROUTER_INFO_EXPIRATION = 24*60*60*1000l;
private final static long ROUTER_INFO_EXPIRATION = 3*24*60*60*1000l;
public KademliaNetworkDatabaseFacade(RouterContext context) {
_context = context;
@ -278,8 +278,14 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_context.jobQueue().addJob(new DataPublisherJob(_context, this));
// expire old leases
_context.jobQueue().addJob(new ExpireLeasesJob(_context, this));
// expire some routers in overly full kbuckets
_context.jobQueue().addJob(new ExpireRoutersJob(_context, this));
// the ExpireRoutersJob never fired since the tunnel pool manager lied
// and said all peers are in use (for no good reason), but this expire
// thing was a bit overzealous anyway, since the kbuckets are only
// relevent when the network is huuuuuuuuge.
//// expire some routers in overly full kbuckets
////_context.jobQueue().addJob(new ExpireRoutersJob(_context, this));
if (!_quiet) {
// fill the passive queue periodically
_context.jobQueue().addJob(new DataRepublishingSelectorJob(_context, this));
@ -643,7 +649,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid routerInfo signature! forged router structure! router = " + routerInfo);
return "Invalid routerInfo signature on " + key.toBase64();
} else if (!routerInfo.isCurrent(ROUTER_INFO_EXPIRATION)) {
} else if (!routerInfo.isCurrent(ROUTER_INFO_EXPIRATION) && (_context.router().getUptime() > 60*60*1000) ) {
if (routerInfo.getNetworkId() != Router.NETWORK_ID) {
_context.shitlist().shitlistRouter(key, "Peer is not in our network");
return "Peer is not in our network (" + routerInfo.getNetworkId() + ", wants "
@ -661,7 +667,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
+ " peers left (curPeer: " + key.toBase64() + " published on "
+ new Date(routerInfo.getPublished()));
}
} else if (routerInfo.getPublished() > now + Router.CLOCK_FUDGE_FACTOR) {
} else if (routerInfo.getPublished() > now + 2*Router.CLOCK_FUDGE_FACTOR) {
long age = routerInfo.getPublished() - _context.clock().now();
if (_log.shouldLog(Log.INFO))
_log.info("Peer " + key.toBase64() + " published their routerInfo in the future?! ["

View File

@ -350,9 +350,10 @@ class PersistentDataStore extends TransientDataStore {
ri.readBytes(fis);
if (ri.getNetworkId() != Router.NETWORK_ID) {
corrupt = true;
if (_log.shouldLog(Log.WARN))
_log.warn("The router is from a different network: "
+ ri.getIdentity().calculateHash().toBase64());
if (_log.shouldLog(Log.ERROR))
_log.error("The router "
+ ri.getIdentity().calculateHash().toBase64()
+ " is from a different network");
} else {
try {
_facade.store(ri.getIdentity().getHash(), ri);
@ -362,14 +363,16 @@ class PersistentDataStore extends TransientDataStore {
}
}
} catch (DataFormatException dfe) {
_log.warn("Error reading the routerInfo from " + _routerFile.getAbsolutePath(), dfe);
if (_log.shouldLog(Log.ERROR))
_log.error("Error reading the routerInfo from " + _routerFile.getName(), dfe);
corrupt = true;
} finally {
if (fis != null) try { fis.close(); } catch (IOException ioe) {}
}
if (corrupt) _routerFile.delete();
} catch (IOException ioe) {
_log.warn("Error reading the RouterInfo from " + _routerFile.getAbsolutePath(), ioe);
if (_log.shouldLog(Log.ERROR))
_log.error("Unable to read the router reference in " + _routerFile.getName(), ioe);
}
}
}

View File

@ -67,6 +67,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
public TransportBid getNextBid(OutNetMessage msg) {
return _manager.getNextBid(msg);
}
int getTransportCount() { return _manager.getTransportCount(); }
public void processMessage(OutNetMessage msg) {
//GetBidsJob j = new GetBidsJob(_context, this, msg);

View File

@ -62,8 +62,11 @@ public class GetBidsJob extends JobImpl {
TransportBid bid = facade.getNextBid(msg);
if (bid == null) {
if (msg.getFailedTransports().size() == 0) {
int failedCount = msg.getFailedTransports().size();
if (failedCount == 0) {
context.shitlist().shitlistRouter(to, "We share no common transports with them");
} else if (failedCount >= facade.getTransportCount()) {
// fail after all transports were unsuccessful
context.netDb().fail(to);
}
fail(context, msg);

View File

@ -117,6 +117,8 @@ public class TransportManager implements TransportEventListener {
_transports.clear();
}
int getTransportCount() { return _transports.size(); }
private boolean isSupported(Set addresses, Transport t) {
for (Iterator iter = addresses.iterator(); iter.hasNext(); ) {
RouterAddress addr = (RouterAddress)iter.next();

View File

@ -561,11 +561,14 @@ public class EstablishState {
/** anything left over in the byte buffer after verification is extra */
private void prepareExtra(ByteBuffer buf) {
_extra = new byte[buf.remaining()];
buf.get(_extra);
_received += _extra.length;
int remaining = buf.remaining();
if (remaining > 0) {
_extra = new byte[remaining];
buf.get(_extra);
_received += remaining;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix() + "prepare extra " + _extra.length + " (total received: " + _received + ")");
_log.debug(prefix() + "prepare extra " + remaining + " (total received: " + _received + ")");
}
/**

View File

@ -467,7 +467,7 @@ public class EventPumper implements Runnable {
_context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage());
con.close(false);
} else {
_context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), NTCPTransport.STYLE, "unable to connect: " + ioe.getMessage());
_context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage(), NTCPTransport.STYLE);
con.close(true);
}
}

View File

@ -94,6 +94,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
/** unencrypted outbound metadata buffer */
private byte _meta[] = new byte[16];
private boolean _sendingMeta;
/** how many consecutive sends were failed due to (estimated) send queue time */
private int _consecutiveBacklog;
private static final int META_FREQUENCY = 10*60*1000;
@ -122,6 +124,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_conKey = key;
_conKey.attach(this);
_sendingMeta = false;
_consecutiveBacklog = 0;
transport.establishing(this);
}
/**
@ -147,6 +150,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_curReadState = new ReadState();
_remotePeer = remotePeer;
_sendingMeta = false;
_consecutiveBacklog = 0;
//_establishState = new EstablishState(ctx, transport, this);
transport.establishing(this);
}
@ -187,10 +191,18 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
public long getMessagesSent() { return _messagesWritten; }
public long getMessagesReceived() { return _messagesRead; }
public long getOutboundQueueSize() { synchronized (_outbound) { return _outbound.size(); } }
public long getOutboundQueueSize() {
synchronized (_outbound) {
int queued = _outbound.size();
if (_currentOutbound != null)
queued++;
return queued;
}
}
public long getTimeSinceSend() { return System.currentTimeMillis()-_lastSendTime; }
public long getTimeSinceReceive() { return System.currentTimeMillis()-_lastReceiveTime; }
public long getTimeSinceCreated() { return System.currentTimeMillis()-_created; }
public int getConsecutiveBacklog() { return _consecutiveBacklog; }
public boolean isClosed() { return _closed; }
public void close() { close(false); }
@ -232,9 +244,16 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (tooBacklogged()) {
boolean allowRequeue = false; // if we are too backlogged in tcp, don't try ssu
boolean successful = false;
_consecutiveBacklog++;
_transport.afterSend(msg, successful, allowRequeue, msg.getLifetime());
if (_consecutiveBacklog > 50) { // waaay too backlogged
if (_log.shouldLog(Log.ERROR))
_log.error("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ") sending to " + _remotePeer.calculateHash().toBase64());
close();
}
return;
}
_consecutiveBacklog = 0;
int enqueued = 0;
if (FAST_LARGE)
bufferedPrepare(msg);
@ -247,8 +266,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (_established && _currentOutbound == null)
_transport.getWriter().wantsWrite(this);
}
private boolean tooBacklogged() {
private long queueTime() {
long queueTime = 0;
int size = 0;
synchronized (_outbound) {
@ -257,9 +276,18 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if ( (msg == null) && (size > 0) )
msg = (OutNetMessage)_outbound.get(0);
if (msg == null)
return false;
return 0;
queueTime = msg.getSendTime(); // does not include any of the pre-send(...) preparation
}
return queueTime;
}
private boolean tooBacklogged() {
long queueTime = queueTime();
if (queueTime <= 0) return false;
int size = 0;
synchronized (_outbound) {
size = _outbound.size();
}
// perhaps we could take into account the size of the queued messages too, our
// current transmission rate, and how much time is left before the new message's expiration?
@ -665,7 +693,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
synchronized (_writeBufs) {
return _writeBufs.size();
}
}
}
/**
* We have read the data in the buffer, but we can't process it locally yet,
@ -946,18 +974,51 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
}
public long getReadTime() { return _curReadState.getReadTime(); }
private static class DataBuf {
byte data[];
ByteArrayInputStream bais;
public DataBuf() {
data = new byte[16*1024];
bais = new ByteArrayInputStream(data);
}
}
private static int MAX_DATA_READ_BUFS = 16;
private static List _dataReadBufs = new ArrayList(16);
private static DataBuf acquireReadBuf() {
synchronized (_dataReadBufs) {
if (_dataReadBufs.size() > 0)
return (DataBuf)_dataReadBufs.remove(0);
}
return new DataBuf();
}
private static void releaseReadBuf(DataBuf buf) {
buf.bais.reset();
synchronized (_dataReadBufs) {
if (_dataReadBufs.size() < MAX_DATA_READ_BUFS)
_dataReadBufs.add(buf);
}
}
/**
* sizeof(data)+data+pad+crc.
*
* perhaps to reduce the per-con memory footprint, we can acquire/release
* the ReadState._data and ._bais when _size is > 0, so there are only
* J 16KB buffers for the cons actually transmitting, instead of one per
* con (including idle ones)
*/
private class ReadState {
private int _size;
private byte _data[];
private ByteArrayInputStream _bais;
private DataBuf _dataBuf;
private int _nextWrite;
private long _expectedCrc;
private Adler32 _crc;
private long _stateBegin;
private int _blocks;
public ReadState() {
_data = new byte[16*1024];
_bais = new ByteArrayInputStream(_data);
_crc = new Adler32();
init();
}
@ -968,7 +1029,9 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_stateBegin = -1;
_blocks = -1;
_crc.reset();
_bais.reset();
if (_dataBuf != null)
releaseReadBuf(_dataBuf);
_dataBuf = null;
}
public int getSize() { return _size; }
public void receiveBlock(byte buf[]) {
@ -978,15 +1041,24 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
receiveSubsequent(buf);
}
}
public long getReadTime() {
long now = System.currentTimeMillis();
long readTime = now - _stateBegin;
if (readTime >= now)
return -1;
else
return readTime;
}
private void receiveInitial(byte buf[]) {
_stateBegin = System.currentTimeMillis();
_size = (int)DataHelper.fromLong(buf, 0, 2);
if (_size == 0) {
readMeta(buf);
init();
return;
return;
} else {
System.arraycopy(buf, 2, _data, 0, buf.length-2);
_dataBuf = acquireReadBuf();
System.arraycopy(buf, 2, _dataBuf.data, 0, buf.length-2);
_nextWrite += buf.length-2;
_crc.update(buf);
_blocks++;
@ -999,7 +1071,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
int remaining = _size - _nextWrite;
int blockUsed = Math.min(buf.length, remaining);
if (remaining > 0) {
System.arraycopy(buf, 0, _data, _nextWrite, blockUsed);
System.arraycopy(buf, 0, _dataBuf.data, _nextWrite, blockUsed);
_nextWrite += blockUsed;
remaining -= blockUsed;
}
@ -1037,7 +1109,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
// depend upon EOF to stop reading, so its ok that the _bais could
// in theory return more data than _size bytes, since h.readMessage
// stops when it should.
I2NPMessage read = h.readMessage(_bais);
I2NPMessage read = h.readMessage(_dataBuf.bais);
long timeToRecv = System.currentTimeMillis() - _stateBegin;
releaseHandler(h);
if (_log.shouldLog(Log.DEBUG))
@ -1073,118 +1145,4 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
}
}
/**
* sizeof(data)+data+pad+crc
*/
private class ReadState2 {
private int _size;
private byte _dataBegin[];
private byte _dataRemaining[];
private int _dataRemainingIndex;
private long _expectedCrc;
private Adler32 _crc;
private long _stateBegin;
private int _blocks;
private boolean _wasMeta;
public ReadState2(byte buf[]) {
_stateBegin = System.currentTimeMillis();
_size = (int)DataHelper.fromLong(buf, 0, 2);
if (_size == 0) {
readMeta(buf);
_wasMeta = true;
return;
} else {
_wasMeta = false;
}
_dataBegin = new byte[buf.length-2];
System.arraycopy(buf, 2, _dataBegin, 0, _dataBegin.length);
_dataRemaining = new byte[_size-_dataBegin.length];
_dataRemainingIndex = 0;
_crc = new Adler32();
_crc.update(buf);
_blocks++;
if (_log.shouldLog(Log.DEBUG))
_log.debug("new read state with size: " + _size + " remaining: " + _dataRemaining.length + " for message " + _messagesRead);
}
public int size() { return _size; }
public int block() { return _blocks; }
public boolean wasMeta() { return _wasMeta; }
public void recv(byte buf[]) {
_blocks++;
int remaining = _dataRemaining.length-_dataRemainingIndex;
int blockUsed = Math.min(buf.length, remaining);
if (remaining > 0) {
System.arraycopy(buf, 0, _dataRemaining, _dataRemainingIndex, blockUsed);
_dataRemainingIndex += blockUsed;
remaining -= blockUsed;
}
if ( (remaining <= 0) && (buf.length-blockUsed < 4) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("crc wraparound required on block " + _blocks + " in message " + _messagesRead);
_crc.update(buf);
return;
} else if (remaining <= 0) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("block remaining in the last block: " + (buf.length-blockUsed));
// on the last block
_expectedCrc = DataHelper.fromLong(buf, buf.length-4, 4);
_crc.update(buf, 0, buf.length-4);
long val = _crc.getValue();
if (_log.shouldLog(Log.DEBUG))
_log.debug("CRC value computed: " + val + " expected: " + _expectedCrc + " size: " + _size
+ " remaining=" + remaining);
if (val == _expectedCrc) {
try {
I2NPMessageHandler h = acquireHandler(_context);
I2NPMessage read = null;
if (false) {
byte msg[] = new byte[_size];
System.arraycopy(_dataBegin, 0, msg, 0, _dataBegin.length);
System.arraycopy(_dataRemaining, 0, msg, _dataBegin.length, _dataRemaining.length);
read = h.readMessage(msg);
} else {
read = h.readMessage(new SequenceInputStream(new ByteArrayInputStream(_dataBegin),
new ByteArrayInputStream(_dataRemaining)));
}
long timeToRecv = System.currentTimeMillis() - _stateBegin;
releaseHandler(h);
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2NP message " + _messagesRead + "/" + (read != null ? read.getUniqueId() : 0)
+ " received after " + timeToRecv + " with " + _size +"/"+ (_blocks*16) + " bytes");
_context.statManager().addRateData("ntcp.receiveTime", timeToRecv, timeToRecv);
_context.statManager().addRateData("ntcp.receiveSize", _size, timeToRecv);
if (read != null) {
_transport.messageReceived(read, _remotePeer, null, timeToRecv, _size);
if (_messagesRead <= 0)
enqueueInfoMessage();
_lastReceiveTime = System.currentTimeMillis();
_messagesRead++;
}
} catch (IOException ioe) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Error parsing I2NP message", ioe);
close();
return;
} catch (I2NPMessageException ime) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Error parsing I2NP message", ime);
close();
return;
}
} else {
if (_log.shouldLog(Log.ERROR))
_log.error("CRC incorrect for message " + _messagesRead + " (calc=" + val + " expected=" + _expectedCrc + ") size=" + _size + " remaining=" + remaining + " blocks " + _blocks);
close();
return;
}
_curReadState = null;
} else {
_crc.update(buf);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("update read state with another block (remaining: " + remaining + ")");
}
}
}
}

View File

@ -379,7 +379,7 @@ public class NTCPTransport extends TransportImpl {
cons = new HashMap(_conByIdent);
_conByIdent.clear();
}
for (Iterator iter = cons.keySet().iterator(); iter.hasNext(); ) {
for (Iterator iter = cons.values().iterator(); iter.hasNext(); ) {
NTCPConnection con = (NTCPConnection)iter.next();
con.close();
}
@ -400,18 +400,22 @@ public class NTCPTransport extends TransportImpl {
long sendTotal = 0;
long recvTotal = 0;
int numPeers = 0;
int readingPeers = 0;
int writingPeers = 0;
StringBuffer buf = new StringBuffer(512);
buf.append("<b id=\"ntcpcon\">NTCP connections: ").append(peers.size()).append("</b><br />\n");
buf.append("<table border=\"1\">\n");
buf.append(" <tr><td><b><a href=\"#def.peer\">peer</a></b></td>");
buf.append(" <td><b><a href=\"#def.peer\">uptime</a></b></td>");
buf.append(" <td><b><a href=\"#def.peer\">idle</a></b></td>");
buf.append(" <td><b><a href=\"#def.peer\">sent</a></b></td>");
buf.append(" <td><b><a href=\"#def.peer\">received</a></b></td>");
buf.append(" <td><b><a href=\"#def.peer\">out/in</a></b></td>");
buf.append(" <td><b><a href=\"#def.peer\">out queue</a></b></td>");
buf.append(" <td><b><a href=\"#def.peer\">skew</a></b></td>");
buf.append(" <tr><td><b>peer</b></td>");
buf.append(" <td><b>uptime</b></td>");
buf.append(" <td><b>idle</b></td>");
buf.append(" <td><b>sent</b></td>");
buf.append(" <td><b>received</b></td>");
buf.append(" <td><b>out/in</b></td>");
buf.append(" <td><b>out queue</b></td>");
buf.append(" <td><b>backlogged?</b></td>");
buf.append(" <td><b>reading?</b></td>");
buf.append(" <td><b>skew</b></td>");
buf.append(" </tr>\n");
out.write(buf.toString());
buf.setLength(0);
@ -425,7 +429,23 @@ public class NTCPTransport extends TransportImpl {
buf.append("</td><td>").append(con.getMessagesReceived());
buf.append("</td><td>").append(formatRate(con.getSendRate()/1024));
buf.append("/").append(formatRate(con.getRecvRate()/1024)).append("KBps");
buf.append("</td><td>").append(con.getOutboundQueueSize());
long outQueue = con.getOutboundQueueSize();
if (outQueue <= 0) {
buf.append("</td><td>No messages");
} else {
buf.append("</td><td>").append(outQueue).append(" message");
if (outQueue > 1)
buf.append("s");
writingPeers++;
}
buf.append("</td><td>").append(con.getConsecutiveBacklog() > 0 ? "true" : "false");
long readTime = con.getReadTime();
if (readTime <= 0) {
buf.append("</td><td>No");
} else {
buf.append("</td><td>For ").append(DataHelper.formatDuration(readTime));
readingPeers++;
}
buf.append("</td><td>").append(DataHelper.formatDuration(con.getClockSkew()));
buf.append("</td></tr>\n");
out.write(buf.toString());
@ -433,6 +453,8 @@ public class NTCPTransport extends TransportImpl {
}
buf.append("</table>\n");
buf.append("Peers currently reading I2NP messages: ").append(readingPeers).append("<br />\n");
buf.append("Peers currently writing I2NP messages: ").append(writingPeers).append("<br />\n");
out.write(buf.toString());
buf.setLength(0);
}

View File

@ -153,6 +153,8 @@ class Reader {
} else if (buf.remaining() <= 0) {
con.removeReadBuf(buf);
}
if (est.isComplete() && est.getExtraBytes() != null)
con.recvEncryptedI2NP(ByteBuffer.wrap(est.getExtraBytes()));
}
while (!con.isClosed() && (buf = con.getNextReadBuf()) != null) {
// decrypt the data and push it into an i2np message

View File

@ -275,7 +275,7 @@ public class TCPTransport extends TransportImpl {
}
if (changedIdents) {
_context.shitlist().shitlistRouter(con.getAttemptedPeer(), "Changed identities");
_context.shitlist().shitlistRouter(con.getAttemptedPeer(), "Changed identities", STYLE);
if (changedMsgs != null) {
for (int i = 0; i < changedMsgs.size(); i++) {
OutNetMessage cur = (OutNetMessage)changedMsgs.get(i);
@ -676,7 +676,7 @@ public class TCPTransport extends TransportImpl {
iter.remove();
_context.shitlist().shitlistRouter(peer, "Peer "
+ msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6)
+ " has no addresses");
+ " has no addresses", STYLE);
_context.netDb().fail(peer);
for (int i = 0; i < msgs.size(); i++) {
OutNetMessage cur = (OutNetMessage)msgs.get(i);
@ -690,7 +690,7 @@ public class TCPTransport extends TransportImpl {
iter.remove();
_context.shitlist().shitlistRouter(peer, "Peer "
+ msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6)
+ " has only invalid addresses");
+ " has only invalid addresses", STYLE);
_context.netDb().fail(peer);
for (int i = 0; i < msgs.size(); i++) {
OutNetMessage cur = (OutNetMessage)msgs.get(i);
@ -716,7 +716,7 @@ public class TCPTransport extends TransportImpl {
if ( (_myAddress != null) && (_myAddress.equals(tcpAddr)) ) {
_log.error("Message points at our old TCP addresses! " + msg.getTarget());
iter.remove();
_context.shitlist().shitlistRouter(peer, "This is our old address...");
_context.shitlist().shitlistRouter(peer, "This is our old address...", STYLE);
_context.netDb().fail(peer);
for (int i = 0; i < msgs.size(); i++) {
OutNetMessage cur = (OutNetMessage)msgs.get(i);
@ -731,7 +731,7 @@ public class TCPTransport extends TransportImpl {
+ " address " + tcpAddr.toString());
iter.remove();
_context.shitlist().shitlistRouter(peer, "Invalid TCP address...");
_context.shitlist().shitlistRouter(peer, "Invalid TCP address...", STYLE);
_context.netDb().fail(peer);
for (int i = 0; i < msgs.size(); i++) {
OutNetMessage cur = (OutNetMessage)msgs.get(i);

View File

@ -146,7 +146,7 @@ public class EstablishmentManager {
if (!_transport.isValid(to.getIP())) {
_transport.failed(msg, "Remote peer's IP isn't valid");
_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address");
_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE);
return;
}
@ -942,7 +942,7 @@ public class EstablishmentManager {
}
Hash peer = outboundState.getRemoteIdentity().calculateHash();
_context.shitlist().shitlistRouter(peer, err);
_context.shitlist().shitlistRouter(peer, err, UDPTransport.STYLE);
_transport.dropPeer(peer, false, err);
//_context.profileManager().commErrorOccurred(peer);
} else {

View File

@ -327,7 +327,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_log.shouldLog(Log.ERROR))
_log.error("The router " + from.toBase64() + " told us we have an invalid IP - "
+ RemoteHostId.toString(ourIP) + ". Lets throw tomatoes at them");
_context.shitlist().shitlistRouter(from, "They said we had an invalid IP");
_context.shitlist().shitlistRouter(from, "They said we had an invalid IP", STYLE);
return;
} else if (inboundRecent && _externalListenPort > 0 && _externalListenHost != null) {
// use OS clock since its an ordering thing, not a time thing
@ -601,7 +601,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
SimpleTimer.getInstance().addEvent(new RemoveDropList(remote), DROPLIST_PERIOD);
}
}
_context.shitlist().shitlistRouter(peerHash, "Part of the wrong network");
_context.shitlist().shitlistRouter(peerHash, "Part of the wrong network", STYLE);
dropPeer(peerHash, false, "wrong network");
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping the peer " + peerHash.toBase64() + " because they are in the wrong net");
@ -702,7 +702,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
dropPeerCapacities(peer);
if (shouldShitlist)
_context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries");
_context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries", STYLE);
long now = _context.clock().now();
_context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime());
synchronized (_peersByIdent) {

View File

@ -546,10 +546,10 @@ public class TunnelPoolManager implements TunnelManagerFacade {
} else if (caps.indexOf(Router.CAPABILITY_BW256) >= 0) {
return "<b>[&gt;128]</b>";
} else {
return "[&nbsp;&nbsp;&nbsp;&nbsp;]";
return "[old&nbsp;]";
}
} else {
return "[&nbsp;&nbsp;&nbsp;&nbsp;]";
return "[unkn]";
}
}
}