diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java index 630b0a224..6ae569168 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java @@ -471,14 +471,14 @@ public class SAMStreamSession { ByteCache cache = ByteCache.getInstance(1024, 4); ByteArray ba = cache.acquire(); try { - int sent = 0; + int remaining = size; byte buf[] = ba.getData(); - while (sent < size) { - int read = in.read(buf); + while (remaining > 0) { + int read = in.read(buf, 0, remaining > buf.length ? buf.length : remaining); if (read == -1) - throw new IOException("Insufficient data from the SAM client (" + sent + "/" + size + ")"); + throw new IOException("Insufficient data from the SAM client (" + remaining + "/" + size + ")"); i2pSocketOS.write(buf, 0, read); - sent += read; + remaining -= read; } //i2pSocketOS.flush(); } catch (IOException e) { diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java new file mode 100644 index 000000000..8aa054d08 --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java @@ -0,0 +1,262 @@ +package net.i2p.sam.client; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.FileOutputStream; +import java.io.FileInputStream; +import java.io.OutputStream; +import java.net.Socket; + +import java.util.Map; +import java.util.Collections; +import java.util.HashMap; +import java.util.Properties; +import java.util.StringTokenizer; + +import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; +import net.i2p.util.Log; +import net.i2p.util.I2PThread; + +import net.i2p.sam.client.SAMEventHandler; +import net.i2p.sam.client.SAMClientEventListenerImpl; +import net.i2p.sam.client.SAMReader; + +/** + * Send a file to a peer + * + * Usage: SAMStreamSend samHost samPort peerDestFile dataFile + * + */ +public class SAMStreamSend { + private I2PAppContext _context; + private Log _log; + private String _samHost; + private String _samPort; + private String _destFile; + private String _dataFile; + private String _conOptions; + private Socket _samSocket; + private OutputStream _samOut; + private InputStream _samIn; + private SAMReader _reader; + private boolean _dead; + private SAMEventHandler _eventHandler; + /** Connection id (Integer) to peer (Flooder) */ + private Map _remotePeers; + + public static void main(String args[]) { + if (args.length < 4) { + System.err.println("Usage: SAMStreamSend samHost samPort peerDestFile dataFile"); + return; + } + I2PAppContext ctx = new I2PAppContext(); + String files[] = new String[args.length - 3]; + SAMStreamSend sender = new SAMStreamSend(ctx, args[0], args[1], args[2], args[3]); + sender.startup(); + } + + public SAMStreamSend(I2PAppContext ctx, String samHost, String samPort, String destFile, String dataFile) { + _context = ctx; + _log = ctx.logManager().getLog(SAMStreamSend.class); + _dead = false; + _samHost = samHost; + _samPort = samPort; + _destFile = destFile; + _dataFile = dataFile;; + _conOptions = ""; + _eventHandler = new SendEventHandler(_context); + _remotePeers = new HashMap(); + } + + public void startup() { + _log.debug("Starting up"); + boolean ok = connect(); + _log.debug("Connected: " + ok); + if (ok) { + _reader = new SAMReader(_context, _samIn, _eventHandler); + _reader.startReading(); + _log.debug("Reader created"); + String ourDest = handshake(); + _log.debug("Handshake complete. we are " + ourDest); + if (ourDest != null) { + send(); + } + } + } + + private class SendEventHandler extends SAMEventHandler { + public SendEventHandler(I2PAppContext ctx) { super(ctx); } + public void streamClosedReceived(String result, int id, String message) { + Sender sender = null; + synchronized (_remotePeers) { + sender = (Sender)_remotePeers.remove(new Integer(id)); + } + if (sender != null) { + sender.closed(); + _log.debug("Connection " + sender.getConnectionId() + " closed to " + sender.getDestination()); + } else { + _log.error("wtf, not connected to " + id + " but we were just closed?"); + } + } + } + + private boolean connect() { + try { + _samSocket = new Socket(_samHost, Integer.parseInt(_samPort)); + _samOut = _samSocket.getOutputStream(); + _samIn = _samSocket.getInputStream(); + return true; + } catch (Exception e) { + _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e); + return false; + } + } + + private String handshake() { + synchronized (_samOut) { + try { + _samOut.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes()); + _samOut.flush(); + _log.debug("Hello sent"); + boolean ok = _eventHandler.waitForHelloReply(); + _log.debug("Hello reply found: " + ok); + if (!ok) + throw new IOException("wtf, hello failed?"); + String req = "SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT " + _conOptions + "\n"; + _samOut.write(req.getBytes()); + _samOut.flush(); + _log.debug("Session create sent"); + ok = _eventHandler.waitForSessionCreateReply(); + _log.debug("Session create reply found: " + ok); + + req = "NAMING LOOKUP NAME=ME\n"; + _samOut.write(req.getBytes()); + _samOut.flush(); + _log.debug("Naming lookup sent"); + String destination = _eventHandler.waitForNamingReply("ME"); + _log.debug("Naming lookup reply found: " + destination); + if (destination == null) { + _log.error("No naming lookup reply found!"); + return null; + } else { + _log.info("We are " + destination); + } + return destination; + } catch (Exception e) { + _log.error("Error handshaking", e); + return null; + } + } + } + + private void send() { + Sender sender = new Sender(); + boolean ok = sender.openConnection(); + if (ok) { + I2PThread t = new I2PThread(sender, "Sender"); + t.start(); + } + } + + private class Sender implements Runnable { + private int _connectionId; + private String _remoteDestination; + private InputStream _in; + private boolean _closed; + private long _started; + private long _totalSent; + + public Sender() { + _closed = false; + } + + public boolean openConnection() { + try { + FileInputStream fin = new FileInputStream(_destFile); + byte dest[] = new byte[1024]; + int read = DataHelper.read(fin, dest); + + _remoteDestination = new String(dest, 0, read); + synchronized (_remotePeers) { + _connectionId = _remotePeers.size() + 1; + _remotePeers.put(new Integer(_connectionId), Sender.this); + } + + _context.statManager().createRateStat("send." + _connectionId + ".totalSent", "Data size sent", "swarm", new long[] { 30*1000, 60*1000, 5*60*1000 }); + _context.statManager().createRateStat("send." + _connectionId + ".started", "When we start", "swarm", new long[] { 5*60*1000 }); + _context.statManager().createRateStat("send." + _connectionId + ".lifetime", "How long we talk to a peer", "swarm", new long[] { 5*60*1000 }); + + byte msg[] = ("STREAM CONNECT ID=" + _connectionId + " DESTINATION=" + _remoteDestination + "\n").getBytes(); + synchronized (_samOut) { + _samOut.write(msg); + _samOut.flush(); + } + + _in = new FileInputStream(_dataFile); + return true; + } catch (IOException ioe) { + _log.error("Unable to connect", ioe); + return false; + } + } + + public int getConnectionId() { return _connectionId; } + public String getDestination() { return _remoteDestination; } + + public void closed() { + if (_closed) return; + _closed = true; + long lifetime = _context.clock().now() - _started; + _context.statManager().addRateData("send." + _connectionId + ".lifetime", lifetime, lifetime); + try { _in.close(); } catch (IOException ioe) {} + } + + public void run() { + _started = _context.clock().now(); + _context.statManager().addRateData("send." + _connectionId + ".started", 1, 0); + byte data[] = new byte[1024]; + long value = 0; + long lastSend = _context.clock().now(); + while (!_closed) { + try { + int read = _in.read(data); + long now = _context.clock().now(); + if (read == -1) { + _log.debug("EOF from the data for " + _connectionId + " after " + (now-lastSend)); + break; + } else if (read > 0) { + _log.debug("Sending " + read + " on " + _connectionId + " after " + (now-lastSend)); + lastSend = now; + + byte msg[] = ("STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n").getBytes(); + synchronized (_samOut) { + _samOut.write(msg); + _samOut.write(data, 0, read); + _samOut.flush(); + } + + _totalSent += read; + _context.statManager().addRateData("send." + _connectionId + ".totalSent", _totalSent, 0); + } + } catch (IOException ioe) { + _log.error("Error sending", ioe); + } + } + + byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes(); + try { + synchronized (_samOut) { + _samOut.write(msg); + _samOut.flush(); + } + } catch (IOException ioe) { + _log.error("Error closing", ioe); + } + + closed(); + } + } +} \ No newline at end of file diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java new file mode 100644 index 000000000..175232321 --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java @@ -0,0 +1,247 @@ +package net.i2p.sam.client; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.FileOutputStream; +import java.io.File; +import java.io.OutputStream; +import java.net.Socket; + +import java.util.Map; +import java.util.Collections; +import java.util.HashMap; +import java.util.Properties; +import java.util.StringTokenizer; + +import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; +import net.i2p.util.Log; +import net.i2p.util.I2PThread; + +import net.i2p.sam.client.SAMEventHandler; +import net.i2p.sam.client.SAMClientEventListenerImpl; +import net.i2p.sam.client.SAMReader; + +/** + * Sit around on a SAM destination, receiving lots of data and + * writing it to disk + * + * Usage: SAMStreamSink samHost samPort myKeyFile sinkDir + * + */ +public class SAMStreamSink { + private I2PAppContext _context; + private Log _log; + private String _samHost; + private String _samPort; + private String _destFile; + private String _sinkDir; + private String _conOptions; + private Socket _samSocket; + private OutputStream _samOut; + private InputStream _samIn; + private SAMReader _reader; + private boolean _dead; + private SAMEventHandler _eventHandler; + /** Connection id (Integer) to peer (Flooder) */ + private Map _remotePeers; + + public static void main(String args[]) { + if (args.length < 4) { + System.err.println("Usage: SAMStreamSink samHost samPort myDestFile sinkDir"); + return; + } + I2PAppContext ctx = new I2PAppContext(); + SAMStreamSink sink = new SAMStreamSink(ctx, args[0], args[1], args[2], args[3]); + sink.startup(); + } + + public SAMStreamSink(I2PAppContext ctx, String samHost, String samPort, String destFile, String sinkDir) { + _context = ctx; + _log = ctx.logManager().getLog(SAMStreamSink.class); + _dead = false; + _samHost = samHost; + _samPort = samPort; + _destFile = destFile; + _sinkDir = sinkDir; + _conOptions = ""; + _eventHandler = new SinkEventHandler(_context); + _remotePeers = new HashMap(); + } + + public void startup() { + _log.debug("Starting up"); + boolean ok = connect(); + _log.debug("Connected: " + ok); + if (ok) { + _reader = new SAMReader(_context, _samIn, _eventHandler); + _reader.startReading(); + _log.debug("Reader created"); + String ourDest = handshake(); + _log.debug("Handshake complete. we are " + ourDest); + if (ourDest != null) { + boolean written = writeDest(ourDest); + _log.debug("Dest written"); + } + } + } + + private class SinkEventHandler extends SAMEventHandler { + public SinkEventHandler(I2PAppContext ctx) { super(ctx); } + public void streamClosedReceived(String result, int id, String message) { + Sink sink = null; + synchronized (_remotePeers) { + sink = (Sink)_remotePeers.remove(new Integer(id)); + } + if (sink != null) { + sink.closed(); + _log.debug("Connection " + sink.getConnectionId() + " closed to " + sink.getDestination()); + } else { + _log.error("wtf, not connected to " + id + " but we were just closed?"); + } + } + public void streamDataReceived(int id, byte data[], int offset, int length) { + Sink sink = null; + synchronized (_remotePeers) { + sink = (Sink)_remotePeers.get(new Integer(id)); + } + if (sink != null) { + sink.received(data, offset, length); + } else { + _log.error("wtf, not connected to " + id + " but we received " + length + "?"); + } + } + public void streamConnectedReceived(String dest, int id) { + _log.debug("Connection " + id + " received from " + dest); + + try { + Sink sink = new Sink(id, dest); + synchronized (_remotePeers) { + _remotePeers.put(new Integer(id), sink); + } + } catch (IOException ioe) { + _log.error("Error creating a new sink", ioe); + } + } + } + + private boolean connect() { + try { + _samSocket = new Socket(_samHost, Integer.parseInt(_samPort)); + _samOut = _samSocket.getOutputStream(); + _samIn = _samSocket.getInputStream(); + return true; + } catch (Exception e) { + _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e); + return false; + } + } + + private String handshake() { + synchronized (_samOut) { + try { + _samOut.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes()); + _samOut.flush(); + _log.debug("Hello sent"); + boolean ok = _eventHandler.waitForHelloReply(); + _log.debug("Hello reply found: " + ok); + if (!ok) + throw new IOException("wtf, hello failed?"); + String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + _destFile + " " + _conOptions + "\n"; + _samOut.write(req.getBytes()); + _samOut.flush(); + _log.debug("Session create sent"); + ok = _eventHandler.waitForSessionCreateReply(); + _log.debug("Session create reply found: " + ok); + + req = "NAMING LOOKUP NAME=ME\n"; + _samOut.write(req.getBytes()); + _samOut.flush(); + _log.debug("Naming lookup sent"); + String destination = _eventHandler.waitForNamingReply("ME"); + _log.debug("Naming lookup reply found: " + destination); + if (destination == null) { + _log.error("No naming lookup reply found!"); + return null; + } else { + _log.info(_destFile + " is located at " + destination); + } + return destination; + } catch (Exception e) { + _log.error("Error handshaking", e); + return null; + } + } + } + + private boolean writeDest(String dest) { + try { + FileOutputStream fos = new FileOutputStream(_destFile); + fos.write(dest.getBytes()); + fos.close(); + return true; + } catch (Exception e) { + _log.error("Error writing to " + _destFile, e); + return false; + } + } + + private class Sink { + private int _connectionId; + private String _remoteDestination; + private boolean _closed; + private long _started; + private long _totalReceived; + private long _lastReceivedOn; + private OutputStream _out; + + public Sink(int conId, String remDest) throws IOException { + _connectionId = conId; + _remoteDestination = remDest; + _closed = false; + _lastReceivedOn = _context.clock().now(); + _context.statManager().createRateStat("sink." + conId + ".totalReceived", "Data size received", "swarm", new long[] { 30*1000, 60*1000, 5*60*1000 }); + _context.statManager().createRateStat("sink." + conId + ".started", "When we start", "swarm", new long[] { 5*60*1000 }); + _context.statManager().createRateStat("sink." + conId + ".lifetime", "How long we talk to a peer", "swarm", new long[] { 5*60*1000 }); + + File sinkDir = new File(_sinkDir); + if (!sinkDir.exists()) + sinkDir.mkdirs(); + + File out = File.createTempFile("sink", ".dat", sinkDir); + _out = new FileOutputStream(out); + } + + public int getConnectionId() { return _connectionId; } + public String getDestination() { return _remoteDestination; } + + public void closed() { + if (_closed) return; + _closed = true; + long lifetime = _context.clock().now() - _started; + _context.statManager().addRateData("sink." + _connectionId + ".lifetime", lifetime, lifetime); + try { + _out.close(); + } catch (IOException ioe) { + _log.error("Error closing", ioe); + } + } + public void received(byte data[], int offset, int len) { + if (_closed) return; + _totalReceived += len; + try { + _out.write(data, offset, len); + } catch (IOException ioe) { + _log.error("Error writing received data"); + closed(); + return; + } + _log.debug("Received " + len + " on " + _connectionId + " after " + (_context.clock().now()-_lastReceivedOn) + + "ms with " + _remoteDestination.substring(0,6)); + + _lastReceivedOn = _context.clock().now(); + } + } +} \ No newline at end of file diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 753f0f57f..d6a9f3884 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -47,7 +47,7 @@ public class MessageOutputStream extends OutputStream { _written = 0; _closed = false; _writeTimeout = -1; - _passiveFlushDelay = 2*1000; + _passiveFlushDelay = 500; _flusher = new Flusher(); if (_log.shouldLog(Log.DEBUG)) _log.debug("MessageOutputStream created"); diff --git a/history.txt b/history.txt index 7e98de303..005249d66 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,11 @@ -$Id: history.txt,v 1.94 2004/12/04 18:40:53 jrandom Exp $ +$Id: history.txt,v 1.95 2004/12/05 05:22:58 jrandom Exp $ + +2004-12-05 jrandom + * Fix the recently broken SAM bridge (duh) + * Add a new pair of SAM apps - net.i2p.sam.client.SAMStreamSink and + net.i2p.sam.client.SAMStreamSend, mirroring the streaming lib's + StreamSink and StreamSend apps for transferring files. + * Make the passive flush timer fire more frequently. 2004-12-05 jrandom * Fixed some links in the console (thanks ugha!) and the javadoc diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 71c104a78..23926157c 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.99 $ $Date: 2004/12/04 18:40:53 $"; + public final static String ID = "$Revision: 1.100 $ $Date: 2004/12/05 05:22:58 $"; public final static String VERSION = "0.4.2.2"; - public final static long BUILD = 3; + public final static long BUILD = 4; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID);