2006-02-19 jrandom

* Moved the current net's reseed URL to a different location than where
      the old net looks (dev.i2p.net/i2pdb2/ vs .../i2pdb/)
    * More aggressively expire inbound messages (on receive, not just on send)
    * Add in a hook for breaking backwards compatibility in the SSU wire
      protocol directly by including a version as part of the handshake.  The
      version is currently set to 0, however, so the wire protocol from this
      build is compatible with all earlier SSU implementations.
    * Increased the number of complete message readers, cutting down
      substantially on the delay processing inbound messages.
    * Delete the message history file on startup
    * Reworked the restart/shutdown display on the console (thanks bd_!)
This commit is contained in:
jrandom
2006-02-19 12:29:57 +00:00
committed by zzz
parent c94de2fbb5
commit 65975df1be
16 changed files with 211 additions and 91 deletions

View File

@ -1,5 +1,6 @@
package net.i2p.router;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
@ -33,6 +34,7 @@ public class MessageHistory {
private ReinitializeJob _reinitializeJob;
private WriteJob _writeJob;
private SubmitMessageHistoryJob _submitMessageHistoryJob;
private volatile boolean _firstPass;
private final static byte[] NL = System.getProperty("line.separator").getBytes();
private final static int FLUSH_SIZE = 1000; // write out at least once every 1000 entries
@ -53,6 +55,7 @@ public class MessageHistory {
_fmt.setTimeZone(TimeZone.getTimeZone("GMT"));
_reinitializeJob = new ReinitializeJob();
_writeJob = new WriteJob();
_firstPass = true;
//_submitMessageHistoryJob = new SubmitMessageHistoryJob(_context);
initialize(true);
}
@ -103,6 +106,12 @@ public class MessageHistory {
_localIdent = getName(_context.routerHash());
_unwrittenEntries = new ArrayList(64);
updateSettings();
// clear the history file on startup
if (_firstPass) {
File f = new File(_historyFile);
f.delete();
}
_firstPass = false;
addEntry(getPrefix() + "** Router initialized (started up or changed identities)");
_context.jobQueue().addJob(_writeJob);
//_submitMessageHistoryJob.getTiming().setStartAfter(_context.clock().now() + 2*60*1000);

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.348 $ $Date: 2006/02/18 01:38:30 $";
public final static String ID = "$Revision: 1.349 $ $Date: 2006/02/18 22:22:32 $";
public final static String VERSION = "0.6.1.10";
public final static long BUILD = 4;
public final static long BUILD = 5;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -20,7 +20,7 @@ import net.i2p.util.Log;
* parse 'em into I2NPMessages, and stick them on the
* {@link net.i2p.router.InNetMessagePool} by way of the {@link UDPTransport}.
*/
public class MessageReceiver implements Runnable {
public class MessageReceiver {
private RouterContext _context;
private Log _log;
private UDPTransport _transport;
@ -28,7 +28,6 @@ public class MessageReceiver implements Runnable {
private List _completeMessages;
private boolean _alive;
private ByteCache _cache;
private I2NPMessageHandler _handler;
public MessageReceiver(RouterContext ctx, UDPTransport transport) {
_context = ctx;
@ -36,16 +35,31 @@ public class MessageReceiver implements Runnable {
_transport = transport;
_completeMessages = new ArrayList(16);
_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);
_handler = new I2NPMessageHandler(ctx);
_context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", new long[] { 60*1000l, 10*60*1000l });
_context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", new long[] { 60*1000l, 10*60*1000l });
_context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", new long[] { 60*1000l, 10*60*1000l });
_context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", new long[] { 60*1000l, 10*60*1000l });
_context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", new long[] { 60*1000l, 10*60*1000l });
_context.statManager().createRateStat("udp.inboundLag", "How long the olded ready message has been sitting on the queue (period is the queue size)?", "udp", new long[] { 60*1000l, 10*60*1000l });
_alive = true;
}
public void startup() {
_alive = true;
I2PThread t = new I2PThread(this, "UDP message receiver");
t.setDaemon(true);
t.start();
for (int i = 0; i < 5; i++) {
I2PThread t = new I2PThread(new Runner(), "UDP message receiver " + i);
t.setDaemon(true);
t.start();
}
}
private class Runner implements Runnable {
private I2NPMessageHandler _handler;
public Runner() { _handler = new I2NPMessageHandler(_context); }
public void run() { loop(_handler); }
}
public void shutdown() {
_alive = false;
synchronized (_completeMessages) {
@ -55,34 +69,65 @@ public class MessageReceiver implements Runnable {
}
public void receiveMessage(InboundMessageState state) {
int total = 0;
long lag = -1;
synchronized (_completeMessages) {
_completeMessages.add(state);
total = _completeMessages.size();
if (total > 1)
lag = ((InboundMessageState)_completeMessages.get(0)).getLifetime();
_completeMessages.notifyAll();
}
if (total > 1)
_context.statManager().addRateData("udp.inboundReady", total, 0);
if (lag > 1000)
_context.statManager().addRateData("udp.inboundLag", lag, total);
}
public void run() {
public void loop(I2NPMessageHandler handler) {
InboundMessageState message = null;
ByteArray buf = _cache.acquire();
while (_alive) {
int expired = 0;
long expiredLifetime = 0;
int remaining = 0;
try {
synchronized (_completeMessages) {
if (_completeMessages.size() > 0)
message = (InboundMessageState)_completeMessages.remove(0);
else
_completeMessages.wait();
while (message == null) {
if (_completeMessages.size() > 0) // grab the tail for lowest latency
message = (InboundMessageState)_completeMessages.remove(_completeMessages.size()-1);
else
_completeMessages.wait(5000);
if ( (message != null) && (message.isExpired()) ) {
expiredLifetime += message.getLifetime();
message = null;
expired++;
}
remaining = _completeMessages.size();
}
}
} catch (InterruptedException ie) {}
if (expired > 0)
_context.statManager().addRateData("udp.inboundExpired", expired, expiredLifetime);
if (message != null) {
long before = System.currentTimeMillis();
if (remaining > 0)
_context.statManager().addRateData("udp.inboundRemaining", remaining, 0);
int size = message.getCompleteSize();
if (_log.shouldLog(Log.INFO))
_log.info("Full message received (" + message.getMessageId() + ") after " + message.getLifetime());
I2NPMessage msg = readMessage(buf, message);
I2NPMessage msg = readMessage(buf, message, handler);
long afterRead = System.currentTimeMillis();
if (msg != null)
_transport.messageReceived(msg, null, message.getFrom(), message.getLifetime(), size);
message = null;
long after = System.currentTimeMillis();
if (afterRead - before > 100)
_context.statManager().addRateData("udp.inboundReadTime", afterRead - before, remaining);
if (after - afterRead > 100)
_context.statManager().addRateData("udp.inboundReceiveProcessTime", after - afterRead, remaining);
}
}
@ -90,7 +135,7 @@ public class MessageReceiver implements Runnable {
_cache.release(buf, false);
}
private I2NPMessage readMessage(ByteArray buf, InboundMessageState state) {
private I2NPMessage readMessage(ByteArray buf, InboundMessageState state, I2NPMessageHandler handler) {
try {
//byte buf[] = new byte[state.getCompleteSize()];
ByteArray fragments[] = state.getFragments();
@ -109,7 +154,7 @@ public class MessageReceiver implements Runnable {
_log.error("Hmm, offset of the fragments = " + off + " while the state says " + state.getCompleteSize());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Raw byte array for " + state.getMessageId() + ": " + Base64.encode(buf.getData(), 0, state.getCompleteSize()));
I2NPMessage m = I2NPMessageImpl.fromRawByteArray(_context, buf.getData(), 0, state.getCompleteSize(), _handler);
I2NPMessage m = I2NPMessageImpl.fromRawByteArray(_context, buf.getData(), 0, state.getCompleteSize(), handler);
m.setUniqueId(state.getMessageId());
return m;
} catch (I2NPMessageException ime) {

View File

@ -88,6 +88,7 @@ public class OutboundMessageFragments {
peer.dropOutbound();
synchronized (_activePeers) {
_activePeers.remove(peer);
_activePeers.notifyAll();
}
}
@ -159,7 +160,7 @@ public class OutboundMessageFragments {
if (_log.shouldLog(Log.WARN))
_log.warn("Error initializing " + msg);
}
finishMessages();
//finishMessages();
}
/**
@ -211,6 +212,7 @@ public class OutboundMessageFragments {
i--;
}
}
_activePeers.notifyAll();
}
for (int i = 0; i < peers.size(); i++) {
PeerState state = (PeerState)peers.get(i);
@ -238,7 +240,7 @@ public class OutboundMessageFragments {
while (_alive && (state == null) ) {
long now = _context.clock().now();
int nextSendDelay = -1;
//finishMessages();
finishMessages();
try {
synchronized (_activePeers) {
for (int i = 0; i < _activePeers.size(); i++) {

View File

@ -30,6 +30,9 @@ public class PacketBuilder {
private static final ByteCache _ivCache = ByteCache.getInstance(64, UDPPacket.IV_SIZE);
private static final ByteCache _hmacCache = ByteCache.getInstance(64, Hash.HASH_LENGTH);
private static final ByteCache _blockCache = ByteCache.getInstance(64, 16);
/** we only talk to people of the right version */
static final int PROTOCOL_VERSION = 0;
public PacketBuilder(I2PAppContext ctx, UDPTransport transport) {
_context = ctx;
@ -160,10 +163,11 @@ public class PacketBuilder {
off += 2;
int sizeWritten = state.writeFragment(data, off, fragment);
if (sizeWritten != size)
if (sizeWritten != size) {
_log.error("Size written: " + sizeWritten + " but size: " + size
+ " for fragment " + fragment + " of " + state.getMessageId());
else if (_log.shouldLog(Log.DEBUG))
return null;
} else if (_log.shouldLog(Log.DEBUG))
_log.debug("Size written: " + sizeWritten + " for fragment " + fragment
+ " of " + state.getMessageId());
size = sizeWritten;
@ -1004,7 +1008,7 @@ public class PacketBuilder {
* Encrypt the packet with the cipher key and the given IV, generate a
* MAC for that encrypted data and IV, and store the result in the packet.
* The MAC used is:
* HMAC-SHA256(payload || IV || payloadLength, macKey)[0:15]
* HMAC-SHA256(payload || IV || (payloadLength ^ protocolVersion), macKey)[0:15]
*
* @param packet prepared packet with the first 32 bytes empty and a length
* whose size is mod 16
@ -1024,7 +1028,7 @@ public class PacketBuilder {
off += encryptSize;
System.arraycopy(iv.getData(), 0, data, off, UDPPacket.IV_SIZE);
off += UDPPacket.IV_SIZE;
DataHelper.toLong(data, off, 2, encryptSize);
DataHelper.toLong(data, off, 2, encryptSize ^ PROTOCOL_VERSION);
int hmacOff = packet.getPacket().getOffset();
int hmacLen = encryptSize + UDPPacket.IV_SIZE + 2;

View File

@ -145,7 +145,7 @@ public class UDPPacket {
ByteArray buf = _validateCache.acquire();
// validate by comparing _data[0:15] and
// HMAC(payload + IV + payloadLength, macKey)
// HMAC(payload + IV + (payloadLength ^ protocolVersion), macKey)
int payloadLength = _packet.getLength() - MAC_SIZE - IV_SIZE;
if (payloadLength > 0) {
@ -154,7 +154,7 @@ public class UDPPacket {
off += payloadLength;
System.arraycopy(_data, _packet.getOffset() + MAC_SIZE, buf.getData(), off, IV_SIZE);
off += IV_SIZE;
DataHelper.toLong(buf.getData(), off, 2, payloadLength);
DataHelper.toLong(buf.getData(), off, 2, payloadLength ^ PacketBuilder.PROTOCOL_VERSION);
off += 2;
eq = _context.hmac().verify(macKey, buf.getData(), 0, off, _data, _packet.getOffset(), MAC_SIZE);

View File

@ -570,7 +570,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_log.shouldLog(Log.INFO))
_log.info("Received another message: " + inMsg.getClass().getName());
}
PeerState peer = getPeerState(remoteIdentHash);
super.messageReceived(inMsg, remoteIdent, remoteIdentHash, msToReceive, bytesReceived);
if (peer != null)
peer.expireInboundMessages();
}
@ -1354,8 +1357,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
"<b id=\"def.rate\">in/out</b>: the rates show a smoothed inbound and outbound transfer rate (KBytes per second)<br />\n" +
"<b id=\"def.up\">up</b>: the uptime is how long ago this session was established<br />\n" +
"<b id=\"def.skew\">skew</b>: the skew says how far off the other user's clock is, relative to your own<br />\n" +
"<b id=\"def.cwnd\">cwnd</b>: the congestion window is how many bytes in 'in flight' you can send without an acknowledgement / <br />\n" +
"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; the number of currently active messages being sent /<br />\n the maximum number of concurrent messages to send /<br />\n"+
"<b id=\"def.cwnd\">cwnd</b>: the congestion window is how many bytes in 'in flight' you can send w/out an acknowledgement / <br />\n" +
"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; the number of currently active messages being sent /<br />\n" +
"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; the maximum number of concurrent messages to send /<br />\n"+
"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; the number of consecutive sends which were blocked due to throws message window size<br />\n" +
"<b id=\"def.ssthresh\">ssthresh</b>: the slow start threshold help make sure the cwnd doesn't grow too fast<br />\n" +
"<b id=\"def.rtt\">rtt</b>: the round trip time is how long it takes to get an acknowledgement of a packet<br />\n" +