SAM: Handle UTF-8 in ReadLine (ticket #1488)

Allow forever timeout in ReadLine
Use ReadLine in v1 and v3 handlers
Fix send client closing too fast in v1 stream mode
UTF-8 test and fixes in clients
This commit is contained in:
zzz
2015-11-28 21:25:44 +00:00
parent dffd441304
commit 513e1b9ff8
7 changed files with 221 additions and 72 deletions

View File

@ -3,13 +3,14 @@ package net.i2p.sam;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.Reader;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
/** /**
* Modified from I2PTunnelHTTPServer * Modified from I2PTunnelHTTPServer
* Handles UTF-8. Does not buffer past the end of line.
* *
* @since 0.9.24 * @since 0.9.24
*/ */
@ -24,28 +25,30 @@ class ReadLine {
* Warning - 8KB line length limit as of 0.7.13, @throws IOException if exceeded * Warning - 8KB line length limit as of 0.7.13, @throws IOException if exceeded
* *
* @param buf output * @param buf output
* @param timeout throws SocketTimeoutException immediately if zero or negative * @param timeout forever if if zero or negative
* @throws SocketTimeoutException if timeout is reached before newline * @throws SocketTimeoutException if timeout is reached before newline
* @throws EOFException if EOF is reached before newline * @throws EOFException if EOF is reached before newline
* @throws LineTooLongException if too long * @throws LineTooLongException if too long
* @throws IOException on other errors in the underlying stream * @throws IOException on other errors in the underlying stream
*/ */
public static void readLine(Socket socket, StringBuilder buf, int timeout) throws IOException { public static void readLine(Socket socket, StringBuilder buf, int timeout) throws IOException {
if (timeout <= 0) final int origTimeout = timeout;
throw new SocketTimeoutException();
long expires = System.currentTimeMillis() + timeout;
// this reads and buffers extra bytes, so we can't use it
// unless we're going to decode UTF-8 on-the-fly, we're stuck with ASCII
//InputStreamReader in = new InputStreamReader(socket.getInputStream(), "UTF-8");
InputStream in = socket.getInputStream();
int c; int c;
int i = 0; int i = 0;
final long expires;
if (origTimeout > 0) {
socket.setSoTimeout(timeout); socket.setSoTimeout(timeout);
while ( (c = in.read()) != -1) { expires = System.currentTimeMillis() + timeout;
} else {
expires = 0;
}
final Reader reader = new UTF8Reader(socket.getInputStream());
while ( (c = reader.read()) != -1) {
if (++i > MAX_LINE_LENGTH) if (++i > MAX_LINE_LENGTH)
throw new LineTooLongException("Line too long - max " + MAX_LINE_LENGTH); throw new LineTooLongException("Line too long - max " + MAX_LINE_LENGTH);
if (c == '\n') if (c == '\n')
break; break;
if (origTimeout > 0) {
int newTimeout = (int) (expires - System.currentTimeMillis()); int newTimeout = (int) (expires - System.currentTimeMillis());
if (newTimeout <= 0) if (newTimeout <= 0)
throw new SocketTimeoutException(); throw new SocketTimeoutException();
@ -54,9 +57,12 @@ class ReadLine {
timeout = newTimeout; timeout = newTimeout;
socket.setSoTimeout(timeout); socket.setSoTimeout(timeout);
} }
} else {
buf.append((char)c);
}
} }
if (c == -1) { if (c == -1) {
if (System.currentTimeMillis() >= expires) if (origTimeout > 0 && System.currentTimeMillis() >= expires)
throw new SocketTimeoutException(); throw new SocketTimeoutException();
else else
throw new EOFException(); throw new EOFException();

View File

@ -57,6 +57,8 @@ class SAMHandlerFactory {
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw new SAMException("Unexpected error", e); throw new SAMException("Unexpected error", e);
} }
if (log.shouldDebug())
log.debug("New message received: [" + line + ']');
// Message format: HELLO VERSION [MIN=v1] [MAX=v2] // Message format: HELLO VERSION [MIN=v1] [MAX=v2]
Properties props = SAMUtils.parseParams(line); Properties props = SAMUtils.parseParams(line);

View File

@ -98,6 +98,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
String opcode = null; String opcode = null;
boolean canContinue = false; boolean canContinue = false;
Properties props; Properties props;
final StringBuilder buf = new StringBuilder(128);
this.thread.setName("SAMv1Handler " + _id); this.thread.setName("SAMv1Handler " + _id);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -120,19 +121,13 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
_log.info("Connection closed by client"); _log.info("Connection closed by client");
break; break;
} }
java.io.InputStream is = clientSocketChannel.socket().getInputStream(); buf.setLength(0);
if (is == null) { // TODO set timeout first time
_log.info("Connection closed by client"); ReadLine.readLine(clientSocketChannel.socket(), buf, 0);
break; msg = buf.toString();
}
msg = DataHelper.readLine(is);
if (msg == null) {
_log.info("Connection closed by client (line read : null)");
break;
}
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
_log.debug("New message received: [" + msg + "]"); _log.debug("New message received: [" + msg + ']');
} }
props = SAMUtils.parseParams(msg); props = SAMUtils.parseParams(msg);
domain = props.getProperty(SAMUtils.COMMAND); domain = props.getProperty(SAMUtils.COMMAND);

View File

@ -329,21 +329,13 @@ class SAMv3Handler extends SAMv1Handler
} }
} else { } else {
buf.setLength(0); buf.setLength(0);
if (DataHelper.readLine(in, buf)) // TODO first time, set a timeout
ReadLine.readLine(socket, buf, 0);
line = buf.toString(); line = buf.toString();
else
line = null;
}
if (line==null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Connection closed by client (line read : null)");
break;
} }
if (_log.shouldLog(Log.DEBUG)) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("New message received: [" + msg + "]"); _log.debug("New message received: [" + line + ']');
}
props = SAMUtils.parseParams(line); props = SAMUtils.parseParams(line);
domain = props.getProperty(SAMUtils.COMMAND); domain = props.getProperty(SAMUtils.COMMAND);
if (domain == null) { if (domain == null) {

View File

@ -0,0 +1,152 @@
package net.i2p.sam;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
/**
* An unbuffered version of InputStreamReader.
*
* Does not read any extra characters, as long as input is well-formed.
* This permits the partial reading of an InputStream as UTF-8
* and then passing the remainder of the input stream elsewhere.
* This isn't the most robust for malformed input, so it
* may not be appropriate for e.g. HTTP headers.
*
* Not thread-safe, obviously.
*
* May be moved to net.i2p.util if anybody else needs it.
*
* @since 0.9.24 somewhat adapted from net.i2p.util.TranslateReader
*/
public class UTF8Reader extends Reader {
private final InputStream _in;
// following three are lazily initialized when needed
private ByteBuffer _bb;
private CharBuffer _cb;
private CharsetDecoder _dc;
// Charset.forName("UTF-8").newDecoder().replacement().charAt(0) & 0xffff
private static final int REPLACEMENT = 0xfffd;
/**
* @param in UTF-8
*/
public UTF8Reader(InputStream in) {
super();
_in = in;
}
/**
* @return replacement character on decoding error
*/
@Override
public int read() throws IOException {
int b = _in.read();
if (b < 0)
return b;
// https://en.wikipedia.org/wiki/Utf-8
if ((b & 0x80) == 0)
return b;
if (_bb == null) {
_bb = ByteBuffer.allocate(6);
_cb = CharBuffer.allocate(1);
_dc = Charset.forName("UTF-8").newDecoder();
} else {
_bb.clear();
_cb.clear();
}
_bb.put((byte) b);
int end; // how many more
if ((b & 0xe0) == 0xc0)
end = 1;
else if ((b & 0xf0) == 0xe0)
end = 2;
else if ((b & 0xf8) == 0xf0)
end = 3;
else if ((b & 0xfc) == 0xf8)
end = 4;
else if ((b & 0xfe) == 0xfc)
end = 5;
else // error, 10xxxxxx
return REPLACEMENT;
for (int i = 0; i < end; i++) {
b = _in.read();
if (b < 0)
return REPLACEMENT; // next read will return EOF
// we aren't going to check for all errors,
// but let's fail fast on this one
if ((b & 0x80) == 0)
return REPLACEMENT;
_bb.put((byte) b);
}
_dc.reset();
_bb.flip();
CoderResult result = _dc.decode(_bb, _cb, true);
// Overflow and underflow are not errors.
// It seems to return underflow every time.
// So just check if we got a character back in the buffer.
_cb.flip();
if (result.isError() || !_cb.hasRemaining())
return REPLACEMENT;
// let underflow and overflow go, return first
return _cb.get() & 0xffff;
}
@Override
public int read(char cbuf[]) throws IOException {
return read(cbuf, 0, cbuf.length);
}
public int read(char cbuf[], int off, int len) throws IOException {
for (int i = 0; i < len; i++) {
int c = read();
if (c < 0) {
if (i == 0)
return -1;
return i;
}
cbuf[off + i] = (char) c;
}
return len;
}
public void close() throws IOException {
_in.close();
}
/****
public static void main(String[] args) {
try {
String s = "Consider the encoding of the Euro sign, €." +
" The Unicode code point for \"€\" is U+20AC.";
byte[] test = s.getBytes("UTF-8");
InputStream bais = new java.io.ByteArrayInputStream(test);
UTF8Reader r = new UTF8Reader(bais);
int b;
StringBuilder buf = new StringBuilder(128);
while ((b = r.read()) >= 0) {
buf.append((char) b);
}
System.out.println("Received: " + buf);
System.out.println("Test passed? " + buf.toString().equals(s));
buf.setLength(0);
bais = new java.io.ByteArrayInputStream(new byte[] { 'x', (byte) 0xcc, 'x' } );
r = new UTF8Reader(bais);
while ((b = r.read()) >= 0) {
buf.append((char) b);
}
System.out.println("Received: " + buf);
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
****/
}

View File

@ -237,9 +237,10 @@ public class SAMStreamSend {
synchronized (samOut) { synchronized (samOut) {
try { try {
if (user != null && password != null) if (user != null && password != null)
samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + " USER=" + user + " PASSWORD=" + password + '\n').getBytes()); samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + " USER=\"" + user.replace("\"", "\\\"") +
"\" PASSWORD=\"" + password.replace("\"", "\\\"") + "\"\n").getBytes("UTF-8"));
else else
samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes("UTF-8"));
samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Hello sent"); _log.debug("Hello sent");
@ -256,6 +257,8 @@ public class SAMStreamSend {
byte[] id = new byte[5]; byte[] id = new byte[5];
_context.random().nextBytes(id); _context.random().nextBytes(id);
_v3ID = Base32.encode(id); _v3ID = Base32.encode(id);
if (_isV32)
_v3ID = "xx€€xx" + _v3ID;
_conOptions = "ID=" + _v3ID; _conOptions = "ID=" + _v3ID;
} }
String style; String style;
@ -266,7 +269,7 @@ public class SAMStreamSend {
else else
style = "RAW"; style = "RAW";
String req = "SESSION CREATE STYLE=" + style + " DESTINATION=TRANSIENT " + _conOptions + ' ' + opts + '\n'; String req = "SESSION CREATE STYLE=" + style + " DESTINATION=TRANSIENT " + _conOptions + ' ' + opts + '\n';
samOut.write(req.getBytes()); samOut.write(req.getBytes("UTF-8"));
samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create sent"); _log.debug("Session create sent");
@ -277,7 +280,7 @@ public class SAMStreamSend {
_log.debug("Session create reply found: " + ok); _log.debug("Session create reply found: " + ok);
req = "NAMING LOOKUP NAME=ME\n"; req = "NAMING LOOKUP NAME=ME\n";
samOut.write(req.getBytes()); samOut.write(req.getBytes("UTF-8"));
samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Naming lookup sent"); _log.debug("Naming lookup sent");
@ -363,7 +366,7 @@ public class SAMStreamSend {
if (_isV3) if (_isV3)
buf.append(" FROM_PORT=1234 TO_PORT=5678"); buf.append(" FROM_PORT=1234 TO_PORT=5678");
buf.append('\n'); buf.append('\n');
byte[] msg = DataHelper.getASCII(buf.toString()); byte[] msg = DataHelper.getUTF8(buf.toString());
synchronized (_samOut) { synchronized (_samOut) {
_samOut.write(msg); _samOut.write(msg);
_samOut.flush(); _samOut.flush();
@ -431,7 +434,7 @@ public class SAMStreamSend {
} else { } else {
throw new IOException("unsupported mode " + _mode); throw new IOException("unsupported mode " + _mode);
} }
byte msg[] = DataHelper.getASCII(m); byte msg[] = DataHelper.getUTF8(m);
_samOut.write(msg); _samOut.write(msg);
} }
_samOut.write(data, 0, read); _samOut.write(data, 0, read);
@ -440,16 +443,16 @@ public class SAMStreamSend {
} else { } else {
// real datagrams // real datagrams
ByteArrayOutputStream baos = new ByteArrayOutputStream(read + 1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(read + 1024);
baos.write(DataHelper.getASCII("3.0 ")); baos.write(DataHelper.getUTF8("3.0 "));
baos.write(DataHelper.getASCII(_v3ID)); baos.write(DataHelper.getUTF8(_v3ID));
baos.write((byte) ' '); baos.write((byte) ' ');
baos.write(DataHelper.getASCII(_remoteDestination)); baos.write(DataHelper.getUTF8(_remoteDestination));
if (_isV32) { if (_isV32) {
// only set TO_PORT to test session setting of FROM_PORT // only set TO_PORT to test session setting of FROM_PORT
if (_mode == RAW) if (_mode == RAW)
baos.write(DataHelper.getASCII(" PROTOCOL=123 TO_PORT=5678")); baos.write(DataHelper.getUTF8(" PROTOCOL=123 TO_PORT=5678"));
else else
baos.write(DataHelper.getASCII(" TO_PORT=5678")); baos.write(DataHelper.getUTF8(" TO_PORT=5678"));
} }
baos.write((byte) '\n'); baos.write((byte) '\n');
baos.write(data, 0, read); baos.write(data, 0, read);
@ -476,12 +479,13 @@ public class SAMStreamSend {
_log.info("Error closing", ioe); _log.info("Error closing", ioe);
} }
} else { } else {
byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes();
try { try {
byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes("UTF-8");
synchronized (_samOut) { synchronized (_samOut) {
_samOut.write(msg); _samOut.write(msg);
_samOut.flush(); _samOut.flush();
_samOut.close(); // we can't close this yet, we will lose data
//_samOut.close();
} }
} catch (IOException ioe) { } catch (IOException ioe) {
_log.info("Error closing", ioe); _log.info("Error closing", ioe);
@ -492,20 +496,18 @@ public class SAMStreamSend {
} }
closed(); closed();
// stop the reader, since we're only doing this once for testing
// you wouldn't do this in a real application
// closing the master socket too fast will kill the data socket flushing through
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {}
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Runner exiting"); _log.debug("Runner exiting");
if (toSend != _totalSent) if (toSend != _totalSent)
_log.error("Only sent " + _totalSent + " of " + toSend + " bytes"); _log.error("Only sent " + _totalSent + " of " + toSend + " bytes");
if (_reader2 != null) if (_reader2 != null)
_reader2.stopReading(); _reader2.stopReading();
// stop the reader, since we're only doing this once for testing
// you wouldn't do this in a real application
if (_isV3) {
// closing the master socket too fast will kill the data socket flushing through
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {}
}
_reader.stopReading(); _reader.stopReading();
} }
} }

View File

@ -307,7 +307,7 @@ public class SAMStreamSink {
try { try {
Thread.sleep(127*1000); Thread.sleep(127*1000);
synchronized(_out) { synchronized(_out) {
_out.write(DataHelper.getASCII("PING " + System.currentTimeMillis() + '\n')); _out.write(DataHelper.getUTF8("PING " + System.currentTimeMillis() + '\n'));
_out.flush(); _out.flush();
} }
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
@ -377,7 +377,7 @@ public class SAMStreamSink {
_log.info("Got PING " + data + ", sending PONG " + data); _log.info("Got PING " + data + ", sending PONG " + data);
synchronized (_out) { synchronized (_out) {
try { try {
_out.write(("PONG " + data + '\n').getBytes()); _out.write(("PONG " + data + '\n').getBytes("UTF-8"));
_out.flush(); _out.flush();
} catch (IOException ioe) { } catch (IOException ioe) {
_log.error("PONG fail", ioe); _log.error("PONG fail", ioe);
@ -514,9 +514,9 @@ public class SAMStreamSink {
synchronized (samOut) { synchronized (samOut) {
try { try {
if (user != null && password != null) if (user != null && password != null)
samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + " USER=" + user + " PASSWORD=" + password + '\n').getBytes()); samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + " USER=" + user + " PASSWORD=" + password + '\n').getBytes("UTF-8"));
else else
samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes("UTF-8"));
samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Hello sent"); _log.debug("Hello sent");
@ -536,7 +536,7 @@ public class SAMStreamSink {
req = "STREAM FORWARD ID=" + _v3ID + " PORT=" + V3FORWARDPORT + '\n'; req = "STREAM FORWARD ID=" + _v3ID + " PORT=" + V3FORWARDPORT + '\n';
else else
throw new IllegalStateException("mode " + mode); throw new IllegalStateException("mode " + mode);
samOut.write(req.getBytes()); samOut.write(req.getBytes("UTF-8"));
samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM ACCEPT/FORWARD sent"); _log.debug("STREAM ACCEPT/FORWARD sent");
@ -600,7 +600,7 @@ public class SAMStreamSink {
else else
style = "RAW HEADER=true PORT=" + V3DGPORT; style = "RAW HEADER=true PORT=" + V3DGPORT;
String req = "SESSION CREATE STYLE=" + style + " DESTINATION=" + dest + ' ' + _conOptions + ' ' + sopts + '\n'; String req = "SESSION CREATE STYLE=" + style + " DESTINATION=" + dest + ' ' + _conOptions + ' ' + sopts + '\n';
samOut.write(req.getBytes()); samOut.write(req.getBytes("UTF-8"));
samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create sent"); _log.debug("Session create sent");
@ -612,7 +612,7 @@ public class SAMStreamSink {
_log.debug("Session create reply found: " + ok); _log.debug("Session create reply found: " + ok);
} }
req = "NAMING LOOKUP NAME=ME\n"; req = "NAMING LOOKUP NAME=ME\n";
samOut.write(req.getBytes()); samOut.write(req.getBytes("UTF-8"));
samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Naming lookup sent"); _log.debug("Naming lookup sent");
@ -649,7 +649,7 @@ public class SAMStreamSink {
FileOutputStream fos = null; FileOutputStream fos = null;
try { try {
fos = new FileOutputStream(f); fos = new FileOutputStream(f);
fos.write(dest.getBytes()); fos.write(dest.getBytes("UTF-8"));
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("My destination written to " + _destFile); _log.debug("My destination written to " + _destFile);
} catch (IOException e) { } catch (IOException e) {