propagate from branch 'i2p.i2p.zzz.test2' (head ec8e362ce8b93280b518c599a3cc075b89085d2b)

to branch 'i2p.i2p' (head c9b1eef91f61f4482ad11c4f2b2d01be67a17ad2)
This commit is contained in:
zzz
2013-12-10 02:26:32 +00:00
54 changed files with 362 additions and 281 deletions

View File

@ -229,13 +229,28 @@ public class Router implements RouterClock.ClockShiftListener {
// for the ping file
// Check for other router but do not start a thread yet so the update doesn't cause
// a NCDFE
if (!isOnlyRouterRunning()) {
_eventLog.addEvent(EventLog.ABORTED, "Another router running");
System.err.println("ERROR: There appears to be another router already running!");
System.err.println(" Please make sure to shut down old instances before starting up");
System.err.println(" a new one. If you are positive that no other instance is running,");
System.err.println(" please delete the file " + getPingFile().getAbsolutePath());
System.exit(-1);
for (int i = 0; i < 14; i++) {
// Wrapper can start us up too quickly after a crash, the ping file
// may still be less than LIVELINESS_DELAY (60s) old.
// So wait at least 60s to be sure.
if (isOnlyRouterRunning()) {
if (i > 0)
System.err.println("INFO: No, there wasn't another router already running. Proceeding with startup.");
break;
}
if (i < 13) {
if (i == 0)
System.err.println("WARN: There may be another router already running. Waiting a while to be sure...");
// yes this is ugly to sleep in the constructor.
try { Thread.sleep(5000); } catch (InterruptedException ie) {}
} else {
_eventLog.addEvent(EventLog.ABORTED, "Another router running");
System.err.println("ERROR: There appears to be another router already running!");
System.err.println(" Please make sure to shut down old instances before starting up");
System.err.println(" a new one. If you are positive that no other instance is running,");
System.err.println(" please delete the file " + getPingFile().getAbsolutePath());
System.exit(-1);
}
}
if (_config.get("router.firstVersion") == null) {

View File

@ -289,14 +289,22 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi
if ( (message.getLeaseSet() == null) || (message.getPrivateKey() == null) || (message.getSigningPrivateKey() == null) ) {
if (_log.shouldLog(Log.ERROR))
_log.error("Null lease set granted: " + message);
_runner.disconnectClient("Invalid CreateLeaseSetMessage");
return;
}
_context.keyManager().registerKeys(message.getLeaseSet().getDestination(), message.getSigningPrivateKey(), message.getPrivateKey());
try {
_context.netDb().publish(message.getLeaseSet());
} catch (IllegalArgumentException iae) {
if (_log.shouldLog(Log.ERROR))
_log.error("Invalid leaseset from client", iae);
_runner.disconnectClient("Invalid leaseset: " + iae);
return;
}
if (_log.shouldLog(Log.INFO))
_log.info("New lease set granted for destination "
+ message.getLeaseSet().getDestination().calculateHash().toBase64());
_context.keyManager().registerKeys(message.getLeaseSet().getDestination(), message.getSigningPrivateKey(), message.getPrivateKey());
_context.netDb().publish(message.getLeaseSet());
// leaseSetCreated takes care of all the LeaseRequestState stuff (including firing any jobs)
_runner.leaseSetCreated(message.getLeaseSet());

View File

@ -527,14 +527,17 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
private static final long PUBLISH_DELAY = 3*1000;
public void publish(LeaseSet localLeaseSet) {
/**
* @throws IllegalArgumentException if the leaseSet is not valid
*/
public void publish(LeaseSet localLeaseSet) throws IllegalArgumentException {
if (!_initialized) return;
Hash h = localLeaseSet.getDestination().calculateHash();
try {
store(h, localLeaseSet);
} catch (IllegalArgumentException iae) {
_log.error("wtf, locally published leaseSet is not valid?", iae);
return;
throw iae;
}
if (!_context.clientManager().shouldPublishLeaseSet(h))
return;

View File

@ -25,6 +25,7 @@ import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
@ -89,6 +90,24 @@ class PeerManager {
super(_context.simpleTimer2(), REORGANIZE_TIME);
}
public void timeReached() {
(new ReorgThread(this)).start();
}
}
/**
* This takes too long to run on the SimpleTimer2 queue
* @since 0.9.10
*/
private class ReorgThread extends I2PThread {
private SimpleTimer2.TimedEvent _event;
public ReorgThread(SimpleTimer2.TimedEvent event) {
super("PeerManager Reorg");
setDaemon(true);
_event = event;
}
public void run() {
long start = System.currentTimeMillis();
try {
_organizer.reorganize(true);
@ -104,7 +123,7 @@ class PeerManager {
delay = REORGANIZE_TIME_MEDIUM;
else
delay = REORGANIZE_TIME;
schedule(delay);
_event.schedule(delay);
}
}

View File

@ -25,6 +25,7 @@ import net.i2p.router.RouterContext;
import net.i2p.router.transport.udp.UDPTransport;
import net.i2p.router.util.EventLog;
import net.i2p.util.Addresses;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
@ -222,6 +223,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
/* We hope the routerinfos are read in and things have settled down by now, but it's not required to be so */
private static final int START_DELAY = 5*60*1000;
private static final int LOOKUP_TIME = 30*60*1000;
private void startGeoIP() {
_context.simpleScheduler().addEvent(new QueueAll(), START_DELAY);
}
@ -247,7 +249,26 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
private class Lookup implements SimpleTimer.TimedEvent {
public void timeReached() {
(new LookupThread()).start();
}
}
/**
* This takes too long to run on the SimpleTimer2 queue
* @since 0.9.10
*/
private class LookupThread extends I2PThread {
public LookupThread() {
super("GeoIP Lookup");
setDaemon(true);
}
public void run() {
long start = System.currentTimeMillis();
_geoIP.blockingLookup();
if (_log.shouldLog(Log.INFO))
_log.info("GeoIP lookup took " + (System.currentTimeMillis() - start));
}
}

View File

@ -211,7 +211,7 @@ class EventPumper implements Runnable {
int failsafeInvalid = 0;
// Increase allowed idle time if we are well under allowed connections, otherwise decrease
if (_transport.haveCapacity(60))
if (_transport.haveCapacity(45))
_expireIdleWriteTime = Math.min(_expireIdleWriteTime + 1000, MAX_EXPIRE_IDLE_TIME);
else
_expireIdleWriteTime = Math.max(_expireIdleWriteTime - 3000, MIN_EXPIRE_IDLE_TIME);

View File

@ -11,6 +11,7 @@ import java.util.Map;
import java.util.Set;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
@ -119,9 +120,9 @@ class PeerState {
*/
//private boolean _remoteWantsPreviousACKs;
/** how many bytes should we send to the peer in a second */
private volatile int _sendWindowBytes;
private int _sendWindowBytes;
/** how many bytes can we send to the peer in the current second */
private volatile int _sendWindowBytesRemaining;
private int _sendWindowBytesRemaining;
private long _lastSendRefill;
private int _sendBps;
private int _sendBytes;
@ -225,13 +226,13 @@ class PeerState {
/** Make sure a 4229 byte TunnelBuildMessage can be sent in one volley with small MTU */
private static final int MIN_CONCURRENT_MSGS = 8;
/** how many concurrent outbound messages do we allow throws OutboundMessageFragments to send */
private volatile int _concurrentMessagesAllowed = MIN_CONCURRENT_MSGS;
private int _concurrentMessagesAllowed = MIN_CONCURRENT_MSGS;
/**
* how many outbound messages are currently being transmitted. Not thread safe, as we're not strict
*/
private volatile int _concurrentMessagesActive = 0;
private int _concurrentMessagesActive;
/** how many concurrency rejections have we had in a row */
private volatile int _consecutiveRejections = 0;
private int _consecutiveRejections;
/** is it inbound? **/
private final boolean _isInbound;
/** Last time it was made an introducer **/
@ -436,9 +437,19 @@ class PeerState {
//public boolean getRemoteWantsPreviousACKs() { return _remoteWantsPreviousACKs; }
/** how many bytes should we send to the peer in a second */
public int getSendWindowBytes() { return _sendWindowBytes; }
public int getSendWindowBytes() {
synchronized(_outboundMessages) {
return _sendWindowBytes;
}
}
/** how many bytes can we send to the peer in the current second */
public int getSendWindowBytesRemaining() { return _sendWindowBytesRemaining; }
public int getSendWindowBytesRemaining() {
synchronized(_outboundMessages) {
return _sendWindowBytesRemaining;
}
}
/** what IP is the peer sending and receiving packets on? */
public byte[] getRemoteIP() { return _remoteIP; }
@ -580,20 +591,24 @@ class PeerState {
/** return the smoothed send transfer rate */
public int getSendBps() { return _sendBps; }
public int getReceiveBps() { return _receiveBps; }
public int incrementConsecutiveFailedSends() {
_concurrentMessagesActive--;
if (_concurrentMessagesActive < 0)
_concurrentMessagesActive = 0;
//long now = _context.clock().now()/(10*1000);
//if (_lastFailedSendPeriod >= now) {
// // ignore... too fast
//} else {
// _lastFailedSendPeriod = now;
_consecutiveFailedSends++;
//}
return _consecutiveFailedSends;
synchronized(_outboundMessages) {
_concurrentMessagesActive--;
if (_concurrentMessagesActive < 0)
_concurrentMessagesActive = 0;
//long now = _context.clock().now()/(10*1000);
//if (_lastFailedSendPeriod >= now) {
// // ignore... too fast
//} else {
// _lastFailedSendPeriod = now;
_consecutiveFailedSends++;
//}
return _consecutiveFailedSends;
}
}
public long getInactivityTime() {
long now = _context.clock().now();
long lastActivity = Math.max(_lastReceiveTime, _lastSendFullyTime);
@ -620,15 +635,17 @@ class PeerState {
* returning true if the full size can be decremented, false if it
* cannot. If it is not decremented, the window size remaining is
* not adjusted at all.
*
* Caller should synch
*/
public boolean allocateSendingBytes(int size, int messagePushCount) { return allocateSendingBytes(size, false, messagePushCount); }
private boolean allocateSendingBytes(int size, int messagePushCount) { return allocateSendingBytes(size, false, messagePushCount); }
public boolean allocateSendingBytes(int size, boolean isForACK) { return allocateSendingBytes(size, isForACK, -1); }
//private boolean allocateSendingBytes(int size, boolean isForACK) { return allocateSendingBytes(size, isForACK, -1); }
/**
* Caller should synch
*/
public boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) {
private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) {
long now = _context.clock().now();
long duration = now - _lastSendRefill;
if (duration >= 1000) {
@ -694,9 +711,25 @@ class PeerState {
****/
public int getSlowStartThreshold() { return _slowStartThreshold; }
public int getConcurrentSends() { return _concurrentMessagesActive; }
public int getConcurrentSendWindow() { return _concurrentMessagesAllowed; }
public int getConsecutiveSendRejections() { return _consecutiveRejections; }
public int getConcurrentSends() {
synchronized(_outboundMessages) {
return _concurrentMessagesActive;
}
}
public int getConcurrentSendWindow() {
synchronized(_outboundMessages) {
return _concurrentMessagesAllowed;
}
}
public int getConsecutiveSendRejections() {
synchronized(_outboundMessages) {
return _consecutiveRejections;
}
}
public boolean isInbound() { return _isInbound; }
/** @since IPv6 */
@ -1674,6 +1707,8 @@ class PeerState {
/**
* Have 3 return values, because if allocateSendingBytes() returns false,
* then allocateSend() can stop iterating
*
* Caller should synch
*/
private ShouldSend locked_shouldSend(OutboundMessageState state) {
long now = _context.clock().now();

View File

@ -2846,7 +2846,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public void timeReached() {
// Increase allowed idle time if we are well under allowed connections, otherwise decrease
if (haveCapacity(60)) {
if (haveCapacity(45)) {
long inc;
// don't adjust too quickly if we are looping fast
if (_lastLoopShort)