Fix ReadLine bug that buffered and lost input;

can't handle UTF-8 for now.
Start support of datagrams and raw in the client
This commit is contained in:
zzz
2015-11-26 20:55:10 +00:00
parent b9ab933550
commit bd048b04cc
3 changed files with 95 additions and 59 deletions

View File

@ -34,7 +34,10 @@ class ReadLine {
if (timeout <= 0) if (timeout <= 0)
throw new SocketTimeoutException(); throw new SocketTimeoutException();
long expires = System.currentTimeMillis() + timeout; long expires = System.currentTimeMillis() + timeout;
InputStreamReader in = new InputStreamReader(socket.getInputStream(), "UTF-8"); // this reads and buffers extra bytes, so we can't use it
// unless we're going to decode UTF-8 on-the-fly, we're stuck with ASCII
//InputStreamReader in = new InputStreamReader(socket.getInputStream(), "UTF-8");
InputStream in = socket.getInputStream();
int c; int c;
int i = 0; int i = 0;
socket.setSoTimeout(timeout); socket.setSoTimeout(timeout);

View File

@ -39,12 +39,15 @@ public class SAMStreamSend {
/** Connection id (Integer) to peer (Flooder) */ /** Connection id (Integer) to peer (Flooder) */
private final Map<String, Sender> _remotePeers; private final Map<String, Sender> _remotePeers;
private static final String USAGE = "Usage: SAMStreamSend [-s] [-d] [-r] [-v version] [-b samHost] [-p samPort] peerDestFile dataDir"; private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4;
private static final String USAGE = "Usage: SAMStreamSend [-s] [-m mode] [-v version] [-b samHost] [-p samPort] peerDestFile dataDir\n" +
" modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4\n" +
" -s: use SSL";
public static void main(String args[]) { public static void main(String args[]) {
Getopt g = new Getopt("SAM", args, "drsb:p:v:"); Getopt g = new Getopt("SAM", args, "sb:m:p:v:");
boolean isSSL = false; boolean isSSL = false;
int mode = 0; // stream int mode = STREAM;
String version = "1.0"; String version = "1.0";
String host = "127.0.0.1"; String host = "127.0.0.1";
String port = "7656"; String port = "7656";
@ -55,12 +58,12 @@ public class SAMStreamSend {
isSSL = true; isSSL = true;
break; break;
case 'd': case 'm':
mode = 1; // datagram mode = Integer.parseInt(g.getOptarg());
break; if (mode < 0 || mode > V1RAW) {
System.err.println(USAGE);
case 'r': return;
mode = 2; // raw }
break; break;
case 'v': case 'v':
@ -92,7 +95,7 @@ public class SAMStreamSend {
I2PAppContext ctx = I2PAppContext.getGlobalContext(); I2PAppContext ctx = I2PAppContext.getGlobalContext();
SAMStreamSend sender = new SAMStreamSend(ctx, host, port, SAMStreamSend sender = new SAMStreamSend(ctx, host, port,
args[startArgs], args[startArgs + 1]); args[startArgs], args[startArgs + 1]);
sender.startup(version); sender.startup(version, isSSL, mode);
} }
public SAMStreamSend(I2PAppContext ctx, String samHost, String samPort, String destFile, String dataFile) { public SAMStreamSend(I2PAppContext ctx, String samHost, String samPort, String destFile, String dataFile) {
@ -107,38 +110,38 @@ public class SAMStreamSend {
_remotePeers = new HashMap<String, Sender>(); _remotePeers = new HashMap<String, Sender>();
} }
public void startup(String version) { public void startup(String version, boolean isSSL, int mode) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Starting up"); _log.debug("Starting up");
try { try {
Socket sock = connect(); Socket sock = connect(isSSL);
SAMEventHandler eventHandler = new SendEventHandler(_context); SAMEventHandler eventHandler = new SendEventHandler(_context);
_reader = new SAMReader(_context, sock.getInputStream(), eventHandler); _reader = new SAMReader(_context, sock.getInputStream(), eventHandler);
_reader.startReading(); _reader.startReading();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader created"); _log.debug("Reader created");
OutputStream out = sock.getOutputStream(); OutputStream out = sock.getOutputStream();
String ourDest = handshake(out, version, true, eventHandler); String ourDest = handshake(out, version, true, eventHandler, mode);
if (ourDest == null) if (ourDest == null)
throw new IOException("handshake failed"); throw new IOException("handshake failed");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handshake complete. we are " + ourDest); _log.debug("Handshake complete. we are " + ourDest);
if (_isV3) { if (_isV3 && mode != V1DG && mode != V1RAW) {
Socket sock2 = connect(); Socket sock2 = connect(isSSL);
eventHandler = new SendEventHandler(_context); eventHandler = new SendEventHandler(_context);
_reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler); _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
_reader2.startReading(); _reader2.startReading();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader2 created"); _log.debug("Reader2 created");
out = sock2.getOutputStream(); out = sock2.getOutputStream();
String ok = handshake(out, version, false, eventHandler); String ok = handshake(out, version, false, eventHandler, mode);
if (ok == null) if (ok == null)
throw new IOException("2nd handshake failed"); throw new IOException("2nd handshake failed");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handshake2 complete."); _log.debug("Handshake2 complete.");
} }
if (ourDest != null) { if (ourDest != null) {
send(out, eventHandler); send(out, eventHandler, mode);
} }
} catch (IOException e) { } catch (IOException e) {
_log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e); _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
@ -168,12 +171,12 @@ public class SAMStreamSend {
} }
} }
private Socket connect() throws IOException { private Socket connect(boolean isSSL) throws IOException {
return new Socket(_samHost, Integer.parseInt(_samPort)); return new Socket(_samHost, Integer.parseInt(_samPort));
} }
/** @return our b64 dest or null */ /** @return our b64 dest or null */
private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler) { private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler, int mode) {
synchronized (samOut) { synchronized (samOut) {
try { try {
samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
@ -194,7 +197,14 @@ public class SAMStreamSend {
_v3ID = Base32.encode(id); _v3ID = Base32.encode(id);
_conOptions = "ID=" + _v3ID; _conOptions = "ID=" + _v3ID;
} }
String req = "SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT " + _conOptions + "\n"; String style;
if (mode == STREAM)
style = "STREAM";
else if (mode == DG || mode == V1DG)
style = "DATAGRAM";
else
style = "RAW";
String req = "SESSION CREATE STYLE=" + style + " DESTINATION=TRANSIENT " + _conOptions + "\n";
samOut.write(req.getBytes()); samOut.write(req.getBytes());
samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -227,8 +237,8 @@ public class SAMStreamSend {
} }
} }
private void send(OutputStream samOut, SAMEventHandler eventHandler) throws IOException { private void send(OutputStream samOut, SAMEventHandler eventHandler, int mode) throws IOException {
Sender sender = new Sender(samOut, eventHandler); Sender sender = new Sender(samOut, eventHandler, mode);
boolean ok = sender.openConnection(); boolean ok = sender.openConnection();
if (ok) { if (ok) {
I2PAppThread t = new I2PAppThread(sender, "Sender"); I2PAppThread t = new I2PAppThread(sender, "Sender");
@ -247,10 +257,12 @@ public class SAMStreamSend {
private long _totalSent; private long _totalSent;
private final OutputStream _samOut; private final OutputStream _samOut;
private final SAMEventHandler _eventHandler; private final SAMEventHandler _eventHandler;
private final int _mode;
public Sender(OutputStream samOut, SAMEventHandler eventHandler) { public Sender(OutputStream samOut, SAMEventHandler eventHandler, int mode) {
_samOut = samOut; _samOut = samOut;
_eventHandler = eventHandler; _eventHandler = eventHandler;
_mode = mode;
synchronized (_remotePeers) { synchronized (_remotePeers) {
if (_v3ID != null) if (_v3ID != null)
_connectionId = _v3ID; _connectionId = _v3ID;
@ -273,21 +285,23 @@ public class SAMStreamSend {
_context.statManager().createRateStat("send." + _connectionId + ".started", "When we start", "swarm", new long[] { 5*60*1000 }); _context.statManager().createRateStat("send." + _connectionId + ".started", "When we start", "swarm", new long[] { 5*60*1000 });
_context.statManager().createRateStat("send." + _connectionId + ".lifetime", "How long we talk to a peer", "swarm", new long[] { 5*60*1000 }); _context.statManager().createRateStat("send." + _connectionId + ".lifetime", "How long we talk to a peer", "swarm", new long[] { 5*60*1000 });
StringBuilder buf = new StringBuilder(1024); if (_mode == STREAM) {
buf.append("STREAM CONNECT ID=").append(_connectionId).append(" DESTINATION=").append(_remoteDestination); StringBuilder buf = new StringBuilder(1024);
// not supported until 3.2 but 3.0-3.1 will ignore buf.append("STREAM CONNECT ID=").append(_connectionId).append(" DESTINATION=").append(_remoteDestination);
if (_isV3) // not supported until 3.2 but 3.0-3.1 will ignore
buf.append(" FROM_PORT=1234 TO_PORT=5678"); if (_isV3)
buf.append('\n'); buf.append(" FROM_PORT=1234 TO_PORT=5678");
byte[] msg = DataHelper.getASCII(buf.toString()); buf.append('\n');
synchronized (_samOut) { byte[] msg = DataHelper.getASCII(buf.toString());
_samOut.write(msg); synchronized (_samOut) {
_samOut.flush(); _samOut.write(msg);
_samOut.flush();
}
_log.debug("STREAM CONNECT sent, waiting for STREAM STATUS...");
boolean ok = _eventHandler.waitForStreamStatusReply();
if (!ok)
throw new IOException("STREAM CONNECT failed");
} }
_log.debug("STREAM CONNECT sent, waiting for STREAM STATUS...");
boolean ok = _eventHandler.waitForStreamStatusReply();
if (!ok)
throw new IOException("STREAM CONNECT failed");
_in = new FileInputStream(_dataFile); _in = new FileInputStream(_dataFile);
return true; return true;
@ -318,7 +332,7 @@ public class SAMStreamSend {
_started = _context.clock().now(); _started = _context.clock().now();
_context.statManager().addRateData("send." + _connectionId + ".started", 1, 0); _context.statManager().addRateData("send." + _connectionId + ".started", 1, 0);
final long toSend = (new File(_dataFile)).length(); final long toSend = (new File(_dataFile)).length();
byte data[] = new byte[1024]; byte data[] = new byte[8192];
long lastSend = _context.clock().now(); long lastSend = _context.clock().now();
while (!_closed) { while (!_closed) {
try { try {
@ -334,8 +348,17 @@ public class SAMStreamSend {
lastSend = now; lastSend = now;
synchronized (_samOut) { synchronized (_samOut) {
if (!_isV3) { if (!_isV3 || _mode == V1DG || _mode == V1RAW) {
byte msg[] = ("STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n").getBytes(); String m;
if (_mode == STREAM)
m = "STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n";
else if (_mode == V1DG)
m = "DATAGRAM SEND DESTINATION=" + _remoteDestination + " SIZE=" + read + "\n";
else if (_mode == V1RAW)
m = "RAW SEND DESTINATION=" + _remoteDestination + " SIZE=" + read + "\n";
else
throw new IOException("unsupported mode " + _mode);
byte msg[] = DataHelper.getASCII(m);
_samOut.write(msg); _samOut.write(msg);
} }
_samOut.write(data, 0, read); _samOut.write(data, 0, read);

View File

@ -40,12 +40,15 @@ public class SAMStreamSink {
/** Connection id (Integer) to peer (Flooder) */ /** Connection id (Integer) to peer (Flooder) */
private final Map<String, Sink> _remotePeers; private final Map<String, Sink> _remotePeers;
private static final String USAGE = "Usage: SAMStreamSink [-s] [-d] [-r] [-v version] [-b samHost] [-p samPort] myDestFile sinkDir"; private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4;
private static final String USAGE = "Usage: SAMStreamSink [-s] [-m mode] [-v version] [-b samHost] [-p samPort] myDestFile sinkDir\n" +
" modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4\n" +
" -s: use SSL";
public static void main(String args[]) { public static void main(String args[]) {
Getopt g = new Getopt("SAM", args, "drsb:p:v:"); Getopt g = new Getopt("SAM", args, "sb:m:p:v:");
boolean isSSL = false; boolean isSSL = false;
int mode = 0; // stream int mode = STREAM;
String version = "1.0"; String version = "1.0";
String host = "127.0.0.1"; String host = "127.0.0.1";
String port = "7656"; String port = "7656";
@ -56,12 +59,12 @@ public class SAMStreamSink {
isSSL = true; isSSL = true;
break; break;
case 'd': case 'm':
mode = 1; // datagram mode = Integer.parseInt(g.getOptarg());
break; if (mode < 0 || mode > V1RAW) {
System.err.println(USAGE);
case 'r': return;
mode = 2; // raw }
break; break;
case 'v': case 'v':
@ -93,7 +96,7 @@ public class SAMStreamSink {
I2PAppContext ctx = I2PAppContext.getGlobalContext(); I2PAppContext ctx = I2PAppContext.getGlobalContext();
SAMStreamSink sink = new SAMStreamSink(ctx, host, port, SAMStreamSink sink = new SAMStreamSink(ctx, host, port,
args[startArgs], args[startArgs + 1]); args[startArgs], args[startArgs + 1]);
sink.startup(version); sink.startup(version, isSSL, mode);
} }
public SAMStreamSink(I2PAppContext ctx, String samHost, String samPort, String destFile, String sinkDir) { public SAMStreamSink(I2PAppContext ctx, String samHost, String samPort, String destFile, String sinkDir) {
@ -108,31 +111,31 @@ public class SAMStreamSink {
_remotePeers = new HashMap<String, Sink>(); _remotePeers = new HashMap<String, Sink>();
} }
public void startup(String version) { public void startup(String version, boolean isSSL, int mode) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Starting up"); _log.debug("Starting up");
try { try {
Socket sock = connect(); Socket sock = connect(isSSL);
OutputStream out = sock.getOutputStream(); OutputStream out = sock.getOutputStream();
SAMEventHandler eventHandler = new SinkEventHandler(_context, out); SAMEventHandler eventHandler = new SinkEventHandler(_context, out);
_reader = new SAMReader(_context, sock.getInputStream(), eventHandler); _reader = new SAMReader(_context, sock.getInputStream(), eventHandler);
_reader.startReading(); _reader.startReading();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader created"); _log.debug("Reader created");
String ourDest = handshake(out, version, true, eventHandler); String ourDest = handshake(out, version, true, eventHandler, mode);
if (ourDest == null) if (ourDest == null)
throw new IOException("handshake failed"); throw new IOException("handshake failed");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Handshake complete. we are " + ourDest); _log.debug("Handshake complete. we are " + ourDest);
if (_isV3) { if (_isV3) {
Socket sock2 = connect(); Socket sock2 = connect(isSSL);
out = sock2.getOutputStream(); out = sock2.getOutputStream();
eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out); eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out);
_reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler); _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
_reader2.startReading(); _reader2.startReading();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader2 created"); _log.debug("Reader2 created");
String ok = handshake(out, version, false, eventHandler); String ok = handshake(out, version, false, eventHandler, mode);
if (ok == null) if (ok == null)
throw new IOException("2nd handshake failed"); throw new IOException("2nd handshake failed");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -285,12 +288,12 @@ public class SAMStreamSink {
} }
} }
private Socket connect() throws IOException { private Socket connect(boolean isSSL) throws IOException {
return new Socket(_samHost, Integer.parseInt(_samPort)); return new Socket(_samHost, Integer.parseInt(_samPort));
} }
/** @return our b64 dest or null */ /** @return our b64 dest or null */
private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler) { private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler, int mode) {
synchronized (samOut) { synchronized (samOut) {
try { try {
samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
@ -354,7 +357,14 @@ public class SAMStreamSink {
// and give it to the SAM server // and give it to the SAM server
dest = _destFile; dest = _destFile;
} }
String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + dest + " " + _conOptions + "\n"; String style;
if (mode == STREAM)
style = "STREAM";
else if (mode == DG || mode == V1DG)
style = "DATAGRAM";
else
style = "RAW";
String req = "SESSION CREATE STYLE=" + style + " DESTINATION=" + dest + " " + _conOptions + "\n";
samOut.write(req.getBytes()); samOut.write(req.getBytes());
samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))