More v3 support

Convert IDs from ints to Strings
Wait for STREAM STATUS
Open 2nd socket for sender
v3 sender working
This commit is contained in:
zzz
2015-11-25 22:59:41 +00:00
parent 612e01cbbf
commit 868e5e988c
5 changed files with 192 additions and 137 deletions

View File

@ -10,9 +10,9 @@ public class SAMClientEventListenerImpl implements SAMReader.SAMClientEventListe
public void helloReplyReceived(boolean ok, String version) {} public void helloReplyReceived(boolean ok, String version) {}
public void namingReplyReceived(String name, String result, String value, String message) {} public void namingReplyReceived(String name, String result, String value, String message) {}
public void sessionStatusReceived(String result, String destination, String message) {} public void sessionStatusReceived(String result, String destination, String message) {}
public void streamClosedReceived(String result, int id, String message) {} public void streamClosedReceived(String result, String id, String message) {}
public void streamConnectedReceived(String remoteDestination, int id) {} public void streamConnectedReceived(String remoteDestination, String id) {}
public void streamDataReceived(int id, byte[] data, int offset, int length) {} public void streamDataReceived(String id, byte[] data, int offset, int length) {}
public void streamStatusReceived(String result, int id, String message) {} public void streamStatusReceived(String result, String id, String message) {}
public void unknownMessageReceived(String major, String minor, Properties params) {} public void unknownMessageReceived(String major, String minor, Properties params) {}
} }

View File

@ -18,8 +18,10 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
private String _version; private String _version;
private final Object _helloLock = new Object(); private final Object _helloLock = new Object();
private Boolean _sessionCreateOk; private Boolean _sessionCreateOk;
private Boolean _streamStatusOk;
private final Object _sessionCreateLock = new Object(); private final Object _sessionCreateLock = new Object();
private final Object _namingReplyLock = new Object(); private final Object _namingReplyLock = new Object();
private final Object _streamStatusLock = new Object();
private final Map<String,String> _namingReplies = new HashMap<String,String>(); private final Map<String,String> _namingReplies = new HashMap<String,String>();
public SAMEventHandler(I2PAppContext ctx) { public SAMEventHandler(I2PAppContext ctx) {
@ -27,7 +29,7 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
_log = ctx.logManager().getLog(getClass()); _log = ctx.logManager().getLog(getClass());
} }
@Override @Override
public void helloReplyReceived(boolean ok, String version) { public void helloReplyReceived(boolean ok, String version) {
synchronized (_helloLock) { synchronized (_helloLock) {
if (ok) if (ok)
@ -39,7 +41,7 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
} }
} }
@Override @Override
public void sessionStatusReceived(String result, String destination, String msg) { public void sessionStatusReceived(String result, String destination, String msg) {
synchronized (_sessionCreateLock) { synchronized (_sessionCreateLock) {
if (SAMReader.SAMClientEventListener.SESSION_STATUS_OK.equals(result)) if (SAMReader.SAMClientEventListener.SESSION_STATUS_OK.equals(result))
@ -50,7 +52,7 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
} }
} }
@Override @Override
public void namingReplyReceived(String name, String result, String value, String msg) { public void namingReplyReceived(String name, String result, String value, String msg) {
synchronized (_namingReplyLock) { synchronized (_namingReplyLock) {
if (SAMReader.SAMClientEventListener.NAMING_REPLY_OK.equals(result)) if (SAMReader.SAMClientEventListener.NAMING_REPLY_OK.equals(result))
@ -61,7 +63,18 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
} }
} }
@Override @Override
public void streamStatusReceived(String result, String id, String message) {
synchronized (_streamStatusLock) {
if (SAMReader.SAMClientEventListener.SESSION_STATUS_OK.equals(result))
_streamStatusOk = Boolean.TRUE;
else
_streamStatusOk = Boolean.FALSE;
_streamStatusLock.notifyAll();
}
}
@Override
public void unknownMessageReceived(String major, String minor, Properties params) { public void unknownMessageReceived(String major, String minor, Properties params) {
_log.error("Unhandled message: [" + major + "] [" + minor + "] [" + params + "]"); _log.error("Unhandled message: [" + major + "] [" + minor + "] [" + params + "]");
} }
@ -106,6 +119,24 @@ public class SAMEventHandler extends SAMClientEventListenerImpl {
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
} }
/**
* Wait for the stream to be created, returning true if everything went ok
*
* @return true if everything ok
*/
public boolean waitForStreamStatusReply() {
while (true) {
try {
synchronized (_streamStatusLock) {
if (_streamStatusOk == null)
_streamStatusLock.wait();
else
return _streamStatusOk.booleanValue();
}
} catch (InterruptedException ie) {}
}
}
/** /**
* Return the destination found matching the name, or null if the key was * Return the destination found matching the name, or null if the key was

View File

@ -73,10 +73,10 @@ public class SAMReader {
public void helloReplyReceived(boolean ok, String version); public void helloReplyReceived(boolean ok, String version);
public void sessionStatusReceived(String result, String destination, String message); public void sessionStatusReceived(String result, String destination, String message);
public void streamStatusReceived(String result, int id, String message); public void streamStatusReceived(String result, String id, String message);
public void streamConnectedReceived(String remoteDestination, int id); public void streamConnectedReceived(String remoteDestination, String id);
public void streamClosedReceived(String result, int id, String message); public void streamClosedReceived(String result, String id, String message);
public void streamDataReceived(int id, byte data[], int offset, int length); public void streamDataReceived(String id, byte data[], int offset, int length);
public void namingReplyReceived(String name, String result, String value, String message); public void namingReplyReceived(String name, String result, String value, String message);
public void destReplyReceived(String publicKey, String privateKey); public void destReplyReceived(String publicKey, String privateKey);
@ -181,24 +181,17 @@ public class SAMReader {
String result = params.getProperty("RESULT"); String result = params.getProperty("RESULT");
String id = params.getProperty("ID"); String id = params.getProperty("ID");
String msg = params.getProperty("MESSAGE"); String msg = params.getProperty("MESSAGE");
if (id != null) { // id is null in v3, so pass it through regardless
try { //if (id != null) {
_listener.streamStatusReceived(result, Integer.parseInt(id), msg); _listener.streamStatusReceived(result, id, msg);
} catch (NumberFormatException nfe) { //} else {
_listener.unknownMessageReceived(major, minor, params); // _listener.unknownMessageReceived(major, minor, params);
} //}
} else {
_listener.unknownMessageReceived(major, minor, params);
}
} else if ("CONNECTED".equals(minor)) { } else if ("CONNECTED".equals(minor)) {
String dest = params.getProperty("DESTINATION"); String dest = params.getProperty("DESTINATION");
String id = params.getProperty("ID"); String id = params.getProperty("ID");
if (id != null) { if (id != null) {
try { _listener.streamConnectedReceived(dest, id);
_listener.streamConnectedReceived(dest, Integer.parseInt(id));
} catch (NumberFormatException nfe) {
_listener.unknownMessageReceived(major, minor, params);
}
} else { } else {
_listener.unknownMessageReceived(major, minor, params); _listener.unknownMessageReceived(major, minor, params);
} }
@ -207,11 +200,7 @@ public class SAMReader {
String id = params.getProperty("ID"); String id = params.getProperty("ID");
String msg = params.getProperty("MESSAGE"); String msg = params.getProperty("MESSAGE");
if (id != null) { if (id != null) {
try { _listener.streamClosedReceived(result, id, msg);
_listener.streamClosedReceived(result, Integer.parseInt(id), msg);
} catch (NumberFormatException nfe) {
_listener.unknownMessageReceived(major, minor, params);
}
} else { } else {
_listener.unknownMessageReceived(major, minor, params); _listener.unknownMessageReceived(major, minor, params);
} }
@ -220,7 +209,6 @@ public class SAMReader {
String size = params.getProperty("SIZE"); String size = params.getProperty("SIZE");
if (id != null) { if (id != null) {
try { try {
int idVal = Integer.parseInt(id);
int sizeVal = Integer.parseInt(size); int sizeVal = Integer.parseInt(size);
byte data[] = new byte[sizeVal]; byte data[] = new byte[sizeVal];
@ -228,7 +216,7 @@ public class SAMReader {
if (read != sizeVal) { if (read != sizeVal) {
_listener.unknownMessageReceived(major, minor, params); _listener.unknownMessageReceived(major, minor, params);
} else { } else {
_listener.streamDataReceived(idVal, data, 0, sizeVal); _listener.streamDataReceived(id, data, 0, sizeVal);
} }
} catch (NumberFormatException nfe) { } catch (NumberFormatException nfe) {
_listener.unknownMessageReceived(major, minor, params); _listener.unknownMessageReceived(major, minor, params);

View File

@ -30,14 +30,12 @@ public class SAMStreamSend {
private final String _destFile; private final String _destFile;
private final String _dataFile; private final String _dataFile;
private String _conOptions; private String _conOptions;
private Socket _samSocket; private SAMReader _reader, _reader2;
private OutputStream _samOut; private boolean _isV3;
private InputStream _samIn; private String _v3ID;
private SAMReader _reader;
//private boolean _dead; //private boolean _dead;
private final SAMEventHandler _eventHandler;
/** Connection id (Integer) to peer (Flooder) */ /** Connection id (Integer) to peer (Flooder) */
private final Map<Integer, Sender> _remotePeers; private final Map<String, Sender> _remotePeers;
public static void main(String args[]) { public static void main(String args[]) {
if (args.length < 4) { if (args.length < 4) {
@ -60,27 +58,48 @@ public class SAMStreamSend {
_destFile = destFile; _destFile = destFile;
_dataFile = dataFile; _dataFile = dataFile;
_conOptions = ""; _conOptions = "";
_eventHandler = new SendEventHandler(_context); _remotePeers = new HashMap<String, Sender>();
_remotePeers = new HashMap<Integer,Sender>();
} }
public void startup(String version) { public void startup(String version) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Starting up"); _log.debug("Starting up");
boolean ok = connect(); try {
if (_log.shouldLog(Log.DEBUG)) Socket sock = connect();
_log.debug("Connected: " + ok); SAMEventHandler eventHandler = new SendEventHandler(_context);
if (ok) { _reader = new SAMReader(_context, sock.getInputStream(), eventHandler);
_reader = new SAMReader(_context, _samIn, _eventHandler);
_reader.startReading(); _reader.startReading();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader created"); _log.debug("Reader created");
String ourDest = handshake(version); OutputStream out = sock.getOutputStream();
String ourDest = handshake(out, version, true, eventHandler);
if (ourDest == null)
throw new IOException("handshake failed");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handshake complete. we are " + ourDest); _log.debug("Handshake complete. we are " + ourDest);
if (ourDest != null) { if (_isV3) {
send(); Socket sock2 = connect();
eventHandler = new SendEventHandler(_context);
_reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
_reader2.startReading();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader2 created");
out = sock2.getOutputStream();
String ok = handshake(out, version, false, eventHandler);
if (ok == null)
throw new IOException("2nd handshake failed");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handshake2 complete.");
} }
if (ourDest != null) {
send(out, eventHandler);
}
} catch (IOException e) {
_log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
if (_reader != null)
_reader.stopReading();
if (_reader2 != null)
_reader2.stopReading();
} }
} }
@ -88,10 +107,10 @@ public class SAMStreamSend {
public SendEventHandler(I2PAppContext ctx) { super(ctx); } public SendEventHandler(I2PAppContext ctx) { super(ctx); }
@Override @Override
public void streamClosedReceived(String result, int id, String message) { public void streamClosedReceived(String result, String id, String message) {
Sender sender = null; Sender sender = null;
synchronized (_remotePeers) { synchronized (_remotePeers) {
sender = _remotePeers.remove(Integer.valueOf(id)); sender = _remotePeers.remove(id);
} }
if (sender != null) { if (sender != null) {
sender.closed(); sender.closed();
@ -103,53 +122,49 @@ public class SAMStreamSend {
} }
} }
private boolean connect() { private Socket connect() throws IOException {
try { return new Socket(_samHost, Integer.parseInt(_samPort));
_samSocket = new Socket(_samHost, Integer.parseInt(_samPort));
_samOut = _samSocket.getOutputStream();
_samIn = _samSocket.getInputStream();
return true;
} catch (Exception e) {
_log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
return false;
}
} }
private String handshake(String version) { /** @return our b64 dest or null */
synchronized (_samOut) { private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler) {
synchronized (samOut) {
try { try {
_samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
_samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Hello sent"); _log.debug("Hello sent");
String hisVersion = _eventHandler.waitForHelloReply(); String hisVersion = eventHandler.waitForHelloReply();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Hello reply found: " + hisVersion); _log.debug("Hello reply found: " + hisVersion);
if (hisVersion == null) if (hisVersion == null)
throw new IOException("Hello failed"); throw new IOException("Hello failed");
boolean isV3 = VersionComparator.comp(hisVersion, "3") >= 0; if (!isMaster)
if (isV3) { return "OK";
_isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
if (_isV3) {
byte[] id = new byte[5]; byte[] id = new byte[5];
_context.random().nextBytes(id); _context.random().nextBytes(id);
_conOptions = "ID=" + Base32.encode(id); _v3ID = Base32.encode(id);
_conOptions = "ID=" + _v3ID;
} }
String req = "SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT " + _conOptions + "\n"; String req = "SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT " + _conOptions + "\n";
_samOut.write(req.getBytes()); samOut.write(req.getBytes());
_samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create sent"); _log.debug("Session create sent");
boolean ok = _eventHandler.waitForSessionCreateReply(); boolean ok = eventHandler.waitForSessionCreateReply();
if (!ok) if (!ok)
throw new IOException("Session create failed"); throw new IOException("Session create failed");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create reply found: " + ok); _log.debug("Session create reply found: " + ok);
req = "NAMING LOOKUP NAME=ME\n"; req = "NAMING LOOKUP NAME=ME\n";
_samOut.write(req.getBytes()); samOut.write(req.getBytes());
_samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Naming lookup sent"); _log.debug("Naming lookup sent");
String destination = _eventHandler.waitForNamingReply("ME"); String destination = eventHandler.waitForNamingReply("ME");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Naming lookup reply found: " + destination); _log.debug("Naming lookup reply found: " + destination);
if (destination == null) { if (destination == null) {
@ -166,8 +181,8 @@ public class SAMStreamSend {
} }
} }
private void send() { private void send(OutputStream samOut, SAMEventHandler eventHandler) {
Sender sender = new Sender(); Sender sender = new Sender(samOut, eventHandler);
boolean ok = sender.openConnection(); boolean ok = sender.openConnection();
if (ok) { if (ok) {
I2PAppThread t = new I2PAppThread(sender, "Sender"); I2PAppThread t = new I2PAppThread(sender, "Sender");
@ -176,14 +191,26 @@ public class SAMStreamSend {
} }
private class Sender implements Runnable { private class Sender implements Runnable {
private int _connectionId; private final String _connectionId;
private String _remoteDestination; private String _remoteDestination;
private InputStream _in; private InputStream _in;
private volatile boolean _closed; private volatile boolean _closed;
private long _started; private long _started;
private long _totalSent; private long _totalSent;
private final OutputStream _samOut;
private final SAMEventHandler _eventHandler;
public Sender() {} public Sender(OutputStream samOut, SAMEventHandler eventHandler) {
_samOut = samOut;
_eventHandler = eventHandler;
synchronized (_remotePeers) {
if (_v3ID != null)
_connectionId = _v3ID;
else
_connectionId = Integer.toString(_remotePeers.size() + 1);
_remotePeers.put(_connectionId, Sender.this);
}
}
public boolean openConnection() { public boolean openConnection() {
FileInputStream fin = null; FileInputStream fin = null;
@ -193,10 +220,6 @@ public class SAMStreamSend {
int read = DataHelper.read(fin, dest); int read = DataHelper.read(fin, dest);
_remoteDestination = new String(dest, 0, read); _remoteDestination = new String(dest, 0, read);
synchronized (_remotePeers) {
_connectionId = _remotePeers.size() + 1;
_remotePeers.put(Integer.valueOf(_connectionId), Sender.this);
}
_context.statManager().createRateStat("send." + _connectionId + ".totalSent", "Data size sent", "swarm", new long[] { 30*1000, 60*1000, 5*60*1000 }); _context.statManager().createRateStat("send." + _connectionId + ".totalSent", "Data size sent", "swarm", new long[] { 30*1000, 60*1000, 5*60*1000 });
_context.statManager().createRateStat("send." + _connectionId + ".started", "When we start", "swarm", new long[] { 5*60*1000 }); _context.statManager().createRateStat("send." + _connectionId + ".started", "When we start", "swarm", new long[] { 5*60*1000 });
@ -207,6 +230,10 @@ public class SAMStreamSend {
_samOut.write(msg); _samOut.write(msg);
_samOut.flush(); _samOut.flush();
} }
_log.debug("STREAM CONNECT sent, waiting for STREAM STATUS...");
boolean ok = _eventHandler.waitForStreamStatusReply();
if (!ok)
throw new IOException("STREAM CONNECT failed");
_in = new FileInputStream(_dataFile); _in = new FileInputStream(_dataFile);
return true; return true;
@ -222,7 +249,7 @@ public class SAMStreamSend {
} }
} }
public int getConnectionId() { return _connectionId; } public String getConnectionId() { return _connectionId; }
public String getDestination() { return _remoteDestination; } public String getDestination() { return _remoteDestination; }
public void closed() { public void closed() {
@ -252,9 +279,11 @@ public class SAMStreamSend {
_log.debug("Sending " + read + " on " + _connectionId + " after " + (now-lastSend)); _log.debug("Sending " + read + " on " + _connectionId + " after " + (now-lastSend));
lastSend = now; lastSend = now;
byte msg[] = ("STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n").getBytes();
synchronized (_samOut) { synchronized (_samOut) {
_samOut.write(msg); if (!_isV3) {
byte msg[] = ("STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n").getBytes();
_samOut.write(msg);
}
_samOut.write(data, 0, read); _samOut.write(data, 0, read);
_samOut.flush(); _samOut.flush();
} }
@ -268,14 +297,23 @@ public class SAMStreamSend {
} }
} }
byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes(); if (_isV3) {
try { try {
synchronized (_samOut) { _samOut.close();
_samOut.write(msg); } catch (IOException ioe) {
_samOut.flush(); _log.info("Error closing", ioe);
}
} else {
byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes();
try {
synchronized (_samOut) {
_samOut.write(msg);
_samOut.flush();
_samOut.close();
}
} catch (IOException ioe) {
_log.info("Error closing", ioe);
} }
} catch (IOException ioe) {
_log.info("Error closing", ioe);
} }
closed(); closed();
@ -283,6 +321,8 @@ public class SAMStreamSend {
_log.debug("Runner exiting"); _log.debug("Runner exiting");
if (toSend != _totalSent) if (toSend != _totalSent)
_log.error("Only sent " + _totalSent + " of " + toSend + " bytes"); _log.error("Only sent " + _totalSent + " of " + toSend + " bytes");
if (_reader2 != null)
_reader2.stopReading();
// stop the reader, since we're only doing this once for testing // stop the reader, since we're only doing this once for testing
// you wouldn't do this in a real application // you wouldn't do this in a real application
_reader.stopReading(); _reader.stopReading();

View File

@ -31,14 +31,12 @@ public class SAMStreamSink {
private final String _destFile; private final String _destFile;
private final String _sinkDir; private final String _sinkDir;
private String _conOptions; private String _conOptions;
private Socket _samSocket;
private OutputStream _samOut;
private InputStream _samIn;
private SAMReader _reader; private SAMReader _reader;
private boolean _isV3;
//private boolean _dead; //private boolean _dead;
private final SAMEventHandler _eventHandler; private final SAMEventHandler _eventHandler;
/** Connection id (Integer) to peer (Flooder) */ /** Connection id (Integer) to peer (Flooder) */
private final Map<Integer, Sink> _remotePeers; private final Map<String, Sink> _remotePeers;
public static void main(String args[]) { public static void main(String args[]) {
if (args.length < 4) { if (args.length < 4) {
@ -61,21 +59,20 @@ public class SAMStreamSink {
_sinkDir = sinkDir; _sinkDir = sinkDir;
_conOptions = ""; _conOptions = "";
_eventHandler = new SinkEventHandler(_context); _eventHandler = new SinkEventHandler(_context);
_remotePeers = new HashMap<Integer,Sink>(); _remotePeers = new HashMap<String, Sink>();
} }
public void startup(String version) { public void startup(String version) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Starting up"); _log.debug("Starting up");
boolean ok = connect(); try {
if (_log.shouldLog(Log.DEBUG)) Socket sock = connect();
_log.debug("Connected: " + ok); _reader = new SAMReader(_context, sock.getInputStream(), _eventHandler);
if (ok) {
_reader = new SAMReader(_context, _samIn, _eventHandler);
_reader.startReading(); _reader.startReading();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader created"); _log.debug("Reader created");
String ourDest = handshake(version); OutputStream out = sock.getOutputStream();
String ourDest = handshake(out, version, true);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handshake complete. we are " + ourDest); _log.debug("Handshake complete. we are " + ourDest);
if (ourDest != null) { if (ourDest != null) {
@ -84,6 +81,8 @@ public class SAMStreamSink {
} else { } else {
_reader.stopReading(); _reader.stopReading();
} }
} catch (IOException e) {
_log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
} }
} }
@ -92,10 +91,10 @@ public class SAMStreamSink {
public SinkEventHandler(I2PAppContext ctx) { super(ctx); } public SinkEventHandler(I2PAppContext ctx) { super(ctx); }
@Override @Override
public void streamClosedReceived(String result, int id, String message) { public void streamClosedReceived(String result, String id, String message) {
Sink sink = null; Sink sink = null;
synchronized (_remotePeers) { synchronized (_remotePeers) {
sink = _remotePeers.remove(Integer.valueOf(id)); sink = _remotePeers.remove(id);
} }
if (sink != null) { if (sink != null) {
sink.closed(); sink.closed();
@ -107,10 +106,10 @@ public class SAMStreamSink {
} }
@Override @Override
public void streamDataReceived(int id, byte data[], int offset, int length) { public void streamDataReceived(String id, byte data[], int offset, int length) {
Sink sink = null; Sink sink = null;
synchronized (_remotePeers) { synchronized (_remotePeers) {
sink = _remotePeers.get(Integer.valueOf(id)); sink = _remotePeers.get(id);
} }
if (sink != null) { if (sink != null) {
sink.received(data, offset, length); sink.received(data, offset, length);
@ -120,14 +119,14 @@ public class SAMStreamSink {
} }
@Override @Override
public void streamConnectedReceived(String dest, int id) { public void streamConnectedReceived(String dest, String id) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Connection " + id + " received from " + dest); _log.debug("Connection " + id + " received from " + dest);
try { try {
Sink sink = new Sink(id, dest); Sink sink = new Sink(id, dest);
synchronized (_remotePeers) { synchronized (_remotePeers) {
_remotePeers.put(Integer.valueOf(id), sink); _remotePeers.put(id, sink);
} }
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("Error creating a new sink", ioe); _log.error("Error creating a new sink", ioe);
@ -135,23 +134,16 @@ public class SAMStreamSink {
} }
} }
private boolean connect() { private Socket connect() throws IOException {
try { return new Socket(_samHost, Integer.parseInt(_samPort));
_samSocket = new Socket(_samHost, Integer.parseInt(_samPort));
_samOut = _samSocket.getOutputStream();
_samIn = _samSocket.getInputStream();
return true;
} catch (Exception e) {
_log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
return false;
}
} }
private String handshake(String version) { /** @return our b64 dest or null */
synchronized (_samOut) { private String handshake(OutputStream samOut, String version, boolean isMaster) {
synchronized (samOut) {
try { try {
_samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
_samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Hello sent"); _log.debug("Hello sent");
String hisVersion = _eventHandler.waitForHelloReply(); String hisVersion = _eventHandler.waitForHelloReply();
@ -159,9 +151,9 @@ public class SAMStreamSink {
_log.debug("Hello reply found: " + hisVersion); _log.debug("Hello reply found: " + hisVersion);
if (hisVersion == null) if (hisVersion == null)
throw new IOException("Hello failed"); throw new IOException("Hello failed");
boolean isV3 = VersionComparator.comp(hisVersion, "3") >= 0; _isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
String dest; String dest;
if (isV3) { if (_isV3) {
// we use the filename as the name in sam.keys // we use the filename as the name in sam.keys
// and read it in ourselves // and read it in ourselves
File keys = new File("sam.keys"); File keys = new File("sam.keys");
@ -183,17 +175,19 @@ public class SAMStreamSink {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Requesting new transient destination"); _log.debug("Requesting new transient destination");
} }
byte[] id = new byte[5]; if (isMaster) {
_context.random().nextBytes(id); byte[] id = new byte[5];
_conOptions = "ID=" + Base32.encode(id); _context.random().nextBytes(id);
_conOptions = "ID=" + Base32.encode(id);
}
} else { } else {
// we use the filename as the name in sam.keys // we use the filename as the name in sam.keys
// and give it to the SAM server // and give it to the SAM server
dest = _destFile; dest = _destFile;
} }
String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + dest + " " + _conOptions + "\n"; String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + dest + " " + _conOptions + "\n";
_samOut.write(req.getBytes()); samOut.write(req.getBytes());
_samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create sent"); _log.debug("Session create sent");
boolean ok = _eventHandler.waitForSessionCreateReply(); boolean ok = _eventHandler.waitForSessionCreateReply();
@ -203,8 +197,8 @@ public class SAMStreamSink {
_log.debug("Session create reply found: " + ok); _log.debug("Session create reply found: " + ok);
req = "NAMING LOOKUP NAME=ME\n"; req = "NAMING LOOKUP NAME=ME\n";
_samOut.write(req.getBytes()); samOut.write(req.getBytes());
_samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Naming lookup sent"); _log.debug("Naming lookup sent");
String destination = _eventHandler.waitForNamingReply("ME"); String destination = _eventHandler.waitForNamingReply("ME");
@ -227,11 +221,13 @@ public class SAMStreamSink {
private boolean writeDest(String dest) { private boolean writeDest(String dest) {
File f = new File(_destFile); File f = new File(_destFile);
/*
if (f.exists()) { if (f.exists()) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Destination file exists, not overwriting:" + _destFile); _log.debug("Destination file exists, not overwriting: " + _destFile);
return false; return false;
} }
*/
FileOutputStream fos = null; FileOutputStream fos = null;
try { try {
fos = new FileOutputStream(f); fos = new FileOutputStream(f);
@ -248,14 +244,14 @@ public class SAMStreamSink {
} }
private class Sink { private class Sink {
private final int _connectionId; private final String _connectionId;
private final String _remoteDestination; private final String _remoteDestination;
private volatile boolean _closed; private volatile boolean _closed;
private final long _started; private final long _started;
private long _lastReceivedOn; private long _lastReceivedOn;
private final OutputStream _out; private final OutputStream _out;
public Sink(int conId, String remDest) throws IOException { public Sink(String conId, String remDest) throws IOException {
_connectionId = conId; _connectionId = conId;
_remoteDestination = remDest; _remoteDestination = remDest;
_closed = false; _closed = false;
@ -273,7 +269,7 @@ public class SAMStreamSink {
_started = _context.clock().now(); _started = _context.clock().now();
} }
public int getConnectionId() { return _connectionId; } public String getConnectionId() { return _connectionId; }
public String getDestination() { return _remoteDestination; } public String getDestination() { return _remoteDestination; }
public void closed() { public void closed() {