propagate from branch 'i2p.i2p' (head b7a8a00272124eec0d149224af58bd144358c009)

to branch 'i2p.i2p.zzz.test' (head a4d67a357c36f4e94718bf237a7af96b8617a4a7)
This commit is contained in:
zzz
2010-03-08 20:04:55 +00:00
270 changed files with 19651 additions and 10467 deletions

View File

@ -11,8 +11,8 @@ package net.i2p.data.i2np;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.I2PAppContext;
import net.i2p.data.DataFormatException;
@ -39,10 +39,11 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
private static final boolean RAW_FULL_SIZE = false;
/** unsynchronized as its pretty much read only (except at startup) */
private static final Map _builders = new HashMap(8);
/** unused */
private static final Map<Integer, Builder> _builders = new ConcurrentHashMap(1);
/** @deprecated unused */
public static final void registerBuilder(Builder builder, int type) { _builders.put(Integer.valueOf(type), builder); }
/** interface for extending the types of messages handled */
/** interface for extending the types of messages handled - unused */
public interface Builder {
/** instantiate a new I2NPMessage to be populated shortly */
public I2NPMessage build(I2PAppContext ctx);
@ -385,20 +386,22 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM
case DataMessage.MESSAGE_TYPE:
return new DataMessage(context);
// unused since 0.6.1.10
//case TunnelCreateMessage.MESSAGE_TYPE:
// return new TunnelCreateMessage(context);
//case TunnelCreateStatusMessage.MESSAGE_TYPE:
// return new TunnelCreateStatusMessage(context);
case TunnelBuildMessage.MESSAGE_TYPE:
return new TunnelBuildMessage(context);
case TunnelBuildReplyMessage.MESSAGE_TYPE:
return new TunnelBuildReplyMessage(context);
// since 0.7.10
case VariableTunnelBuildMessage.MESSAGE_TYPE:
return new VariableTunnelBuildMessage(context);
// since 0.7.10
case VariableTunnelBuildReplyMessage.MESSAGE_TYPE:
return new VariableTunnelBuildReplyMessage(context);
default:
Builder builder = (Builder)_builders.get(Integer.valueOf(type));
if (builder == null)
return null;
else
// unused
Builder builder = _builders.get(Integer.valueOf(type));
if (builder != null)
return builder.build(context);
return new UnknownI2NPMessage(context, type);
}
}
}

View File

@ -9,18 +9,30 @@ import net.i2p.data.ByteArray;
*
*/
public class TunnelBuildMessage extends I2NPMessageImpl {
private ByteArray _records[];
protected ByteArray _records[];
protected int RECORD_COUNT;
public static final int MAX_RECORD_COUNT = 8;
public static final int MESSAGE_TYPE = 21;
public static final int RECORD_COUNT = 8;
public TunnelBuildMessage(I2PAppContext context) {
this(context, MAX_RECORD_COUNT);
}
/** @since 0.7.10 */
protected TunnelBuildMessage(I2PAppContext context, int records) {
super(context);
_records = new ByteArray[RECORD_COUNT];
if (records > 0) {
RECORD_COUNT = records;
_records = new ByteArray[records];
}
// else will be initialized by readMessage() in VTBM
}
public void setRecord(int index, ByteArray record) { _records[index] = record; }
public ByteArray getRecord(int index) { return _records[index]; }
/** @since 0.7.10 */
public int getRecordCount() { return RECORD_COUNT; }
public static final int RECORD_SIZE = 512+16;
@ -50,4 +62,9 @@ public class TunnelBuildMessage extends I2NPMessageImpl {
}
return curIndex;
}
@Override
public String toString() {
return "[TunnelBuildMessage]";
}
}

View File

@ -10,18 +10,30 @@ import net.i2p.data.ByteArray;
* reply tunnel
*/
public class TunnelBuildReplyMessage extends I2NPMessageImpl {
private ByteArray _records[];
protected ByteArray _records[];
protected int RECORD_COUNT;
public static final int MAX_RECORD_COUNT = TunnelBuildMessage.MAX_RECORD_COUNT;
public static final int MESSAGE_TYPE = 22;
public static final int RECORD_COUNT = TunnelBuildMessage.RECORD_COUNT;
public TunnelBuildReplyMessage(I2PAppContext context) {
this(context, MAX_RECORD_COUNT);
}
/** @since 0.7.10 */
protected TunnelBuildReplyMessage(I2PAppContext context, int records) {
super(context);
_records = new ByteArray[RECORD_COUNT];
if (records > 0) {
RECORD_COUNT = records;
_records = new ByteArray[records];
}
// else will be initialized by readMessage() in VTBRM
}
public void setRecord(int index, ByteArray record) { _records[index] = record; }
public ByteArray getRecord(int index) { return _records[index]; }
/** @since 0.7.10 */
public int getRecordCount() { return RECORD_COUNT; }
public static final int RECORD_SIZE = TunnelBuildMessage.RECORD_SIZE;
@ -53,4 +65,9 @@ public class TunnelBuildReplyMessage extends I2NPMessageImpl {
}
return curIndex;
}
@Override
public String toString() {
return "[TunnelBuildReplyMessage]";
}
}

View File

@ -0,0 +1,114 @@
package net.i2p.data.i2np;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.IOException;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
/**
* This is the same as DataMessage but with a variable message type.
* This is defined so routers can route messages they don't know about.
* We don't extend DataMessage so that any code that does (instanceof DataMessage)
* won't return true for this type. Load tests use DataMessage, for example.
* See InboundMessageDistributor.
*
* There is no setData() method, the only way to create one of these is to
* read it with readMessage() (i.e., it came from some other router)
*
* @since 0.7.12
*/
public class UnknownI2NPMessage extends I2NPMessageImpl {
private byte _data[];
private int _type;
/** @param type 0-255 */
public UnknownI2NPMessage(I2PAppContext context, int type) {
super(context);
_type = type;
}
/** warning - only public for equals() */
public byte[] getData() {
verifyUnwritten();
return _data;
}
public void readMessage(byte data[], int offset, int dataSize, int type) throws I2NPMessageException, IOException {
if (type != _type) throw new I2NPMessageException("Message type is incorrect for this message");
int curIndex = offset;
long size = DataHelper.fromLong(data, curIndex, 4);
curIndex += 4;
if (size > MAX_SIZE)
throw new I2NPMessageException("wtf, size=" + size);
_data = new byte[(int)size];
System.arraycopy(data, curIndex, _data, 0, (int)size);
}
/** calculate the message body's length (not including the header and footer */
protected int calculateWrittenLength() {
if (_data == null)
return 4;
else
return 4 + _data.length;
}
/** write the message body to the output array, starting at the given index */
protected int writeMessageBody(byte out[], int curIndex) {
verifyUnwritten();
if (_data == null) {
out[curIndex++] = 0x0;
out[curIndex++] = 0x0;
out[curIndex++] = 0x0;
out[curIndex++] = 0x0;
} else {
byte len[] = DataHelper.toLong(4, _data.length);
System.arraycopy(len, 0, out, curIndex, 4);
curIndex += 4;
System.arraycopy(_data, 0, out, curIndex, _data.length);
curIndex += _data.length;
}
return curIndex;
}
@Override
protected void written() {
super.written();
_data = null;
}
/** @return 0-255 */
public int getType() { return _type; }
@Override
public int hashCode() {
return _type + DataHelper.hashCode(getData());
}
@Override
public boolean equals(Object object) {
if ( (object != null) && (object instanceof UnknownI2NPMessage) ) {
UnknownI2NPMessage msg = (UnknownI2NPMessage)object;
return _type == msg.getType() && DataHelper.eq(getData(), msg.getData());
} else {
return false;
}
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append("[UnknownI2NPMessage: ");
buf.append("\n\tType: ").append(_type);
buf.append("\n\tLength: ").append(calculateWrittenLength() - 4);
buf.append("]");
return buf.toString();
}
}

View File

@ -0,0 +1,69 @@
package net.i2p.data.i2np;
import java.io.IOException;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
/**
* @since 0.7.12
*/
public class VariableTunnelBuildMessage extends TunnelBuildMessage {
public static final int MESSAGE_TYPE = 23;
/** zero record count, will be set with readMessage() */
public VariableTunnelBuildMessage(I2PAppContext context) {
super(context, 0);
}
public VariableTunnelBuildMessage(I2PAppContext context, int records) {
super(context, records);
}
@Override
protected int calculateWrittenLength() { return 1 + super.calculateWrittenLength(); }
@Override
public int getType() { return MESSAGE_TYPE; }
@Override
public void readMessage(byte[] data, int offset, int dataSize, int type) throws I2NPMessageException, IOException {
if (type != MESSAGE_TYPE)
throw new I2NPMessageException("Message type is incorrect for this message");
int r = (int)DataHelper.fromLong(data, offset, 1);
if (r <= 0 || r > MAX_RECORD_COUNT)
throw new I2NPMessageException("Bad record count " + r);
RECORD_COUNT = r;
if (dataSize != calculateWrittenLength())
throw new I2NPMessageException("Wrong length (expects " + calculateWrittenLength() + ", recv " + dataSize + ")");
_records = new ByteArray[RECORD_COUNT];
super.readMessage(data, offset + 1, dataSize, TunnelBuildMessage.MESSAGE_TYPE);
}
@Override
protected int writeMessageBody(byte[] out, int curIndex) throws I2NPMessageException {
int remaining = out.length - (curIndex + calculateWrittenLength());
if (remaining < 0)
throw new I2NPMessageException("Not large enough (too short by " + remaining + ")");
if (RECORD_COUNT <= 0 || RECORD_COUNT > MAX_RECORD_COUNT)
throw new I2NPMessageException("Bad record count " + RECORD_COUNT);
DataHelper.toLong(out, curIndex++, 1, RECORD_COUNT);
// can't call super, written length check will fail
//return super.writeMessageBody(out, curIndex + 1);
for (int i = 0; i < RECORD_COUNT; i++) {
System.arraycopy(_records[i].getData(), _records[i].getOffset(), out, curIndex, RECORD_SIZE);
curIndex += RECORD_SIZE;
}
return curIndex;
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);
buf.append("[VariableTunnelBuildMessage: " +
"\n\tRecords: ").append(getRecordCount())
.append(']');
return buf.toString();
}
}

View File

@ -0,0 +1,70 @@
package net.i2p.data.i2np;
import java.io.IOException;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
/**
* Transmitted from the new outbound endpoint to the creator through a
* reply tunnel
*
* @since 0.7.12
*/
public class VariableTunnelBuildReplyMessage extends TunnelBuildReplyMessage {
public static final int MESSAGE_TYPE = 24;
/** zero record count, will be set with readMessage() */
public VariableTunnelBuildReplyMessage(I2PAppContext context) {
super(context, 0);
}
public VariableTunnelBuildReplyMessage(I2PAppContext context, int records) {
super(context, records);
}
@Override
protected int calculateWrittenLength() { return 1 + super.calculateWrittenLength(); }
@Override
public int getType() { return MESSAGE_TYPE; }
public void readMessage(byte[] data, int offset, int dataSize, int type) throws I2NPMessageException, IOException {
if (type != MESSAGE_TYPE)
throw new I2NPMessageException("Message type is incorrect for this message");
int r = (int)DataHelper.fromLong(data, offset, 1);
if (r <= 0 || r > MAX_RECORD_COUNT)
throw new I2NPMessageException("Bad record count " + r);
RECORD_COUNT = r;
if (dataSize != calculateWrittenLength())
throw new I2NPMessageException("Wrong length (expects " + calculateWrittenLength() + ", recv " + dataSize + ")");
_records = new ByteArray[RECORD_COUNT];
super.readMessage(data, offset + 1, dataSize, TunnelBuildReplyMessage.MESSAGE_TYPE);
}
protected int writeMessageBody(byte[] out, int curIndex) throws I2NPMessageException {
int remaining = out.length - (curIndex + calculateWrittenLength());
if (remaining < 0)
throw new I2NPMessageException("Not large enough (too short by " + remaining + ")");
if (RECORD_COUNT <= 0 || RECORD_COUNT > MAX_RECORD_COUNT)
throw new I2NPMessageException("Bad record count " + RECORD_COUNT);
DataHelper.toLong(out, curIndex++, 1, RECORD_COUNT);
// can't call super, written length check will fail
//return super.writeMessageBody(out, curIndex + 1);
for (int i = 0; i < RECORD_COUNT; i++) {
System.arraycopy(_records[i].getData(), _records[i].getOffset(), out, curIndex, RECORD_SIZE);
curIndex += RECORD_SIZE;
}
return curIndex;
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);
buf.append("[VariableTunnelBuildReplyMessage: " +
"\n\tRecords: ").append(getRecordCount())
.append(']');
return buf.toString();
}
}

View File

@ -42,6 +42,9 @@ public class InNetMessagePool implements Service {
private boolean _alive;
private boolean _dispatchThreaded;
/** Make this >= the max I2NP message type number (currently 24) */
private static final int MAX_I2NP_MESSAGE_TYPE = 31;
/**
* If set to true, we will have two additional threads - one for dispatching
* tunnel data messages, and another for dispatching tunnel gateway messages.
@ -62,8 +65,7 @@ public class InNetMessagePool implements Service {
public InNetMessagePool(RouterContext context) {
_context = context;
// 32 is greater than the max I2NP message type number (currently 22) + 1
_handlerJobBuilders = new HandlerJobBuilder[32];
_handlerJobBuilders = new HandlerJobBuilder[MAX_I2NP_MESSAGE_TYPE + 1];
if (DISPATCH_DIRECT) {
// keep the compiler happy since they are final
_pendingDataMessages = null;
@ -160,6 +162,7 @@ public class InNetMessagePool implements Service {
shortCircuitTunnelData(messageBody, fromRouterHash);
allowMatches = false;
} else {
// why don't we allow type 0? There used to be a message of type 0 long ago...
if ( (type > 0) && (type < _handlerJobBuilders.length) ) {
HandlerJobBuilder builder = _handlerJobBuilders[type];

View File

@ -176,7 +176,6 @@ public class RouterClock extends Clock {
if (desiredOffset > offset) {
// slew forward
_offset = ++offset;
_lastSlewed = systemNow;
} else if (desiredOffset < offset) {
// slew backward, but don't let the clock go backward
// this should be the first call since systemNow
@ -184,8 +183,8 @@ public class RouterClock extends Clock {
// from the last systemNow, thus we won't let the clock go backward,
// no need to track when we were last called.
_offset = --offset;
_lastSlewed = systemNow;
}
_lastSlewed = systemNow;
}
return offset + systemNow;
}

View File

@ -62,7 +62,7 @@ public class RouterContext extends I2PAppContext {
private Calculator _capacityCalc;
private static List _contexts = new ArrayList(1);
private static List<RouterContext> _contexts = new ArrayList(1);
public RouterContext(Router router) { this(router, null); }
public RouterContext(Router router, Properties envProps) {
@ -148,7 +148,7 @@ public class RouterContext extends I2PAppContext {
* context is created or a router is shut down.
*
*/
public static List listContexts() { return _contexts; }
public static List<RouterContext> listContexts() { return _contexts; }
/** what router is this context working for? */
public Router router() { return _router; }

View File

@ -18,10 +18,10 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 8;
public final static long BUILD = 10;
/** for example "-test" */
public final static String EXTRA = "";
public final static String EXTRA = "-rc";
public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA;
public static void main(String args[]) {
System.out.println("I2P Router version: " + FULL_VERSION);

View File

@ -26,8 +26,8 @@ class FloodfillMonitorJob extends JobImpl {
private static final int REQUEUE_DELAY = 60*60*1000;
private static final long MIN_UPTIME = 2*60*60*1000;
private static final long MIN_CHANGE_DELAY = 6*60*60*1000;
private static final int MIN_FF = 10;
private static final int MAX_FF = 15;
private static final int MIN_FF = 45;
private static final int MAX_FF = 60;
private static final String PROP_FLOODFILL_PARTICIPANT = "router.floodfillParticipant";
public FloodfillMonitorJob(RouterContext context, FloodfillNetworkDatabaseFacade facade) {

View File

@ -245,7 +245,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
Set keys = ds.getKeys();
if (keys != null) {
for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
Object o = getDataStore().get((Hash)iter.next());
Object o = ds.get((Hash)iter.next());
if (o instanceof RouterInfo)
rv.add((RouterInfo)o);
}

View File

@ -18,6 +18,7 @@ import net.i2p.util.EepGet;
import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
import net.i2p.util.SSLEepGet;
import net.i2p.util.Translate;
/**
* Moved from ReseedHandler in routerconsole. See ReseedChecker for additional comments.
@ -33,14 +34,18 @@ public class Reseeder {
private Log _log;
// Reject unreasonably big files, because we download into a ByteArrayOutputStream.
private static final long MAX_RESEED_RESPONSE_SIZE = 8 * 1024 * 1024;
private static final long MAX_RESEED_RESPONSE_SIZE = 1024 * 1024;
private static final String DEFAULT_SEED_URL = "http://a.netdb.i2p2.de/,http://b.netdb.i2p2.de/,http://reseed.i2p-projekt.de/";
private static final String DEFAULT_SEED_URL = "http://a.netdb.i2p2.de/,http://b.netdb.i2p2.de/,http://reseed.i2p-projekt.de/,http://i2pbote.net/netDb/";
private static final String PROP_INPROGRESS = "net.i2p.router.web.ReseedHandler.reseedInProgress";
private static final String PROP_ERROR = "net.i2p.router.web.ReseedHandler.errorMessage";
private static final String PROP_STATUS = "net.i2p.router.web.ReseedHandler.statusMessage";
public static final String PROP_PROXY_HOST = "router.reseedProxyHost";
public static final String PROP_PROXY_PORT = "router.reseedProxyPort";
private static final String RESEED_TIPS =
_x("Ensure that nothing blocks outbound HTTP, check <a href=logs.jsp>logs</a> " +
"and if nothing helps, read FAQ about reseeding manually.");
public Reseeder(RouterContext ctx) {
_context = ctx;
@ -63,7 +68,6 @@ public class Reseeder {
}
/** Todo: translate the messages sent via PROP_STATUS */
public class ReseedRunner implements Runnable, EepGet.StatusListener {
private boolean _isRunning;
private String _proxyHost;
@ -71,7 +75,7 @@ public class Reseeder {
public ReseedRunner() {
_isRunning = false;
System.setProperty(PROP_STATUS, "Reseeding.");
System.setProperty(PROP_STATUS, _("Reseeding"));
}
public boolean isRunning() { return _isRunning; }
public void run() {
@ -105,10 +109,6 @@ public class Reseeder {
* save them into this router's netDb dir.
*
*/
private static final String RESEED_TIPS =
"Ensure that nothing blocks outbound HTTP, check <a href=logs.jsp>logs</a> " +
"and if nothing helps, read FAQ about reseeding manually.";
private void reseed(boolean echoStatus) {
List URLList = new ArrayList();
String URLs = _context.getProperty("i2p.reseedURL", DEFAULT_SEED_URL);
@ -139,14 +139,14 @@ public class Reseeder {
try {
System.setProperty(PROP_ERROR, "");
System.setProperty(PROP_STATUS, "Reseeding: fetching seed URL.");
System.setProperty(PROP_STATUS, _("Reseeding: fetching seed URL."));
System.err.println("Reseed from " + seedURL);
URL dir = new URL(seedURL);
byte contentRaw[] = readURL(dir);
if (contentRaw == null) {
System.setProperty(PROP_ERROR,
"Last reseed failed fully (failed reading seed URL). " +
RESEED_TIPS);
_("Last reseed failed fully (failed reading seed URL).") + ' ' +
_(RESEED_TIPS));
// Logging deprecated here since attemptFailed() provides better info
_log.debug("Failed reading seed URL: " + seedURL);
return;
@ -171,8 +171,8 @@ public class Reseeder {
if (total <= 0) {
_log.error("Read " + contentRaw.length + " bytes from seed " + seedURL + ", but found no routerInfo URLs.");
System.setProperty(PROP_ERROR,
"Last reseed failed fully (no routerInfo URLs at seed URL). " +
RESEED_TIPS);
_("Last reseed failed fully (no routerInfo URLs at seed URL).") + ' ' +
_(RESEED_TIPS));
return;
}
@ -184,8 +184,7 @@ public class Reseeder {
for (Iterator iter = urlList.iterator(); iter.hasNext() && fetched < 200; ) {
try {
System.setProperty(PROP_STATUS,
"Reseeding: fetching router info from seed URL (" +
fetched + " successful, " + errors + " errors, " + total + " total).");
_("Reseeding: fetching router info from seed URL ({0} successful, {1} errors).", fetched, errors));
fetchSeed(seedURL, (String)iter.next());
fetched++;
@ -206,13 +205,13 @@ public class Reseeder {
// because some routerInfos will always fail.
if ((failPercent >= 10) && (failPercent < 90)) {
System.setProperty(PROP_ERROR,
"Last reseed failed partly (" + failPercent + "% of " + total + "). " +
RESEED_TIPS);
_("Last reseed failed partly ({0}% of {1}).", failPercent, total) + ' ' +
_(RESEED_TIPS));
}
if (failPercent >= 90) {
System.setProperty(PROP_ERROR,
"Last reseed failed (" + failPercent + "% of " + total + "). " +
RESEED_TIPS);
_("Last reseed failed ({0}% of {1}).", failPercent, total) + ' ' +
_(RESEED_TIPS));
}
if (fetched > 0)
_context.netDb().rescan();
@ -221,8 +220,8 @@ public class Reseeder {
_isRunning = false;
} catch (Throwable t) {
System.setProperty(PROP_ERROR,
"Last reseed failed fully (exception caught). " +
RESEED_TIPS);
_("Last reseed failed fully (exception caught).") + ' ' +
_(RESEED_TIPS));
_log.error("Error reseeding", t);
}
}
@ -270,6 +269,28 @@ public class Reseeder {
}
/**
* Mark a string for extraction by xgettext and translation.
* Use this only in static initializers.
* It does not translate!
* @return s
*/
private static final String _x(String s) {
return s;
}
private static final String BUNDLE_NAME = "net.i2p.router.web.messages";
/** translate */
public String _(String key) {
return Translate.getString(key, _context, BUNDLE_NAME);
}
/** translate */
public String _(String s, Object o, Object o2) {
return Translate.getString(s, o, o2, _context, BUNDLE_NAME);
}
/******
public static void main(String args[]) {
if ( (args != null) && (args.length == 1) && (!Boolean.valueOf(args[0]).booleanValue()) ) {

View File

@ -4,6 +4,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@ -32,6 +33,13 @@ public class ClientAppConfig {
public String args;
public long delay;
public boolean disabled;
/** @since 0.7.12 */
public String classpath;
/** @since 0.7.12 */
public String stopargs;
/** @since 0.7.12 */
public String uninstallargs;
public ClientAppConfig(String cl, String client, String a, long d, boolean dis) {
className = cl;
clientName = client;
@ -40,6 +48,14 @@ public class ClientAppConfig {
disabled = dis;
}
/** @since 0.7.12 */
public ClientAppConfig(String cl, String client, String a, long d, boolean dis, String cp, String sa, String ua) {
this(cl, client, a, d, dis);
classpath = cp;
stopargs = sa;
uninstallargs = ua;
}
public static File configFile(I2PAppContext ctx) {
String clientConfigFile = ctx.getProperty(PROP_CLIENT_CONFIG_FILENAME, DEFAULT_CLIENT_CONFIG_FILENAME);
File cfgFile = new File(clientConfigFile);
@ -72,6 +88,26 @@ public class ClientAppConfig {
*/
public static List<ClientAppConfig> getClientApps(RouterContext ctx) {
Properties clientApps = getClientAppProps(ctx);
return getClientApps(clientApps);
}
/*
* Go through the properties, and return a List of ClientAppConfig structures
*/
public static List<ClientAppConfig> getClientApps(File cfgFile) {
Properties clientApps = new Properties();
try {
DataHelper.loadProps(clientApps, cfgFile);
} catch (IOException ioe) {
return Collections.EMPTY_LIST;
}
return getClientApps(clientApps);
}
/*
* Go through the properties, and return a List of ClientAppConfig structures
*/
private static List<ClientAppConfig> getClientApps(Properties clientApps) {
List<ClientAppConfig> rv = new ArrayList(8);
int i = 0;
while (true) {
@ -83,6 +119,9 @@ public class ClientAppConfig {
String delayStr = clientApps.getProperty(PREFIX + i + ".delay");
String onBoot = clientApps.getProperty(PREFIX + i + ".onBoot");
String disabled = clientApps.getProperty(PREFIX + i + ".startOnLoad");
String classpath = clientApps.getProperty(PREFIX + i + ".classpath");
String stopargs = clientApps.getProperty(PREFIX + i + ".stopargs");
String uninstallargs = clientApps.getProperty(PREFIX + i + ".uninstallargs");
i++;
boolean dis = disabled != null && "false".equals(disabled);
@ -94,11 +133,13 @@ public class ClientAppConfig {
if (delayStr != null && !onStartup)
try { delay = 1000*Integer.parseInt(delayStr); } catch (NumberFormatException nfe) {}
rv.add(new ClientAppConfig(className, clientName, args, delay, dis));
rv.add(new ClientAppConfig(className, clientName, args, delay, dis,
classpath, stopargs, uninstallargs));
}
return rv;
}
/** classpath and stopargs not supported */
public static void writeClientAppConfig(RouterContext ctx, List apps) {
File cfgFile = configFile(ctx);
FileOutputStream fos = null;

View File

@ -1,6 +1,7 @@
package net.i2p.router.startup;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
@ -48,16 +49,20 @@ public class LoadClientAppsJob extends JobImpl {
}
}
}
private class DelayedRunClient extends JobImpl {
public static class DelayedRunClient extends JobImpl {
private String _className;
private String _clientName;
private String _args[];
private Log _log;
public DelayedRunClient(RouterContext enclosingContext, String className, String clientName, String args[], long delay) {
super(enclosingContext);
_className = className;
_clientName = clientName;
_args = args;
getTiming().setStartAfter(LoadClientAppsJob.this.getContext().clock().now() + delay);
_log = enclosingContext.logManager().getLog(LoadClientAppsJob.class);
getTiming().setStartAfter(getContext().clock().now() + delay);
}
public String getName() { return "Delayed client job"; }
public void runJob() {
@ -80,9 +85,8 @@ public class LoadClientAppsJob extends JobImpl {
if (str.length() > 0)
argList.add(str);
buf = new StringBuilder(32);
} else {
isQuoted = true;
}
isQuoted = !isQuoted;
break;
case ' ':
case '\t':
@ -115,7 +119,8 @@ public class LoadClientAppsJob extends JobImpl {
}
public static void runClient(String className, String clientName, String args[], Log log) {
log.info("Loading up the client application " + clientName + ": " + className + " " + args);
if (log.shouldLog(Log.INFO))
log.info("Loading up the client application " + clientName + ": " + className + " " + Arrays.toString(args));
I2PThread t = new I2PThread(new RunApp(className, clientName, args, log));
if (clientName == null)
clientName = className + " client";
@ -146,7 +151,8 @@ public class LoadClientAppsJob extends JobImpl {
} catch (Throwable t) {
_log.log(Log.CRIT, "Error starting up the client class " + _className, t);
}
_log.info("Done running client application " + _appName);
if (_log.shouldLog(Log.INFO))
_log.info("Done running client application " + _appName);
}
}

View File

@ -33,6 +33,7 @@ import net.i2p.router.transport.udp.UDPTransport;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
import net.i2p.util.Translate;
public class CommSystemFacadeImpl extends CommSystemFacade {
private Log _log;
@ -475,6 +476,8 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
return n;
}
private static final String BUNDLE_NAME = "net.i2p.router.web.messages";
/** Provide a consistent "look" for displaying router IDs in the console */
@Override
public String renderPeerHTML(Hash peer) {
@ -482,8 +485,11 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
StringBuilder buf = new StringBuilder(128);
String c = getCountry(peer);
if (c != null) {
String countryName = getCountryName(c);
if (countryName.length() > 2)
countryName = Translate.getString(countryName, _context, BUNDLE_NAME);
buf.append("<img height=\"11\" width=\"16\" alt=\"").append(c.toUpperCase()).append("\" title=\"");
buf.append(getCountryName(c));
buf.append(countryName);
buf.append("\" src=\"/flags.jsp?c=").append(c).append("\"> ");
}
buf.append("<tt>");

View File

@ -17,6 +17,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.RouterIdentity;
import net.i2p.data.RouterInfo;
@ -33,11 +34,11 @@ public class EventPumper implements Runnable {
private Log _log;
private volatile boolean _alive;
private Selector _selector;
private final List _bufCache;
private final List _wantsRead = new ArrayList(16);
private final List _wantsWrite = new ArrayList(4);
private final List _wantsRegister = new ArrayList(1);
private final List _wantsConRegister = new ArrayList(4);
private final LinkedBlockingQueue<ByteBuffer> _bufCache;
private final LinkedBlockingQueue<NTCPConnection> _wantsRead = new LinkedBlockingQueue();
private final LinkedBlockingQueue<NTCPConnection> _wantsWrite = new LinkedBlockingQueue();
private final LinkedBlockingQueue<ServerSocketChannel> _wantsRegister = new LinkedBlockingQueue();
private final LinkedBlockingQueue<NTCPConnection> _wantsConRegister = new LinkedBlockingQueue();
private NTCPTransport _transport;
private long _expireIdleWriteTime;
@ -54,23 +55,19 @@ public class EventPumper implements Runnable {
/** tunnel test is every 30-60s, so this should be longer than, say, 3*45s to allow for drops */
private static final long MIN_EXPIRE_IDLE_TIME = 3*60*1000l;
private static final long MAX_EXPIRE_IDLE_TIME = 15*60*1000l;
public EventPumper(RouterContext ctx, NTCPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
_transport = transport;
_alive = false;
_bufCache = new ArrayList(MAX_CACHE_SIZE);
_bufCache = new LinkedBlockingQueue(MAX_CACHE_SIZE);
_expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME;
}
public synchronized void startPumping() {
if (_log.shouldLog(Log.INFO))
_log.info("Starting pumper");
// _wantsRead = new ArrayList(16);
// _wantsWrite = new ArrayList(4);
// _wantsRegister = new ArrayList(1);
// _wantsConRegister = new ArrayList(4);
try {
_selector = Selector.open();
_alive = true;
@ -98,13 +95,13 @@ public class EventPumper implements Runnable {
public void register(ServerSocketChannel chan) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering server socket channel");
synchronized (_wantsRegister) { _wantsRegister.add(chan); }
_wantsRegister.offer(chan);
_selector.wakeup();
}
public void registerConnect(NTCPConnection con) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering outbound connection");
_context.statManager().addRateData("ntcp.registerConnect", 1, 0);
synchronized (_wantsConRegister) { _wantsConRegister.add(con); }
_wantsConRegister.offer(con);
_selector.wakeup();
}
@ -254,10 +251,10 @@ public class EventPumper implements Runnable {
} catch (Exception e) {
_log.error("Error closing keys on pumper shutdown", e);
}
synchronized (_wantsConRegister) { _wantsConRegister.clear(); }
synchronized (_wantsRead) { _wantsRead.clear(); }
synchronized (_wantsRegister) { _wantsRegister.clear(); }
synchronized (_wantsWrite) { _wantsWrite.clear(); }
_wantsConRegister.clear();
_wantsRead.clear();
_wantsRegister.clear();
_wantsWrite.clear();
}
private void processKeys(Set selected) {
@ -322,10 +319,8 @@ public class EventPumper implements Runnable {
public void wantsWrite(NTCPConnection con) {
if (_log.shouldLog(Log.INFO))
_log.info("Before adding wants to write on " + con);
synchronized (_wantsWrite) {
if (!_wantsWrite.contains(con))
_wantsWrite.add(con);
}
if (!_wantsWrite.contains(con))
_wantsWrite.offer(con);
if (_log.shouldLog(Log.INFO))
_log.info("Wants to write on " + con);
_selector.wakeup();
@ -333,10 +328,8 @@ public class EventPumper implements Runnable {
_log.debug("selector awoken for write");
}
public void wantsRead(NTCPConnection con) {
synchronized (_wantsRead) {
if (!_wantsRead.contains(con))
_wantsRead.add(con);
}
if (!_wantsRead.contains(con))
_wantsRead.offer(con);
if (_log.shouldLog(Log.DEBUG))
_log.debug("wants to read on " + con);
_selector.wakeup();
@ -345,16 +338,16 @@ public class EventPumper implements Runnable {
}
private static final int MIN_BUFS = 5;
/**
* There's only one pumper, so static is fine, unless multi router
* Is there a better way to do this?
*/
private static int NUM_BUFS = 5;
private static int __liveBufs = 0;
private static int __consecutiveExtra;
ByteBuffer acquireBuf() {
if (false) return ByteBuffer.allocate(BUF_SIZE);
ByteBuffer rv = null;
synchronized (_bufCache) {
if (_bufCache.size() > 0)
rv = (ByteBuffer)_bufCache.remove(0);
}
//if (false) return ByteBuffer.allocate(BUF_SIZE);
ByteBuffer rv = (ByteBuffer)_bufCache.poll();
if (rv == null) {
rv = ByteBuffer.allocate(BUF_SIZE);
NUM_BUFS = ++__liveBufs;
@ -369,27 +362,24 @@ public class EventPumper implements Runnable {
}
void releaseBuf(ByteBuffer buf) {
if (false) return;
//if (false) return;
if (_log.shouldLog(Log.DEBUG))
_log.debug("releasing read buffer " + System.identityHashCode(buf) + " with " + __liveBufs + " live: " + buf);
buf.clear();
int extra = 0;
boolean cached = false;
synchronized (_bufCache) {
if (_bufCache.size() < NUM_BUFS) {
extra = _bufCache.size();
_bufCache.add(buf);
cached = true;
if (extra > 5) {
__consecutiveExtra++;
if (__consecutiveExtra >= 20) {
NUM_BUFS = Math.max(NUM_BUFS - 1, MIN_BUFS);
__consecutiveExtra = 0;
}
int extra = _bufCache.size();
boolean cached = extra < NUM_BUFS;
if (cached) {
_bufCache.offer(buf);
if (extra > 5) {
__consecutiveExtra++;
if (__consecutiveExtra >= 20) {
NUM_BUFS = Math.max(NUM_BUFS - 1, MIN_BUFS);
__consecutiveExtra = 0;
}
} else {
__liveBufs--;
}
} else {
__liveBufs--;
}
if (cached && _log.shouldLog(Log.DEBUG))
_log.debug("read buffer " + System.identityHashCode(buf) + " cached with " + __liveBufs + " live");
@ -578,14 +568,8 @@ public class EventPumper implements Runnable {
}
private void runDelayedEvents(List buf) {
synchronized (_wantsRead) {
if (_wantsRead.size() > 0) {
buf.addAll(_wantsRead);
_wantsRead.clear();
}
}
while (buf.size() > 0) {
NTCPConnection con = (NTCPConnection)buf.remove(0);
NTCPConnection con;
while ((con = _wantsRead.poll()) != null) {
SelectionKey key = con.getKey();
try {
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
@ -594,14 +578,7 @@ public class EventPumper implements Runnable {
}
}
synchronized (_wantsWrite) {
if (_wantsWrite.size() > 0) {
buf.addAll(_wantsWrite);
_wantsWrite.clear();
}
}
while (buf.size() > 0) {
NTCPConnection con = (NTCPConnection)buf.remove(0);
while ((con = _wantsWrite.poll()) != null) {
SelectionKey key = con.getKey();
try {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
@ -610,14 +587,8 @@ public class EventPumper implements Runnable {
}
}
synchronized (_wantsRegister) {
if (_wantsRegister.size() > 0) {
buf.addAll(_wantsRegister);
_wantsRegister.clear();
}
}
while (buf.size() > 0) {
ServerSocketChannel chan = (ServerSocketChannel)buf.remove(0);
ServerSocketChannel chan;
while ((chan = _wantsRegister.poll()) != null) {
try {
SelectionKey key = chan.register(_selector, SelectionKey.OP_ACCEPT);
key.attach(chan);
@ -626,14 +597,7 @@ public class EventPumper implements Runnable {
}
}
synchronized (_wantsConRegister) {
if (_wantsConRegister.size() > 0) {
buf.addAll(_wantsConRegister);
_wantsConRegister.clear();
}
}
while (buf.size() > 0) {
NTCPConnection con = (NTCPConnection)buf.remove(0);
while ((con = _wantsConRegister.poll()) != null) {
try {
SelectionKey key = con.getChannel().register(_selector, SelectionKey.OP_CONNECT);
key.attach(con);

View File

@ -8,6 +8,8 @@ import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.zip.Adler32;
import net.i2p.data.Base64;
@ -23,6 +25,7 @@ import net.i2p.router.OutNetMessage;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log;
/**
@ -53,13 +56,14 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
private SocketChannel _chan;
private SelectionKey _conKey;
/** list of ByteBuffer containing data we have read and are ready to process, oldest first */
private final List _readBufs;
private final LinkedBlockingQueue<ByteBuffer> _readBufs;
/**
* list of ByteBuffers containing fully populated and encrypted data, ready to write,
* and already cleared through the bandwidth limiter.
*/
private final List _writeBufs;
private final List _bwRequests;
private final LinkedBlockingQueue<ByteBuffer> _writeBufs;
/** Todo: This is only so we can abort() them when we close() ??? */
private final Set<FIFOBandwidthLimiter.Request> _bwRequests;
private boolean _established;
private long _establishedOn;
private EstablishState _establishState;
@ -72,8 +76,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
/**
* pending unprepared OutNetMessage instances
*/
private final List _outbound;
/** current prepared OutNetMessage, or null */
private final LinkedBlockingQueue<OutNetMessage> _outbound;
/** current prepared OutNetMessage, or null - synchronize on _outbound to modify */
private OutNetMessage _currentOutbound;
private SessionKey _sessionKey;
/** encrypted block of the current I2NP message being read */
@ -124,10 +128,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_lastReceiveTime = _created;
_transport = transport;
_chan = chan;
_readBufs = new ArrayList(4);
_writeBufs = new ArrayList(4);
_bwRequests = new ArrayList(2);
_outbound = new ArrayList(4);
_readBufs = new LinkedBlockingQueue();
_writeBufs = new LinkedBlockingQueue();
_bwRequests = new ConcurrentHashSet(2);
_outbound = new LinkedBlockingQueue();
_established = false;
_isInbound = true;
_closed = false;
@ -154,10 +158,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_lastReceiveTime = _created;
_transport = transport;
_remAddr = remAddr;
_readBufs = new ArrayList(4);
_writeBufs = new ArrayList(4);
_bwRequests = new ArrayList(2);
_outbound = new ArrayList(4);
_readBufs = new LinkedBlockingQueue();
_writeBufs = new LinkedBlockingQueue();
_bwRequests = new ConcurrentHashSet(2);
_outbound = new LinkedBlockingQueue();
_established = false;
_isInbound = false;
_closed = false;
@ -210,12 +214,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
public long getMessagesSent() { return _messagesWritten; }
public long getMessagesReceived() { return _messagesRead; }
public long getOutboundQueueSize() {
synchronized (_outbound) {
int queued = _outbound.size();
if (_currentOutbound != null)
queued++;
return queued;
}
}
public long getTimeSinceSend() { return System.currentTimeMillis()-_lastSendTime; }
public long getTimeSinceReceive() { return System.currentTimeMillis()-_lastReceiveTime; }
@ -234,29 +236,21 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_transport.removeCon(this);
_transport.getReader().connectionClosed(this);
_transport.getWriter().connectionClosed(this);
List reqs = null;
synchronized (_bwRequests) {
if (_bwRequests.size() > 0) {
reqs = new ArrayList(_bwRequests);
_bwRequests.clear();
}
for (Iterator<FIFOBandwidthLimiter.Request> iter = _bwRequests.iterator(); iter.hasNext(); ) {
iter.next().abort();
}
if (reqs != null)
for (Iterator iter = reqs.iterator(); iter.hasNext(); )
((FIFOBandwidthLimiter.Request)iter.next()).abort();
List msgs = null;
synchronized (_outbound) {
msgs = new ArrayList(_outbound);
_outbound.clear();
}
for (int i = 0; i < msgs.size(); i++) {
OutNetMessage msg = (OutNetMessage)msgs.get(i);
_bwRequests.clear();
OutNetMessage msg;
while ((msg = _outbound.poll()) != null) {
Object buf = msg.releasePreparationBuffer();
if (buf != null)
releaseBuf((PrepBuffer)buf);
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
}
OutNetMessage msg = _currentOutbound;
msg = _currentOutbound;
if (msg != null) {
Object buf = msg.releasePreparationBuffer();
if (buf != null)
@ -277,10 +271,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (_consecutiveBacklog > 10) { // waaay too backlogged
boolean wantsWrite = false;
try { wantsWrite = ( (_conKey.interestOps() & SelectionKey.OP_WRITE) != 0); } catch (Exception e) {}
int blocks = 0;
synchronized (_writeBufs) { blocks = _writeBufs.size(); }
if (_log.shouldLog(Log.WARN))
if (_log.shouldLog(Log.WARN)) {
int blocks = _writeBufs.size();
_log.warn("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash().toBase64());
}
_context.statManager().addRateData("ntcp.closeOnBacklog", getUptime(), getUptime());
close();
}
@ -292,40 +286,29 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (FAST_LARGE)
bufferedPrepare(msg);
boolean noOutbound = false;
synchronized (_outbound) {
_outbound.add(msg);
enqueued = _outbound.size();
msg.setQueueSize(enqueued);
noOutbound = (_currentOutbound == null);
}
_outbound.offer(msg);
enqueued = _outbound.size();
msg.setQueueSize(enqueued);
noOutbound = (_currentOutbound == null);
if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
if (_established && noOutbound)
_transport.getWriter().wantsWrite(this, "enqueued");
}
private long queueTime() {
long queueTime = 0;
int size = 0;
synchronized (_outbound) {
OutNetMessage msg = _currentOutbound;
size = _outbound.size();
if ( (msg == null) && (size > 0) )
msg = (OutNetMessage)_outbound.get(0);
OutNetMessage msg = _currentOutbound;
if (msg == null) {
msg = _outbound.peek();
if (msg == null)
return 0;
queueTime = msg.getSendTime(); // does not include any of the pre-send(...) preparation
}
return queueTime;
return msg.getSendTime(); // does not include any of the pre-send(...) preparation
}
public boolean tooBacklogged() {
long queueTime = queueTime();
if (queueTime <= 0) return false;
int size = 0;
boolean currentOutboundSet = false;
synchronized (_outbound) {
size = _outbound.size();
currentOutboundSet = (_currentOutbound != null);
}
boolean currentOutboundSet = _currentOutbound != null;
// perhaps we could take into account the size of the queued messages too, our
// current transmission rate, and how much time is left before the new message's expiration?
@ -333,15 +316,16 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (getUptime() < 10*1000) // allow some slack just after establishment
return false;
if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime...
int writeBufs = 0;
synchronized (_writeBufs) { writeBufs = _writeBufs.size(); }
if (_log.shouldLog(Log.WARN))
int size = _outbound.size();
if (_log.shouldLog(Log.WARN)) {
int writeBufs = _writeBufs.size();
try {
_log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size
+ ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE))
+ ", currentOut set? " + currentOutboundSet
+ ", writeBufs: " + writeBufs + " on " + toString());
} catch (Exception e) {} // java.nio.channels.CancelledKeyException
}
_context.statManager().addRateData("ntcp.sendBacklogTime", queueTime, size);
return true;
//} else if (size > 32) { // another arbitrary limit.
@ -440,10 +424,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_establishState = null;
_transport.markReachable(getRemotePeer().calculateHash(), false);
//_context.shitlist().unshitlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE);
boolean msgs = false;
synchronized (_outbound) {
msgs = (_outbound.size() > 0);
}
boolean msgs = !_outbound.isEmpty();
_nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY);
_nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
if (msgs)
@ -469,6 +450,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
* prepare the next i2np message for transmission. this should be run from
* the Writer thread pool.
*
* Todo: remove synchronization?
*
*/
synchronized void prepareNextWrite() {
//if (FAST_LARGE)
@ -579,6 +562,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
* prepare the next i2np message for transmission. this should be run from
* the Writer thread pool.
*
* Todo: remove synchronization?
*
*/
synchronized void prepareNextWriteFast() {
if (_log.shouldLog(Log.DEBUG))
@ -600,6 +585,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
OutNetMessage msg = null;
// this is synchronized only for _currentOutbound
// Todo: figure out how to remove the synchronization
synchronized (_outbound) {
if (_currentOutbound != null) {
if (_log.shouldLog(Log.INFO))
@ -608,21 +595,28 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
//throw new RuntimeException("We should not be preparing a write while we still have one pending");
if (queueTime() > 3*1000) { // don't stall low-priority messages
msg = (OutNetMessage)_outbound.remove(0);
msg = _outbound.poll();
if (msg == null)
return;
} else {
Iterator it = _outbound.iterator();
int slot = 0; // only for logging
Iterator<OutNetMessage> it = _outbound.iterator();
for (int i = 0; it.hasNext() && i < 75; i++) { //arbitrary bound
OutNetMessage mmsg = (OutNetMessage) it.next();
if (msg == null || mmsg.getPriority() > msg.getPriority())
OutNetMessage mmsg = it.next();
if (msg == null || mmsg.getPriority() > msg.getPriority()) {
msg = mmsg;
slot = i;
}
}
if (msg == null)
return;
// if (_outbound.indexOf(msg) > 0)
// _log.debug("Priority message sent, pri = " + msg.getPriority() + " pos = " + _outbound.indexOf(msg) + "/" +_outbound.size());
if (_log.shouldLog(Log.INFO))
_log.info("Type " + msg.getMessage().getType() + " pri " + msg.getPriority() + " slot " + _outbound.indexOf(msg));
_outbound.remove(msg);
_log.info("Type " + msg.getMessage().getType() + " pri " + msg.getPriority() + " slot " + slot);
boolean removed = _outbound.remove(msg);
if ((!removed) && _log.shouldLog(Log.ERROR))
_log.info("Already removed??? " + msg.getMessage().getType());
}
_currentOutbound = msg;
}
@ -815,17 +809,13 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
write(buf);
}
}
private void removeRequest(FIFOBandwidthLimiter.Request req) {
synchronized (_bwRequests) { _bwRequests.remove(req); }
_bwRequests.remove(req);
}
private void addRequest(FIFOBandwidthLimiter.Request req) {
synchronized (_bwRequests) { _bwRequests.add(req); }
}
public int outboundQueueSize() {
synchronized (_writeBufs) {
return _writeBufs.size();
}
_bwRequests.add(req);
}
/**
@ -852,10 +842,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
*/
public void recv(ByteBuffer buf) {
_bytesReceived += buf.remaining();
synchronized (_readBufs) {
//buf.flip();
_readBufs.add(buf);
}
_readBufs.offer(buf);
_transport.getReader().wantsRead(this);
updateStats();
}
@ -865,34 +853,27 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
*/
public void write(ByteBuffer buf) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Before write(buf)");
synchronized (_writeBufs) {
_writeBufs.add(buf);
}
_writeBufs.offer(buf);
if (_log.shouldLog(Log.DEBUG)) _log.debug("After write(buf)");
_transport.getPumper().wantsWrite(this);
}
/** @return null if none available */
public ByteBuffer getNextReadBuf() {
synchronized (_readBufs) {
if (_readBufs.size() > 0)
return (ByteBuffer)_readBufs.get(0);
}
return null;
return _readBufs.poll();
}
/** since getNextReadBuf() removes, this should not be necessary */
public void removeReadBuf(ByteBuffer buf) {
synchronized (_readBufs) {
_readBufs.remove(buf);
}
_readBufs.remove(buf);
//_transport.getPumper().releaseBuf(buf);
}
public int getWriteBufCount() { synchronized (_writeBufs) { return _writeBufs.size(); } }
public int getWriteBufCount() { return _writeBufs.size(); }
/** @return null if none available */
public ByteBuffer getNextWriteBuf() {
synchronized (_writeBufs) {
if (_writeBufs.size() > 0)
return (ByteBuffer)_writeBufs.get(0); // not remove! we removeWriteBuf afterwards
}
return null;
return _writeBufs.peek(); // not remove! we removeWriteBuf afterwards
}
public void removeWriteBuf(ByteBuffer buf) {
@ -900,16 +881,15 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
OutNetMessage msg = null;
boolean bufsRemain = false;
boolean clearMessage = false;
synchronized (_writeBufs) {
if (_sendingMeta && (buf.capacity() == _meta.length)) {
_sendingMeta = false;
} else {
clearMessage = true;
}
_writeBufs.remove(buf);
bufsRemain = _writeBufs.size() > 0;
if (_sendingMeta && (buf.capacity() == _meta.length)) {
_sendingMeta = false;
} else {
clearMessage = true;
}
_writeBufs.remove(buf);
bufsRemain = !_writeBufs.isEmpty();
if (clearMessage) {
// see synchronization comments in prepareNextWriteFast()
synchronized (_outbound) {
if (_currentOutbound != null)
msg = _currentOutbound;
@ -935,10 +915,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_log.info("I2NP meta message sent completely");
}
boolean msgs = false;
synchronized (_outbound) {
msgs = ((_outbound.size() > 0) || (_currentOutbound != null));
}
boolean msgs = ((!_outbound.isEmpty()) || (_currentOutbound != null));
if (msgs) // push through the bw limiter to reach _writeBufs
_transport.getWriter().wantsWrite(this, "write completed");
if (bufsRemain) // send asap
@ -1113,22 +1090,17 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
private static final int MAX_HANDLERS = 4;
private final static List _i2npHandlers = new ArrayList(MAX_HANDLERS);
private final static LinkedBlockingQueue<I2NPMessageHandler> _i2npHandlers = new LinkedBlockingQueue(MAX_HANDLERS);
private final static I2NPMessageHandler acquireHandler(RouterContext ctx) {
I2NPMessageHandler rv = null;
synchronized (_i2npHandlers) {
if (_i2npHandlers.size() > 0)
rv = (I2NPMessageHandler)_i2npHandlers.remove(0);
}
I2NPMessageHandler rv = _i2npHandlers.poll();
if (rv == null)
rv = new I2NPMessageHandler(ctx);
return rv;
}
private static void releaseHandler(I2NPMessageHandler handler) {
synchronized (_i2npHandlers) {
if (_i2npHandlers.size() < MAX_HANDLERS)
_i2npHandlers.add(handler);
}
_i2npHandlers.offer(handler);
}
@ -1144,21 +1116,20 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
private static final int MAX_DATA_READ_BUFS = 16;
private final static List _dataReadBufs = new ArrayList(MAX_DATA_READ_BUFS);
private final static LinkedBlockingQueue<DataBuf> _dataReadBufs = new LinkedBlockingQueue(MAX_DATA_READ_BUFS);
private static DataBuf acquireReadBuf() {
synchronized (_dataReadBufs) {
if (_dataReadBufs.size() > 0)
return (DataBuf)_dataReadBufs.remove(0);
}
DataBuf rv = _dataReadBufs.poll();
if (rv != null)
return rv;
return new DataBuf();
}
private static void releaseReadBuf(DataBuf buf) {
buf.bais.reset();
synchronized (_dataReadBufs) {
if (_dataReadBufs.size() < MAX_DATA_READ_BUFS)
_dataReadBufs.add(buf);
}
_dataReadBufs.offer(buf);
}
/**
* sizeof(data)+data+pad+crc.
*

View File

@ -430,7 +430,18 @@ public class NTCPTransport extends TransportImpl {
private static final int NUM_CONCURRENT_READERS = 3;
private static final int NUM_CONCURRENT_WRITERS = 3;
/**
* Called by TransportManager.
* Caller should stop the transport first, then
* verify stopped with isAlive()
* Unfortunately TransportManager doesn't do that, so we
* check here to prevent two pumpers.
* @return appears to be ignored by caller
*/
public synchronized RouterAddress startListening() {
// try once again to prevent two pumpers which is fatal
if (_pumper.isAlive())
return _myAddress != null ? _myAddress.toRouterAddress() : null;
if (_log.shouldLog(Log.WARN)) _log.warn("Starting ntcp transport listening");
_finisher.start();
_pumper.startPumping();
@ -442,7 +453,17 @@ public class NTCPTransport extends TransportImpl {
return bindAddress();
}
/**
* Only called by CSFI.
* Caller should stop the transport first, then
* verify stopped with isAlive()
* @return appears to be ignored by caller
*/
public synchronized RouterAddress restartListening(RouterAddress addr) {
// try once again to prevent two pumpers which is fatal
// we could just return null since the return value is ignored
if (_pumper.isAlive())
return _myAddress != null ? _myAddress.toRouterAddress() : null;
if (_log.shouldLog(Log.WARN)) _log.warn("Restarting ntcp transport listening");
_finisher.start();
_pumper.startPumping();
@ -461,6 +482,7 @@ public class NTCPTransport extends TransportImpl {
return _pumper.isAlive();
}
/** call from synchronized method */
private RouterAddress bindAddress() {
if (_myAddress != null) {
try {

View File

@ -17,10 +17,10 @@ import net.i2p.util.Log;
class Reader {
private RouterContext _context;
private Log _log;
private final List _pendingConnections;
private List _liveReads;
private List _readAfterLive;
private List _runners;
private final List<NTCPConnection> _pendingConnections;
private List<NTCPConnection> _liveReads;
private List<NTCPConnection> _readAfterLive;
private List<Runner> _runners;
public Reader(RouterContext ctx) {
_context = ctx;
@ -41,7 +41,7 @@ class Reader {
}
public void stopReading() {
while (_runners.size() > 0) {
Runner r = (Runner)_runners.remove(0);
Runner r = _runners.remove(0);
r.stop();
}
synchronized (_pendingConnections) {
@ -93,7 +93,7 @@ class Reader {
if (_pendingConnections.size() <= 0) {
_pendingConnections.wait();
} else {
con = (NTCPConnection)_pendingConnections.remove(0);
con = _pendingConnections.remove(0);
_liveReads.add(con);
}
}
@ -155,7 +155,8 @@ class Reader {
con.close();
return;
} else if (buf.remaining() <= 0) {
con.removeReadBuf(buf);
// not necessary, getNextReadBuf() removes
//con.removeReadBuf(buf);
}
if (est.isComplete() && est.getExtraBytes() != null)
con.recvEncryptedI2NP(ByteBuffer.wrap(est.getExtraBytes()));
@ -165,7 +166,8 @@ class Reader {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Processing read buffer as part of an i2np message (" + buf.remaining() + " bytes)");
con.recvEncryptedI2NP(buf);
con.removeReadBuf(buf);
// not necessary, getNextReadBuf() removes
//con.removeReadBuf(buf);
}
}
}

View File

@ -16,10 +16,10 @@ import net.i2p.util.Log;
class Writer {
private RouterContext _context;
private Log _log;
private final List _pendingConnections;
private List _liveWrites;
private List _writeAfterLive;
private List _runners;
private final List<NTCPConnection> _pendingConnections;
private List<NTCPConnection> _liveWrites;
private List<NTCPConnection> _writeAfterLive;
private List<Runner> _runners;
public Writer(RouterContext ctx) {
_context = ctx;
@ -40,7 +40,7 @@ class Writer {
}
public void stopWriting() {
while (_runners.size() > 0) {
Runner r = (Runner)_runners.remove(0);
Runner r = _runners.remove(0);
r.stop();
}
synchronized (_pendingConnections) {
@ -100,7 +100,7 @@ class Writer {
_log.debug("Done writing, but nothing pending, so wait");
_pendingConnections.wait();
} else {
con = (NTCPConnection)_pendingConnections.remove(0);
con = _pendingConnections.remove(0);
_liveWrites.add(con);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Switch to writing on: " + con);

View File

@ -0,0 +1,32 @@
package net.i2p.router.transport.udp;
import net.i2p.data.Hash;
import net.i2p.router.OutNetMessage;
/**
* Since the TimedWeightedPriorityMessageQueue.add()
* was disabled by jrandom in UDPTransport.java
* on 2006-02-19, and the choke/unchoke was disabled at the same time,
* all of TWPMQ is pointless, so just do this for now.
*
* It appears from his comments that it was a lock contention issue,
* so perhaps TWPMQ can be converted to concurrent and re-enabled.
*
* @since 0.7.12
*/
public class DummyThrottle implements OutboundMessageFragments.ActiveThrottle {
public DummyThrottle() {
}
public void choke(Hash peer) {
}
public void unchoke(Hash peer) {
}
public boolean isChoked(Hash peer) {
return false;
}
}

View File

@ -9,6 +9,8 @@ import net.i2p.util.Log;
* Blocking thread to grab new messages off the outbound queue and
* plopping them into our active pool.
*
* WARNING - UNUSED since 0.6.1.11
*
*/
public class OutboundRefiller implements Runnable {
private RouterContext _context;

View File

@ -16,6 +16,9 @@ import net.i2p.util.Log;
* Weighted priority queue implementation for the outbound messages, coupled
* with code to fail messages that expire.
*
* WARNING - UNUSED since 0.6.1.11
* See comments in DQAT.java and mtn history ca. 2006-02-19
*
*/
public class TimedWeightedPriorityMessageQueue implements MessageQueue, OutboundMessageFragments.ActiveThrottle {
private RouterContext _context;

View File

@ -12,7 +12,8 @@ import net.i2p.util.I2PThread;
// import net.i2p.util.Log;
/**
*
* This sends random data to all UDP peers at a specified rate.
* It is for testing only!
*/
class UDPFlooder implements Runnable {
private RouterContext _context;

View File

@ -80,6 +80,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private int _expireTimeout;
/** last report from a peer of our IP */
private Hash _lastFrom;
private byte[] _lastOurIP;
private int _lastOurPort;
private static final int DROPLIST_PERIOD = 10*60*1000;
private static final int MAX_DROPLIST_SIZE = 256;
@ -130,12 +135,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
/** how many relays offered to us will we use at a time? */
public static final int PUBLIC_RELAY_COUNT = 3;
private static final boolean USE_PRIORITY = false;
/** configure the priority queue with the given split points */
private static final int PRIORITY_LIMITS[] = new int[] { 100, 200, 300, 400, 500, 1000 };
/** configure the priority queue with the given weighting per priority group */
private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 };
/** should we flood all UDP peers with the configured rate? */
/** should we flood all UDP peers with the configured rate? This is for testing only! */
private static final boolean SHOULD_FLOOD_PEERS = false;
private static final int MAX_CONSECUTIVE_FAILED = 5;
@ -165,9 +172,16 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_dropList = new ArrayList(256);
_endpoint = null;
TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this);
_outboundMessages = mq;
_activeThrottle = mq;
// See comments in DQAT.java
if (USE_PRIORITY) {
TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this);
_outboundMessages = mq;
_activeThrottle = mq;
} else {
DummyThrottle mq = new DummyThrottle();
_outboundMessages = null;
_activeThrottle = mq;
}
_cachedBid = new SharedBid[BID_VALUES.length];
for (int i = 0; i < BID_VALUES.length; i++) {
@ -176,7 +190,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_fragments = new OutboundMessageFragments(_context, this, _activeThrottle);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this);
_flooder = new UDPFlooder(_context, this);
if (SHOULD_FLOOD_PEERS)
_flooder = new UDPFlooder(_context, this);
_expireTimeout = EXPIRE_TIMEOUT;
_expireEvent = new ExpirePeerEvent();
_testEvent = new PeerTestEvent();
@ -271,10 +286,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_handler == null)
_handler = new PacketHandler(_context, this, _endpoint, _establisher, _inboundFragments, _testManager, _introManager);
if (_refiller == null)
// See comments in DQAT.java
if (USE_PRIORITY && _refiller == null)
_refiller = new OutboundRefiller(_context, _fragments, _outboundMessages);
if (_flooder == null)
if (SHOULD_FLOOD_PEERS && _flooder == null)
_flooder = new UDPFlooder(_context, this);
// Startup the endpoint with the requested port, check the actual port, and
@ -301,8 +317,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_inboundFragments.startup();
_pusher = new PacketPusher(_context, _fragments, _endpoint.getSender());
_pusher.startup();
_refiller.startup();
_flooder.startup();
if (USE_PRIORITY)
_refiller.startup();
if (SHOULD_FLOOD_PEERS)
_flooder.startup();
_expireEvent.setIsAlive(true);
_testEvent.setIsAlive(true); // this queues it for 3-6 minutes in the future...
SimpleTimer.getInstance().addEvent(_testEvent, 10*1000); // lets requeue it for Real Soon
@ -407,9 +425,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
* Right now, we just blindly trust them, changing our IP and port on a
* whim. this is not good ;)
*
* Slight enhancement - require two different peers in a row to agree
*
* Todo:
* - Much better tracking of troublemakers
* - Disable if we have good local address or UPnP
* - This gets harder if and when we publish multiple addresses, or IPv6
*
* @param from Hash of inbound destination
* @param ourIP publicly routable IPv4 only
@ -441,9 +462,25 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return;
} else if (inboundRecent && _externalListenPort > 0 && _externalListenHost != null) {
// use OS clock since its an ordering thing, not a time thing
// Note that this fails us if we switch from one IP to a second, then back to the first,
// as some routers still have the first IP and will successfully connect,
// leaving us thinking the second IP is still good.
if (_log.shouldLog(Log.INFO))
_log.info("Ignoring IP address suggestion, since we have received an inbound con recently");
} else if (from.equals(_lastFrom) || !eq(_lastOurIP, _lastOurPort, ourIP, ourPort)) {
_lastFrom = from;
_lastOurIP = ourIP;
_lastOurPort = ourPort;
if (_log.shouldLog(Log.WARN))
_log.warn("The router " + from.toBase64() + " told us we have a new IP - "
+ RemoteHostId.toString(ourIP) + " port " + ourPort + ". Wait until somebody else tells us the same thing.");
} else {
if (_log.shouldLog(Log.WARN))
_log.warn(from.toBase64() + " and " + _lastFrom.toBase64() + " agree we have a new IP - "
+ RemoteHostId.toString(ourIP) + " port " + ourPort + ". Changing address.");
_lastFrom = from;
_lastOurIP = ourIP;
_lastOurPort = ourPort;
changeAddress(ourIP, ourPort);
}
@ -462,14 +499,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_log.warn("Change address? status = " + _reachabilityStatus +
" diff = " + (_context.clock().now() - _reachabilityStatusLastUpdated) +
" old = " + _externalListenHost + ':' + _externalListenPort +
" new = " + DataHelper.toString(ourIP) + ':' + ourPort);
" new = " + RemoteHostId.toString(ourIP) + ':' + ourPort);
synchronized (this) {
if ( (_externalListenHost == null) ||
(!eq(_externalListenHost.getAddress(), _externalListenPort, ourIP, ourPort)) ) {
if ( (_reachabilityStatus != CommSystemFacade.STATUS_OK) ||
(_externalListenHost == null) || (_externalListenPort <= 0) ||
(_context.clock().now() - _reachabilityStatusLastUpdated > 2*TEST_FREQUENCY) ) {
// This prevents us from changing our IP when we are not firewalled
//if ( (_reachabilityStatus != CommSystemFacade.STATUS_OK) ||
// (_externalListenHost == null) || (_externalListenPort <= 0) ||
// (_context.clock().now() - _reachabilityStatusLastUpdated > 2*TEST_FREQUENCY) ) {
// they told us something different and our tests are either old or failing
if (_log.shouldLog(Log.WARN))
_log.warn("Trying to change our external address...");
@ -488,13 +526,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_log.shouldLog(Log.WARN))
_log.warn("Error trying to change our external address", uhe);
}
} else {
// they told us something different, but our tests are recent and positive,
// so lets test again
fireTest = true;
if (_log.shouldLog(Log.WARN))
_log.warn("Different address, but we're fine.. (" + _reachabilityStatus + ")");
}
//} else {
// // they told us something different, but our tests are recent and positive,
// // so lets test again
// fireTest = true;
// if (_log.shouldLog(Log.WARN))
// _log.warn("Different address, but we're fine.. (" + _reachabilityStatus + ")");
//}
} else {
// matched what we expect
if (_log.shouldLog(Log.INFO))
@ -1125,10 +1163,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
msg.timestamp("enqueueing for an already established peer");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Add to fragments for " + to.toBase64());
if (true) // skip the priority queue and go straight to the active pool
_fragments.add(msg);
else
// See comments in DQAT.java
if (USE_PRIORITY)
_outboundMessages.add(msg);
else // skip the priority queue and go straight to the active pool
_fragments.add(msg);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Establish new connection to " + to.toBase64());
@ -1224,7 +1264,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
options.setProperty(UDPAddress.PROP_CAPACITY, ""+UDPAddress.CAPACITY_TESTING + UDPAddress.CAPACITY_INTRODUCER);
if (directIncluded || introducersIncluded) {
options.setProperty(UDPAddress.PROP_INTRO_KEY, _introKey.toBase64());
// This is called via TransportManager.configTransports() before startup(), prevent NPE
if (_introKey != null)
options.setProperty(UDPAddress.PROP_INTRO_KEY, _introKey.toBase64());
RouterAddress addr = new RouterAddress();
if (ADJUST_COST && !haveCapacity())

View File

@ -22,18 +22,24 @@ import net.i2p.util.Log;
*/
public class BuildMessageGenerator {
// cached, rather than creating lots of temporary Integer objects whenever we build a tunnel
public static final Integer ORDER[] = new Integer[TunnelBuildMessage.RECORD_COUNT];
public static final Integer ORDER[] = new Integer[TunnelBuildMessage.MAX_RECORD_COUNT];
static { for (int i = 0; i < ORDER.length; i++) ORDER[i] = Integer.valueOf(i); }
/** return null if it is unable to find a router's public key (etc) */
/****
public TunnelBuildMessage createInbound(RouterContext ctx, TunnelCreatorConfig cfg) {
return create(ctx, cfg, null, -1);
}
****/
/** return null if it is unable to find a router's public key (etc) */
/****
public TunnelBuildMessage createOutbound(RouterContext ctx, TunnelCreatorConfig cfg, Hash replyRouter, long replyTunnel) {
return create(ctx, cfg, replyRouter, replyTunnel);
}
****/
/****
private TunnelBuildMessage create(RouterContext ctx, TunnelCreatorConfig cfg, Hash replyRouter, long replyTunnel) {
TunnelBuildMessage msg = new TunnelBuildMessage(ctx);
List order = new ArrayList(ORDER.length);
@ -50,14 +56,15 @@ public class BuildMessageGenerator {
layeredEncrypt(ctx, msg, cfg, order);
return msg;
}
****/
/**
* Place the asymmetrically encrypted record in the specified record slot,
* containing the hop's configuration (as well as the reply info, if it is an outbound endpoint)
*/
public void createRecord(int recordNum, int hop, TunnelBuildMessage msg, TunnelCreatorConfig cfg, Hash replyRouter, long replyTunnel, I2PAppContext ctx, PublicKey peerKey) {
public static void createRecord(int recordNum, int hop, TunnelBuildMessage msg, TunnelCreatorConfig cfg, Hash replyRouter, long replyTunnel, I2PAppContext ctx, PublicKey peerKey) {
byte encrypted[] = new byte[TunnelBuildMessage.RECORD_SIZE];
Log log = ctx.logManager().getLog(getClass());
Log log = ctx.logManager().getLog(BuildMessageGenerator.class);
if (peerKey != null) {
BuildRequestRecord req = null;
if ( (!cfg.isInbound()) && (hop + 1 == cfg.getLength()) ) //outbound endpoint
@ -79,7 +86,7 @@ public class BuildMessageGenerator {
msg.setRecord(recordNum, new ByteArray(encrypted));
}
private BuildRequestRecord createUnencryptedRecord(I2PAppContext ctx, TunnelCreatorConfig cfg, int hop, Hash replyRouter, long replyTunnel) {
private static BuildRequestRecord createUnencryptedRecord(I2PAppContext ctx, TunnelCreatorConfig cfg, int hop, Hash replyRouter, long replyTunnel) {
Log log = ctx.logManager().getLog(BuildMessageGenerator.class);
if (hop < cfg.getLength()) {
// ok, now lets fill in some data
@ -143,10 +150,10 @@ public class BuildMessageGenerator {
* Encrypt the records so their hop ident is visible at the appropriate times
* @param order list of hop #s as Integers. For instance, if (order.get(1) is 4), it is peer cfg.getPeer(4)
*/
public void layeredEncrypt(I2PAppContext ctx, TunnelBuildMessage msg, TunnelCreatorConfig cfg, List order) {
public static void layeredEncrypt(I2PAppContext ctx, TunnelBuildMessage msg, TunnelCreatorConfig cfg, List order) {
Log log = ctx.logManager().getLog(BuildMessageGenerator.class);
// encrypt the records so that the right elements will be visible at the right time
for (int i = 0; i < TunnelBuildMessage.RECORD_COUNT; i++) {
for (int i = 0; i < msg.getRecordCount(); i++) {
ByteArray rec = msg.getRecord(i);
Integer hopNum = (Integer)order.get(i);
int hop = hopNum.intValue();

View File

@ -43,7 +43,7 @@ public class BuildMessageProcessor {
long totalEq = 0;
long totalDup = 0;
long beforeLoop = System.currentTimeMillis();
for (int i = 0; i < TunnelBuildMessage.RECORD_COUNT; i++) {
for (int i = 0; i < msg.getRecordCount(); i++) {
ByteArray rec = msg.getRecord(i);
int off = rec.getOffset();
int len = BuildRequestRecord.PEER_SIZE;
@ -87,7 +87,7 @@ public class BuildMessageProcessor {
SessionKey replyKey = rv.readReplyKey();
byte iv[] = rv.readReplyIV();
int ivOff = 0;
for (int i = 0; i < TunnelBuildMessage.RECORD_COUNT; i++) {
for (int i = 0; i < msg.getRecordCount(); i++) {
if (i != ourHop) {
ByteArray data = msg.getRecord(i);
if (log.shouldLog(Log.DEBUG))

View File

@ -17,7 +17,6 @@ import net.i2p.util.Log;
*
*/
public class BuildReplyHandler {
public BuildReplyHandler() {}
/**
* Decrypt the tunnel build reply records. This overwrites the contents of the reply
@ -25,11 +24,16 @@ public class BuildReplyHandler {
* @return status for the records (in record order), or null if the replies were not valid. Fake records
* always have 0 as their value
*/
public int[] decrypt(I2PAppContext ctx, TunnelBuildReplyMessage reply, TunnelCreatorConfig cfg, List recordOrder) {
Log log = ctx.logManager().getLog(getClass());
int rv[] = new int[TunnelBuildReplyMessage.RECORD_COUNT];
public static int[] decrypt(I2PAppContext ctx, TunnelBuildReplyMessage reply, TunnelCreatorConfig cfg, List<Integer> recordOrder) {
Log log = ctx.logManager().getLog(BuildReplyHandler.class);
if (reply.getRecordCount() != recordOrder.size()) {
// somebody messed with us
log.error("Corrupted build reply, expected " + recordOrder.size() + " records, got " + reply.getRecordCount());
return null;
}
int rv[] = new int[reply.getRecordCount()];
for (int i = 0; i < rv.length; i++) {
int hop = ((Integer)recordOrder.get(i)).intValue();
int hop = recordOrder.get(i).intValue();
if (BuildMessageGenerator.isBlank(cfg, hop)) {
// self...
if (log.shouldLog(Log.DEBUG))
@ -56,8 +60,8 @@ public class BuildReplyHandler {
*
* @return -1 on decrypt failure
*/
private int decryptRecord(I2PAppContext ctx, TunnelBuildReplyMessage reply, TunnelCreatorConfig cfg, int recordNum, int hop) {
Log log = ctx.logManager().getLog(getClass());
private static int decryptRecord(I2PAppContext ctx, TunnelBuildReplyMessage reply, TunnelCreatorConfig cfg, int recordNum, int hop) {
Log log = ctx.logManager().getLog(BuildReplyHandler.class);
if (BuildMessageGenerator.isBlank(cfg, hop)) {
if (log.shouldLog(Log.DEBUG))
log.debug(reply.getUniqueId() + ": Record " + recordNum + "/" + hop + " is fake, so consider it valid...");

View File

@ -11,6 +11,7 @@ import net.i2p.data.i2np.DeliveryStatusMessage;
import net.i2p.data.i2np.GarlicMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.TunnelBuildReplyMessage;
import net.i2p.data.i2np.VariableTunnelBuildReplyMessage;
import net.i2p.router.ClientMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
@ -83,7 +84,8 @@ public class InboundMessageDistributor implements GarlicMessageReceiver.CloveRec
// as long as there's no reply token (FVSJ will never set a reply token but an attacker might)
((msg.getType() != DatabaseStoreMessage.MESSAGE_TYPE) || (!_client.equals(((DatabaseStoreMessage)msg).getKey())) ||
(((DatabaseStoreMessage)msg).getReplyToken() != 0)) &&
(msg.getType() != TunnelBuildReplyMessage.MESSAGE_TYPE)) {
(msg.getType() != TunnelBuildReplyMessage.MESSAGE_TYPE) &&
(msg.getType() != VariableTunnelBuildReplyMessage.MESSAGE_TYPE)) {
// drop it, since we should only get tunnel test messages and garlic messages down
// client tunnels
_context.statManager().addRateData("tunnel.dropDangerousClientTunnelMessage", 1, msg.getType());
@ -204,6 +206,7 @@ public class InboundMessageDistributor implements GarlicMessageReceiver.CloveRec
return;
}
case DeliveryInstructions.DELIVERY_MODE_DESTINATION:
// Can we route UnknownI2NPMessages to a destination too?
if (!(data instanceof DataMessage)) {
if (_log.shouldLog(Log.ERROR))
_log.error("cant send a " + data.getClass().getName() + " to a destination");

View File

@ -26,7 +26,7 @@ public class TunnelCreatorConfig implements TunnelInfo {
/** gateway first */
private Hash _peers[];
private long _expiration;
private List _order;
private List<Integer> _order;
private long _replyMessageId;
private boolean _isInbound;
private long _messagesProcessed;
@ -54,7 +54,11 @@ public class TunnelCreatorConfig implements TunnelInfo {
_failures = 0;
}
/** how many hops are there in the tunnel? */
/**
* How many hops are there in the tunnel?
* INCLUDING US.
* i.e. one more than the TunnelCreatorConfig length.
*/
public int getLength() { return _config.length; }
public Properties getOptions() { return null; }
@ -91,8 +95,8 @@ public class TunnelCreatorConfig implements TunnelInfo {
public void setExpiration(long when) { _expiration = when; }
/** component ordering in the new style request */
public List getReplyOrder() { return _order; }
public void setReplyOrder(List order) { _order = order; }
public List<Integer> getReplyOrder() { return _order; }
public void setReplyOrder(List<Integer> order) { _order = order; }
/** new style reply message id */
public long getReplyMessageId() { return _replyMessageId; }
public void setReplyMessageId(long id) { _replyMessageId = id; }

View File

@ -3,9 +3,12 @@ package net.i2p.router.tunnel.pool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.data.Hash;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.RouterInfo;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelManagerFacade;
@ -28,17 +31,25 @@ class BuildExecutor implements Runnable {
private Log _log;
private TunnelPoolManager _manager;
/** list of TunnelCreatorConfig elements of tunnels currently being built */
private final List<PooledTunnelCreatorConfig> _currentlyBuilding;
private final Object _currentlyBuilding;
/** indexed by ptcc.getReplyMessageId() */
private final ConcurrentHashMap<Long, PooledTunnelCreatorConfig> _currentlyBuildingMap;
/** indexed by ptcc.getReplyMessageId() */
private final ConcurrentHashMap<Long, PooledTunnelCreatorConfig> _recentlyBuildingMap;
private boolean _isRunning;
private BuildHandler _handler;
private boolean _repoll;
private static final int MAX_CONCURRENT_BUILDS = 10;
/** accept replies up to a minute after we gave up on them */
private static final long GRACE_PERIOD = 60*1000;
public BuildExecutor(RouterContext ctx, TunnelPoolManager mgr) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
_manager = mgr;
_currentlyBuilding = new ArrayList(MAX_CONCURRENT_BUILDS);
_currentlyBuilding = new Object();
_currentlyBuildingMap = new ConcurrentHashMap(MAX_CONCURRENT_BUILDS);
_recentlyBuildingMap = new ConcurrentHashMap(4 * MAX_CONCURRENT_BUILDS);
_context.statManager().createRateStat("tunnel.concurrentBuilds", "How many builds are going at once", "Tunnels", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
_context.statManager().createRateStat("tunnel.concurrentBuildsLagged", "How many builds are going at once when we reject further builds, due to job lag (period is lag)", "Tunnels", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
_context.statManager().createRateStat("tunnel.buildExploratoryExpire", "How often an exploratory tunnel times out during creation", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
@ -51,6 +62,7 @@ class BuildExecutor implements Runnable {
_context.statManager().createRateStat("tunnel.buildRequestZeroHopTime", "How long it takes to build a zero hop tunnel", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.pendingRemaining", "How many inbound requests are pending after a pass (period is how long the pass takes)?", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.buildFailFirstHop", "How often we fail to build a OB tunnel because we can't contact the first hop", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.buildReplySlow", "Build reply late, but not too late", "Tunnels", new long[] { 10*60*1000 });
// Get stat manager, get recognized bandwidth tiers
StatManager statMgr = _context.statManager();
@ -78,25 +90,33 @@ class BuildExecutor implements Runnable {
if (allowed > MAX_CONCURRENT_BUILDS) allowed = MAX_CONCURRENT_BUILDS; // Never go beyond 10, that is uncharted territory (old limit was 5)
allowed = _context.getProperty("router.tunnelConcurrentBuilds", allowed);
// expire any REALLY old requests
long expireBefore = _context.clock().now() + 10*60*1000 - BuildRequestor.REQUEST_TIMEOUT - GRACE_PERIOD;
for (Iterator<PooledTunnelCreatorConfig> iter = _recentlyBuildingMap.values().iterator(); iter.hasNext(); ) {
PooledTunnelCreatorConfig cfg = iter.next();
if (cfg.getExpiration() <= expireBefore) {
iter.remove();
}
}
// expire any old requests
List<PooledTunnelCreatorConfig> expired = null;
int concurrent = 0;
// Todo: Make expiration variable
long expireBefore = _context.clock().now() + 10*60*1000 - BuildRequestor.REQUEST_TIMEOUT;
synchronized (_currentlyBuilding) {
// expire any old requests
for (int i = 0; i < _currentlyBuilding.size(); i++) {
PooledTunnelCreatorConfig cfg = _currentlyBuilding.get(i);
if (cfg.getExpiration() <= expireBefore) {
_currentlyBuilding.remove(i);
i--;
if (expired == null)
expired = new ArrayList();
expired.add(cfg);
}
expireBefore = _context.clock().now() + 10*60*1000 - BuildRequestor.REQUEST_TIMEOUT;
for (Iterator<PooledTunnelCreatorConfig> iter = _currentlyBuildingMap.values().iterator(); iter.hasNext(); ) {
PooledTunnelCreatorConfig cfg = iter.next();
if (cfg.getExpiration() <= expireBefore) {
// save them for another minute
_recentlyBuildingMap.putIfAbsent(Long.valueOf(cfg.getReplyMessageId()), cfg);
iter.remove();
if (expired == null)
expired = new ArrayList();
expired.add(cfg);
}
concurrent = _currentlyBuilding.size();
allowed -= concurrent;
}
concurrent = _currentlyBuildingMap.size();
allowed -= concurrent;
if (expired != null) {
for (int i = 0; i < expired.size(); i++) {
@ -111,7 +131,7 @@ class BuildExecutor implements Runnable {
// Look up peer
Hash peer = cfg.getPeer(iPeer);
// Avoid recording ourselves
if (peer.toBase64().equals(_context.routerHash().toBase64()))
if (peer.equals(_context.routerHash()))
continue;
// Look up routerInfo
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(peer);
@ -303,9 +323,6 @@ class BuildExecutor implements Runnable {
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Configuring new tunnel " + i + " for " + pool + ": " + cfg);
synchronized (_currentlyBuilding) {
_currentlyBuilding.add(cfg);
}
buildTunnel(pool, cfg);
realBuilt++;
@ -400,9 +417,6 @@ class BuildExecutor implements Runnable {
if (cfg != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Configuring short tunnel " + i + " for " + pool + ": " + cfg);
synchronized (_currentlyBuilding) {
_currentlyBuilding.add(cfg);
}
buildTunnel(pool, cfg);
if (cfg.getLength() > 1) {
allowed--; // oops... shouldn't have done that, but hey, its not that bad...
@ -422,6 +436,15 @@ class BuildExecutor implements Runnable {
void buildTunnel(TunnelPool pool, PooledTunnelCreatorConfig cfg) {
long beforeBuild = System.currentTimeMillis();
if (cfg.getLength() > 1) {
// should we allow an ID of 0?
cfg.setReplyMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
if (addToBuilding(cfg)) {
_log.error("Dup reply ID: " + cfg.getReplyMessageId());
// fail
return;
}
}
BuildRequestor.request(_context, pool, cfg, this);
long buildTime = System.currentTimeMillis() - beforeBuild;
if (cfg.getLength() <= 1)
@ -445,8 +468,9 @@ class BuildExecutor implements Runnable {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Build complete for " + cfg);
pool.buildComplete(cfg);
if (cfg.getLength() > 1)
removeFromBuilding(cfg.getReplyMessageId());
synchronized (_currentlyBuilding) {
_currentlyBuilding.remove(cfg);
_currentlyBuilding.notifyAll();
}
@ -479,6 +503,41 @@ class BuildExecutor implements Runnable {
_log.info(tunnel + ": Peer " + peer.toBase64() + " did not reply to the tunnel join request");
}
List locked_getCurrentlyBuilding() { return _currentlyBuilding; }
/**
* Only do this for non-fallback tunnels.
* @return true if refused because of a duplicate key
*/
private boolean addToBuilding(PooledTunnelCreatorConfig cfg) {
//_log.error("Adding ID: " + cfg.getReplyMessageId() + "; size was: " + _currentlyBuildingMap.size());
return _currentlyBuildingMap.putIfAbsent(Long.valueOf(cfg.getReplyMessageId()), cfg) != null;
}
/**
* This returns the PTCC up to a minute after it 'expired', thus allowing us to
* still use a tunnel if it was accepted, and to update peer stats.
* This means that manager.buildComplete() could be called more than once, and
* a build can be failed or successful after it was timed out,
* which will affect the stats and profiles.
* But that's ok. A peer that rejects slowly gets penalized twice, for example.
*
* @return ptcc or null
*/
PooledTunnelCreatorConfig removeFromBuilding(long id) {
//_log.error("Removing ID: " + id + "; size was: " + _currentlyBuildingMap.size());
Long key = Long.valueOf(id);
PooledTunnelCreatorConfig rv = _currentlyBuildingMap.remove(key);
if (rv != null)
return rv;
rv = _recentlyBuildingMap.remove(key);
if (rv != null) {
long requestedOn = rv.getExpiration() - 10*60*1000;
long rtt = _context.clock().now() - requestedOn;
_context.statManager().addRateData("tunnel.buildReplySlow", rtt, 0);
if (_log.shouldLog(Log.WARN))
_log.warn("Got reply late (rtt = " + rtt + ") for: " + rv);
}
return rv;
}
public int getInboundBuildQueueSize() { return _handler.getInboundBuildQueueSize(); }
}

View File

@ -16,6 +16,8 @@ import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.TunnelBuildMessage;
import net.i2p.data.i2np.TunnelBuildReplyMessage;
import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.data.i2np.VariableTunnelBuildMessage;
import net.i2p.data.i2np.VariableTunnelBuildReplyMessage;
import net.i2p.router.HandlerJobBuilder;
import net.i2p.router.Job;
import net.i2p.router.JobImpl;
@ -87,12 +89,18 @@ class BuildHandler {
_context.statManager().createRateStat("tunnel.receiveRejectionTransient", "How often we are rejected due to transient overload?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("tunnel.receiveRejectionBandwidth", "How often we are rejected due to bandwidth overload?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("tunnel.receiveRejectionCritical", "How often we are rejected due to critical failure?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
_context.statManager().createRateStat("tunnel.corruptBuildReply", "", "Tunnels", new long[] { 24*60*60*1000l });
_processor = new BuildMessageProcessor(ctx);
_buildMessageHandlerJob = new TunnelBuildMessageHandlerJob(ctx);
_buildReplyMessageHandlerJob = new TunnelBuildReplyMessageHandlerJob(ctx);
ctx.inNetMessagePool().registerHandlerJobBuilder(TunnelBuildMessage.MESSAGE_TYPE, new TunnelBuildMessageHandlerJobBuilder());
ctx.inNetMessagePool().registerHandlerJobBuilder(TunnelBuildReplyMessage.MESSAGE_TYPE, new TunnelBuildReplyMessageHandlerJobBuilder());
TunnelBuildMessageHandlerJobBuilder tbmhjb = new TunnelBuildMessageHandlerJobBuilder();
TunnelBuildReplyMessageHandlerJobBuilder tbrmhjb = new TunnelBuildReplyMessageHandlerJobBuilder();
ctx.inNetMessagePool().registerHandlerJobBuilder(TunnelBuildMessage.MESSAGE_TYPE, tbmhjb);
ctx.inNetMessagePool().registerHandlerJobBuilder(TunnelBuildReplyMessage.MESSAGE_TYPE, tbrmhjb);
ctx.inNetMessagePool().registerHandlerJobBuilder(VariableTunnelBuildMessage.MESSAGE_TYPE, tbmhjb);
ctx.inNetMessagePool().registerHandlerJobBuilder(VariableTunnelBuildReplyMessage.MESSAGE_TYPE, tbrmhjb);
}
private static final int MAX_HANDLE_AT_ONCE = 2;
@ -219,28 +227,13 @@ class BuildHandler {
private void handleReply(BuildReplyMessageState state) {
// search through the tunnels for a reply
long replyMessageId = state.msg.getUniqueId();
PooledTunnelCreatorConfig cfg = null;
List building = _exec.locked_getCurrentlyBuilding();
PooledTunnelCreatorConfig cfg = _exec.removeFromBuilding(replyMessageId);
StringBuilder buf = null;
synchronized (building) {
for (int i = 0; i < building.size(); i++) {
PooledTunnelCreatorConfig cur = (PooledTunnelCreatorConfig)building.get(i);
if (cur.getReplyMessageId() == replyMessageId) {
building.remove(i);
cfg = cur;
break;
}
}
if ( (cfg == null) && (_log.shouldLog(Log.DEBUG)) )
buf = new StringBuilder(building.toString());
}
if (cfg == null) {
// cannot handle - not pending... took too long?
if (_log.shouldLog(Log.WARN))
_log.warn("The reply " + replyMessageId + " did not match any pending tunnels");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Pending tunnels: " + buf.toString());
_context.statManager().addRateData("tunnel.buildReplyTooSlow", 1, 0);
} else {
handleReply(state.msg, cfg, System.currentTimeMillis()-state.recvTime);
@ -253,14 +246,19 @@ class BuildHandler {
if (_log.shouldLog(Log.INFO))
_log.info(msg.getUniqueId() + ": Handling the reply after " + rtt + ", delayed " + delay + " waiting for " + cfg);
BuildReplyHandler handler = new BuildReplyHandler();
List order = cfg.getReplyOrder();
int statuses[] = handler.decrypt(_context, msg, cfg, order);
List<Integer> order = cfg.getReplyOrder();
int statuses[] = BuildReplyHandler.decrypt(_context, msg, cfg, order);
if (statuses != null) {
boolean allAgree = true;
// For each peer in the tunnel
for (int i = 0; i < cfg.getLength(); i++) {
Hash peer = cfg.getPeer(i);
// If this tunnel member is us, skip this record, don't update profile or stats
// for ourselves, we always agree
// Why must we save a slot for ourselves anyway?
if (peer.equals(_context.routerHash()))
continue;
int record = order.indexOf(Integer.valueOf(i));
if (record < 0) {
_log.error("Bad status index " + i);
@ -268,9 +266,9 @@ class BuildHandler {
_exec.buildComplete(cfg, cfg.getTunnelPool());
return;
}
int howBad = statuses[record];
// If this tunnel member isn't ourselves
if (!peer.toBase64().equals(_context.routerHash().toBase64())) {
// Look up routerInfo
RouterInfo ri = _context.netDb().lookupRouterInfoLocally(peer);
// Default and detect bandwidth tier
@ -285,7 +283,6 @@ class BuildHandler {
}
if (_log.shouldLog(Log.INFO))
_log.info(msg.getUniqueId() + ": Peer " + peer.toBase64() + " replied with status " + howBad);
}
if (howBad == 0) {
// w3wt
@ -338,6 +335,7 @@ class BuildHandler {
} else {
if (_log.shouldLog(Log.WARN))
_log.warn(msg.getUniqueId() + ": Tunnel reply could not be decrypted for tunnel " + cfg);
_context.statManager().addRateData("tunnel.corruptBuildReply", 1, 0);
// don't leak
_exec.buildComplete(cfg, cfg.getTunnelPool());
}
@ -403,8 +401,13 @@ class BuildHandler {
* This request is actually a reply, process it as such
*/
private void handleRequestAsInboundEndpoint(BuildEndMessageState state) {
TunnelBuildReplyMessage msg = new TunnelBuildReplyMessage(_context);
for (int i = 0; i < TunnelBuildMessage.RECORD_COUNT; i++)
int records = state.msg.getRecordCount();
TunnelBuildReplyMessage msg;
if (records == TunnelBuildMessage.MAX_RECORD_COUNT)
msg = new TunnelBuildReplyMessage(_context);
else
msg = new VariableTunnelBuildReplyMessage(_context, records);
for (int i = 0; i < records; i++)
msg.setRecord(i, state.msg.getRecord(i));
msg.setUniqueId(state.msg.getUniqueId());
handleReply(msg, state.cfg, System.currentTimeMillis() - state.recvTime);
@ -490,7 +493,6 @@ class BuildHandler {
* If we did credit the reply to the tunnel, it would
* prevent the classification of the tunnel as 'inactive' on tunnels.jsp.
*/
@SuppressWarnings("static-access")
private void handleReq(RouterInfo nextPeerInfo, BuildMessageState state, BuildRequestRecord req, Hash nextPeer) {
long ourId = req.readReceiveTunnelId();
long nextId = req.readNextTunnelId();
@ -613,7 +615,8 @@ class BuildHandler {
}
byte reply[] = BuildResponseRecord.create(_context, response, req.readReplyKey(), req.readReplyIV(), state.msg.getUniqueId());
for (int j = 0; j < TunnelBuildMessage.RECORD_COUNT; j++) {
int records = state.msg.getRecordCount();
for (int j = 0; j < records; j++) {
if (state.msg.getRecord(j) == null) {
ourSlot = j;
state.msg.setRecord(j, new ByteArray(reply));
@ -648,9 +651,12 @@ class BuildHandler {
} else {
// send it to the reply tunnel on the reply peer within a new TunnelBuildReplyMessage
// (enough layers jrandom?)
TunnelBuildReplyMessage replyMsg = new TunnelBuildReplyMessage(_context);
/* FIXME Accessing static field "RECORD_COUNT" FIXME */
for (int i = 0; i < state.msg.RECORD_COUNT; i++)
TunnelBuildReplyMessage replyMsg;
if (records == TunnelBuildMessage.MAX_RECORD_COUNT)
replyMsg = new TunnelBuildReplyMessage(_context);
else
replyMsg = new VariableTunnelBuildReplyMessage(_context, records);
for (int i = 0; i < records; i++)
replyMsg.setRecord(i, state.msg.getRecord(i));
replyMsg.setUniqueId(req.readReplyMessageId());
replyMsg.setMessageExpiration(_context.clock().now() + 10*1000);
@ -693,28 +699,16 @@ class BuildHandler {
// need to figure out if this is a reply to an inbound tunnel request (where we are the
// endpoint, receiving the request at the last hop)
long reqId = receivedMessage.getUniqueId();
PooledTunnelCreatorConfig cfg = null;
List building = _exec.locked_getCurrentlyBuilding();
List ids = new ArrayList();
synchronized (building) {
for (int i = 0; i < building.size(); i++) {
PooledTunnelCreatorConfig cur = (PooledTunnelCreatorConfig)building.get(i);
ids.add(new Long(cur.getReplyMessageId()));
if ( (cur.isInbound()) && (cur.getReplyMessageId() == reqId) ) {
building.remove(i);
cfg = cur;
break;
} else if (cur.getReplyMessageId() == reqId) {
_log.error("received it, but its not inbound? " + cur);
}
}
}
PooledTunnelCreatorConfig cfg = _exec.removeFromBuilding(reqId);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive tunnel build message " + reqId + " from "
+ (from != null ? from.calculateHash().toBase64() : fromHash != null ? fromHash.toBase64() : "tunnels")
+ ", waiting ids: " + ids + ", found matching tunnel? " + (cfg != null),
null);//new Exception("source"));
+ ", found matching tunnel? " + (cfg != null));
if (cfg != null) {
if (!cfg.isInbound()) {
// shouldnt happen - should we put it back?
_log.error("received it, but its not inbound? " + cfg);
}
BuildEndMessageState state = new BuildEndMessageState(cfg, receivedMessage);
if (HANDLE_REPLIES_INLINE) {
handleRequestAsInboundEndpoint(state);

View File

@ -12,18 +12,20 @@ import net.i2p.data.RouterInfo;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.TunnelBuildMessage;
import net.i2p.data.i2np.VariableTunnelBuildMessage;
import net.i2p.router.JobImpl;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
import net.i2p.router.tunnel.BuildMessageGenerator;
import net.i2p.util.Log;
import net.i2p.util.VersionComparator;
/**
*
*/
class BuildRequestor {
private static final List ORDER = new ArrayList(BuildMessageGenerator.ORDER.length);
private static final List<Integer> ORDER = new ArrayList(BuildMessageGenerator.ORDER.length);
static {
for (int i = 0; i < BuildMessageGenerator.ORDER.length; i++)
ORDER.add(Integer.valueOf(i));
@ -40,7 +42,13 @@ class BuildRequestor {
*
*/
static final int REQUEST_TIMEOUT = 13*1000;
/** make this shorter than REQUEST_TIMEOUT */
private static final int FIRST_HOP_TIMEOUT = 10*1000;
/** some randomization is added on to this */
private static final int BUILD_MSG_TIMEOUT = 60*1000;
private static boolean usePairedTunnels(RouterContext ctx) {
String val = ctx.getProperty("router.usePairedTunnels");
if ( (val == null) || (Boolean.valueOf(val).booleanValue()) )
@ -50,7 +58,7 @@ class BuildRequestor {
}
/** new style requests need to fill in the tunnel IDs before hand */
public static void prepare(RouterContext ctx, PooledTunnelCreatorConfig cfg) {
private static void prepare(RouterContext ctx, PooledTunnelCreatorConfig cfg) {
for (int i = 0; i < cfg.getLength(); i++) {
if ( (!cfg.isInbound()) && (i == 0) ) {
// outbound gateway (us) doesn't receive on a tunnel id
@ -67,8 +75,14 @@ class BuildRequestor {
cfg.getConfig(i).setReplyIV(new ByteArray(iv));
cfg.getConfig(i).setReplyKey(ctx.keyGenerator().generateSessionKey());
}
cfg.setReplyMessageId(ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE));
// This is in BuildExecutor.buildTunnel() now
// And it was overwritten by the one in createTunnelBuildMessage() anyway!
//cfg.setReplyMessageId(ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE));
}
/**
* @param cfg ReplyMessageId must be set
*/
public static void request(RouterContext ctx, TunnelPool pool, PooledTunnelCreatorConfig cfg, BuildExecutor exec) {
// new style crypto fills in all the blanks, while the old style waits for replies to fill in the next hop, etc
prepare(ctx, cfg);
@ -136,8 +150,12 @@ class BuildRequestor {
+ " with msgId=" + msg.getUniqueId());
// send it directly to the first hop
OutNetMessage outMsg = new OutNetMessage(ctx);
// Todo: add some fuzz to the expiration to make it harder to guess how many hops?
outMsg.setExpiration(msg.getMessageExpiration());
// Add some fuzz to the TBM expiration to make it harder to guess how many hops
// or placement in the tunnel
msg.setMessageExpiration(ctx.clock().now() + BUILD_MSG_TIMEOUT + ctx.random().nextLong(20*1000));
// We set the OutNetMessage expiration much shorter, so that the
// TunnelBuildFirstHopFailJob fires before the 13s build expiration.
outMsg.setExpiration(ctx.clock().now() + FIRST_HOP_TIMEOUT);
outMsg.setMessage(msg);
outMsg.setPriority(PRIORITY);
RouterInfo peer = ctx.netDb().lookupRouterInfoLocally(cfg.getPeer(1));
@ -156,33 +174,97 @@ class BuildRequestor {
+ "ms and dispatched in " + (System.currentTimeMillis()-beforeDispatch));
}
private static final String MIN_VARIABLE_VERSION = "0.7.12";
/** change this to true in 0.7.13 if testing goes well */
private static final boolean SEND_VARIABLE = false;
/** 5 (~2600 bytes) fits nicely in 3 tunnel messages */
private static final int SHORT_RECORDS = 5;
private static final int LONG_RECORDS = TunnelBuildMessage.MAX_RECORD_COUNT;
private static final VersionComparator _versionComparator = new VersionComparator();
private static final List<Integer> SHORT_ORDER = new ArrayList(SHORT_RECORDS);
static {
for (int i = 0; i < SHORT_RECORDS; i++)
SHORT_ORDER.add(Integer.valueOf(i));
}
private static boolean supportsVariable(RouterContext ctx, Hash h) {
RouterInfo ri = ctx.netDb().lookupRouterInfoLocally(h);
if (ri == null)
return false;
String v = ri.getOption("router.version");
if (v == null)
return false;
return _versionComparator.compare(v, MIN_VARIABLE_VERSION) >= 0;
}
/**
* If the tunnel is short enough, and everybody in the tunnel, and the
* OBEP or IBGW for the paired tunnel, all support the new variable-sized tunnel build message,
* then use that, otherwise the old 8-entry version.
*/
private static TunnelBuildMessage createTunnelBuildMessage(RouterContext ctx, TunnelPool pool, PooledTunnelCreatorConfig cfg, TunnelInfo pairedTunnel, BuildExecutor exec) {
Log log = ctx.logManager().getLog(BuildRequestor.class);
long replyTunnel = 0;
Hash replyRouter = null;
boolean useVariable = SEND_VARIABLE && cfg.getLength() <= SHORT_RECORDS;
if (cfg.isInbound()) {
replyTunnel = 0;
//replyTunnel = 0; // as above
replyRouter = ctx.routerHash();
if (useVariable) {
// check the reply OBEP and all the tunnel peers except ourselves
if (!supportsVariable(ctx, pairedTunnel.getPeer(pairedTunnel.getLength() - 1))) {
useVariable = false;
} else {
for (int i = 0; i < cfg.getLength() - 1; i++) {
if (!supportsVariable(ctx, cfg.getPeer(i))) {
useVariable = false;
break;
}
}
}
}
} else {
replyTunnel = pairedTunnel.getReceiveTunnelId(0).getTunnelId();
replyRouter = pairedTunnel.getPeer(0);
if (useVariable) {
// check the reply IBGW and all the tunnel peers except ourselves
if (!supportsVariable(ctx, replyRouter)) {
useVariable = false;
} else {
for (int i = 1; i < cfg.getLength() - 1; i++) {
if (!supportsVariable(ctx, cfg.getPeer(i))) {
useVariable = false;
break;
}
}
}
}
}
// populate and encrypt the message
BuildMessageGenerator gen = new BuildMessageGenerator();
TunnelBuildMessage msg = new TunnelBuildMessage(ctx);
TunnelBuildMessage msg;
List<Integer> order;
if (useVariable) {
msg = new VariableTunnelBuildMessage(ctx, SHORT_RECORDS);
order = new ArrayList(SHORT_ORDER);
if (log.shouldLog(Log.INFO))
log.info("Using new VTBM");
} else {
msg = new TunnelBuildMessage(ctx);
order = new ArrayList(ORDER);
}
long replyMessageId = ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE);
cfg.setReplyMessageId(replyMessageId);
// This is in BuildExecutor.buildTunnel() now
//long replyMessageId = ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE);
//cfg.setReplyMessageId(replyMessageId);
List order = new ArrayList(ORDER);
Collections.shuffle(order, ctx.random()); // randomized placement within the message
cfg.setReplyOrder(order);
if (log.shouldLog(Log.DEBUG))
log.debug("Build order: " + order + " for " + cfg);
for (int i = 0; i < BuildMessageGenerator.ORDER.length; i++) {
for (int i = 0; i < msg.getRecordCount(); i++) {
int hop = ((Integer)order.get(i)).intValue();
PublicKey key = null;
@ -202,9 +284,9 @@ class BuildRequestor {
}
if (log.shouldLog(Log.DEBUG))
log.debug(cfg.getReplyMessageId() + ": record " + i + "/" + hop + " has key " + key);
gen.createRecord(i, hop, msg, cfg, replyRouter, replyTunnel, ctx, key);
BuildMessageGenerator.createRecord(i, hop, msg, cfg, replyRouter, replyTunnel, ctx, key);
}
gen.layeredEncrypt(ctx, msg, cfg, order);
BuildMessageGenerator.layeredEncrypt(ctx, msg, cfg, order);
return msg;
}