* EepGet:

- Add gunzip support (nonproxied only)
    - Clean up progress formatting
  * SSLEepGet:
    - Add gunzip support
    - Increase buffer size
This commit is contained in:
zzz
2011-10-15 17:20:30 +00:00
parent 104594ed59
commit e8712a3a11
3 changed files with 158 additions and 32 deletions

View File

@ -4,6 +4,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.Socket;
@ -11,6 +13,7 @@ import java.net.URL;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Formatter;
import java.util.List;
import java.util.StringTokenizer;
@ -72,6 +75,9 @@ public class EepGet {
protected long _fetchInactivityTimeout;
protected int _redirects;
protected String _redirectLocation;
protected boolean _isGzippedResponse;
protected IOException _decompressException;
/** this will be replaced by the HTTP Proxy if we are using it */
protected static final String USER_AGENT = "Wget/1.11.4";
protected static final long CONNECT_TIMEOUT = 45*1000;
@ -82,30 +88,39 @@ public class EepGet {
public EepGet(I2PAppContext ctx, String proxyHost, int proxyPort, int numRetries, String outputFile, String url) {
this(ctx, true, proxyHost, proxyPort, numRetries, outputFile, url);
}
public EepGet(I2PAppContext ctx, String proxyHost, int proxyPort, int numRetries, String outputFile, String url, boolean allowCaching) {
this(ctx, true, proxyHost, proxyPort, numRetries, outputFile, url, allowCaching, null);
}
public EepGet(I2PAppContext ctx, int numRetries, String outputFile, String url) {
this(ctx, false, null, -1, numRetries, outputFile, url);
}
public EepGet(I2PAppContext ctx, int numRetries, String outputFile, String url, boolean allowCaching) {
this(ctx, false, null, -1, numRetries, outputFile, url, allowCaching, null);
}
public EepGet(I2PAppContext ctx, boolean shouldProxy, String proxyHost, int proxyPort, int numRetries, String outputFile, String url) {
this(ctx, shouldProxy, proxyHost, proxyPort, numRetries, outputFile, url, true, null);
}
public EepGet(I2PAppContext ctx, boolean shouldProxy, String proxyHost, int proxyPort, int numRetries, String outputFile, String url, String postData) {
this(ctx, shouldProxy, proxyHost, proxyPort, numRetries, -1, -1, outputFile, null, url, true, null, postData);
}
public EepGet(I2PAppContext ctx, boolean shouldProxy, String proxyHost, int proxyPort, int numRetries, String outputFile, String url, boolean allowCaching, String etag) {
this(ctx, shouldProxy, proxyHost, proxyPort, numRetries, -1, -1, outputFile, null, url, allowCaching, etag, null);
}
public EepGet(I2PAppContext ctx, boolean shouldProxy, String proxyHost, int proxyPort, int numRetries, String outputFile, String url, boolean allowCaching, String etag, String lastModified) {
this(ctx, shouldProxy, proxyHost, proxyPort, numRetries, -1, -1, outputFile, null, url, allowCaching, etag, lastModified, null);
}
public EepGet(I2PAppContext ctx, boolean shouldProxy, String proxyHost, int proxyPort, int numRetries, long minSize, long maxSize, String outputFile, OutputStream outputStream, String url, boolean allowCaching, String etag, String postData) {
this(ctx, shouldProxy, proxyHost, proxyPort, numRetries, minSize, maxSize, outputFile, outputStream, url, allowCaching, etag, null, postData);
}
public EepGet(I2PAppContext ctx, boolean shouldProxy, String proxyHost, int proxyPort, int numRetries, long minSize, long maxSize,
String outputFile, OutputStream outputStream, String url, boolean allowCaching,
String etag, String lastModified, String postData) {
@ -149,10 +164,9 @@ public class EepGet {
try {
for (int i = 0; i < args.length; i++) {
if (args[i].equals("-p")) {
proxyHost = args[i+1].substring(0, args[i+1].indexOf(':'));
String port = args[i+1].substring(args[i+1].indexOf(':')+1);
proxyHost = args[++i].substring(0, args[i].indexOf(':'));
String port = args[i].substring(args[i].indexOf(':')+1);
proxyPort = Integer.parseInt(port);
i++;
} else if (args[i].equals("-n")) {
numRetries = Integer.parseInt(args[i+1]);
i++;
@ -239,9 +253,10 @@ public class EepGet {
}
private static void usage() {
System.err.println("EepGet [-p 127.0.0.1:4444] [-n #retries] [-o outputFile] " +
"[-m markSize lineLen] [-t timeout] [-h headerKey headerValue] " +
"[-u username password] url");
System.err.println("EepGet [-p 127.0.0.1:4444] [-n #retries] [-o outputFile]\n" +
" [-m markSize lineLen] [-t timeout] [-h headerKey headerValue]\n" +
" [-u username password] url]\n" +
" (use -p :0 for no proxy)");
}
public static interface StatusListener {
@ -323,31 +338,26 @@ public class EepGet {
long timeToSend = now - _lastComplete;
if (timeToSend > 0) {
StringBuilder buf = new StringBuilder(50);
Formatter fmt = new Formatter(buf);
buf.append(" ");
if ( bytesRemaining > 0 ) {
double pct = ((double)_written + _previousWritten) /
double pct = 100 * ((double)_written + _previousWritten) /
((double)alreadyTransferred + (double)currentWrite + (double)bytesRemaining);
synchronized (_pct) {
buf.append(_pct.format(pct));
}
buf.append(": ");
fmt.format("%4.1f", Double.valueOf(pct));
buf.append("%: ");
}
buf.append(_written);
fmt.format("%8d", Long.valueOf(_written));
buf.append(" @ ");
double lineKBytes = ((double)_markSize * (double)_lineSize)/1024.0d;
double kbps = lineKBytes/((double)timeToSend/1000.0d);
synchronized (_kbps) {
buf.append(_kbps.format(kbps));
}
buf.append("KBps");
fmt.format("%7.2f", Double.valueOf(kbps));
buf.append(" KBps");
buf.append(" / ");
long lifetime = _context.clock().now() - _startedOn;
double lifetimeKBps = (1000.0d*(double)(_written)/((double)lifetime*1024.0d));
synchronized (_kbps) {
buf.append(_kbps.format(lifetimeKBps));
}
buf.append("KBps");
fmt.format("%7.2f", Double.valueOf(lifetimeKBps));
buf.append(" KBps");
System.out.println(buf.toString());
}
_lastComplete = now;
@ -589,8 +599,18 @@ public class EepGet {
if ((_maxSize > -1) && (_bytesRemaining > _maxSize))
throw new IOException("HTTP response size " + _bytesRemaining + " violates maximum of " + _maxSize + " bytes");
Thread pusher = null;
_decompressException = null;
if (_isGzippedResponse) {
PipedInputStream pi = BigPipedInputStream.getInstance();
PipedOutputStream po = new PipedOutputStream(pi);
pusher = new I2PAppThread(new Gunzipper(pi, _out), "EepGet Decompressor");
_out = po;
pusher.start();
}
int remaining = (int)_bytesRemaining;
byte buf[] = new byte[8*1024];
byte buf[] = new byte[16*1024];
while (_keepFetching && ( (remaining > 0) || !strictSize ) && !_aborted) {
int toRead = buf.length;
if (strictSize && toRead > remaining)
@ -648,6 +668,18 @@ public class EepGet {
_out.close();
_out = null;
if (_isGzippedResponse) {
try {
pusher.join();
} catch (InterruptedException ie) {}
pusher = null;
if (_decompressException != null) {
// we can't resume from here
_keepFetching = false;
throw _decompressException;
}
}
if (_aborted)
throw new IOException("Timed out reading the HTTP data");
@ -785,6 +817,7 @@ public class EepGet {
_transferFailed = true;
}
_isGzippedResponse = false;
// clear out the arguments, as we use the same variables for return values
_etag = null;
_lastModified = null;
@ -898,28 +931,35 @@ public class EepGet {
}
private void handle(String key, String val) {
key = key.trim();
val = val.trim();
for (int i = 0; i < _listeners.size(); i++)
_listeners.get(i).headerReceived(_url, _currentAttempt, key.trim(), val.trim());
_listeners.get(i).headerReceived(_url, _currentAttempt, key, val);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Header line: [" + key + "] = [" + val + "]");
if (key.equalsIgnoreCase("Content-length")) {
try {
_bytesRemaining = Long.parseLong(val.trim());
_bytesRemaining = Long.parseLong(val);
} catch (NumberFormatException nfe) {
nfe.printStackTrace();
}
} else if (key.equalsIgnoreCase("ETag")) {
_etag = val.trim();
_etag = val;
} else if (key.equalsIgnoreCase("Last-Modified")) {
_lastModified = val.trim();
_lastModified = val;
} else if (key.equalsIgnoreCase("Transfer-encoding")) {
if (val.indexOf("chunked") != -1)
_encodingChunked = true;
_encodingChunked = val.toLowerCase().contains("chunked");
} else if (key.equalsIgnoreCase("Content-encoding")) {
// This is kindof a hack, but if we are downloading a gzip file
// we don't want to transparently gunzip it and save it as a .gz file.
// A query string will also mess this up
if ((!_actualURL.endsWith(".gz")) && (!_actualURL.endsWith(".tgz")))
_isGzippedResponse = val.toLowerCase().contains("gzip");
} else if (key.equalsIgnoreCase("Content-Type")) {
_contentType=val.trim();
_contentType=val;
} else if (key.equalsIgnoreCase("Location")) {
_redirectLocation=val.trim();
_redirectLocation=val;
} else {
// ignore the rest
}
@ -1041,8 +1081,13 @@ public class EepGet {
if (post)
buf.append("Content-length: ").append(_postData.length()).append("\r\n");
// This will be replaced if we are going through I2PTunnelHTTPClient
buf.append("User-Agent: " + USER_AGENT + "\r\n" +
"Accept-Encoding: \r\n" +
buf.append("Accept-Encoding: ");
if ((!_shouldProxy) &&
// This is kindof a hack, but if we are downloading a gzip file
// we don't want to transparently gunzip it and save it as a .gz file.
(!path.endsWith(".gz")) && (!path.endsWith(".tgz")))
buf.append("gzip");
buf.append("\r\nUser-Agent: " + USER_AGENT + "\r\n" +
"Connection: close\r\n");
if (_extraHeaders != null) {
for (String hdr : _extraHeaders) {
@ -1127,4 +1172,48 @@ public class EepGet {
addHeader("Proxy-Authorization",
"Basic " + Base64.encode((userName + ':' + password).getBytes(), true)); // true = use standard alphabet
}
/**
* Decompressor thread.
* Copied / modified from i2ptunnel HTTPResponseOutputStream (GPL)
*
* @since 0.8.10
*/
protected class Gunzipper implements Runnable {
private final InputStream _inRaw;
private final OutputStream _out;
public Gunzipper(InputStream in, OutputStream out) {
_inRaw = in;
_out = out;
}
public void run() {
ReusableGZIPInputStream in = null;
long written = 0;
try {
in = ReusableGZIPInputStream.acquire();
// blocking
in.initialize(_inRaw);
byte buf[] = new byte[8*1024];
int read = -1;
while ( (read = in.read(buf)) != -1) {
_out.write(buf, 0, read);
}
} catch (IOException ioe) {
_decompressException = ioe;
if (_log.shouldLog(Log.WARN))
_log.warn("Error decompressing: " + written + ", " + (in != null ? in.getTotalRead() + "/" + in.getTotalExpanded() : ""), ioe);
} catch (OutOfMemoryError oom) {
_decompressException = new IOException("OOM in HTTP Decompressor");
_log.error("OOM in HTTP Decompressor", oom);
} finally {
if (_out != null) try {
_out.close();
} catch (IOException ioe) {}
if (in != null)
ReusableGZIPInputStream.release(in);
}
}
}
}

View File

@ -43,6 +43,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintWriter;
import java.net.MalformedURLException;
import java.net.URL;
@ -499,8 +501,18 @@ public class SSLEepGet extends EepGet {
boolean strictSize = (_bytesRemaining >= 0);
Thread pusher = null;
_decompressException = null;
if (_isGzippedResponse) {
PipedInputStream pi = BigPipedInputStream.getInstance();
PipedOutputStream po = new PipedOutputStream(pi);
pusher = new I2PAppThread(new Gunzipper(pi, _out), "EepGet Decompressor");
_out = po;
pusher.start();
}
int remaining = (int)_bytesRemaining;
byte buf[] = new byte[1024];
byte buf[] = new byte[16*1024];
while (_keepFetching && ( (remaining > 0) || !strictSize ) && !_aborted) {
int toRead = buf.length;
if (strictSize && toRead > remaining)
@ -557,6 +569,18 @@ public class SSLEepGet extends EepGet {
_out.close();
_out = null;
if (_isGzippedResponse) {
try {
pusher.join();
} catch (InterruptedException ie) {}
pusher = null;
if (_decompressException != null) {
// we can't resume from here
_keepFetching = false;
throw _decompressException;
}
}
if (_aborted)
throw new IOException("Timed out reading the HTTP data");