merge of 'b002d4a942128fdd4994a2cfba1c554ba9cb81d8'

and 'e6547920e2da9f540c79fcafc7ca7c82d25eae23'
This commit is contained in:
zzz
2010-11-26 15:57:05 +00:00
37 changed files with 607 additions and 383 deletions

View File

@ -191,7 +191,9 @@ public class Router {
// This is here so that we can get the directory location from the context
// for the ping file
if (!beginMarkingLiveliness()) {
// Check for other router but do not start a thread yet so the update doesn't cause
// a NCDFE
if (!isOnlyRouterRunning()) {
System.err.println("ERROR: There appears to be another router already running!");
System.err.println(" Please make sure to shut down old instances before starting up");
System.err.println(" a new one. If you are positive that no other instance is running,");
@ -215,6 +217,11 @@ public class Router {
// overwrite an existing running router's jar files. Other than ours.
installUpdates();
// ********* Start no threads before here ********* //
//
// NOW we can start the ping file thread.
beginMarkingLiveliness();
// Apps may use this as an easy way to determine if they are in the router JVM
// But context.isRouterContext() is even easier...
// Both of these as of 0.7.9
@ -1163,38 +1170,50 @@ public class Router {
// verify the whole thing first
// we could remember this fails, and not bother restarting, but who cares...
boolean ok = FileUtil.verifyZip(updateFile);
if (ok)
ok = FileUtil.extractZip(updateFile, _context.getBaseDir());
if (ok)
System.out.println("INFO: Update installed");
else
System.out.println("ERROR: Update failed!");
if (!ok) {
// we can't leave the file in place or we'll continually restart, so rename it
File bad = new File(_context.getRouterDir(), "BAD-" + UPDATE_FILE);
boolean renamed = updateFile.renameTo(bad);
if (renamed) {
System.out.println("Moved update file to " + bad.getAbsolutePath());
} else {
System.out.println("Deleting file " + updateFile.getAbsolutePath());
ok = true; // so it will be deleted
}
}
if (ok) {
// This may be useful someday. First added in 0.8.2
// Moved above the extract so we don't NCDFE
_config.put("router.updateLastInstalled", "" + System.currentTimeMillis());
saveConfig();
boolean deleted = updateFile.delete();
if (!deleted) {
System.out.println("ERROR: Unable to delete the update file!");
updateFile.deleteOnExit();
}
ok = FileUtil.extractZip(updateFile, _context.getBaseDir());
}
// Very important - we have now trashed our jars.
// After this point, do not use any new I2P classes, or they will fail to load
// and we will die with NCDFE.
// Ideally, do not use I2P classes at all, new or not.
try {
if (ok)
System.out.println("INFO: Update installed");
else
System.out.println("ERROR: Update failed!");
if (!ok) {
// we can't leave the file in place or we'll continually restart, so rename it
File bad = new File(_context.getRouterDir(), "BAD-" + UPDATE_FILE);
boolean renamed = updateFile.renameTo(bad);
if (renamed) {
System.out.println("Moved update file to " + bad.getAbsolutePath());
} else {
System.out.println("Deleting file " + updateFile.getAbsolutePath());
ok = true; // so it will be deleted
}
}
if (ok) {
boolean deleted = updateFile.delete();
if (!deleted) {
System.out.println("ERROR: Unable to delete the update file!");
updateFile.deleteOnExit();
}
}
// exit whether ok or not
if (System.getProperty("wrapper.version") != null)
System.out.println("INFO: Restarting after update");
else
System.out.println("WARNING: Exiting after update, restart I2P");
} catch (Throwable t) {
// hide the NCDFE
// hopefully the update file got deleted or we will loop
}
// exit whether ok or not
if (System.getProperty("wrapper.version") != null)
System.out.println("INFO: Restarting after update");
else
System.out.println("WARNING: Exiting after update, restart I2P");
System.exit(EXIT_HARD_RESTART);
}
}
@ -1230,13 +1249,14 @@ public class Router {
static final long LIVELINESS_DELAY = 60*1000;
/**
* Start a thread that will periodically update the file "router.ping", but if
* Check the file "router.ping", but if
* that file already exists and was recently written to, return false as there is
* another instance running
* another instance running.
*
* @return true if the router is the only one running
* @since 0.8.2
*/
private boolean beginMarkingLiveliness() {
private boolean isOnlyRouterRunning() {
File f = getPingFile();
if (f.exists()) {
long lastWritten = f.lastModified();
@ -1247,12 +1267,20 @@ public class Router {
return false;
}
}
return true;
}
/**
* Start a thread that will periodically update the file "router.ping".
* isOnlyRouterRunning() MUST have been called previously.
*/
private void beginMarkingLiveliness() {
File f = getPingFile();
// not an I2PThread for context creation issues
Thread t = new Thread(new MarkLiveliness(_context, this, f));
t.setName("Mark router liveliness");
t.setDaemon(true);
t.start();
return true;
}
public static final String PROP_BANDWIDTH_SHARE_PERCENTAGE = "router.sharePercentage";

View File

@ -686,6 +686,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
return rv;
}
private static final int MIN_ROUTERS = 90;
/**
* Determine whether this routerInfo will be accepted as valid and current
* given what we know now.
@ -694,9 +696,9 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
String validate(Hash key, RouterInfo routerInfo) throws IllegalArgumentException {
long now = _context.clock().now();
boolean upLongEnough = _context.router().getUptime() > 60*60*1000;
// Once we're over 120 routers, reduce the expiration time down from the default,
// Once we're over MIN_ROUTERS routers, reduce the expiration time down from the default,
// as a crude way of limiting memory usage.
// i.e. at 300 routers the expiration time will be about half the default, etc.
// i.e. at 2*MIN_ROUTERS routers the expiration time will be about half the default, etc.
// And if we're floodfill, we can keep the expiration really short, since
// we are always getting the latest published to us.
// As the net grows this won't be sufficient, and we'll have to implement
@ -708,7 +710,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
// _kb.size() includes leasesets but that's ok
adjustedExpiration = Math.min(ROUTER_INFO_EXPIRATION,
ROUTER_INFO_EXPIRATION_MIN +
((ROUTER_INFO_EXPIRATION - ROUTER_INFO_EXPIRATION_MIN) * 120 / (_kb.size() + 1)));
((ROUTER_INFO_EXPIRATION - ROUTER_INFO_EXPIRATION_MIN) * MIN_ROUTERS / (_kb.size() + 1)));
if (!key.equals(routerInfo.getIdentity().getHash())) {
if (_log.shouldLog(Log.WARN))

View File

@ -149,7 +149,8 @@ public class WorkingDir {
// this one must be after MIGRATE_BASE
success &= migrateJettyXml(oldDirf, dirf);
success &= migrateClientsConfig(oldDirf, dirf);
success &= copy(new File(oldDirf, "docs/news.xml"), new SecureDirectory(dirf, "docs"));
// for later news.xml updates (we don't copy initialNews.xml over anymore)
success &= (new SecureDirectory(dirf, "docs")) .mkdir();
// Report success or failure
if (success) {

View File

@ -234,7 +234,7 @@ public class EstablishState {
System.arraycopy(_X, 0, xy, 0, _X.length);
System.arraycopy(_Y, 0, xy, _X.length, _Y.length);
Hash hxy = _context.sha().calculateHash(xy);
_tsB = _context.clock().now()/1000l; // our (Bob's) timestamp in seconds
_tsB = (_context.clock().now() + 500) / 1000l; // our (Bob's) timestamp in seconds
byte padding[] = new byte[12]; // the encrypted data needs an extra 12 bytes
_context.random().nextBytes(padding);
byte toEncrypt[] = new byte[hxy.getData().length+4+padding.length];
@ -387,7 +387,7 @@ public class EstablishState {
return;
}
_tsB = DataHelper.fromLong(hXY_tsB, Hash.HASH_LENGTH, 4); // their (Bob's) timestamp in seconds
_tsA = _context.clock().now()/1000; // our (Alice's) timestamp in seconds
_tsA = (_context.clock().now() + 500) / 1000; // our (Alice's) timestamp in seconds
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"h(X+Y) is correct, tsA-tsB=" + (_tsA-_tsB));

View File

@ -1035,7 +1035,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
*
*/
private void readMeta(byte unencrypted[]) {
long ourTs = _context.clock().now()/1000;
long ourTs = (_context.clock().now() + 500) / 1000;
long ts = DataHelper.fromLong(unencrypted, 2, 4);
Adler32 crc = new Adler32();
crc.update(unencrypted, 0, unencrypted.length-4);
@ -1068,7 +1068,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
synchronized (_meta) {
_context.random().nextBytes(_meta); // randomize the uninterpreted, then overwrite w/ data
DataHelper.toLong(_meta, 0, 2, 0);
DataHelper.toLong(_meta, 2, 4, _context.clock().now()/1000);
DataHelper.toLong(_meta, 2, 4, (_context.clock().now() + 500) / 1000);
Adler32 crc = new Adler32();
crc.update(_meta, 0, _meta.length-4);
DataHelper.toLong(_meta, _meta.length-4, 4, crc.getValue());

View File

@ -72,9 +72,6 @@ class InboundEstablishState {
_alicePort = remotePort;
_remoteHostId = new RemoteHostId(_aliceIP, _alicePort);
_bobPort = localPort;
_keyBuilder = null;
_verificationAttempted = false;
_complete = false;
_currentState = STATE_UNKNOWN;
_establishBegin = ctx.clock().now();
}

View File

@ -86,13 +86,11 @@ class OutboundEstablishState {
}
_remotePeer = remotePeer;
_introKey = introKey;
_keyBuilder = null;
_queuedMessages = new LinkedBlockingQueue();
_currentState = STATE_UNKNOWN;
_establishBegin = ctx.clock().now();
_remoteAddress = addr;
_introductionNonce = -1;
_complete = false;
prepareSessionRequest();
if ( (addr != null) && (addr.getIntroducerCount() > 0) ) {
if (_log.shouldLog(Log.DEBUG))

View File

@ -47,7 +47,6 @@ class OutboundMessageFragments {
_transport = transport;
// _throttle = throttle;
_activePeers = new ArrayList(256);
_nextPeer = 0;
_builder = new PacketBuilder(ctx, transport);
_alive = true;
// _allowExcess = false;

View File

@ -47,9 +47,6 @@ class OutboundMessageState {
public OutboundMessageState(I2PAppContext context) {
_context = context;
_log = _context.logManager().getLog(OutboundMessageState.class);
_pushCount = 0;
_maxSends = 0;
// _nextSendFragment = 0;
}
public boolean initialize(OutNetMessage msg) {

View File

@ -1084,7 +1084,7 @@ class PacketBuilder {
// header
data[off] = flagByte;
off++;
long now = _context.clock().now() / 1000;
long now = (_context.clock().now() + 500) / 1000;
DataHelper.toLong(data, off, 4, now);
// todo: add support for rekeying and extended options
return packet;

View File

@ -63,8 +63,13 @@ class PeerState {
private boolean _rekeyBeganLocally;
/** when were the current cipher and MAC keys established/rekeyed? */
private long _keyEstablishedTime;
/** how far off is the remote peer from our clock, in milliseconds? */
/**
* How far off is the remote peer from our clock, in milliseconds?
* A positive number means our clock is ahead of theirs.
*/
private long _clockSkew;
/** what is the current receive second, for congestion control? */
private long _currentReceiveSecond;
/** when did we last send them a packet? */
@ -241,36 +246,19 @@ class PeerState {
_context = ctx;
_log = ctx.logManager().getLog(PeerState.class);
_transport = transport;
_remotePeer = null;
_currentMACKey = null;
_currentCipherKey = null;
_nextMACKey = null;
_nextCipherKey = null;
_nextKeyingMaterial = null;
_rekeyBeganLocally = false;
_keyEstablishedTime = -1;
_clockSkew = 0;
_currentReceiveSecond = -1;
_lastSendTime = -1;
_lastReceiveTime = -1;
_currentACKs = new ConcurrentHashSet();
_currentACKsResend = new LinkedBlockingQueue();
_currentSecondECNReceived = false;
_remoteWantsPreviousACKs = false;
_sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES;
_sendWindowBytesRemaining = DEFAULT_SEND_WINDOW_BYTES;
_slowStartThreshold = MAX_SEND_WINDOW_BYTES/2;
_lastSendRefill = _context.clock().now();
_receivePeriodBegin = _lastSendRefill;
_sendBps = 0;
_sendBytes = 0;
_receiveBps = 0;
_lastCongestionOccurred = -1;
_remoteIP = null;
_remotePort = -1;
_remoteRequiresIntroduction = false;
_weRelayToThemAs = 0;
_theyRelayToUsAs = 0;
_mtu = getDefaultMTU();
_mtuReceive = _mtu;
_mtuLastChecked = -1;
@ -278,19 +266,8 @@ class PeerState {
_rto = MIN_RTO;
_rtt = _rto/2;
_rttDeviation = _rtt;
_messagesReceived = 0;
_messagesSent = 0;
_packetsTransmitted = 0;
_packetsRetransmitted = 0;
_packetRetransmissionRate = 0;
_retransmissionPeriodStart = 0;
_packetsReceived = 0;
_packetsReceivedDuplicate = 0;
_inboundMessages = new HashMap(8);
_outboundMessages = new ArrayList(32);
_dead = false;
_isInbound = false;
_lastIntroducerTime = 0;
_context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES);
@ -346,8 +323,13 @@ class PeerState {
public boolean getRekeyBeganLocally() { return _rekeyBeganLocally; }
/** when were the current cipher and MAC keys established/rekeyed? */
public long getKeyEstablishedTime() { return _keyEstablishedTime; }
/** how far off is the remote peer from our clock, in milliseconds? */
/**
* How far off is the remote peer from our clock, in milliseconds?
* A positive number means our clock is ahead of theirs.
*/
public long getClockSkew() { return _clockSkew ; }
/** what is the current receive second, for congestion control? */
public long getCurrentReceiveSecond() { return _currentReceiveSecond; }
/** when did we last send them a packet? */
@ -444,10 +426,17 @@ class PeerState {
public void setRekeyBeganLocally(boolean local) { _rekeyBeganLocally = local; }
/** when were the current cipher and MAC keys established/rekeyed? */
public void setKeyEstablishedTime(long when) { _keyEstablishedTime = when; }
/** how far off is the remote peer from our clock, in milliseconds? */
/**
* Update the moving-average clock skew based on the current difference.
* The raw skew will be adjusted for RTT/2 here.
* @param skew milliseconds, NOT adjusted for RTT.
* A positive number means our clock is ahead of theirs.
*/
public void adjustClockSkew(long skew) {
_clockSkew = (long) (0.9*(float)_clockSkew + 0.1*(float)skew);
_clockSkew = (long) (0.9*(float)_clockSkew + 0.1*(float)(skew - (_rtt / 2)));
}
/** what is the current receive second, for congestion control? */
public void setCurrentReceiveSecond(long sec) { _currentReceiveSecond = sec; }
/** when did we last send them a packet? */
@ -679,6 +668,7 @@ class PeerState {
*
*/
public List<Long> getCurrentFullACKs() {
// no such element exception seen here
ArrayList<Long> rv = new ArrayList(_currentACKs);
// include some for retransmission
rv.addAll(_currentACKsResend);

View File

@ -118,8 +118,6 @@ class PeerTestManager {
_activeTests = new ConcurrentHashMap();
_recentTests = new LinkedBlockingQueue();
_packetBuilder = new PacketBuilder(context, transport);
_currentTest = null;
_currentTestComplete = false;
_context.statManager().createRateStat("udp.statusKnownCharlie", "How often the bob we pick passes us to a charlie we already have a session with?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveTestReply", "How often we get a reply to our peer test?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveTest", "How often we get a packet requesting us to participate in a peer test?", "udp", UDPTransport.RATES);

View File

@ -76,7 +76,6 @@ class TimedWeightedPriorityMessageQueue implements MessageQueue, OutboundMessage
}
_alive = true;
_nextLock = this;
_nextQueue = 0;
_chokedPeers = Collections.synchronizedSet(new HashSet(16));
_listener = lsnr;
_context.statManager().createRateStat("udp.timeToEntrance", "Message lifetime until it reaches the UDP system", "udp", UDPTransport.RATES);

View File

@ -177,7 +177,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_peersByIdent = new ConcurrentHashMap(128);
_peersByRemoteHost = new ConcurrentHashMap(128);
_dropList = new ConcurrentHashSet(2);
_endpoint = null;
// See comments in DQAT.java
if (USE_PRIORITY) {