2006-01-01 jrandom

* Disable multifile torrent creation in I2PSnark's web UI for the moment
      (though it can still seed and participate in multifile swarms)
    * Enable a new speed calculation for profiling peers, using their peak
      1 minute average tunnel throughput as their speed.
This commit is contained in:
jrandom
2006-01-01 17:23:26 +00:00
committed by zzz
parent 76f89ac93c
commit 23723b56ca
13 changed files with 114 additions and 24 deletions

View File

@ -240,7 +240,7 @@ public class I2PSnarkServlet extends HttpServlet {
if ( (announceURLOther != null) && (announceURLOther.trim().length() > "http://.i2p/announce".length()) ) if ( (announceURLOther != null) && (announceURLOther.trim().length() > "http://.i2p/announce".length()) )
announceURL = announceURLOther; announceURL = announceURLOther;
if (baseFile.exists()) { if (baseFile.exists() && baseFile.isFile()) {
try { try {
Storage s = new Storage(baseFile, announceURL, null); Storage s = new Storage(baseFile, announceURL, null);
s.create(); s.create();
@ -258,6 +258,8 @@ public class I2PSnarkServlet extends HttpServlet {
} catch (IOException ioe) { } catch (IOException ioe) {
_manager.addMessage("Error creating a torrent for " + baseFile.getAbsolutePath() + ": " + ioe.getMessage()); _manager.addMessage("Error creating a torrent for " + baseFile.getAbsolutePath() + ": " + ioe.getMessage());
} }
} else if (baseFile.exists()) {
_manager.addMessage("I2PSnark doesn't yet support creating multifile torrents");
} else { } else {
_manager.addMessage("Cannot create a torrent for the nonexistant data: " + baseFile.getAbsolutePath()); _manager.addMessage("Cannot create a torrent for the nonexistant data: " + baseFile.getAbsolutePath());
} }

View File

@ -1,4 +1,10 @@
$Id: history.txt,v 1.375 2005/12/30 18:33:54 jrandom Exp $ $Id: history.txt,v 1.376 2005/12/31 18:40:23 jrandom Exp $
2006-01-01 jrandom
* Disable multifile torrent creation in I2PSnark's web UI for the moment
(though it can still seed and participate in multifile swarms)
* Enable a new speed calculation for profiling peers, using their peak
1 minute average tunnel throughput as their speed.
2005-12-31 jrandom 2005-12-31 jrandom
* Include a simple torrent creator in the I2PSnark web UI * Include a simple torrent creator in the I2PSnark web UI

View File

@ -399,7 +399,7 @@ public class LoadTestManager {
else else
buf.append("[unknown_peer]"); buf.append("[unknown_peer]");
buf.append(" "); buf.append(" ");
TunnelId id = tunnel.getReceiveTunnelId(i); TunnelId id = info.getReceiveTunnelId(i);
if (id != null) if (id != null)
buf.append(id.getTunnelId()); buf.append(id.getTunnelId());
else else
@ -418,7 +418,7 @@ public class LoadTestManager {
else else
buf.append("[unknown_peer]"); buf.append("[unknown_peer]");
buf.append(" "); buf.append(" ");
TunnelId id = tunnel.getReceiveTunnelId(i); TunnelId id = info.getReceiveTunnelId(i);
if (id != null) if (id != null)
buf.append(id.getTunnelId()); buf.append(id.getTunnelId());
else else

View File

@ -60,6 +60,13 @@ public interface ProfileManager {
* *
*/ */
void tunnelTestSucceeded(Hash peer, long responseTimeMs); void tunnelTestSucceeded(Hash peer, long responseTimeMs);
/**
* Note that we were able to push some data through a tunnel that the peer
* is participating in (detected after rtt).
*
*/
void tunnelDataPushed(Hash peer, long rtt, int size);
/** /**
* Note that the peer participated in a tunnel that failed. Its failure may not have * Note that the peer participated in a tunnel that failed. Its failure may not have

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
* *
*/ */
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.322 $ $Date: 2005/12/30 18:33:54 $"; public final static String ID = "$Revision: 1.323 $ $Date: 2005/12/31 18:40:22 $";
public final static String VERSION = "0.6.1.8"; public final static String VERSION = "0.6.1.8";
public final static long BUILD = 6; public final static long BUILD = 7;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);

View File

@ -562,12 +562,20 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
getContext().messageHistory().sendPayloadMessage(dataMsgId, true, sendTime); getContext().messageHistory().sendPayloadMessage(dataMsgId, true, sendTime);
getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, true); getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, true);
_lease.setNumSuccess(_lease.getNumSuccess()+1); _lease.setNumSuccess(_lease.getNumSuccess()+1);
int size = _clientMessageSize;
getContext().statManager().addRateData("client.sendAckTime", sendTime, 0); getContext().statManager().addRateData("client.sendAckTime", sendTime, 0);
getContext().statManager().addRateData("client.sendMessageSize", _clientMessageSize, sendTime); getContext().statManager().addRateData("client.sendMessageSize", _clientMessageSize, sendTime);
if (_outTunnel != null) if (_outTunnel != null) {
for (int i = 0; i < _outTunnel.getLength(); i++) if (_outTunnel.getLength() > 0)
size = ((size + 1023) / 1024) * 1024; // messages are in ~1KB blocks
for (int i = 0; i < _outTunnel.getLength(); i++) {
getContext().profileManager().tunnelTestSucceeded(_outTunnel.getPeer(i), sendTime); getContext().profileManager().tunnelTestSucceeded(_outTunnel.getPeer(i), sendTime);
getContext().profileManager().tunnelDataPushed(_outTunnel.getPeer(i), sendTime, size);
}
}
if (_inTunnel != null) if (_inTunnel != null)
for (int i = 0; i < _inTunnel.getLength(); i++) for (int i = 0; i < _inTunnel.getLength(); i++)
getContext().profileManager().tunnelTestSucceeded(_inTunnel.getPeer(i), sendTime); getContext().profileManager().tunnelTestSucceeded(_inTunnel.getPeer(i), sendTime);

View File

@ -136,12 +136,12 @@ class StoreJob extends JobImpl {
List closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted()); List closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted());
if ( (closestHashes == null) || (closestHashes.size() <= 0) ) { if ( (closestHashes == null) || (closestHashes.size() <= 0) ) {
if (_state.getPending().size() <= 0) { if (_state.getPending().size() <= 0) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.INFO))
_log.warn(getJobId() + ": No more peers left and none pending"); _log.info(getJobId() + ": No more peers left and none pending");
fail(); fail();
} else { } else {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.INFO))
_log.warn(getJobId() + ": No more peers left but some are pending, so keep waiting"); _log.info(getJobId() + ": No more peers left but some are pending, so keep waiting");
return; return;
} }
} else { } else {
@ -152,8 +152,8 @@ class StoreJob extends JobImpl {
Hash peer = (Hash)iter.next(); Hash peer = (Hash)iter.next();
DataStructure ds = _facade.getDataStore().get(peer); DataStructure ds = _facade.getDataStore().get(peer);
if ( (ds == null) || !(ds instanceof RouterInfo) ) { if ( (ds == null) || !(ds instanceof RouterInfo) ) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.INFO))
_log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! " + peer + " : " + ds); _log.info(getJobId() + ": Error selecting closest hash that wasnt a router! " + peer + " : " + ds);
_state.addSkipped(peer); _state.addSkipped(peer);
} else { } else {
int peerTimeout = _facade.getPeerTimeout(peer); int peerTimeout = _facade.getPeerTimeout(peer);
@ -295,10 +295,6 @@ class StoreJob extends JobImpl {
_state.addPending(peer.getIdentity().getHash()); _state.addPending(peer.getIdentity().getHash());
SendSuccessJob onReply = new SendSuccessJob(getContext(), peer);
FailedJob onFail = new FailedJob(getContext(), peer, getContext().clock().now());
StoreMessageSelector selector = new StoreMessageSelector(getContext(), getJobId(), peer, token, expiration);
TunnelInfo outTunnel = selectOutboundTunnel(); TunnelInfo outTunnel = selectOutboundTunnel();
if (outTunnel != null) { if (outTunnel != null) {
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
@ -306,7 +302,11 @@ class StoreJob extends JobImpl {
// + peer.getIdentity().getHash().toBase64()); // + peer.getIdentity().getHash().toBase64());
TunnelId targetTunnelId = null; // not needed TunnelId targetTunnelId = null; // not needed
Job onSend = null; // not wanted Job onSend = null; // not wanted
SendSuccessJob onReply = new SendSuccessJob(getContext(), peer, outTunnel, msg.getMessageSize());
FailedJob onFail = new FailedJob(getContext(), peer, getContext().clock().now());
StoreMessageSelector selector = new StoreMessageSelector(getContext(), getJobId(), peer, token, expiration);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("sending store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + msg); _log.debug("sending store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + msg);
getContext().messageRegistry().registerPending(selector, onReply, onFail, (int)(expiration - getContext().clock().now())); getContext().messageRegistry().registerPending(selector, onReply, onFail, (int)(expiration - getContext().clock().now()));
@ -333,10 +333,20 @@ class StoreJob extends JobImpl {
*/ */
private class SendSuccessJob extends JobImpl implements ReplyJob { private class SendSuccessJob extends JobImpl implements ReplyJob {
private RouterInfo _peer; private RouterInfo _peer;
private TunnelInfo _sendThrough;
private int _msgSize;
public SendSuccessJob(RouterContext enclosingContext, RouterInfo peer) { public SendSuccessJob(RouterContext enclosingContext, RouterInfo peer) {
this(enclosingContext, peer, null, 0);
}
public SendSuccessJob(RouterContext enclosingContext, RouterInfo peer, TunnelInfo sendThrough, int size) {
super(enclosingContext); super(enclosingContext);
_peer = peer; _peer = peer;
_sendThrough = sendThrough;
if (size <= 0)
_msgSize = 0;
else
_msgSize = ((size + 1023) / 1024) * 1024;
} }
public String getName() { return "Kademlia Store Send Success"; } public String getName() { return "Kademlia Store Send Success"; }
@ -348,6 +358,13 @@ class StoreJob extends JobImpl {
getContext().profileManager().dbStoreSent(_peer.getIdentity().getHash(), howLong); getContext().profileManager().dbStoreSent(_peer.getIdentity().getHash(), howLong);
getContext().statManager().addRateData("netDb.ackTime", howLong, howLong); getContext().statManager().addRateData("netDb.ackTime", howLong, howLong);
if ( (_sendThrough != null) && (_msgSize > 0) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("sent a " + _msgSize + "byte netDb message through tunnel " + _sendThrough + " after " + howLong);
for (int i = 0; i < _sendThrough.getLength(); i++)
getContext().profileManager().tunnelDataPushed(_sendThrough.getPeer(i), howLong, _msgSize);
}
if (_state.getCompleteCount() >= getRedundancy()) { if (_state.getCompleteCount() >= getRedundancy()) {
succeed(); succeed();
} else { } else {
@ -375,8 +392,8 @@ class StoreJob extends JobImpl {
_sendOn = sendOn; _sendOn = sendOn;
} }
public void runJob() { public void runJob() {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.INFO))
_log.warn(StoreJob.this.getJobId() + ": Peer " + _peer.getIdentity().getHash().toBase64() _log.info(StoreJob.this.getJobId() + ": Peer " + _peer.getIdentity().getHash().toBase64()
+ " timed out sending " + _state.getTarget()); + " timed out sending " + _state.getTarget());
_state.replyTimeout(_peer.getIdentity().getHash()); _state.replyTimeout(_peer.getIdentity().getHash());
getContext().profileManager().dbStoreFailed(_peer.getIdentity().getHash()); getContext().profileManager().dbStoreFailed(_peer.getIdentity().getHash());
@ -406,8 +423,8 @@ class StoreJob extends JobImpl {
* Send totally failed * Send totally failed
*/ */
protected void fail() { protected void fail() {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.INFO))
_log.warn(getJobId() + ": Failed sending key " + _state.getTarget()); _log.info(getJobId() + ": Failed sending key " + _state.getTarget());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": State of failed send: " + _state, new Exception("Who failed me?")); _log.debug(getJobId() + ": State of failed send: " + _state, new Exception("Who failed me?"));
if (_onFailure != null) if (_onFailure != null)

View File

@ -233,6 +233,13 @@ public class PeerProfile {
_log.info("Updating tunnel test time for " + _peer.toBase64().substring(0,6) _log.info("Updating tunnel test time for " + _peer.toBase64().substring(0,6)
+ " to " + _tunnelTestResponseTimeAvg + " via " + ms); + " to " + _tunnelTestResponseTimeAvg + " via " + ms);
} }
/** bytes per minute */
private volatile double _peakThroughput;
private volatile long _peakThroughputCurrentTotal;
public double getPeakThroughputKBps() { return _peakThroughput / (60d*1024d); }
public void setPeakThroughputKBps(double kBps) { _peakThroughput = kBps*60; }
void dataPushed(int size) { _peakThroughputCurrentTotal += size; }
/** /**
* when the given peer is performing so poorly that we don't want to bother keeping * when the given peer is performing so poorly that we don't want to bother keeping
@ -301,6 +308,22 @@ public class PeerProfile {
_expanded = true; _expanded = true;
} }
private long _lastCoalesceDate = System.currentTimeMillis();
private void coalesceThroughput() {
long now = System.currentTimeMillis();
long measuredPeriod = now - _lastCoalesceDate;
if (measuredPeriod >= 60*1000) {
long tot = _peakThroughputCurrentTotal;
double peak = _peakThroughput;
if (tot >= peak)
_peakThroughput = tot;
_peakThroughputCurrentTotal = 0;
if ( (tot > 0) && _log.shouldLog(Log.WARN) )
_log.warn("updating throughput after " + tot + " to " + (_peakThroughput/60d) + " for " + _peer.toBase64());
_lastCoalesceDate = now;
}
}
/** update the stats and rates (this should be called once a minute) */ /** update the stats and rates (this should be called once a minute) */
public void coalesceStats() { public void coalesceStats() {
if (!_expanded) return; if (!_expanded) return;
@ -316,6 +339,8 @@ public class PeerProfile {
_dbHistory.coalesceStats(); _dbHistory.coalesceStats();
_tunnelHistory.coalesceStats(); _tunnelHistory.coalesceStats();
coalesceThroughput();
_speedValue = calculateSpeed(); _speedValue = calculateSpeed();
_oldSpeedValue = calculateOldSpeed(); _oldSpeedValue = calculateOldSpeed();
_reliabilityValue = calculateReliability(); _reliabilityValue = calculateReliability();

View File

@ -117,6 +117,14 @@ public class ProfileManagerImpl implements ProfileManager {
data.getTunnelTestResponseTimeSlow().addData(responseTimeMs, responseTimeMs); data.getTunnelTestResponseTimeSlow().addData(responseTimeMs, responseTimeMs);
} }
public void tunnelDataPushed(Hash peer, long rtt, int size) {
if (_context.routerHash().equals(peer))
return;
PeerProfile data = getProfile(peer);
if (data != null)
data.dataPushed(size); // ignore rtt, as we are averaging over a minute
}
private int getSlowThreshold() { private int getSlowThreshold() {
// perhaps we should have this compare vs. tunnel.testSuccessTime? // perhaps we should have this compare vs. tunnel.testSuccessTime?
return 5*1000; return 5*1000;

View File

@ -124,7 +124,7 @@ class ProfileOrganizerRenderer {
} }
buf.append("</table>"); buf.append("</table>");
buf.append("<i>Definitions:<ul>"); buf.append("<i>Definitions:<ul>");
buf.append("<li><b>speed</b>: how many round trip messages can we pump through the peer per minute?</li>"); buf.append("<li><b>speed</b>: peak throughput (bytes per second) over a 1 minute period that the peer has sustained</li>");
buf.append("<li><b>capacity</b>: how many tunnels can we ask them to join in an hour?</li>"); buf.append("<li><b>capacity</b>: how many tunnels can we ask them to join in an hour?</li>");
buf.append("<li><b>integration</b>: how many new peers have they told us about lately?</li>"); buf.append("<li><b>integration</b>: how many new peers have they told us about lately?</li>");
buf.append("<li><b>failing?</b>: is the peer currently swamped (and if possible we should avoid nagging them)?</li>"); buf.append("<li><b>failing?</b>: is the peer currently swamped (and if possible we should avoid nagging them)?</li>");

View File

@ -122,6 +122,7 @@ class ProfilePersistenceHelper {
buf.append("lastHeardFrom=").append(profile.getLastHeardFrom()).append(NL); buf.append("lastHeardFrom=").append(profile.getLastHeardFrom()).append(NL);
buf.append("# moving average as to how fast the peer replies").append(NL); buf.append("# moving average as to how fast the peer replies").append(NL);
buf.append("tunnelTestTimeAverage=").append(profile.getTunnelTestTimeAverage()).append(NL); buf.append("tunnelTestTimeAverage=").append(profile.getTunnelTestTimeAverage()).append(NL);
buf.append("tunnelPeakThroughput=").append(profile.getPeakThroughputKBps()).append(NL);
buf.append(NL); buf.append(NL);
out.write(buf.toString().getBytes()); out.write(buf.toString().getBytes());
@ -207,6 +208,7 @@ class ProfilePersistenceHelper {
profile.setLastSendFailed(getLong(props, "lastFailedSend")); profile.setLastSendFailed(getLong(props, "lastFailedSend"));
profile.setLastHeardFrom(getLong(props, "lastHeardFrom")); profile.setLastHeardFrom(getLong(props, "lastHeardFrom"));
profile.setTunnelTestTimeAverage(getDouble(props, "tunnelTestTimeAverage")); profile.setTunnelTestTimeAverage(getDouble(props, "tunnelTestTimeAverage"));
profile.setPeakThroughputKBps(getDouble(props, "tunnelPeakThroughput"));
profile.getTunnelHistory().load(props); profile.getTunnelHistory().load(props);
profile.getDBHistory().load(props); profile.getDBHistory().load(props);

View File

@ -38,6 +38,7 @@ public class SpeedCalculator extends Calculator {
} }
public double calc(PeerProfile profile) { public double calc(PeerProfile profile) {
if (true) return profile.getPeakThroughputKBps()*1024d;
if (true) return calcAverage(profile); if (true) return calcAverage(profile);
long threshold = getEventThreshold(); long threshold = getEventThreshold();
boolean tunnelTestOnly = getUseTunnelTestOnly(); boolean tunnelTestOnly = getUseTunnelTestOnly();

View File

@ -5,6 +5,7 @@ import net.i2p.data.ByteArray;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.util.ByteCache; import net.i2p.util.ByteCache;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.router.RouterContext;
/** /**
* Receive the inbound tunnel message, removing all of the layers * Receive the inbound tunnel message, removing all of the layers
@ -71,6 +72,19 @@ public class InboundEndpointProcessor {
decrypt(_context, _config, iv, orig, offset, length); decrypt(_context, _config, iv, orig, offset, length);
_cache.release(ba); _cache.release(ba);
// now for a little bookkeeping
RouterContext ctx = null;
if (_context instanceof RouterContext)
ctx = (RouterContext)_context;
if ( (ctx != null) && (_config != null) && (_config.getLength() > 0) ) {
int rtt = 0; // dunno... may not be related to an rtt
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received a " + length + "byte message through tunnel " + _config);
for (int i = 0; i < _config.getLength(); i++)
ctx.profileManager().tunnelDataPushed(_config.getPeer(i), rtt, length);
}
return true; return true;
} }