2005-07-31 jrandom

* Adjust the netDb search and store per peer timeouts to match the average
      measured per peer success times, rather than huge fixed values.
    * Optimized and reverified the netDb peer selection / retrieval process
      within the kbuckets.
    * Drop TCP connections that don't have any useful activity in 10 minutes.
    * If i2np.udp.fixedPort=true, never change the externally published port,
      even if we are autodetecting the IP address.
(also includes most of the new peer/NAT testing, but thats not used atm)
This commit is contained in:
jrandom
2005-07-31 21:35:26 +00:00
committed by zzz
parent 593253e6a3
commit def24e34ad
30 changed files with 1043 additions and 137 deletions

View File

@ -1,4 +1,13 @@
$Id: history.txt,v 1.217 2005/07/22 19:15:59 jrandom Exp $
$Id: history.txt,v 1.218 2005/07/27 14:06:11 jrandom Exp $
2005-07-31 jrandom
* Adjust the netDb search and store per peer timeouts to match the average
measured per peer success times, rather than huge fixed values.
* Optimized and reverified the netDb peer selection / retrieval process
within the kbuckets.
* Drop TCP connections that don't have any useful activity in 10 minutes.
* If i2np.udp.fixedPort=true, never change the externally published port,
even if we are autodetecting the IP address.
* 2005-07-27 0.6 released

View File

@ -221,10 +221,10 @@ public class DeliveryInstructions extends DataStructureImpl {
return val;
}
private byte[] getAdditionalInfo() throws DataFormatException {
private int getAdditionalInfoSize() {
int additionalSize = 0;
if (getEncrypted()) {
if (_encryptionKey == null) throw new DataFormatException("Encryption key is not set");
if (_encryptionKey == null) throw new IllegalStateException("Encryption key is not set");
additionalSize += SessionKey.KEYSIZE_BYTES;
}
switch (getDeliveryMode()) {
@ -233,15 +233,15 @@ public class DeliveryInstructions extends DataStructureImpl {
_log.debug("mode = local");
break;
case FLAG_MODE_DESTINATION:
if (_destinationHash == null) throw new DataFormatException("Destination hash is not set");
if (_destinationHash == null) throw new IllegalStateException("Destination hash is not set");
additionalSize += Hash.HASH_LENGTH;
break;
case FLAG_MODE_ROUTER:
if (_routerHash == null) throw new DataFormatException("Router hash is not set");
if (_routerHash == null) throw new IllegalStateException("Router hash is not set");
additionalSize += Hash.HASH_LENGTH;
break;
case FLAG_MODE_TUNNEL:
if ( (_routerHash == null) || (_tunnelId == null) ) throw new DataFormatException("Router hash or tunnel ID is not set");
if ( (_routerHash == null) || (_tunnelId == null) ) throw new IllegalStateException("Router hash or tunnel ID is not set");
additionalSize += Hash.HASH_LENGTH;
additionalSize += 4; // tunnelId
break;
@ -249,13 +249,23 @@ public class DeliveryInstructions extends DataStructureImpl {
if (getDelayRequested()) {
additionalSize += 4;
}
}
return additionalSize;
}
private byte[] getAdditionalInfo() {
int additionalSize = getAdditionalInfoSize();
byte rv[] = new byte[additionalSize];
int offset = 0;
offset += getAdditionalInfo(rv, offset);
if (offset != additionalSize)
_log.log(Log.CRIT, "wtf, additionalSize = " + additionalSize + ", offset = " + offset);
return rv;
}
private int getAdditionalInfo(byte rv[], int offset) {
int origOffset = offset;
if (getEncrypted()) {
if (_encryptionKey == null) throw new DataFormatException("Encryption key is not set");
if (_encryptionKey == null) throw new IllegalStateException("Encryption key is not set");
System.arraycopy(_encryptionKey.getData(), 0, rv, offset, SessionKey.KEYSIZE_BYTES);
offset += SessionKey.KEYSIZE_BYTES;
if (_log.shouldLog(Log.DEBUG))
@ -270,21 +280,21 @@ public class DeliveryInstructions extends DataStructureImpl {
_log.debug("mode = local");
break;
case FLAG_MODE_DESTINATION:
if (_destinationHash == null) throw new DataFormatException("Destination hash is not set");
if (_destinationHash == null) throw new IllegalStateException("Destination hash is not set");
System.arraycopy(_destinationHash.getData(), 0, rv, offset, Hash.HASH_LENGTH);
offset += Hash.HASH_LENGTH;
if (_log.shouldLog(Log.DEBUG))
_log.debug("mode = destination, hash = " + _destinationHash);
break;
case FLAG_MODE_ROUTER:
if (_routerHash == null) throw new DataFormatException("Router hash is not set");
if (_routerHash == null) throw new IllegalStateException("Router hash is not set");
System.arraycopy(_routerHash.getData(), 0, rv, offset, Hash.HASH_LENGTH);
offset += Hash.HASH_LENGTH;
if (_log.shouldLog(Log.DEBUG))
_log.debug("mode = router, routerHash = " + _routerHash);
break;
case FLAG_MODE_TUNNEL:
if ( (_routerHash == null) || (_tunnelId == null) ) throw new DataFormatException("Router hash or tunnel ID is not set");
if ( (_routerHash == null) || (_tunnelId == null) ) throw new IllegalStateException("Router hash or tunnel ID is not set");
System.arraycopy(_routerHash.getData(), 0, rv, offset, Hash.HASH_LENGTH);
offset += Hash.HASH_LENGTH;
DataHelper.toLong(rv, offset, 4, _tunnelId.getTunnelId());
@ -303,7 +313,7 @@ public class DeliveryInstructions extends DataStructureImpl {
if (_log.shouldLog(Log.DEBUG))
_log.debug("delay NOT requested");
}
return rv;
return offset - origOffset;
}
public void writeBytes(OutputStream out) throws DataFormatException, IOException {
@ -320,6 +330,27 @@ public class DeliveryInstructions extends DataStructureImpl {
}
}
/**
* return the number of bytes written to the target
*/
public int writeBytes(byte target[], int offset) {
if ( (_deliveryMode < 0) || (_deliveryMode > FLAG_MODE_TUNNEL) ) throw new IllegalStateException("Invalid data: mode = " + _deliveryMode);
long flags = getFlags();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Write flags: " + flags + " mode: " + getDeliveryMode()
+ " =?= " + flagMode(flags));
int origOffset = offset;
DataHelper.toLong(target, offset, 1, flags);
offset++;
offset += getAdditionalInfo(target, offset);
return offset - origOffset;
}
public int getSize() {
return 1 // flags
+ getAdditionalInfoSize();
}
public boolean equals(Object obj) {
if ( (obj == null) || !(obj instanceof DeliveryInstructions))
return false;

View File

@ -155,13 +155,35 @@ public class GarlicClove extends DataStructureImpl {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Written cert: " + _certificate);
}
public byte[] toByteArray() {
byte rv[] = new byte[estimateSize()];
int offset = 0;
offset += _instructions.writeBytes(rv, offset);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Wrote instructions: " + _instructions);
//offset += _msg.toByteArray(rv);
try {
byte m[] = _msg.toByteArray();
System.arraycopy(m, 0, rv, offset, m.length);
offset += m.length;
} catch (Exception e) { throw new RuntimeException("Unable to write: " + _msg + ": " + e.getMessage()); }
DataHelper.toLong(rv, offset, 4, _cloveId);
offset += 4;
DataHelper.toDate(rv, offset, _expiration.getTime());
offset += DataHelper.DATE_LENGTH;
offset += _certificate.writeBytes(rv, offset);
if (offset != rv.length)
_log.log(Log.CRIT, "Clove offset: " + offset + " but estimated length: " + rv.length);
return rv;
}
public int estimateSize() {
return 64 // instructions (high estimate)
return _instructions.getSize()
+ _msg.getMessageSize()
+ 4 // cloveId
+ DataHelper.DATE_LENGTH
+ 4; // certificate
+ _certificate.size(); // certificate
}
public boolean equals(Object obj) {

View File

@ -10,6 +10,7 @@ package net.i2p.data.i2np;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
@ -133,6 +134,13 @@ public class I2NPMessageReader {
_listener.readError(I2NPMessageReader.this, ime);
_listener.disconnected(I2NPMessageReader.this);
cancelRunner();
} catch (InterruptedIOException iioe) {
// not all I2NPMessageReaders support this, but some run off sockets which throw
// SocketTimeoutExceptions or InterruptedIOExceptions
if (_log.shouldLog(Log.INFO))
_log.info("Disconnecting due to inactivity", iioe);
_listener.disconnected(I2NPMessageReader.this);
cancelRunner();
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("IO Error handling message", ioe);

View File

@ -51,7 +51,7 @@ public class MultiRouterBuilder {
String dir = args[i];
try {
int basePortNum = Integer.parseInt(args[i+1]);
buildConfig(dir, basePortNum);
buildConfig(dir, basePortNum, i == 0);
} catch (NumberFormatException nfe) {
nfe.printStackTrace();
}
@ -74,42 +74,24 @@ public class MultiRouterBuilder {
}
private static void buildStartupScript(String args[]) {
buildStartupScriptWin(args);
buildStartupScriptNix(args);
}
private static void buildStartupScriptWin(String args[]) {
StringBuffer buf = new StringBuffer(4096);
buf.append("@echo off\ntitle I2P Router Sim\n");
buf.append("echo After all of the routers have started up, you should cross seed them\n");
buf.append("echo Simply copy */netDb/routerInfo-* to all of the various */netDb/ directories\n");
buf.append("java -cp lib\\i2p.jar;lib\\router.jar;lib\\mstreaming.jar;");
buf.append("lib\\heartbeat.jar;lib\\i2ptunnel.jar;lib\\netmonitor.jar;");
buf.append("lib\\sam.jar ");
buf.append("-Djava.library.path=. ");
buf.append("-DloggerFilenameOverride=logs\\log-sim-#.txt ");
buf.append("net.i2p.router.MultiRouter baseEnv.txt ");
for (int i = 0; i < args.length; i += 2)
buf.append(args[i]).append("\\routerEnv.txt ");
buf.append("\npause\n");
try {
FileOutputStream fos = new FileOutputStream("runNetSim.bat");
fos.write(buf.toString().getBytes());
fos.close();
} catch (IOException ioe) { ioe.printStackTrace(); }
}
private static void buildStartupScriptNix(String args[]) {
StringBuffer buf = new StringBuffer(4096);
buf.append("#!/bin/sh\n");
buf.append("nohup java -cp lib/i2p.jar:lib/router.jar:lib/mstreaming.jar:");
buf.append("lib/heartbeat.jar:lib/i2ptunnel.jar:lib/netmonitor.jar:");
buf.append("lib/sam.jar ");
buf.append("-Djava.library.path=. ");
buf.append("-DloggerFilenameOverride=logs/log-sim-#.txt ");
buf.append("net.i2p.router.MultiRouter baseEnv.txt ");
for (int i = 1; i < args.length; i += 2)
buf.append("export CP=.; for LIB in lib/* ; do export CP=$CP:$LIB ; done\n");
buf.append("nohup java -cp $CP ");
buf.append(" -Xmx1024m");
buf.append(" -Djava.library.path=.");
buf.append(" -DloggerFilenameOverride=logs/log-sim-#.txt");
buf.append(" -Dorg.mortbay.http.Version.paranoid=true");
buf.append(" -Dorg.mortbay.util.FileResource.checkAliases=false");
buf.append(" -verbose:gc");
buf.append(" net.i2p.router.MultiRouter baseEnv.txt ");
for (int i = 0; i < args.length; i += 2)
buf.append(args[i]).append("/routerEnv.txt ");
buf.append(" > sim.txt &\n");
buf.append(" > sim.out 2>&1 &\n");
buf.append("echo $! > sim.pid\n");
buf.append("echo \"After all of the routers have started up, you should cross seed them\"\n");
buf.append("echo \"Simply copy */netDb/routerInfo-* to all of the various */netDb/ directories\"\n");
@ -120,32 +102,11 @@ public class MultiRouterBuilder {
} catch (IOException ioe) { ioe.printStackTrace(); }
}
private static void buildConfig(String dir, int basePort) {
private static void buildConfig(String dir, int basePort, boolean isFirst) {
File baseDir = new File(dir);
baseDir.mkdirs();
File cfgFile = new File(baseDir, "router.config");
StringBuffer buf = new StringBuffer(8*1024);
buf.append("i2np.bandwidth.inboundKBytesPerSecond=8\n");
buf.append("i2np.bandwidth.outboundKBytesPerSecond=8\n");
buf.append("i2np.bandwidth.inboundBurstKBytes=80\n");
buf.append("i2np.bandwidth.outboundBurstKBytes=80\n");
buf.append("i2np.bandwidth.replenishFrequencyMs=1000\n");
buf.append("router.publishPeerRankings=true\n");
buf.append("router.keepHistory=false\n");
buf.append("router.submitHistory=false\n");
buf.append("router.maxJobRunners=1\n");
buf.append("router.jobLagWarning=10000\n");
buf.append("router.jobLagFatal=30000\n");
buf.append("router.jobRunWarning=10000\n");
buf.append("router.jobRunFatal=30000\n");
buf.append("router.jobWarmupTime=600000\n");
buf.append("router.targetClients=1\n");
buf.append("tunnels.numInbound=6\n");
buf.append("tunnels.numOutbound=6\n");
buf.append("tunnels.depthInbound=2\n");
buf.append("tunnels.depthOutbound=2\n");
buf.append("tunnels.tunnelDuration=600000\n");
buf.append("router.maxWaitingJobs=30\n");
buf.append("router.profileDir=").append(baseDir.getPath()).append("/peerProfiles\n");
buf.append("router.historyFilename=").append(baseDir.getPath()).append("/messageHistory.txt\n");
buf.append("router.sessionKeys.location=").append(baseDir.getPath()).append("/sessionKeys.dat\n");
@ -154,46 +115,53 @@ public class MultiRouterBuilder {
buf.append("router.networkDatabase.dbDir=").append(baseDir.getPath()).append("/netDb\n");
buf.append("router.tunnelPoolFile=").append(baseDir.getPath()).append("/tunnelPool.dat\n");
buf.append("router.keyBackupDir=").append(baseDir.getPath()).append("/keyBackup\n");
buf.append("router.clientConfigFile=").append(baseDir.getPath()).append("/clients.config\n");
buf.append("i2np.tcp.port=").append(basePort).append('\n');
buf.append("i2np.tcp.hostname=localhost\n");
buf.append("i2np.tcp.allowLocal=true\n");
buf.append("i2np.tcp.disable=true\n");
buf.append("i2np.tcp.enable=false\n");
buf.append("i2np.udp.host=127.0.0.1\n");
buf.append("i2np.udp.port=").append(basePort).append('\n');
buf.append("i2np.udp.internalPort=").append(basePort).append('\n');
buf.append("i2cp.port=").append(basePort+1).append('\n');
buf.append("router.adminPort=").append(basePort+2).append('\n');
buf.append("#clientApp.0.main=net.i2p.sam.SAMBridge\n");
buf.append("#clientApp.0.name=SAM\n");
buf.append("#clientApp.0.args=localhost ").append(basePort+3).append(" i2cp.tcp.host=localhost i2cp.tcp.port=").append(basePort+1).append("\n");
buf.append("#clientApp.1.main=net.i2p.i2ptunnel.I2PTunnel\n");
buf.append("#clientApp.1.name=EepProxy\n");
buf.append("#clientApp.1.args=-nogui -e \"config localhost ").append(basePort+1).append("\" -e \"httpclient ").append(basePort+4).append("\"\n");
buf.append("#clientApp.2.main=net.i2p.heartbeat.Heartbeat\n");
buf.append("#clientApp.2.name=Heartbeat\n");
buf.append("#clientApp.2.args=").append(baseDir.getPath()).append("/heartbeat.config\n");
buf.append("stat.logFile=").append(baseDir.getPath()).append("/stats.log\n");
buf.append("stat.logFilters=*\n");
try {
FileOutputStream fos = new FileOutputStream(cfgFile);
fos.write(buf.toString().getBytes());
fos.close();
fos = new FileOutputStream(new File(baseDir, "heartbeat.config"));
StringBuffer tbuf = new StringBuffer(1024);
tbuf.append("i2cpHost=localhost\n");
tbuf.append("i2cpPort=").append(basePort+1).append('\n');
tbuf.append("numHops=2\n");
tbuf.append("privateDestinationFile=").append(baseDir.getPath()).append("/heartbeat.keys\n");
tbuf.append("publicDestinationFile=").append(baseDir.getPath()).append("/heartbeat.txt\n");
fos.write(tbuf.toString().getBytes());
fos.close();
File envFile = new File(baseDir, "routerEnv.txt");
fos = new FileOutputStream(envFile);
fos.write(("loggerFilenameOverride="+baseDir+ "/logs/log-router-#.txt\n").getBytes());
fos.write(("router.configLocation="+baseDir+"/router.config\n").getBytes());
fos.write(("router.pingFile="+baseDir+"/router.ping\n").getBytes());
//fos.write(("i2p.vmCommSystem=true\n").getBytes());
fos.write(("i2p.encryption=off\n").getBytes());
//fos.write(("i2p.encryption=off\n").getBytes());
fos.close();
File f = new File(baseDir, "logs");
f.mkdirs();
if (isFirst) {
fos = new FileOutputStream(baseDir.getPath() + "/clients.config");
fos.write(("clientApp.0.args=" + (basePort-1) + " 127.0.0.1 ./webapps\n").getBytes());
fos.write(("clientApp.0.main=net.i2p.router.web.RouterConsoleRunner\n").getBytes());
fos.write(("clientApp.0.name=webconsole\n").getBytes());
fos.write(("clientApp.0.onBoot=true\n").getBytes());
fos.write(("clientApp.1.args=\n").getBytes());
fos.write(("clientApp.1.main=net.i2p.router.Counter\n").getBytes());
fos.write(("clientApp.1.name=counter\n").getBytes());
fos.write(("clientApp.1.onBoot=true\n").getBytes());
fos.write(("clientApp.2.args=i2ptunnel.config\n").getBytes());
fos.write(("clientApp.2.main=net.i2p.i2ptunnel.TunnelControllerGroup\n").getBytes());
fos.write(("clientApp.2.name=tunnels\n").getBytes());
fos.write(("clientApp.2.delay=60\n").getBytes());
fos.close();
}
} catch (IOException ioe) {
ioe.printStackTrace();
}

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.208 $ $Date: 2005/07/22 19:15:58 $";
public final static String ID = "$Revision: 1.209 $ $Date: 2005/07/27 14:03:43 $";
public final static String VERSION = "0.6";
public final static long BUILD = 0;
public final static long BUILD = 1;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -201,9 +201,12 @@ public class GarlicMessageBuilder {
clove.setCloveId(config.getId());
clove.setExpiration(new Date(config.getExpiration()));
clove.setInstructions(config.getDeliveryInstructions());
return clove.toByteArray();
/*
int size = clove.estimateSize();
ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
clove.writeBytes(baos);
return baos.toByteArray();
*/
}
}

View File

@ -53,6 +53,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
getContext().statManager().createRateStat("netDb.lookupsMatchedReceivedPublished", "How many netDb lookups did we have the data for that were published to us?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.lookupsMatchedLocalClosest", "How many netDb lookups for local data were received where we are the closest peers?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.lookupsMatchedLocalNotClosest", "How many netDb lookups for local data were received where we are NOT the closest peers?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.lookupsMatchedRemoteNotClosest", "How many netDb lookups for remote data were received where we are NOT the closest peers?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
_message = receivedMessage;
_from = from;
_fromHash = fromHash;
@ -81,12 +82,16 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
Set routerInfoSet = getContext().netDb().findNearestRouters(_message.getSearchKey(),
CLOSENESS_THRESHOLD,
_message.getDontIncludePeers());
if (getContext().clientManager().isLocal(ls.getDestination()) &&
weAreClosest(routerInfoSet)) {
getContext().statManager().addRateData("netDb.lookupsMatchedLocalClosest", 1, 0);
sendData(_message.getSearchKey(), ls, fromKey, _message.getReplyTunnel());
if (getContext().clientManager().isLocal(ls.getDestination())) {
if (weAreClosest(routerInfoSet)) {
getContext().statManager().addRateData("netDb.lookupsMatchedLocalClosest", 1, 0);
sendData(_message.getSearchKey(), ls, fromKey, _message.getReplyTunnel());
} else {
getContext().statManager().addRateData("netDb.lookupsMatchedLocalNotClosest", 1, 0);
sendClosest(_message.getSearchKey(), routerInfoSet, fromKey, _message.getReplyTunnel());
}
} else {
getContext().statManager().addRateData("netDb.lookupsMatchedLocalNotClosest", 1, 0);
getContext().statManager().addRateData("netDb.lookupsMatchedRemoteNotClosest", 1, 0);
sendClosest(_message.getSearchKey(), routerInfoSet, fromKey, _message.getReplyTunnel());
}
}

View File

@ -36,6 +36,8 @@ public class HandleDatabaseStoreMessageJob extends JobImpl {
super(ctx);
_log = ctx.logManager().getLog(HandleDatabaseStoreMessageJob.class);
ctx.statManager().createRateStat("netDb.storeHandled", "How many netDb store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("netDb.storeLeaseSetHandled", "How many leaseSet store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
ctx.statManager().createRateStat("netDb.storeRouterInfoHandled", "How many routerInfo store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
_message = receivedMessage;
_from = from;
_fromHash = fromHash;
@ -48,6 +50,8 @@ public class HandleDatabaseStoreMessageJob extends JobImpl {
String invalidMessage = null;
boolean wasNew = false;
if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) {
getContext().statManager().addRateData("netDb.storeLeaseSetHandled", 1, 0);
try {
LeaseSet ls = _message.getLeaseSet();
// mark it as something we received, so we'll answer queries
@ -65,6 +69,7 @@ public class HandleDatabaseStoreMessageJob extends JobImpl {
invalidMessage = iae.getMessage();
}
} else if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_ROUTERINFO) {
getContext().statManager().addRateData("netDb.storeRouterInfoHandled", 1, 0);
if (_log.shouldLog(Log.INFO))
_log.info("Handling dbStore of router " + _message.getKey() + " with publishDate of "
+ new Date(_message.getRouterInfo().getPublished()));

View File

@ -62,6 +62,8 @@ interface KBucket {
* @return set of Hash structures
*/
public Set getEntries(Set toIgnoreHashes);
public void getEntries(SelectionCollector collector);
/**
* Fill the bucket with entries
* @param entries set of Hash structures

View File

@ -10,6 +10,7 @@ package net.i2p.router.networkdb.kademlia;
import java.math.BigInteger;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import net.i2p.I2PAppContext;
@ -205,6 +206,14 @@ class KBucketImpl implements KBucket {
return entries;
}
public void getEntries(SelectionCollector collector) {
synchronized (_entries) {
for (Iterator iter = _entries.iterator(); iter.hasNext(); ) {
collector.add((Hash)iter.next());
}
}
}
public void setEntries(Set entries) {
synchronized (_entries) {
_entries.clear();

View File

@ -85,6 +85,11 @@ class KBucketSet {
return all;
}
public void getAll(SelectionCollector collector) {
for (int i = 0; i < _buckets.length; i++)
_buckets[i].getEntries(collector);
}
public int pickBucket(Hash key) {
for (int i = 0; i < NUM_BUCKETS; i++) {
if (_buckets[i].shouldContain(key))
@ -134,8 +139,7 @@ class KBucketSet {
return buf.toString();
}
final static String toString(byte b[]) {
byte val[] = new byte[Hash.HASH_LENGTH];
if (b.length < 32)

View File

@ -116,10 +116,10 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
private final static int MIN_REMAINING_ROUTERS = 20;
/**
* dont accept any dbDtore of a router over 6 hours old (unless we dont
* dont accept any dbDtore of a router over 24 hours old (unless we dont
* know anyone or just started up)
*/
private final static long ROUTER_INFO_EXPIRATION = 6*60*60*1000l;
private final static long ROUTER_INFO_EXPIRATION = 24*60*60*1000l;
public KademliaNetworkDatabaseFacade(RouterContext context) {
_context = context;
@ -274,7 +274,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_started = System.currentTimeMillis();
// read the queues and publish appropriately
_context.jobQueue().addJob(new DataPublisherJob(_context, this));
if (false)
_context.jobQueue().addJob(new DataPublisherJob(_context, this));
// expire old leases
_context.jobQueue().addJob(new ExpireLeasesJob(_context, this));
// expire some routers in overly full kbuckets

View File

@ -55,6 +55,9 @@ class PeerSelector {
* @return List of Hash for the peers selected, ordered by bucket (but intra bucket order is not defined)
*/
public List selectNearestExplicit(Hash key, int maxNumRouters, Set peersToIgnore, KBucketSet kbuckets) {
if (true)
return selectNearestExplicitThin(key, maxNumRouters, peersToIgnore, kbuckets);
if (peersToIgnore == null)
peersToIgnore = new HashSet(1);
peersToIgnore.add(_context.router().getRouterInfo().getIdentity().getHash());
@ -80,6 +83,63 @@ class PeerSelector {
return peerHashes;
}
/**
* Ignore KBucket ordering and do the XOR explicitly per key. Runs in O(n*log(n))
* time (n=routing table size with c ~ 32 xor ops). This gets strict ordering
* on closest
*
* @return List of Hash for the peers selected, ordered by bucket (but intra bucket order is not defined)
*/
public List selectNearestExplicitThin(Hash key, int maxNumRouters, Set peersToIgnore, KBucketSet kbuckets) {
if (peersToIgnore == null)
peersToIgnore = new HashSet(1);
peersToIgnore.add(_context.router().getRouterInfo().getIdentity().getHash());
MatchSelectionCollector matches = new MatchSelectionCollector(key, peersToIgnore);
kbuckets.getAll(matches);
List rv = matches.get(maxNumRouters);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Searching for " + maxNumRouters + " peers close to " + key + ": "
+ rv + " (not including " + peersToIgnore + ") [allHashes.size = "
+ matches.size() + "]");
return rv;
}
private class MatchSelectionCollector implements SelectionCollector {
private TreeMap _sorted;
private Hash _key;
private Set _toIgnore;
private int _matches;
public MatchSelectionCollector(Hash key, Set toIgnore) {
_key = key;
_sorted = new TreeMap();
_toIgnore = toIgnore;
_matches = 0;
}
public void add(Hash entry) {
if (_context.profileOrganizer().isFailing(entry))
return;
if (_toIgnore.contains(entry))
return;
if (_context.netDb().lookupRouterInfoLocally(entry) == null)
return;
BigInteger diff = getDistance(_key, entry);
_sorted.put(diff, entry);
_matches++;
}
/** get the first $howMany entries matching */
public List get(int howMany) {
List rv = new ArrayList(howMany);
for (int i = 0; i < howMany; i++) {
if (_sorted.size() <= 0)
break;
rv.add(_sorted.remove(_sorted.firstKey()));
}
return rv;
}
public int size() { return _matches; }
}
/**
* strip out all of the peers that are failing
*

View File

@ -33,6 +33,7 @@ public class RepublishLeaseSetJob extends JobImpl {
_facade = facade;
_dest = destHash;
//getTiming().setStartAfter(ctx.clock().now()+REPUBLISH_LEASESET_DELAY);
getContext().statManager().createRateStat("netDb.republishLeaseSetCount", "How often we republish a leaseSet?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
}
public String getName() { return "Republish a local leaseSet"; }
public void runJob() {
@ -46,6 +47,7 @@ public class RepublishLeaseSetJob extends JobImpl {
if (_log.shouldLog(Log.WARN))
_log.warn("Not publishing a LOCAL lease that isn't current - " + _dest, new Exception("Publish expired LOCAL lease?"));
} else {
getContext().statManager().addRateData("netDb.republishLeaseSetCount", 1, 0);
getContext().jobQueue().addJob(new StoreJob(getContext(), _facade, _dest, ls, new OnSuccess(getContext()), new OnFailure(getContext()), REPUBLISH_LEASESET_TIMEOUT));
}
} else {
@ -82,7 +84,7 @@ public class RepublishLeaseSetJob extends JobImpl {
public void runJob() {
if (_log.shouldLog(Log.WARN))
_log.warn("FAILED publishing of the leaseSet for " + _dest.toBase64());
RepublishLeaseSetJob.this.requeue(30*1000);
RepublishLeaseSetJob.this.requeue(5*1000);
}
}
}

View File

@ -27,6 +27,8 @@ import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
import net.i2p.router.message.SendMessageDirectJob;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.util.Log;
/**
@ -59,7 +61,7 @@ class SearchJob extends JobImpl {
* How long will we give each peer to reply to our search?
*
*/
private static final int PER_PEER_TIMEOUT = 10*1000;
private static final int PER_PEER_TIMEOUT = 2*1000;
/**
* give ourselves 30 seconds to send out the value found to the closest
@ -69,6 +71,13 @@ class SearchJob extends JobImpl {
*/
private static final long RESEND_TIMEOUT = 30*1000;
/**
* When we're just waiting for something to change, requeue the search status test
* every second.
*
*/
private static final long REQUEUE_DELAY = 1000;
/**
* Create a new search for the routingKey specified
*
@ -100,15 +109,17 @@ class SearchJob extends JobImpl {
getContext().statManager().createRateStat("netDb.searchReplyNotValidated", "How many search replies we get that we are NOT able to validate (fetch)", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.searchReplyValidationSkipped", "How many search replies we get from unreliable peers that we skip?", "NetworkDatabase", new long[] { 5*60*1000l, 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.republishQuantity", "How many peers do we need to send a found leaseSet to?", "NetworkDatabase", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
getContext().statManager().addRateData("netDb.searchCount", 1, 0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Search (" + getClass().getName() + " for " + key.toBase64(), new Exception("Search enqueued by"));
}
public void runJob() {
_startedOn = getContext().clock().now();
if (_startedOn <= 0)
_startedOn = getContext().clock().now();
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Searching for " + _state.getTarget()); // , getAddedBy());
getContext().statManager().addRateData("netDb.searchCount", 1, 0);
searchNext();
}
@ -117,6 +128,25 @@ class SearchJob extends JobImpl {
protected long getExpiration() { return _expiration; }
protected long getTimeoutMs() { return _timeoutMs; }
/**
* Let each peer take up to the average successful search RTT
*
*/
protected int getPerPeerTimeoutMs() {
int rv = -1;
RateStat rs = getContext().statManager().getRate("netDb.successTime");
if (rs != null) {
Rate r = rs.getRate(rs.getPeriods()[0]);
rv = (int)r.getLifetimeAverageValue();
}
rv <<= 1; // double it to give some leeway. (bah, too lazy to record stdev)
if ( (rv <= 0) || (rv > PER_PEER_TIMEOUT) )
return PER_PEER_TIMEOUT;
else
return rv + 1025; // tunnel delay
}
/**
* Send the next search, or stop if its completed
*/
@ -228,7 +258,12 @@ class SearchJob extends JobImpl {
}
private void requeuePending() {
requeuePending(5*1000);
// timeout/2 to average things out (midway through)
long perPeerTimeout = getPerPeerTimeoutMs()/2;
if (perPeerTimeout < REQUEUE_DELAY)
requeuePending(perPeerTimeout);
else
requeuePending(REQUEUE_DELAY);
}
private void requeuePending(long ms) {
if (_pendingRequeueJob == null)
@ -306,7 +341,7 @@ class SearchJob extends JobImpl {
return;
}
long expiration = getContext().clock().now() + PER_PEER_TIMEOUT; // getTimeoutMs();
long expiration = getContext().clock().now() + getPerPeerTimeoutMs();
DatabaseLookupMessage msg = buildMessage(inTunnelId, inGateway, expiration);
@ -328,13 +363,13 @@ class SearchJob extends JobImpl {
SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state);
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this);
getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router), PER_PEER_TIMEOUT);
getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router), getPerPeerTimeoutMs());
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, router.getIdentity().getHash());
}
/** we're searching for a router, so we can just send direct */
protected void sendRouterSearch(RouterInfo router) {
long expiration = getContext().clock().now() + PER_PEER_TIMEOUT; // getTimeoutMs();
long expiration = getContext().clock().now() + getPerPeerTimeoutMs();
DatabaseLookupMessage msg = buildMessage(expiration);
@ -345,7 +380,7 @@ class SearchJob extends JobImpl {
SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state);
SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this);
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, router.getIdentity().getHash(),
reply, new FailedJob(getContext(), router), sel, PER_PEER_TIMEOUT, SEARCH_PRIORITY);
reply, new FailedJob(getContext(), router), sel, getPerPeerTimeoutMs(), SEARCH_PRIORITY);
getContext().jobQueue().addJob(j);
}
@ -598,6 +633,13 @@ class SearchJob extends JobImpl {
*/
private static final int MAX_LEASE_RESEND = 10;
/**
* Should we republish a routerInfo received? Probably not worthwhile, since
* routerInfo entries should be very easy to find.
*
*/
private static final boolean SHOULD_RESEND_ROUTERINFO = false;
/**
* After we get the data we were searching for, rebroadcast it to the peers
* we would query first if we were to search for it again (healing the network).
@ -606,11 +648,13 @@ class SearchJob extends JobImpl {
private void resend() {
DataStructure ds = _facade.lookupLeaseSetLocally(_state.getTarget());
if (ds == null) {
ds = _facade.lookupRouterInfoLocally(_state.getTarget());
if (ds != null)
getContext().jobQueue().addJob(new StoreJob(getContext(), _facade, _state.getTarget(),
ds, null, null, RESEND_TIMEOUT,
_state.getSuccessful()));
if (SHOULD_RESEND_ROUTERINFO) {
ds = _facade.lookupRouterInfoLocally(_state.getTarget());
if (ds != null)
getContext().jobQueue().addJob(new StoreJob(getContext(), _facade, _state.getTarget(),
ds, null, null, RESEND_TIMEOUT,
_state.getSuccessful()));
}
} else {
Set sendTo = _state.getFailed();
sendTo.addAll(_state.getPending());

View File

@ -0,0 +1,10 @@
package net.i2p.router.networkdb.kademlia;
import net.i2p.data.Hash;
/**
* Visit kbuckets, gathering matches
*/
interface SelectionCollector {
public void add(Hash entry);
}

View File

@ -24,6 +24,8 @@ import net.i2p.router.JobImpl;
import net.i2p.router.ReplyJob;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.util.Log;
class StoreJob extends JobImpl {
@ -36,8 +38,8 @@ class StoreJob extends JobImpl {
private long _expiration;
private PeerSelector _peerSelector;
private final static int PARALLELIZATION = 2; // how many sent at a time
private final static int REDUNDANCY = 6; // we want the data sent to 6 peers
private final static int PARALLELIZATION = 3; // how many sent at a time
private final static int REDUNDANCY = 3; // we want the data sent to 6 peers
/**
* additionally send to 1 outlier(s), in case all of the routers chosen in our
* REDUNDANCY set are attacking us by accepting DbStore messages but dropping
@ -50,8 +52,10 @@ class StoreJob extends JobImpl {
private final static int EXPLORATORY_REDUNDANCY = 1;
private final static int STORE_PRIORITY = 100;
/** how long we allow for an ACK to take after a store */
private final static int STORE_TIMEOUT_MS = 10*1000;
/** default period we allow for an ACK to take after a store */
private final static int PER_PEER_TIMEOUT = 5*1000;
/** smallest allowed period */
private static final int MIN_PER_PEER_TIMEOUT = 1*1000;
/**
* Create a new search for the routingKey specified
@ -70,8 +74,10 @@ class StoreJob extends JobImpl {
DataStructure data, Job onSuccess, Job onFailure, long timeoutMs, Set toSkip) {
super(context);
_log = context.logManager().getLog(StoreJob.class);
getContext().statManager().createRateStat("netDb.storeSent", "How many netDb store messages have we sent?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.storeRouterInfoSent", "How many routerInfo store messages have we sent?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.storeLeaseSetSent", "How many leaseSet store messages have we sent?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.storePeers", "How many peers each netDb must be sent to before success?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.storeFailedPeers", "How many peers each netDb must be sent to before failing completely?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
getContext().statManager().createRateStat("netDb.ackTime", "How long does it take for a peer to ack a netDb store?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
_facade = facade;
_state = new StoreState(getContext(), key, data, toSkip);
@ -198,11 +204,14 @@ class StoreJob extends JobImpl {
// _log.debug(getJobId() + ": Send store to " + router.getIdentity().getHash().toBase64());
}
sendStore(msg, router, getContext().clock().now() + STORE_TIMEOUT_MS);
sendStore(msg, router, getContext().clock().now() + getPerPeerTimeoutMs());
}
private void sendStore(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
getContext().statManager().addRateData("netDb.storeSent", 1, 0);
if (msg.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET)
getContext().statManager().addRateData("netDb.storeLeaseSetSent", 1, 0);
else
getContext().statManager().addRateData("netDb.storeRouterInfoSent", 1, 0);
sendStoreThroughGarlic(msg, peer, expiration);
}
@ -242,7 +251,7 @@ class StoreJob extends JobImpl {
if (_log.shouldLog(Log.DEBUG))
_log.debug("sending store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + msg);
getContext().messageRegistry().registerPending(selector, onReply, onFail, STORE_TIMEOUT_MS);
getContext().messageRegistry().registerPending(selector, onReply, onFail, getPerPeerTimeoutMs());
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), null, peer.getIdentity().getHash());
} else {
if (_log.shouldLog(Log.ERROR))
@ -328,6 +337,7 @@ class StoreJob extends JobImpl {
if (_onSuccess != null)
getContext().jobQueue().addJob(_onSuccess);
_facade.noteKeySent(_state.getTarget());
_state.complete(true);
getContext().statManager().addRateData("netDb.storePeers", _state.getAttempted().size(), _state.getWhenCompleted()-_state.getWhenStarted());
}
@ -341,5 +351,30 @@ class StoreJob extends JobImpl {
_log.debug(getJobId() + ": State of failed send: " + _state, new Exception("Who failed me?"));
if (_onFailure != null)
getContext().jobQueue().addJob(_onFailure);
_state.complete(true);
getContext().statManager().addRateData("netDb.storeFailedPeers", _state.getAttempted().size(), _state.getWhenCompleted()-_state.getWhenStarted());
}
/**
* Let each peer take up to the average successful search RTT
*
*/
private int getPerPeerTimeoutMs() {
int rv = -1;
RateStat rs = getContext().statManager().getRate("netDb.ackTime");
if (rs != null) {
Rate r = rs.getRate(rs.getPeriods()[0]);
rv = (int)r.getLifetimeAverageValue();
}
rv <<= 1; // double it to give some leeway. (bah, too lazy to record stdev)
if (rv <= 0)
return PER_PEER_TIMEOUT;
else if (rv < MIN_PER_PEER_TIMEOUT)
return MIN_PER_PEER_TIMEOUT;
else if (rv > PER_PEER_TIMEOUT)
return PER_PEER_TIMEOUT;
else
return rv;
}
}

View File

@ -75,7 +75,7 @@ class StoreState {
}
public boolean completed() { return _completed != -1; }
public void complete(boolean completed) {
if (completed)
if (completed && _completed <= 0)
_completed = _context.clock().now();
}
public int getCompleteCount() { return _completeCount; }

View File

@ -38,6 +38,9 @@ public class PeerTestJob extends JobImpl {
super(context);
_log = context.logManager().getLog(PeerTestJob.class);
_keepTesting = false;
getContext().statManager().createRateStat("peer.testOK", "How long a successful test takes", "Peers", new long[] { 60*1000, 10*60*1000 });
getContext().statManager().createRateStat("peer.testTooSlow", "How long a too-slow (yet successful) test takes", "Peers", new long[] { 60*1000, 10*60*1000 });
getContext().statManager().createRateStat("peer.testTimeout", "How often a test times out without a reply", "Peers", new long[] { 60*1000, 10*60*1000 });
}
/** how long should we wait before firing off new tests? */
@ -202,9 +205,14 @@ public class PeerTestJob extends JobImpl {
DeliveryStatusMessage msg = (DeliveryStatusMessage)message;
if (_nonce == msg.getMessageId()) {
long timeLeft = _expiration - getContext().clock().now();
if (timeLeft < 0)
_log.warn("Took too long to get a reply from peer " + _peer.toBase64()
+ ": " + (0-timeLeft) + "ms too slow");
if (timeLeft < 0) {
if (_log.shouldLog(Log.WARN))
_log.warn("Took too long to get a reply from peer " + _peer.toBase64()
+ ": " + (0-timeLeft) + "ms too slow");
getContext().statManager().addRateData("peer.testTooSlow", 0-timeLeft, 0);
} else {
getContext().statManager().addRateData("peer.testOK", getTestTimeout() - timeLeft, 0);
}
return true;
}
}
@ -280,6 +288,7 @@ public class PeerTestJob extends JobImpl {
// don't fail the tunnels, as the peer might just plain be down, or
// otherwise overloaded
getContext().statManager().addRateData("peer.testTimeout", 1, 0);
}
}
}

View File

@ -152,7 +152,10 @@ public class ConnectionHandler {
if (ok && (_error == null) ) {
establishComplete();
try { _socket.setSoTimeout(0); } catch (SocketException se) {}
if (true)
try { _socket.setSoTimeout(ConnectionRunner.DISCONNECT_INACTIVITY_PERIOD); } catch (SocketException se) {}
else
try { _socket.setSoTimeout(0); } catch (SocketException se) {}
if (_log.shouldLog(Log.INFO))
_log.info("Establishment ok... building the con");

View File

@ -25,8 +25,11 @@ class ConnectionRunner implements Runnable {
private long _lastTimeSend;
private long _lastWriteEnd;
private long _lastWriteBegin;
private long _lastRealActivity;
private static final long TIME_SEND_FREQUENCY = 60*1000;
/** if we don't send them any real data in a 10 minute period, drop 'em */
static final int DISCONNECT_INACTIVITY_PERIOD = 10*60*1000;
public ConnectionRunner(RouterContext ctx, TCPConnection con) {
_context = ctx;
@ -35,6 +38,7 @@ class ConnectionRunner implements Runnable {
_keepRunning = false;
_lastWriteBegin = ctx.clock().now();
_lastWriteEnd = _lastWriteBegin;
_lastRealActivity = _lastWriteBegin;
}
public void startRunning() {
@ -104,6 +108,9 @@ class ConnectionRunner implements Runnable {
OutputStream out = _con.getOutputStream();
boolean ok = false;
if (!DateMessage.class.getName().equals(msg.getMessageType()))
_lastRealActivity = _context.clock().now();
try {
synchronized (out) {
_lastWriteBegin = _context.clock().now();
@ -154,6 +161,7 @@ class ConnectionRunner implements Runnable {
long now = _context.clock().now();
long timeSinceWrite = now - _lastWriteEnd;
long timeSinceWriteBegin = now - _lastWriteBegin;
long timeSinceWriteReal = now - _lastRealActivity;
if (timeSinceWrite > 5*TIME_SEND_FREQUENCY) {
TCPTransport t = _con.getTransport();
String msg = "Connection closed with "
@ -170,6 +178,23 @@ class ConnectionRunner implements Runnable {
_con.closeConnection(false);
return;
}
if (timeSinceWriteReal > DISCONNECT_INACTIVITY_PERIOD) {
TCPTransport t = _con.getTransport();
String msg = "Connection closed with "
+ _con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6)
+ " due to " + DataHelper.formatDuration(timeSinceWriteReal)
+ " of inactivity after "
+ DataHelper.formatDuration(_con.getLifetime());
if (_lastWriteBegin > _lastWriteEnd)
msg = msg + " with a message being written for " +
DataHelper.formatDuration(timeSinceWriteBegin);
t.addConnectionErrorMessage(msg);
if (_log.shouldLog(Log.INFO))
_log.info(msg);
_con.closeConnection(false);
return;
}
if (_lastTimeSend < _context.clock().now() - 2*TIME_SEND_FREQUENCY)
enqueueTimeMessage();
long delay = 2*TIME_SEND_FREQUENCY + _context.random().nextInt((int)TIME_SEND_FREQUENCY);

View File

@ -442,6 +442,145 @@ public class PacketBuilder {
return packet;
}
/**
* full flag info for a peerTest message. this can be fixed,
* since we never rekey on test, and don't need any extended options
*/
private static final byte PEER_TEST_FLAG_BYTE = (UDPPacket.PAYLOAD_TYPE_TEST << 4);
/**
* Build a packet as if we are Alice and we either want Bob to begin a
* peer test or Charlie to finish a peer test.
*
* @return ready to send packet, or null if there was a problem
*/
public UDPPacket buildPeerTestFromAlice(InetAddress toIP, int toPort, SessionKey toIntroKey, long nonce, SessionKey aliceIntroKey) {
UDPPacket packet = UDPPacket.acquire(_context);
byte data[] = packet.getPacket().getData();
Arrays.fill(data, 0, data.length, (byte)0x0);
int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
// header
data[off] = PEER_TEST_FLAG_BYTE;
off++;
long now = _context.clock().now() / 1000;
DataHelper.toLong(data, off, 4, now);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending peer test " + nonce + " to Bob with time = " + new Date(now*1000));
off += 4;
// now for the body
DataHelper.toLong(data, off, 4, nonce);
off += 4;
DataHelper.toLong(data, off, 1, 0); // neither Bob nor Charlie need Alice's IP from her
off++;
DataHelper.toLong(data, off, 2, 0); // neither Bob nor Charlie need Alice's port from her
off += 2;
System.arraycopy(aliceIntroKey.getData(), 0, data, off, SessionKey.KEYSIZE_BYTES);
off += SessionKey.KEYSIZE_BYTES;
// we can pad here if we want, maybe randomized?
// pad up so we're on the encryption boundary
if ( (off % 16) != 0)
off += 16 - (off % 16);
packet.getPacket().setLength(off);
authenticate(packet, toIntroKey, toIntroKey);
setTo(packet, toIP, toPort);
return packet;
}
/**
* Build a packet as if we are either Bob or Charlie and we are helping test Alice.
*
* @return ready to send packet, or null if there was a problem
*/
public UDPPacket buildPeerTestToAlice(InetAddress aliceIP, int alicePort, SessionKey aliceIntroKey, SessionKey charlieIntroKey, long nonce) {
UDPPacket packet = UDPPacket.acquire(_context);
byte data[] = packet.getPacket().getData();
Arrays.fill(data, 0, data.length, (byte)0x0);
int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
// header
data[off] = PEER_TEST_FLAG_BYTE;
off++;
long now = _context.clock().now() / 1000;
DataHelper.toLong(data, off, 4, now);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending peer test " + nonce + " to Alice with time = " + new Date(now*1000));
off += 4;
// now for the body
DataHelper.toLong(data, off, 4, nonce);
off += 4;
byte ip[] = aliceIP.getAddress();
DataHelper.toLong(data, off, 1, ip.length);
off++;
System.arraycopy(ip, 0, data, off, ip.length);
off += ip.length;
DataHelper.toLong(data, off, 2, alicePort);
off += 2;
System.arraycopy(charlieIntroKey.getData(), 0, data, off, SessionKey.KEYSIZE_BYTES);
off += SessionKey.KEYSIZE_BYTES;
// we can pad here if we want, maybe randomized?
// pad up so we're on the encryption boundary
if ( (off % 16) != 0)
off += 16 - (off % 16);
packet.getPacket().setLength(off);
authenticate(packet, aliceIntroKey, aliceIntroKey);
setTo(packet, aliceIP, alicePort);
return packet;
}
/**
* Build a packet as if we are Bob sending Charlie a packet to help test Alice.
*
* @return ready to send packet, or null if there was a problem
*/
public UDPPacket buildPeerTestToCharlie(InetAddress aliceIP, int alicePort, SessionKey aliceIntroKey, long nonce,
InetAddress charlieIP, int charliePort,
SessionKey charlieCipherKey, SessionKey charlieMACKey) {
UDPPacket packet = UDPPacket.acquire(_context);
byte data[] = packet.getPacket().getData();
Arrays.fill(data, 0, data.length, (byte)0x0);
int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
// header
data[off] = PEER_TEST_FLAG_BYTE;
off++;
long now = _context.clock().now() / 1000;
DataHelper.toLong(data, off, 4, now);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending peer test " + nonce + " to Charlie with time = " + new Date(now*1000));
off += 4;
// now for the body
DataHelper.toLong(data, off, 4, nonce);
off += 4;
byte ip[] = aliceIP.getAddress();
DataHelper.toLong(data, off, 1, ip.length);
off++;
System.arraycopy(ip, 0, data, off, ip.length);
off += ip.length;
DataHelper.toLong(data, off, 2, alicePort);
off += 2;
System.arraycopy(aliceIntroKey.getData(), 0, data, off, SessionKey.KEYSIZE_BYTES);
off += SessionKey.KEYSIZE_BYTES;
// we can pad here if we want, maybe randomized?
// pad up so we're on the encryption boundary
if ( (off % 16) != 0)
off += 16 - (off % 16);
packet.getPacket().setLength(off);
authenticate(packet, charlieCipherKey, charlieMACKey);
setTo(packet, charlieIP, charliePort);
return packet;
}
private void setTo(UDPPacket packet, InetAddress ip, int port) {
packet.getPacket().setAddress(ip);
packet.getPacket().setPort(port);

View File

@ -27,17 +27,19 @@ public class PacketHandler {
private UDPEndpoint _endpoint;
private EstablishmentManager _establisher;
private InboundMessageFragments _inbound;
private PeerTestManager _testManager;
private boolean _keepReading;
private static final int NUM_HANDLERS = 3;
public PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher, InboundMessageFragments inbound) {
public PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher, InboundMessageFragments inbound, PeerTestManager testManager) {
_context = ctx;
_log = ctx.logManager().getLog(PacketHandler.class);
_transport = transport;
_endpoint = endpoint;
_establisher = establisher;
_inbound = inbound;
_testManager = testManager;
_context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", new long[] { 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.receivePacketSkew", "How long ago after the packet was sent did we receive it", "udp", new long[] { 10*60*1000, 60*60*1000 });
@ -323,6 +325,9 @@ public class PacketHandler {
_log.info("Received new DATA packet from " + state + ": " + packet);
_inbound.receiveData(state, reader.getDataReader());
break;
case UDPPacket.PAYLOAD_TYPE_TEST:
_testManager.receiveTest(from, reader);
break;
default:
if (_log.shouldLog(Log.WARN))
_log.warn("Unknown payload type: " + reader.readPayloadType());

View File

@ -0,0 +1,360 @@
package net.i2p.router.transport.udp;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import net.i2p.router.RouterContext;
import net.i2p.data.DataHelper;
import net.i2p.data.RouterInfo;
import net.i2p.data.SessionKey;
import net.i2p.util.SimpleTimer;
import net.i2p.util.Log;
/**
*
*/
class PeerTestManager {
private RouterContext _context;
private Log _log;
private UDPTransport _transport;
private PacketBuilder _packetBuilder;
/**
* circular list of nonces which we have received as if we were 'Charlie'
* (meaning if we see it again, we aren't Bob and shouldn't find our own Charlie).
* Synchronize against this when updating it
*/
private long _receiveAsCharlie[];
/** index into _receiveAsCharlie which we should next write to */
private int _receiveAsCharlieIndex;
/** nonce we are currently running our own test as, or -1 */
private long _currentTestNonce;
private InetAddress _bobIP;
private int _bobPort;
private SessionKey _bobIntroKey;
private long _testBeginTime;
private long _lastSendTime;
private long _receiveBobReplyTime;
private long _receiveCharlieReplyTime;
private InetAddress _charlieIP;
private int _charliePort;
private SessionKey _charlieIntroKey;
private int _receiveBobReplyPort;
private int _receiveCharlieReplyPort;
/** longest we will keep track of a Charlie nonce for */
private static final int MAX_CHARLIE_LIFETIME = 10*1000;
public PeerTestManager(RouterContext context, UDPTransport transport) {
_context = context;
_transport = transport;
_log = context.logManager().getLog(PeerTestManager.class);
_receiveAsCharlie = new long[64];
_packetBuilder = new PacketBuilder(context);
_currentTestNonce = -1;
}
private static final int RESEND_TIMEOUT = 5*1000;
private static final int MAX_TEST_TIME = 30*1000;
private static final long MAX_NONCE = (1l << 32) - 1l;
public void runTest(InetAddress bobIP, int bobPort, SessionKey bobIntroKey) {
_currentTestNonce = _context.random().nextLong(MAX_NONCE);
_bobIP = bobIP;
_bobPort = bobPort;
_bobIntroKey = bobIntroKey;
_charlieIP = null;
_charliePort = -1;
_charlieIntroKey = null;
_testBeginTime = _context.clock().now();
_lastSendTime = _testBeginTime;
_receiveBobReplyTime = -1;
_receiveCharlieReplyTime = -1;
_receiveBobReplyPort = -1;
_receiveCharlieReplyPort = -1;
sendTestToBob();
SimpleTimer.getInstance().addEvent(new ContinueTest(), RESEND_TIMEOUT);
}
private class ContinueTest implements SimpleTimer.TimedEvent {
public void timeReached() {
if (_currentTestNonce < 0) {
// already completed
return;
} else if (expired()) {
testComplete();
} else {
if (_receiveBobReplyTime < 0) {
// no message from Bob yet, send it again
sendTestToBob();
} else if (_receiveCharlieReplyTime < 0) {
// received from Bob, but no reply from Charlie. send it to
// Bob again so he pokes Charlie
sendTestToBob();
} else {
// received from both Bob and Charlie, but we haven't received a
// second message from Charlie yet
sendTestToCharlie();
}
SimpleTimer.getInstance().addEvent(ContinueTest.this, RESEND_TIMEOUT);
}
}
private boolean expired() { return _testBeginTime + MAX_TEST_TIME < _context.clock().now(); }
}
private void sendTestToBob() {
_transport.send(_packetBuilder.buildPeerTestFromAlice(_bobIP, _bobPort, _bobIntroKey,
_currentTestNonce, _transport.getIntroKey()));
}
private void sendTestToCharlie() {
_transport.send(_packetBuilder.buildPeerTestFromAlice(_charlieIP, _charliePort, _charlieIntroKey,
_currentTestNonce, _transport.getIntroKey()));
}
/**
* Receive a PeerTest message which contains the correct nonce for our current
* test
*/
private void receiveTestReply(RemoteHostId from, UDPPacketReader.PeerTestReader testInfo) {
if (DataHelper.eq(from.getIP(), _bobIP.getAddress())) {
_receiveBobReplyTime = _context.clock().now();
_receiveBobReplyPort = testInfo.readPort();
} else {
if (_receiveCharlieReplyTime > 0) {
// this is our second charlie, yay!
_receiveCharlieReplyPort = testInfo.readPort();
testComplete();
} else {
// ok, first charlie. send 'em a packet
_receiveCharlieReplyTime = _context.clock().now();
_charliePort = from.getPort();
try {
_charlieIP = InetAddress.getByAddress(from.getIP());
sendTestToCharlie();
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Charlie's IP is b0rked: " + from + ": " + testInfo);
}
}
}
}
private static final short STATUS_REACHABLE_OK = 0;
private static final short STATUS_REACHABLE_DIFFERENT = 1;
private static final short STATUS_CHARLIE_DIED = 2;
private static final short STATUS_REJECT_UNSOLICITED = 3;
private static final short STATUS_BOB_SUCKS = 4;
/**
* Evaluate the info we have and act accordingly, since the test has either timed out or
* we have successfully received the second PeerTest from a Charlie.
*
*/
private void testComplete() {
short status = -1;
if (_receiveCharlieReplyPort > 0) {
// we received a second message from charlie
if (_receiveBobReplyPort == _receiveCharlieReplyPort) {
status = STATUS_REACHABLE_OK;
} else {
status = STATUS_REACHABLE_DIFFERENT;
}
} else if (_receiveCharlieReplyTime > 0) {
// we received only one message from charlie
status = STATUS_CHARLIE_DIED;
} else if (_receiveBobReplyTime > 0) {
// we received a message from bob but no messages from charlie
status = STATUS_REJECT_UNSOLICITED;
} else {
// we never received anything from bob - he is either down or ignoring us
status = STATUS_BOB_SUCKS;
}
honorStatus(status);
// now zero everything out
_currentTestNonce = -1;
_bobIP = null;
_bobPort = -1;
_bobIntroKey = null;
_charlieIP = null;
_charliePort = -1;
_charlieIntroKey = null;
_testBeginTime = -1;
_lastSendTime = -1;
_receiveBobReplyTime = -1;
_receiveCharlieReplyTime = -1;
_receiveBobReplyPort = -1;
_receiveCharlieReplyPort = -1;
}
/**
* Depending upon the status, fire off different events (using received port/ip/etc as
* necessary).
*
*/
private void honorStatus(short status) {
switch (status) {
case STATUS_REACHABLE_OK:
case STATUS_REACHABLE_DIFFERENT:
case STATUS_CHARLIE_DIED:
case STATUS_REJECT_UNSOLICITED:
case STATUS_BOB_SUCKS:
if (_log.shouldLog(Log.INFO))
_log.info("Test results: status = " + status);
}
}
/**
* Receive a test message of some sort from the given peer, queueing up any packet
* that should be sent in response, or if its a reply to our own current testing,
* adjusting our test state.
*
*/
public void receiveTest(RemoteHostId from, UDPPacketReader reader) {
UDPPacketReader.PeerTestReader testInfo = reader.getPeerTestReader();
byte fromIP[] = null;
int fromPort = testInfo.readPort();
long nonce = testInfo.readNonce();
if (nonce == _currentTestNonce) {
receiveTestReply(from, testInfo);
return;
}
if ( (testInfo.readIPSize() > 0) && (fromPort > 0) ) {
fromIP = new byte[testInfo.readIPSize()];
testInfo.readIP(fromIP, 0);
}
if ( ( (fromIP == null) && (fromPort <= 0) ) || // info is unknown or...
(DataHelper.eq(fromIP, from.getIP()) && (fromPort == from.getPort())) ) { // info matches sender
boolean weAreCharlie = false;
synchronized (_receiveAsCharlie) {
weAreCharlie = (Arrays.binarySearch(_receiveAsCharlie, nonce) != -1);
}
if (weAreCharlie) {
receiveFromAliceAsCharlie(from, testInfo, nonce);
} else {
receiveFromAliceAsBob(from, testInfo, nonce);
}
} else {
receiveFromBobAsCharlie(from, fromIP, fromPort, nonce, testInfo);
}
}
/**
* The packet's IP/port does not match the IP/port included in the message,
* so we must be Charlie receiving a PeerTest from Bob.
*
*/
private void receiveFromBobAsCharlie(RemoteHostId from, byte fromIP[], int fromPort, long nonce, UDPPacketReader.PeerTestReader testInfo) {
if (fromIP == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("From address received from Bob (we are Charlie) is invalid: " + from + ": " + testInfo);
return;
}
if (fromPort <= 0) {
if (_log.shouldLog(Log.WARN))
_log.warn("From port received from Bob (we are Charlie) is invalid: " + fromPort + ": " + testInfo);
return;
}
int index = -1;
synchronized (_receiveAsCharlie) {
index = _receiveAsCharlieIndex;
_receiveAsCharlie[index] = nonce;
_receiveAsCharlieIndex = (index + 1) % _receiveAsCharlie.length;
}
SimpleTimer.getInstance().addEvent(new RemoveCharlie(nonce, index), MAX_CHARLIE_LIFETIME);
try {
InetAddress aliceIP = InetAddress.getByAddress(fromIP);
SessionKey aliceIntroKey = new SessionKey();
testInfo.readIntroKey(aliceIntroKey.getData(), 0);
UDPPacket packet = _packetBuilder.buildPeerTestToAlice(aliceIP, fromPort, aliceIntroKey, _transport.getIntroKey(), nonce);
_transport.send(packet);
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to build the aliceIP from " + from, uhe);
}
}
/**
* The PeerTest message came from the peer referenced in the message (or there wasn't
* any info in the message), plus we are not acting as Charlie (so we've got to be Bob).
*
*/
private void receiveFromAliceAsBob(RemoteHostId from, UDPPacketReader.PeerTestReader testInfo, long nonce) {
// we are Bob, so send Alice her PeerTest, pick a Charlie, and
// send Charlie Alice's info
PeerState charlie = _transport.getPeerState(UDPAddress.CAPACITY_TESTING);
InetAddress aliceIP = null;
SessionKey aliceIntroKey = null;
try {
aliceIP = InetAddress.getByAddress(from.getIP());
aliceIntroKey = new SessionKey();
testInfo.readIntroKey(aliceIntroKey.getData(), 0);
RouterInfo info = _context.netDb().lookupRouterInfoLocally(charlie.getRemotePeer());
if (info == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No info for charlie: " + charlie);
return;
}
UDPAddress addr = new UDPAddress(info.getTargetAddress(UDPTransport.STYLE));
SessionKey charlieIntroKey = new SessionKey(addr.getIntroKey());
UDPPacket packet = _packetBuilder.buildPeerTestToAlice(aliceIP, from.getPort(), aliceIntroKey, charlieIntroKey, nonce);
_transport.send(packet);
packet = _packetBuilder.buildPeerTestToCharlie(aliceIP, from.getPort(), aliceIntroKey, nonce,
charlie.getRemoteIPAddress(),
charlie.getRemotePort(),
charlie.getCurrentCipherKey(),
charlie.getCurrentMACKey());
_transport.send(packet);
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to build the aliceIP from " + from, uhe);
}
}
/**
* We are charlie, so send Alice her PeerTest message
*
*/
private void receiveFromAliceAsCharlie(RemoteHostId from, UDPPacketReader.PeerTestReader testInfo, long nonce) {
try {
InetAddress aliceIP = InetAddress.getByAddress(from.getIP());
SessionKey aliceIntroKey = new SessionKey();
testInfo.readIntroKey(aliceIntroKey.getData(), 0);
UDPPacket packet = _packetBuilder.buildPeerTestToAlice(aliceIP, from.getPort(), aliceIntroKey, _transport.getIntroKey(), nonce);
_transport.send(packet);
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to build the aliceIP from " + from, uhe);
}
}
/**
* forget about charlie's nonce after 60s.
*/
private class RemoveCharlie implements SimpleTimer.TimedEvent {
private long _nonce;
private int _index;
public RemoveCharlie(long nonce, int index) {
_nonce = nonce;
_index = index;
}
public void timeReached() {
/** only forget about an entry if we haven't already moved on */
synchronized (_receiveAsCharlie) {
if (_receiveAsCharlie[_index] == _nonce)
_receiveAsCharlie[_index] = -1;
}
}
}
}

View File

@ -19,6 +19,10 @@ public class UDPAddress {
public static final String PROP_PORT = "port";
public static final String PROP_HOST = "host";
public static final String PROP_INTRO_KEY = "key";
public static final String PROP_CAPACITY = "opts";
public static final char CAPACITY_TESTING = 'A';
public static final char CAPACITY_INTRODUCER = 'B';
public UDPAddress(RouterAddress addr) {
parse(addr);

View File

@ -56,6 +56,7 @@ public class UDPPacket {
public static final int PAYLOAD_TYPE_RELAY_RESPONSE = 4;
public static final int PAYLOAD_TYPE_RELAY_INTRO = 5;
public static final int PAYLOAD_TYPE_DATA = 6;
public static final int PAYLOAD_TYPE_TEST = 7;
// various flag fields for use in the data packets
public static final byte DATA_FLAG_EXPLICIT_ACK = (byte)(1 << 7);

View File

@ -23,6 +23,7 @@ public class UDPPacketReader {
private SessionCreatedReader _sessionCreatedReader;
private SessionConfirmedReader _sessionConfirmedReader;
private DataReader _dataReader;
private PeerTestReader _peerTestReader;
private static final int KEYING_MATERIAL_LENGTH = 64;
@ -33,6 +34,7 @@ public class UDPPacketReader {
_sessionCreatedReader = new SessionCreatedReader();
_sessionConfirmedReader = new SessionConfirmedReader();
_dataReader = new DataReader();
_peerTestReader = new PeerTestReader();
}
public void initialize(UDPPacket packet) {
@ -51,7 +53,7 @@ public class UDPPacketReader {
/** what type of payload is in here? */
public int readPayloadType() {
// 3 highest order bits == payload type
// 4 highest order bits == payload type
return (_message[_payloadBeginOffset] & 0xFF) >>> 4;
}
@ -90,6 +92,7 @@ public class UDPPacketReader {
public SessionCreatedReader getSessionCreatedReader() { return _sessionCreatedReader; }
public SessionConfirmedReader getSessionConfirmedReader() { return _sessionConfirmedReader; }
public DataReader getDataReader() { return _dataReader; }
public PeerTestReader getPeerTestReader() { return _peerTestReader; }
public String toString() {
switch (readPayloadType()) {
@ -491,4 +494,46 @@ public class UDPPacketReader {
return buf.toString();
}
}
/** Help read the PeerTest payload */
public class PeerTestReader {
private static final int NONCE_LENGTH = 4;
public long readNonce() {
int readOffset = readBodyOffset();
return DataHelper.fromLong(_message, readOffset, NONCE_LENGTH);
}
public int readIPSize() {
int offset = readBodyOffset() + NONCE_LENGTH;
return (int)DataHelper.fromLong(_message, offset, 1);
}
/** what IP Alice is reachable on */
public void readIP(byte target[], int targetOffset) {
int offset = readBodyOffset() + NONCE_LENGTH;
int size = (int)DataHelper.fromLong(_message, offset, 1);
offset++;
System.arraycopy(_message, offset, target, targetOffset, size);
}
/** what IP Alice is reachable on */
public int readPort() {
int offset = readBodyOffset() + NONCE_LENGTH;
int size = (int)DataHelper.fromLong(_message, offset, 1);
offset++;
offset += size; // skip the IP
return (int)DataHelper.fromLong(_message, offset, 2);
}
/** what Alice's intro key is (if known - if unknown, the key is INVALID_KEY) */
public void readIntroKey(byte target[], int targetOffset) {
int offset = readBodyOffset() + NONCE_LENGTH;
int size = (int)DataHelper.fromLong(_message, offset, 1);
offset++;
offset += size; // skip the IP
offset += 2; // skip the port
System.arraycopy(_message, offset, target, targetOffset, SessionKey.KEYSIZE_BYTES);
}
}
}

View File

@ -40,6 +40,7 @@ public class UDPSender {
_context.statManager().createRateStat("udp.socketSendTime", "How long the actual socket.send took", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.sendBWThrottleTime", "How long the send is blocked by the bandwidth throttle", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.sendACKTime", "How long an ACK packet is blocked for (duration == lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
_context.statManager().createRateStat("udp.sendException", "How frequently we fail to send a packet (likely due to a windows exception)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
}
public void startup() {
@ -176,8 +177,9 @@ public class UDPSender {
_context.statManager().addRateData("udp.pushTime", packet.getLifetime(), packet.getLifetime());
_context.statManager().addRateData("udp.sendPacketSize", size, packet.getLifetime());
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error sending", ioe);
if (_log.shouldLog(Log.WARN))
_log.warn("Error sending", ioe);
_context.statManager().addRateData("udp.sendException", 1, packet.getLifetime());
}
// back to the cache

View File

@ -22,8 +22,10 @@ import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterInfo;
import net.i2p.data.RouterIdentity;
import net.i2p.data.SessionKey;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.Transport;
@ -45,6 +47,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private Map _peersByRemoteHost;
/** Relay tag (base64 String) to PeerState */
private Map _peersByRelayTag;
/**
* Array of list of PeerState instances, where each list contains peers with one
* of the given capacities (from 0-25, referencing 'A'-'Z').
*/
private List _peersByCapacity[];
private PacketHandler _handler;
private EstablishmentManager _establisher;
private MessageQueue _outboundMessages;
@ -54,6 +61,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private PacketPusher _pusher;
private InboundMessageFragments _inboundFragments;
private UDPFlooder _flooder;
private PeerTestManager _testManager;
private ExpirePeerEvent _expireEvent;
/** list of RelayPeer objects for people who will relay to us */
@ -92,6 +100,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public static final String PROP_ALWAYS_PREFER_UDP = "i2np.udp.alwaysPreferred";
private static final String DEFAULT_ALWAYS_PREFER_UDP = "true";
public static final String PROP_FIXED_PORT = "i2np.udp.fixedPort";
private static final String DEFAULT_FIXED_PORT = "true";
/** how many relays offered to us will we use at a time? */
public static final int PUBLIC_RELAY_COUNT = 3;
@ -112,6 +123,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_peersByIdent = new HashMap(128);
_peersByRemoteHost = new HashMap(128);
_peersByRelayTag = new HashMap(128);
_peersByCapacity = new ArrayList['Z'-'A'+1];
for (int i = 0; i < _peersByCapacity.length; i++)
_peersByCapacity[i] = new ArrayList(16);
_endpoint = null;
TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this);
@ -198,8 +212,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_establisher == null)
_establisher = new EstablishmentManager(_context, this);
if (_testManager == null)
_testManager = new PeerTestManager(_context, this);
if (_handler == null)
_handler = new PacketHandler(_context, this, _endpoint, _establisher, _inboundFragments);
_handler = new PacketHandler(_context, this, _endpoint, _establisher, _inboundFragments, _testManager);
if (_refiller == null)
_refiller = new OutboundRefiller(_context, _fragments, _outboundMessages);
@ -261,13 +278,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (explicitAddressSpecified())
return;
boolean fixedPort = getIsPortFixed();
boolean updated = false;
synchronized (this) {
if ( (_externalListenHost == null) ||
(!eq(_externalListenHost.getAddress(), _externalListenPort, ourIP, ourPort)) ) {
try {
_externalListenHost = InetAddress.getByAddress(ourIP);
_externalListenPort = ourPort;
if (!fixedPort)
_externalListenPort = ourPort;
rebuildExternalAddress();
replaceAddress(_externalAddress);
updated = true;
@ -277,7 +296,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
}
_context.router().setConfigSetting(PROP_EXTERNAL_PORT, ourPort+"");
if (!fixedPort)
_context.router().setConfigSetting(PROP_EXTERNAL_PORT, ourPort+"");
_context.router().saveConfig();
if (updated)
@ -288,6 +308,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return (rport == lport) && DataHelper.eq(laddr, raddr);
}
private boolean getIsPortFixed() {
return DEFAULT_FIXED_PORT.equals(_context.getProperty(PROP_FIXED_PORT, DEFAULT_FIXED_PORT));
}
/**
* get the state for the peer at the given remote host/port, or null
* if no state exists
@ -318,6 +341,56 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
}
/** pick a random peer with the given capacity */
public PeerState getPeerState(char capacity) {
int index = _context.random().nextInt(1024);
List peers = _peersByCapacity[capacity-'A'];
synchronized (peers) {
int size = peers.size();
if (size <= 0) return null;
index = index % size;
return (PeerState)peers.get(index);
}
}
private static final int MAX_PEERS_PER_CAPACITY = 16;
/**
* Intercept RouterInfo entries received directly from a peer to inject them into
* the PeersByCapacity listing.
*
*/
public void messageReceived(I2NPMessage inMsg, RouterIdentity remoteIdent, Hash remoteIdentHash, long msToReceive, int bytesReceived) {
if (inMsg instanceof DatabaseStoreMessage) {
DatabaseStoreMessage dsm = (DatabaseStoreMessage)inMsg;
if (dsm.getType() == DatabaseStoreMessage.KEY_TYPE_ROUTERINFO) {
Hash from = remoteIdentHash;
if (from == null)
from = remoteIdent.getHash();
if (from.equals(dsm.getKey())) {
// db info received directly from the peer - inject it into the peersByCapacity
RouterInfo info = dsm.getRouterInfo();
Properties opts = info.getOptions();
if ( (opts != null) && (info.isValid()) ) {
String capacities = opts.getProperty(UDPAddress.PROP_CAPACITY);
if (capacities != null) {
PeerState peer = getPeerState(from);
for (int i = 0; i < capacities.length(); i++) {
char capacity = capacities.charAt(i);
List peers = _peersByCapacity[capacity];
synchronized (peers) {
if ( (peers.size() < MAX_PEERS_PER_CAPACITY) && (!peers.contains(peer)) )
peers.add(peer);
}
}
}
}
}
}
}
super.messageReceived(inMsg, remoteIdent, remoteIdentHash, msToReceive, bytesReceived);
}
/**
* add the peer info, returning true if it went in properly, false if
* it was rejected (causes include peer ident already connected, or no
@ -380,6 +453,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_log.shouldLog(Log.INFO))
_log.info("Dropping remote peer: " + peer + " shitlist? " + shouldShitlist, new Exception("Dropped by"));
if (peer.getRemotePeer() != null) {
dropPeerCapacities(peer);
if (shouldShitlist) {
long now = _context.clock().now();
_context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime());
@ -408,6 +483,26 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_expireEvent.remove(peer);
}
/**
* Make sure we don't think this dropped peer is capable of doing anything anymore...
*
*/
private void dropPeerCapacities(PeerState peer) {
RouterInfo info = _context.netDb().lookupRouterInfoLocally(peer.getRemotePeer());
if (info != null) {
String capacities = info.getOptions().getProperty(UDPAddress.PROP_CAPACITY);
if (capacities != null) {
for (int i = 0; i < capacities.length(); i++) {
char capacity = capacities.charAt(i);
List peers = _peersByCapacity[capacity];
synchronized (peers) {
peers.remove(peer);
}
}
}
}
}
int send(UDPPacket packet) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending packet " + packet);