v1 datagram and raw support for sink

This commit is contained in:
zzz
2015-11-26 21:39:18 +00:00
parent bd048b04cc
commit 6373c8a9ed
3 changed files with 115 additions and 12 deletions

View File

@ -14,6 +14,8 @@ public class SAMClientEventListenerImpl implements SAMReader.SAMClientEventListe
public void streamConnectedReceived(String remoteDestination, String id) {} public void streamConnectedReceived(String remoteDestination, String id) {}
public void streamDataReceived(String id, byte[] data, int offset, int length) {} public void streamDataReceived(String id, byte[] data, int offset, int length) {}
public void streamStatusReceived(String result, String id, String message) {} public void streamStatusReceived(String result, String id, String message) {}
public void datagramReceived(String dest, byte[] data, int offset, int length, int fromPort, int toPort) {}
public void rawReceived(byte[] data, int offset, int length, int fromPort, int toPort, int protocol) {}
public void pingReceived(String data) {} public void pingReceived(String data) {}
public void pongReceived(String data) {} public void pongReceived(String data) {}
public void unknownMessageReceived(String major, String minor, Properties params) {} public void unknownMessageReceived(String major, String minor, Properties params) {}

View File

@ -80,6 +80,8 @@ public class SAMReader {
public void streamDataReceived(String id, byte data[], int offset, int length); public void streamDataReceived(String id, byte data[], int offset, int length);
public void namingReplyReceived(String name, String result, String value, String message); public void namingReplyReceived(String name, String result, String value, String message);
public void destReplyReceived(String publicKey, String privateKey); public void destReplyReceived(String publicKey, String privateKey);
public void datagramReceived(String dest, byte[] data, int offset, int length, int fromPort, int toPort);
public void rawReceived(byte[] data, int offset, int length, int fromPort, int toPort, int protocol);
public void pingReceived(String data); public void pingReceived(String data);
public void pongReceived(String data); public void pongReceived(String data);
@ -231,6 +233,73 @@ public class SAMReader {
} else { } else {
_listener.unknownMessageReceived(major, minor, params); _listener.unknownMessageReceived(major, minor, params);
} }
} else if ("DATAGRAM".equals(major)) {
if ("RECEIVED".equals(minor)) {
String dest = params.getProperty("DESTINATION");
String size = params.getProperty("SIZE");
String fp = params.getProperty("FROM_PORT");
String tp = params.getProperty("TO_PORT");
int fromPort = 0;
int toPort = 0;
if (dest != null) {
try {
if (fp != null)
fromPort = Integer.parseInt(fp);
if (tp != null)
toPort = Integer.parseInt(tp);
int sizeVal = Integer.parseInt(size);
byte data[] = new byte[sizeVal];
int read = DataHelper.read(_inRaw, data);
if (read != sizeVal) {
_listener.unknownMessageReceived(major, minor, params);
} else {
_listener.datagramReceived(dest, data, 0, sizeVal, fromPort, toPort);
}
} catch (NumberFormatException nfe) {
_listener.unknownMessageReceived(major, minor, params);
} catch (IOException ioe) {
_live = false;
_listener.unknownMessageReceived(major, minor, params);
}
} else {
_listener.unknownMessageReceived(major, minor, params);
}
} else {
_listener.unknownMessageReceived(major, minor, params);
}
} else if ("RAW".equals(major)) {
if ("RECEIVED".equals(minor)) {
String size = params.getProperty("SIZE");
String fp = params.getProperty("FROM_PORT");
String tp = params.getProperty("TO_PORT");
String pr = params.getProperty("PROTOCOL");
int fromPort = 0;
int toPort = 0;
int protocol = 18;
try {
if (fp != null)
fromPort = Integer.parseInt(fp);
if (tp != null)
toPort = Integer.parseInt(tp);
if (pr != null)
protocol = Integer.parseInt(pr);
int sizeVal = Integer.parseInt(size);
byte data[] = new byte[sizeVal];
int read = DataHelper.read(_inRaw, data);
if (read != sizeVal) {
_listener.unknownMessageReceived(major, minor, params);
} else {
_listener.rawReceived(data, 0, sizeVal, fromPort, toPort, protocol);
}
} catch (NumberFormatException nfe) {
_listener.unknownMessageReceived(major, minor, params);
} catch (IOException ioe) {
_live = false;
_listener.unknownMessageReceived(major, minor, params);
}
} else {
_listener.unknownMessageReceived(major, minor, params);
}
} else if ("NAMING".equals(major)) { } else if ("NAMING".equals(major)) {
if ("REPLY".equals(minor)) { if ("REPLY".equals(minor)) {
String name = params.getProperty("NAME"); String name = params.getProperty("NAME");

View File

@ -127,7 +127,7 @@ public class SAMStreamSink {
throw new IOException("handshake failed"); throw new IOException("handshake failed");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handshake complete. we are " + ourDest); _log.debug("Handshake complete. we are " + ourDest);
if (_isV3) { if (_isV3 && mode != V1DG && mode != V1RAW) {
Socket sock2 = connect(isSSL); Socket sock2 = connect(isSSL);
out = sock2.getOutputStream(); out = sock2.getOutputStream();
eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out); eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out);
@ -158,7 +158,7 @@ public class SAMStreamSink {
@Override @Override
public void streamClosedReceived(String result, String id, String message) { public void streamClosedReceived(String result, String id, String message) {
Sink sink = null; Sink sink;
synchronized (_remotePeers) { synchronized (_remotePeers) {
sink = _remotePeers.remove(id); sink = _remotePeers.remove(id);
} }
@ -173,7 +173,7 @@ public class SAMStreamSink {
@Override @Override
public void streamDataReceived(String id, byte data[], int offset, int length) { public void streamDataReceived(String id, byte data[], int offset, int length) {
Sink sink = null; Sink sink;
synchronized (_remotePeers) { synchronized (_remotePeers) {
sink = _remotePeers.get(id); sink = _remotePeers.get(id);
} }
@ -212,6 +212,34 @@ public class SAMStreamSink {
} }
} }
} }
@Override
public void datagramReceived(String dest, byte[] data, int offset, int length, int fromPort, int toPort) {
// just get the first
Sink sink;
synchronized (_remotePeers) {
if (_remotePeers.isEmpty()) {
_log.error("not connected but we received datagram " + length + "?");
return;
}
sink = _remotePeers.values().iterator().next();
}
sink.received(data, offset, length);
}
@Override
public void rawReceived(byte[] data, int offset, int length, int fromPort, int toPort, int protocol) {
// just get the first
Sink sink;
synchronized (_remotePeers) {
if (_remotePeers.isEmpty()) {
_log.error("not connected but we received raw " + length + "?");
return;
}
sink = _remotePeers.values().iterator().next();
}
sink.received(data, offset, length);
}
} }
private class SinkEventHandler2 extends SinkEventHandler { private class SinkEventHandler2 extends SinkEventHandler {
@ -369,12 +397,13 @@ public class SAMStreamSink {
samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create sent"); _log.debug("Session create sent");
boolean ok = eventHandler.waitForSessionCreateReply(); if (mode == STREAM) {
if (!ok) boolean ok = eventHandler.waitForSessionCreateReply();
throw new IOException("Session create failed"); if (!ok)
if (_log.shouldLog(Log.DEBUG)) throw new IOException("Session create failed");
_log.debug("Session create reply found: " + ok); if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create reply found: " + ok);
}
req = "NAMING LOOKUP NAME=ME\n"; req = "NAMING LOOKUP NAME=ME\n";
samOut.write(req.getBytes()); samOut.write(req.getBytes());
samOut.flush(); samOut.flush();
@ -386,9 +415,12 @@ public class SAMStreamSink {
if (destination == null) { if (destination == null) {
_log.error("No naming lookup reply found!"); _log.error("No naming lookup reply found!");
return null; return null;
} else { }
if (_log.shouldInfo()) if (_log.shouldInfo())
_log.info(_destFile + " is located at " + destination); _log.info(_destFile + " is located at " + destination);
if (mode != STREAM) {
// fake it so the sink starts
eventHandler.streamConnectedReceived(destination, "FAKE");
} }
return destination; return destination;
} catch (IOException e) { } catch (IOException e) {