2004-10-24 jrandom

* Allow explicit inclusion of session tags in the SDK, enabling the
      resending of tags bundled with messages that would not otherwise
      be ACKed.
    * Don't force mode=guaranteed for end to end delivery - if mode=bestEffort
      no DeliveryStatusMessage will be bundled (and as such, client apps using
      it will need to do their own session tag ack/nack).
    * Handle client errors when notifying them of message availability.
    * New StreamSinkSend which sends a file to a destination and disconnects.
    * Update the I2PSocketManagerFactory to build the specific
      I2PSocketManager instance based on the "i2p.streaming.manager" property,
      containing the class name of the I2PSocketManager implementation to instantiate.
This commit is contained in:
jrandom
2004-10-24 23:00:44 +00:00
committed by zzz
parent 40df846e3f
commit 9680effb9f
10 changed files with 236 additions and 18 deletions

View File

@ -13,6 +13,7 @@ import java.net.NoRouteToHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import net.i2p.I2PAppContext;
@ -99,4 +100,6 @@ public interface I2PSocketManager {
public String getName();
public void setName(String name);
public void init(I2PAppContext context, I2PSession session, Properties opts, String name);
}

View File

@ -6,6 +6,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PClient;
import net.i2p.client.I2PClientFactory;
@ -23,6 +24,9 @@ import net.i2p.util.Log;
public class I2PSocketManagerFactory {
private final static Log _log = new Log(I2PSocketManagerFactory.class);
public static final String PROP_MANAGER = "i2p.streaming.manager";
public static final String DEFAULT_MANAGER = "net.i2p.client.streaming.I2PSocketManagerImpl";
/**
* Create a socket manager using a brand new destination connected to the
* I2CP router on the local machine on the default port (7654).
@ -76,23 +80,60 @@ public class I2PSocketManagerFactory {
public static I2PSocketManager createManager(InputStream myPrivateKeyStream, String i2cpHost, int i2cpPort,
Properties opts) {
I2PClient client = I2PClientFactory.createClient();
opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED);
if (true) {
// for the old streaming lib
opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED);
} else {
// for new streaming lib:
opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT);
}
opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost);
opts.setProperty(I2PClient.PROP_TCP_PORT, "" + i2cpPort);
try {
I2PSession session = client.createSession(myPrivateKeyStream, opts);
session.connect();
return createManager(session);
return createManager(session, opts, "manager");
} catch (I2PSessionException ise) {
_log.error("Error creating session for socket manager", ise);
return null;
}
}
private static I2PSocketManager createManager(I2PSession session) {
I2PSocketManagerImpl mgr = new I2PSocketManagerImpl();
mgr.setSession(session);
mgr.setDefaultOptions(new I2PSocketOptions());
return mgr;
private static I2PSocketManager createManager(I2PSession session, Properties opts, String name) {
if (false) {
I2PSocketManagerImpl mgr = new I2PSocketManagerImpl();
mgr.setSession(session);
mgr.setDefaultOptions(new I2PSocketOptions());
return mgr;
} else {
String classname = opts.getProperty(PROP_MANAGER, DEFAULT_MANAGER);
if (classname != null) {
try {
Class cls = Class.forName(classname);
Object obj = cls.newInstance();
if (obj instanceof I2PSocketManager) {
I2PSocketManager mgr = (I2PSocketManager)obj;
I2PAppContext context = I2PAppContext.getGlobalContext();
mgr.init(context, session, opts, name);
return mgr;
} else {
throw new IllegalStateException("Invalid manager class [" + classname + "]");
}
} catch (ClassNotFoundException cnfe) {
_log.error("Error loading " + classname, cnfe);
throw new IllegalStateException("Invalid manager class [" + classname + "] - not found");
} catch (InstantiationException ie) {
_log.error("Error loading " + classname, ie);
throw new IllegalStateException("Invalid manager class [" + classname + "] - unable to instantiate");
} catch (IllegalAccessException iae) {
_log.error("Error loading " + classname, iae);
throw new IllegalStateException("Invalid manager class [" + classname + "] - illegal access");
}
} else {
throw new IllegalStateException("No manager class specified");
}
}
}
}

View File

@ -13,6 +13,7 @@ import java.net.NoRouteToHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import net.i2p.I2PAppContext;
@ -65,13 +66,18 @@ public class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListene
this("SocketManager " + (++__managerId));
}
public I2PSocketManagerImpl(String name) {
init(I2PAppContext.getGlobalContext(), null, null, name);
}
public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
_name = name;
_session = null;
_context = context;
_log = _context.logManager().getLog(I2PSocketManager.class);
_inSockets = new HashMap(16);
_outSockets = new HashMap(16);
_acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
_context = I2PAppContext.getGlobalContext();
_log = _context.logManager().getLog(I2PSocketManager.class);
setSession(session);
setDefaultOptions(new I2PSocketOptions());
_context.statManager().createRateStat("streaming.lifetime", "How long before the socket is closed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.sent", "How many bytes are sent in the stream?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("streaming.received", "How many bytes are received in the stream?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });

View File

@ -0,0 +1,132 @@
package net.i2p.client.streaming;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.util.Random;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.data.Destination;
import net.i2p.data.DataFormatException;
import net.i2p.util.Log;
/**
* Simple streaming lib test app that connects to a given destination and sends
* the contents of a file, then disconnects. See the {@link #main}
*
*/
public class StreamSinkSend {
private Log _log;
private String _sendFile;
private int _writeDelay;
private String _peerDestFile;
/**
* Build the client but don't fire it up.
* @param filename file to send
* @param writeDelayMs how long to wait between each .write (0 for no delay)
* @param serverDestFile file containing the StreamSinkServer's binary Destination
*/
public StreamSinkSend(String filename, int writeDelayMs, String serverDestFile) {
_sendFile = filename;
_writeDelay = writeDelayMs;
_peerDestFile = serverDestFile;
_log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkClient.class);
}
/**
* Actually connect and run the client - this call blocks until completion.
*
*/
public void runClient() {
I2PSocketManager mgr = I2PSocketManagerFactory.createManager();
Destination peer = null;
FileInputStream fis = null;
try {
fis = new FileInputStream(_peerDestFile);
peer = new Destination();
peer.readBytes(fis);
} catch (IOException ioe) {
_log.error("Error finding the peer destination to contact in " + _peerDestFile, ioe);
return;
} catch (DataFormatException dfe) {
_log.error("Peer destination is not valid in " + _peerDestFile, dfe);
return;
} finally {
if (fis == null) try { fis.close(); } catch (IOException ioe) {}
}
System.out.println("Send " + _sendFile + " to " + peer.calculateHash().toBase64());
try {
I2PSocket sock = mgr.connect(peer);
byte buf[] = new byte[32*1024];
OutputStream out = sock.getOutputStream();
long beforeSending = System.currentTimeMillis();
fis = new FileInputStream(_sendFile);
long size = 0;
while (true) {
int read = fis.read(buf);
if (read < 0)
break;
out.write(buf, 0, read);
size += read;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Wrote " + read);
if (_writeDelay > 0) {
try { Thread.sleep(_writeDelay); } catch (InterruptedException ie) {}
}
}
fis.close();
long afterSending = System.currentTimeMillis();
System.out.println("Sent " + (size / 1024) + "KB in " + (afterSending-beforeSending) + "ms");
sock.close();
} catch (InterruptedIOException iie) {
_log.error("Timeout connecting to the peer", iie);
return;
} catch (NoRouteToHostException nrthe) {
_log.error("Unable to connect to the peer", nrthe);
return;
} catch (ConnectException ce) {
_log.error("Connection already dropped", ce);
return;
} catch (I2PException ie) {
_log.error("Error connecting to the peer", ie);
return;
} catch (IOException ioe) {
_log.error("IO error sending", ioe);
return;
}
}
/**
* Fire up the client. <code>Usage: StreamSinkClient sendFile writeDelayMs serverDestFile</code> <br />
* <ul>
* <li><b>sendFile</b>: filename to send</li>
* <li><b>writeDelayMs</b>: how long to wait between each .write (0 for no delay)</li>
* <li><b>serverDestFile</b>: file containing the StreamSinkServer's binary Destination</li>
* </ul>
*/
public static void main(String args[]) {
if (args.length != 3) {
System.out.println("Usage: StreamSinkClient sendFile writeDelayMs serverDestFile");
} else {
int writeDelayMs = -1;
try {
writeDelayMs = Integer.parseInt(args[1]);
} catch (NumberFormatException nfe) {
System.err.println("Write delay ms invalid [" + args[1] + "]");
return;
}
StreamSinkSend client = new StreamSinkSend(args[0], writeDelayMs, args[2]);
client.runClient();
}
}
}

View File

@ -109,10 +109,13 @@ public class StreamSinkServer {
while ( (read = in.read(buf)) != -1) {
_fos.write(buf, 0, read);
}
_log.error("Got EOF from client socket");
} catch (IOException ioe) {
_log.error("Error writing the sink", ioe);
} finally {
if (_fos != null) try { _fos.close(); } catch (IOException ioe) {}
if (_sock != null) try { _sock.close(); } catch (IOException ioe) {}
_log.error("Client socket closed");
}
}
}

View File

@ -386,8 +386,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
}
}
if ( (msgId != null) && (size != null) ) {
if (_sessionListener != null)
_sessionListener.messageAvailable(I2PSessionImpl.this, msgId.intValue(), size.intValue());
if (_sessionListener != null) {
try {
_sessionListener.messageAvailable(I2PSessionImpl.this, msgId.intValue(), size.intValue());
} catch (Exception e) {
_log.log(Log.CRIT, "Error notifying app of message availability", e);
}
}
}
}
}

View File

@ -82,7 +82,8 @@ class I2PSessionImpl2 extends I2PSessionImpl {
// success or accepted). we may want to break this out into a seperate
// attribute, allowing both nonblocking sends and transparently managed keys,
// as well as the nonblocking sends with application managed keys. Later.
if (isGuaranteed() || true) {
if (isGuaranteed() || false) {
//_log.error("sendGuaranteed");
return sendGuaranteed(dest, payload, keyUsed, tagsSent);
}
return sendBestEffort(dest, payload, keyUsed, tagsSent);
@ -111,16 +112,28 @@ class I2PSessionImpl2 extends I2PSessionImpl {
if (key == null) key = _context.sessionKeyManager().createSession(dest.getPublicKey());
SessionTag tag = _context.sessionKeyManager().consumeNextAvailableTag(dest.getPublicKey(), key);
Set sentTags = null;
if (_context.sessionKeyManager().getAvailableTags(dest.getPublicKey(), key) < 10) {
int oldTags = _context.sessionKeyManager().getAvailableTags(dest.getPublicKey(), key);
long availTimeLeft = _context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key);
if (oldTags < 10) {
sentTags = createNewTags(50);
} else if (_context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key) < 30 * 1000) {
//_log.error("** sendBestEffort only had " + oldTags + " adding 50");
} else if (availTimeLeft < 30 * 1000) {
// if we have > 10 tags, but they expire in under 30 seconds, we want more
sentTags = createNewTags(50);
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Tags are almost expired, adding 50 new ones");
//_log.error("** sendBestEffort available time left " + availTimeLeft);
} else {
//_log.error("sendBestEffort old tags: " + oldTags + " available time left: " + availTimeLeft);
}
SessionKey newKey = null;
if (false) // rekey
newKey = _context.keyGenerator().generateSessionKey();
if ( (tagsSent != null) && (tagsSent.size() > 0) ) {
if (sentTags == null)
sentTags = new HashSet();
sentTags.addAll(tagsSent);
}
long nonce = _context.random().nextInt(Integer.MAX_VALUE);
MessageState state = new MessageState(nonce, getPrefix());

View File

@ -207,6 +207,8 @@ class TransientSessionKeyManager extends SessionKeyManager {
sess.addTags(set);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Tags delivered to set " + set + " on session " + sess);
if (sessionTags.size() > 0)
_log.debug("Tags delivered: " + sessionTags.size() + " total = " + sess.availableTags());
}
/**

View File

@ -1,4 +1,17 @@
$Id: history.txt,v 1.54 2004/10/18 14:08:01 jrandom Exp $
$Id: history.txt,v 1.55 2004/10/23 20:42:35 jrandom Exp $
2004-10-24 jrandom
* Allow explicit inclusion of session tags in the SDK, enabling the
resending of tags bundled with messages that would not otherwise
be ACKed.
* Don't force mode=guaranteed for end to end delivery - if mode=bestEffort
no DeliveryStatusMessage will be bundled (and as such, client apps using
it will need to do their own session tag ack/nack).
* Handle client errors when notifying them of message availability.
* New StreamSinkSend which sends a file to a destination and disconnects.
* Update the I2PSocketManagerFactory to build the specific
I2PSocketManager instance based on the "i2p.streaming.manager" property,
containing the class name of the I2PSocketManager to instantiate.
2004-10-23 jrandom
* Minor ministreaming lib refactoring to simplify integration of the full

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
public final static String ID = "$Revision: 1.60 $ $Date: 2004/10/18 14:08:00 $";
public final static String ID = "$Revision: 1.61 $ $Date: 2004/10/23 20:42:35 $";
public final static String VERSION = "0.4.1.3";
public final static long BUILD = 1;
public final static long BUILD = 2;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION);
System.out.println("Router ID: " + RouterVersion.ID);