propagate from branch 'i2p.i2p.zzz.test' (head b1e81b14fbaafdc188ae4d312f843c38b65cc310)

to branch 'i2p.i2p' (head 010351f9470b0e699e17447c87daf6c67e5e5dcc)
This commit is contained in:
zzz
2009-08-24 00:06:48 +00:00
22 changed files with 295 additions and 131 deletions

View File

@ -16,9 +16,9 @@ net.i2p.client.streaming.I2PServerSocket#accept} method, which will provide an
application wants to create a new stream to a peer, it should do so with the
appropriate {@link net.i2p.client.streaming.I2PSocketManager#connect} call.</p>
<p>There is a simple pair of demo applications available as well - {@link
net.i2p.client.streaming.StreamSinkServer} listens to a destination and dumps
the data from all sockets it accepts to individual files, while {@link
net.i2p.client.streaming.StreamSinkClient} connects to a particular destination
<p>There is a simple pair of demo applications available as well -
net.i2p.client.streaming.StreamSinkServer listens to a destination and dumps
the data from all sockets it accepts to individual files, while
net.i2p.client.streaming.StreamSinkClient connects to a particular destination
and sends a specific amount of random data then disconnects.</p>
</body></html>

View File

@ -0,0 +1,30 @@
<%@page contentType="text/html"%>
<%@page pageEncoding="UTF-8"%>
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html><head><title>DEBUG</title>
<%@include file="css.jsp" %>
</head><body>
<%@include file="summary.jsp" %>
<div class="main" id="main">
<%
/*
* Quick and easy place to put debugging stuff
*/
net.i2p.router.RouterContext ctx = (net.i2p.router.RouterContext) net.i2p.I2PAppContext.getGlobalContext();
/*
* Print out the status for all the SessionKeyManagers
*/
out.print("<h1>Router SKM</h1>");
ctx.sessionKeyManager().renderStatusHTML(out);
java.util.Set<net.i2p.data.Destination> clients = ctx.clientManager().listClients();
for (net.i2p.data.Destination dest : clients) {
net.i2p.data.Hash h = dest.calculateHash();
net.i2p.crypto.SessionKeyManager skm = ctx.clientManager().getClientSessionKeyManager(h);
if (skm != null) {
out.print("<h1>" + h.toBase64().substring(0,6) + " SKM</h1>");
skm.renderStatusHTML(out);
}
}
%>
</div></body></html>

View File

@ -354,6 +354,7 @@ public class Connection {
*/
}
/*********
private class PingNotifier implements ConnectionManager.PingNotifier {
private long _startedPingOn;
public PingNotifier() {
@ -367,6 +368,7 @@ public class Connection {
_options.updateRTT((int)time*2);
}
}
*********/
List ackPackets(long ackThrough, long nacks[]) {
if (ackThrough < _highestAckedThrough) {
@ -548,20 +550,21 @@ public class Connection {
killOutstandingPackets();
}
/** ignore tag issues */
private void killOutstandingPackets() {
boolean tagsCancelled = false;
//boolean tagsCancelled = false;
synchronized (_outboundPackets) {
for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
PacketLocal pl = (PacketLocal)iter.next();
if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) )
tagsCancelled = true;
//if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) )
// tagsCancelled = true;
pl.cancelled();
}
_outboundPackets.clear();
_outboundPackets.notifyAll();
}
if (tagsCancelled)
_context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
//if (tagsCancelled)
// _context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
}
private class DisconnectEvent implements SimpleTimer.TimedEvent {
@ -1140,12 +1143,12 @@ public class Connection {
// in case things really suck, the other side may have lost thier
// session tags (e.g. they restarted), so jump back to ElGamal.
int failTagsAt = _options.getMaxResends() - 2;
if ( (newWindowSize == 1) && (numSends == failTagsAt) ) {
if (_log.shouldLog(Log.WARN))
_log.warn("Optimistically failing tags at resend " + numSends);
_context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
}
//int failTagsAt = _options.getMaxResends() - 2;
//if ( (newWindowSize == 1) && (numSends == failTagsAt) ) {
// if (_log.shouldLog(Log.WARN))
// _log.warn("Optimistically failing tags at resend " + numSends);
// _context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
//}
if (numSends - 1 > _options.getMaxResends()) {
if (_log.shouldLog(Log.DEBUG))

View File

@ -349,24 +349,35 @@ public class ConnectionManager {
return new HashSet(_connectionByInboundId.values());
}
}
/** blocking */
public boolean ping(Destination peer, long timeoutMs) {
return ping(peer, timeoutMs, true);
return ping(peer, timeoutMs, true, null);
}
public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
return ping(peer, timeoutMs, blocking, null, null, null);
return ping(peer, timeoutMs, blocking, null);
}
/**
* @deprecated I2PSession ignores tags, use non-tag variant
* @param keyToUse ignored
* @param tagsToSend ignored
*/
public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
return ping(peer, timeoutMs, blocking, notifier);
}
public boolean ping(Destination peer, long timeoutMs, boolean blocking, PingNotifier notifier) {
Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
PacketLocal packet = new PacketLocal(_context, peer);
packet.setSendStreamId(id.longValue());
packet.setFlag(Packet.FLAG_ECHO);
packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
packet.setOptionalFrom(_session.getMyDestination());
if ( (keyToUse != null) && (tagsToSend != null) ) {
packet.setKeyUsed(keyToUse);
packet.setTagsSent(tagsToSend);
}
//if ( (keyToUse != null) && (tagsToSend != null) ) {
// packet.setKeyUsed(keyToUse);
// packet.setTagsSent(tagsToSend);
//}
PingRequest req = new PingRequest(peer, packet, notifier);
@ -435,7 +446,7 @@ public class ConnectionManager {
}
public void pong() {
_log.debug("Ping successful");
_context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
//_context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
synchronized (ConnectionManager.PingRequest.this) {
_ponged = true;
ConnectionManager.PingRequest.this.notifyAll();

View File

@ -263,12 +263,12 @@ public class ConnectionPacketHandler {
numResends++;
// ACK the tags we delivered so we can use them
if ( (p.getKeyUsed() != null) && (p.getTagsSent() != null)
&& (p.getTagsSent().size() > 0) ) {
_context.sessionKeyManager().tagsDelivered(p.getTo().getPublicKey(),
p.getKeyUsed(),
p.getTagsSent());
}
//if ( (p.getKeyUsed() != null) && (p.getTagsSent() != null)
// && (p.getTagsSent().size() > 0) ) {
// _context.sessionKeyManager().tagsDelivered(p.getTo().getPublicKey(),
// p.getKeyUsed(),
// p.getTagsSent());
//}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet acked after " + p.getAckTime() + "ms: " + p);
}

View File

@ -47,11 +47,31 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
public Destination getTo() { return _to; }
public void setTo(Destination to) { _to = to; }
/**
* @deprecated should always return null
*/
public SessionKey getKeyUsed() { return _keyUsed; }
public void setKeyUsed(SessionKey key) { _keyUsed = key; }
/**
* @deprecated I2PSession throws out the tags
*/
public void setKeyUsed(SessionKey key) {
if (key != null)
_log.error("Who is sending tags thru the streaming lib?");
_keyUsed = key;
}
/**
* @deprecated should always return null or an empty set
*/
public Set getTagsSent() { return _tagsSent; }
/**
* @deprecated I2PSession throws out the tags
*/
public void setTagsSent(Set tags) {
if (tags != null && tags.size() > 0)
_log.error("Who is sending tags thru the streaming lib? " + tags.size());
if ( (_tagsSent != null) && (_tagsSent.size() > 0) && (tags.size() > 0) ) {
//int old = _tagsSent.size();
//_tagsSent.addAll(tags);

View File

@ -36,16 +36,18 @@ public class PacketQueue {
/**
* Add a new packet to be sent out ASAP
*
* keys and tags disabled since dropped in I2PSession
*/
public void enqueue(PacketLocal packet) {
packet.prepare();
SessionKey keyUsed = packet.getKeyUsed();
if (keyUsed == null)
keyUsed = new SessionKey();
Set tagsSent = packet.getTagsSent();
if (tagsSent == null)
tagsSent = new HashSet(0);
//SessionKey keyUsed = packet.getKeyUsed();
//if (keyUsed == null)
// keyUsed = new SessionKey();
//Set tagsSent = packet.getTagsSent();
//if (tagsSent == null)
// tagsSent = new HashSet(0);
// cache this from before sendMessage
String conStr = null;
@ -92,13 +94,19 @@ public class PacketQueue {
// I2PSessionImpl2
//sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, expires);
// I2PSessionMuxedImpl
sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, expires,
//sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, expires,
// I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
// I2PSessionMuxedImpl no tags
sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null, expires,
I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
else
// I2PSessionImpl2
//sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, 0);
// I2PSessionMuxedImpl
sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent,
//sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent,
// I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
// I2PSessionMuxedImpl no tags
sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null,
I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
end = _context.clock().now();
@ -129,13 +137,11 @@ public class PacketQueue {
if (c != null) // handle race on b0rk
c.disconnect(false);
} else {
packet.setKeyUsed(keyUsed);
packet.setTagsSent(tagsSent);
//packet.setKeyUsed(keyUsed);
//packet.setTagsSent(tagsSent);
packet.incrementSends();
if (_log.shouldLog(Log.DEBUG)) {
String msg = "SEND " + packet + (tagsSent.size() > 0
? " with " + tagsSent.size() + " tags"
: "")
String msg = "SEND " + packet
+ " send # " + packet.getNumSends()
+ " sendTime: " + (end-begin)
+ " con: " + conStr;