More SAM client cleanup and fixes, beginning of v3 support

v3 unfinished, does not work yet
This commit is contained in:
zzz
2015-11-25 20:46:21 +00:00
parent 13fd613bb8
commit 612e01cbbf
5 changed files with 116 additions and 47 deletions

View File

@ -7,7 +7,7 @@ import java.util.Properties;
*/ */
public class SAMClientEventListenerImpl implements SAMReader.SAMClientEventListener { public class SAMClientEventListenerImpl implements SAMReader.SAMClientEventListener {
public void destReplyReceived(String publicKey, String privateKey) {} public void destReplyReceived(String publicKey, String privateKey) {}
public void helloReplyReceived(boolean ok) {} public void helloReplyReceived(boolean ok, String version) {}
public void namingReplyReceived(String name, String result, String value, String message) {} public void namingReplyReceived(String name, String result, String value, String message) {}
public void sessionStatusReceived(String result, String destination, String message) {} public void sessionStatusReceived(String result, String destination, String message) {}
public void streamClosedReceived(String result, int id, String message) {} public void streamClosedReceived(String result, int id, String message) {}

View File

@ -15,6 +15,7 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
//private I2PAppContext _context; //private I2PAppContext _context;
private final Log _log; private final Log _log;
private Boolean _helloOk; private Boolean _helloOk;
private String _version;
private final Object _helloLock = new Object(); private final Object _helloLock = new Object();
private Boolean _sessionCreateOk; private Boolean _sessionCreateOk;
private final Object _sessionCreateLock = new Object(); private final Object _sessionCreateLock = new Object();
@ -27,12 +28,13 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
} }
@Override @Override
public void helloReplyReceived(boolean ok) { public void helloReplyReceived(boolean ok, String version) {
synchronized (_helloLock) { synchronized (_helloLock) {
if (ok) if (ok)
_helloOk = Boolean.TRUE; _helloOk = Boolean.TRUE;
else else
_helloOk = Boolean.FALSE; _helloOk = Boolean.FALSE;
_version = version;
_helloLock.notifyAll(); _helloLock.notifyAll();
} }
} }
@ -61,7 +63,7 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
@Override @Override
public void unknownMessageReceived(String major, String minor, Properties params) { public void unknownMessageReceived(String major, String minor, Properties params) {
_log.error("wrt, [" + major + "] [" + minor + "] [" + params + "]"); _log.error("Unhandled message: [" + major + "] [" + minor + "] [" + params + "]");
} }
@ -70,18 +72,18 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
// //
/** /**
* Wait for the connection to be established, returning true if everything * Wait for the connection to be established, returning the server version if everything
* went ok * went ok
* @return true if everything ok * @return SAM server version if everything ok, or null on failure
*/ */
public boolean waitForHelloReply() { public String waitForHelloReply() {
while (true) { while (true) {
try { try {
synchronized (_helloLock) { synchronized (_helloLock) {
if (_helloOk == null) if (_helloOk == null)
_helloLock.wait(); _helloLock.wait();
else else
return _helloOk.booleanValue(); return _helloOk.booleanValue() ? _version : null;
} }
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }

View File

@ -71,7 +71,7 @@ public class SAMReader {
public static final String NAMING_REPLY_INVALID_KEY = "INVALID_KEY"; public static final String NAMING_REPLY_INVALID_KEY = "INVALID_KEY";
public static final String NAMING_REPLY_KEY_NOT_FOUND = "KEY_NOT_FOUND"; public static final String NAMING_REPLY_KEY_NOT_FOUND = "KEY_NOT_FOUND";
public void helloReplyReceived(boolean ok); public void helloReplyReceived(boolean ok, String version);
public void sessionStatusReceived(String result, String destination, String message); public void sessionStatusReceived(String result, String destination, String message);
public void streamStatusReceived(String result, int id, String message); public void streamStatusReceived(String result, int id, String message);
public void streamConnectedReceived(String remoteDestination, int id); public void streamConnectedReceived(String remoteDestination, int id);
@ -159,10 +159,11 @@ public class SAMReader {
if ("HELLO".equals(major)) { if ("HELLO".equals(major)) {
if ("REPLY".equals(minor)) { if ("REPLY".equals(minor)) {
String result = params.getProperty("RESULT"); String result = params.getProperty("RESULT");
if ("OK".equals(result)) String version= params.getProperty("VERSION");
_listener.helloReplyReceived(true); if ("OK".equals(result) && version != null)
_listener.helloReplyReceived(true, version);
else else
_listener.helloReplyReceived(false); _listener.helloReplyReceived(false, version);
} else { } else {
_listener.unknownMessageReceived(major, minor, params); _listener.unknownMessageReceived(major, minor, params);
} }

View File

@ -1,5 +1,6 @@
package net.i2p.sam.client; package net.i2p.sam.client;
import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -9,9 +10,11 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.data.Base32;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.util.I2PAppThread; import net.i2p.util.I2PAppThread;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.VersionComparator;
/** /**
* Send a file to a peer * Send a file to a peer
@ -26,7 +29,7 @@ public class SAMStreamSend {
private final String _samPort; private final String _samPort;
private final String _destFile; private final String _destFile;
private final String _dataFile; private final String _dataFile;
private final String _conOptions; private String _conOptions;
private Socket _samSocket; private Socket _samSocket;
private OutputStream _samOut; private OutputStream _samOut;
private InputStream _samIn; private InputStream _samIn;
@ -38,13 +41,14 @@ public class SAMStreamSend {
public static void main(String args[]) { public static void main(String args[]) {
if (args.length < 4) { if (args.length < 4) {
System.err.println("Usage: SAMStreamSend samHost samPort peerDestFile dataFile"); System.err.println("Usage: SAMStreamSend samHost samPort peerDestFile dataFile [version]");
return; return;
} }
I2PAppContext ctx = new I2PAppContext(); I2PAppContext ctx = I2PAppContext.getGlobalContext();
//String files[] = new String[args.length - 3]; //String files[] = new String[args.length - 3];
SAMStreamSend sender = new SAMStreamSend(ctx, args[0], args[1], args[2], args[3]); SAMStreamSend sender = new SAMStreamSend(ctx, args[0], args[1], args[2], args[3]);
sender.startup(); String version = (args.length >= 5) ? args[4] : "1.0";
sender.startup(version);
} }
public SAMStreamSend(I2PAppContext ctx, String samHost, String samPort, String destFile, String dataFile) { public SAMStreamSend(I2PAppContext ctx, String samHost, String samPort, String destFile, String dataFile) {
@ -60,7 +64,7 @@ public class SAMStreamSend {
_remotePeers = new HashMap<Integer,Sender>(); _remotePeers = new HashMap<Integer,Sender>();
} }
public void startup() { public void startup(String version) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Starting up"); _log.debug("Starting up");
boolean ok = connect(); boolean ok = connect();
@ -71,7 +75,7 @@ public class SAMStreamSend {
_reader.startReading(); _reader.startReading();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader created"); _log.debug("Reader created");
String ourDest = handshake(); String ourDest = handshake(version);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handshake complete. we are " + ourDest); _log.debug("Handshake complete. we are " + ourDest);
if (ourDest != null) { if (ourDest != null) {
@ -82,6 +86,8 @@ public class SAMStreamSend {
private class SendEventHandler extends SAMEventHandler { private class SendEventHandler extends SAMEventHandler {
public SendEventHandler(I2PAppContext ctx) { super(ctx); } public SendEventHandler(I2PAppContext ctx) { super(ctx); }
@Override
public void streamClosedReceived(String result, int id, String message) { public void streamClosedReceived(String result, int id, String message) {
Sender sender = null; Sender sender = null;
synchronized (_remotePeers) { synchronized (_remotePeers) {
@ -92,7 +98,7 @@ public class SAMStreamSend {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Connection " + sender.getConnectionId() + " closed to " + sender.getDestination()); _log.debug("Connection " + sender.getConnectionId() + " closed to " + sender.getDestination());
} else { } else {
_log.error("wtf, not connected to " + id + " but we were just closed?"); _log.error("not connected to " + id + " but we were just closed?");
} }
} }
} }
@ -109,24 +115,32 @@ public class SAMStreamSend {
} }
} }
private String handshake() { private String handshake(String version) {
synchronized (_samOut) { synchronized (_samOut) {
try { try {
_samOut.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes()); _samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
_samOut.flush(); _samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Hello sent"); _log.debug("Hello sent");
boolean ok = _eventHandler.waitForHelloReply(); String hisVersion = _eventHandler.waitForHelloReply();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Hello reply found: " + ok); _log.debug("Hello reply found: " + hisVersion);
if (!ok) if (hisVersion == null)
throw new IOException("wtf, hello failed?"); throw new IOException("Hello failed");
boolean isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
if (isV3) {
byte[] id = new byte[5];
_context.random().nextBytes(id);
_conOptions = "ID=" + Base32.encode(id);
}
String req = "SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT " + _conOptions + "\n"; String req = "SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT " + _conOptions + "\n";
_samOut.write(req.getBytes()); _samOut.write(req.getBytes());
_samOut.flush(); _samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create sent"); _log.debug("Session create sent");
ok = _eventHandler.waitForSessionCreateReply(); boolean ok = _eventHandler.waitForSessionCreateReply();
if (!ok)
throw new IOException("Session create failed");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create reply found: " + ok); _log.debug("Session create reply found: " + ok);
@ -222,6 +236,7 @@ public class SAMStreamSend {
public void run() { public void run() {
_started = _context.clock().now(); _started = _context.clock().now();
_context.statManager().addRateData("send." + _connectionId + ".started", 1, 0); _context.statManager().addRateData("send." + _connectionId + ".started", 1, 0);
final long toSend = (new File(_dataFile)).length();
byte data[] = new byte[1024]; byte data[] = new byte[1024];
long lastSend = _context.clock().now(); long lastSend = _context.clock().now();
while (!_closed) { while (!_closed) {
@ -249,6 +264,7 @@ public class SAMStreamSend {
} }
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("Error sending", ioe); _log.error("Error sending", ioe);
break;
} }
} }
@ -259,12 +275,14 @@ public class SAMStreamSend {
_samOut.flush(); _samOut.flush();
} }
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("Error closing", ioe); _log.info("Error closing", ioe);
} }
closed(); closed();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Runner exiting"); _log.debug("Runner exiting");
if (toSend != _totalSent)
_log.error("Only sent " + _totalSent + " of " + toSend + " bytes");
// stop the reader, since we're only doing this once for testing // stop the reader, since we're only doing this once for testing
// you wouldn't do this in a real application // you wouldn't do this in a real application
_reader.stopReading(); _reader.stopReading();

View File

@ -8,9 +8,13 @@ import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.data.Base32;
import net.i2p.data.DataHelper;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.VersionComparator;
/** /**
* Sit around on a SAM destination, receiving lots of data and * Sit around on a SAM destination, receiving lots of data and
@ -26,7 +30,7 @@ public class SAMStreamSink {
private final String _samPort; private final String _samPort;
private final String _destFile; private final String _destFile;
private final String _sinkDir; private final String _sinkDir;
private final String _conOptions; private String _conOptions;
private Socket _samSocket; private Socket _samSocket;
private OutputStream _samOut; private OutputStream _samOut;
private InputStream _samIn; private InputStream _samIn;
@ -38,12 +42,13 @@ public class SAMStreamSink {
public static void main(String args[]) { public static void main(String args[]) {
if (args.length < 4) { if (args.length < 4) {
System.err.println("Usage: SAMStreamSink samHost samPort myDestFile sinkDir"); System.err.println("Usage: SAMStreamSink samHost samPort myDestFile sinkDir [version]");
return; return;
} }
I2PAppContext ctx = new I2PAppContext(); I2PAppContext ctx = I2PAppContext.getGlobalContext();
SAMStreamSink sink = new SAMStreamSink(ctx, args[0], args[1], args[2], args[3]); SAMStreamSink sink = new SAMStreamSink(ctx, args[0], args[1], args[2], args[3]);
sink.startup(); String version = (args.length >= 5) ? args[4] : "1.0";
sink.startup(version);
} }
public SAMStreamSink(I2PAppContext ctx, String samHost, String samPort, String destFile, String sinkDir) { public SAMStreamSink(I2PAppContext ctx, String samHost, String samPort, String destFile, String sinkDir) {
@ -59,7 +64,7 @@ public class SAMStreamSink {
_remotePeers = new HashMap<Integer,Sink>(); _remotePeers = new HashMap<Integer,Sink>();
} }
public void startup() { public void startup(String version) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Starting up"); _log.debug("Starting up");
boolean ok = connect(); boolean ok = connect();
@ -70,14 +75,14 @@ public class SAMStreamSink {
_reader.startReading(); _reader.startReading();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader created"); _log.debug("Reader created");
String ourDest = handshake(); String ourDest = handshake(version);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handshake complete. we are " + ourDest); _log.debug("Handshake complete. we are " + ourDest);
if (ourDest != null) { if (ourDest != null) {
//boolean written = //boolean written =
writeDest(ourDest); writeDest(ourDest);
if (_log.shouldLog(Log.DEBUG)) } else {
_log.debug("My destination written to " + _destFile); _reader.stopReading();
} }
} }
} }
@ -97,7 +102,7 @@ public class SAMStreamSink {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Connection " + sink.getConnectionId() + " closed to " + sink.getDestination()); _log.debug("Connection " + sink.getConnectionId() + " closed to " + sink.getDestination());
} else { } else {
_log.error("wtf, not connected to " + id + " but we were just closed?"); _log.error("not connected to " + id + " but we were just closed?");
} }
} }
@ -110,7 +115,7 @@ public class SAMStreamSink {
if (sink != null) { if (sink != null) {
sink.received(data, offset, length); sink.received(data, offset, length);
} else { } else {
_log.error("wtf, not connected to " + id + " but we received " + length + "?"); _log.error("not connected to " + id + " but we received " + length + "?");
} }
} }
@ -142,24 +147,58 @@ public class SAMStreamSink {
} }
} }
private String handshake() { private String handshake(String version) {
synchronized (_samOut) { synchronized (_samOut) {
try { try {
_samOut.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes()); _samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
_samOut.flush(); _samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Hello sent"); _log.debug("Hello sent");
boolean ok = _eventHandler.waitForHelloReply(); String hisVersion = _eventHandler.waitForHelloReply();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Hello reply found: " + ok); _log.debug("Hello reply found: " + hisVersion);
if (!ok) if (hisVersion == null)
throw new IOException("wtf, hello failed?"); throw new IOException("Hello failed");
String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + _destFile + " " + _conOptions + "\n"; boolean isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
String dest;
if (isV3) {
// we use the filename as the name in sam.keys
// and read it in ourselves
File keys = new File("sam.keys");
if (keys.exists()) {
Properties opts = new Properties();
DataHelper.loadProps(opts, keys);
String s = opts.getProperty(_destFile);
if (s != null) {
dest = s;
} else {
dest = "TRANSIENT";
(new File(_destFile)).delete();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Requesting new transient destination");
}
} else {
dest = "TRANSIENT";
(new File(_destFile)).delete();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Requesting new transient destination");
}
byte[] id = new byte[5];
_context.random().nextBytes(id);
_conOptions = "ID=" + Base32.encode(id);
} else {
// we use the filename as the name in sam.keys
// and give it to the SAM server
dest = _destFile;
}
String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + dest + " " + _conOptions + "\n";
_samOut.write(req.getBytes()); _samOut.write(req.getBytes());
_samOut.flush(); _samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create sent"); _log.debug("Session create sent");
ok = _eventHandler.waitForSessionCreateReply(); boolean ok = _eventHandler.waitForSessionCreateReply();
if (!ok)
throw new IOException("Session create failed");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create reply found: " + ok); _log.debug("Session create reply found: " + ok);
@ -175,7 +214,8 @@ public class SAMStreamSink {
_log.error("No naming lookup reply found!"); _log.error("No naming lookup reply found!");
return null; return null;
} else { } else {
_log.info(_destFile + " is located at " + destination); if (_log.shouldInfo())
_log.info(_destFile + " is located at " + destination);
} }
return destination; return destination;
} catch (Exception e) { } catch (Exception e) {
@ -186,10 +226,18 @@ public class SAMStreamSink {
} }
private boolean writeDest(String dest) { private boolean writeDest(String dest) {
File f = new File(_destFile);
if (f.exists()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Destination file exists, not overwriting:" + _destFile);
return false;
}
FileOutputStream fos = null; FileOutputStream fos = null;
try { try {
fos = new FileOutputStream(_destFile); fos = new FileOutputStream(f);
fos.write(dest.getBytes()); fos.write(dest.getBytes());
if (_log.shouldLog(Log.DEBUG))
_log.debug("My destination written to " + _destFile);
} catch (Exception e) { } catch (Exception e) {
_log.error("Error writing to " + _destFile, e); _log.error("Error writing to " + _destFile, e);
return false; return false;
@ -236,7 +284,7 @@ public class SAMStreamSink {
try { try {
_out.close(); _out.close();
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("Error closing", ioe); _log.info("Error closing", ioe);
} }
} }
public void received(byte data[], int offset, int len) { public void received(byte data[], int offset, int len) {