2005-12-30 jrandom

* Close streams more gracefully
This commit is contained in:
jrandom
2005-12-30 23:33:52 +00:00
committed by zzz
parent 8e87ae08fb
commit 0f8611e465
12 changed files with 52 additions and 25 deletions

View File

@ -117,7 +117,8 @@ public class BitField
public String toString() public String toString()
{ {
// Not very efficient // Not very efficient
StringBuffer sb = new StringBuffer("BitField["); StringBuffer sb = new StringBuffer("BitField(");
sb.append(size).append(")[");
for (int i = 0; i < size; i++) for (int i = 0; i < size; i++)
if (get(i)) if (get(i))
{ {

View File

@ -116,7 +116,7 @@ class PeerConnectionIn implements Runnable
din.readFully(bitmap); din.readFully(bitmap);
ps.bitfieldMessage(bitmap); ps.bitfieldMessage(bitmap);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received bitmap from " + peer + " on " + peer.metainfo.getName()); _log.debug("Received bitmap from " + peer + " on " + peer.metainfo.getName() + ": size=" + (i-1) + ": " + ps.bitfield);
break; break;
case 6: case 6:
piece = din.readInt(); piece = din.readInt();

View File

@ -365,8 +365,11 @@ public class PeerCoordinator implements PeerListener
*/ */
public int wantPiece(Peer peer, BitField havePieces) public int wantPiece(Peer peer, BitField havePieces)
{ {
if (halted) if (halted) {
if (_log.shouldLog(Log.WARN))
_log.warn("We don't want anything from the peer, as we are halted! peer=" + peer);
return -1; return -1;
}
synchronized(wantedPieces) synchronized(wantedPieces)
{ {
@ -398,7 +401,12 @@ public class PeerCoordinator implements PeerListener
piece = p; piece = p;
} }
} }
if (piece == null) return -1; //If we still can't find a piece we want, so be it. if (piece == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("nothing to even rerequest from " + peer + ": requested = " + requested
+ " wanted = " + wantedPieces + " peerHas = " + havePieces);
return -1; //If we still can't find a piece we want, so be it.
}
} }
piece.setRequested(true); piece.setRequested(true);
return piece.getId(); return piece.getId();

View File

@ -35,4 +35,8 @@ public class Piece implements Comparable {
public boolean removePeer(Peer peer) { return this.peers.remove(peer.getPeerID()); } public boolean removePeer(Peer peer) { return this.peers.remove(peer.getPeerID()); }
public boolean isRequested() { return this.requested; } public boolean isRequested() { return this.requested; }
public void setRequested(boolean requested) { this.requested = requested; } public void setRequested(boolean requested) { this.requested = requested; }
public String toString() {
return String.valueOf(id);
}
} }

View File

@ -26,6 +26,7 @@ import java.util.*;
import org.klomp.snark.bencode.*; import org.klomp.snark.bencode.*;
import net.i2p.data.Destination;
import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
@ -287,8 +288,10 @@ public class Snark
I2PServerSocket serversocket = I2PSnarkUtil.instance().getServerSocket(); I2PServerSocket serversocket = I2PSnarkUtil.instance().getServerSocket();
if (serversocket == null) if (serversocket == null)
fatal("Unable to listen for I2P connections"); fatal("Unable to listen for I2P connections");
else else {
debug("Listening on I2P destination " + serversocket.getManager().getSession().getMyDestination().toBase64(), NOTICE); Destination d = serversocket.getManager().getSession().getMyDestination();
debug("Listening on I2P destination " + d.toBase64() + " / " + d.calculateHash().toBase64(), NOTICE);
}
// Figure out what the torrent argument represents. // Figure out what the torrent argument represents.
meta = null; meta = null;

View File

@ -107,7 +107,7 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
useGZIP = true; useGZIP = true;
if (allowGZIP && useGZIP) { if (allowGZIP && useGZIP) {
I2PThread req = new I2PThread(new CompressedRequestor(s, socket, modifiedHeader), "http compressor"); I2PThread req = new I2PThread(new CompressedRequestor(s, socket, modifiedHeader), Thread.currentThread().getName()+".hc");
req.start(); req.start();
} else { } else {
new I2PTunnelRunner(s, socket, slock, null, modifiedHeader.getBytes(), null); new I2PTunnelRunner(s, socket, slock, null, modifiedHeader.getBytes(), null);
@ -154,7 +154,7 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
_log.info("request headers: " + _headers); _log.info("request headers: " + _headers);
serverout.write(_headers.getBytes()); serverout.write(_headers.getBytes());
browserin = _browser.getInputStream(); browserin = _browser.getInputStream();
I2PThread sender = new I2PThread(new Sender(serverout, browserin, "server: browser to server"), "http compressed sender"); I2PThread sender = new I2PThread(new Sender(serverout, browserin, "server: browser to server"), Thread.currentThread().getName() + "hcs");
sender.start(); sender.start();
browserout = _browser.getOutputStream(); browserout = _browser.getOutputStream();

View File

@ -211,14 +211,20 @@ public class Connection {
if (evt != null) { if (evt != null) {
boolean sent = evt.retransmit(false); boolean sent = evt.retransmit(false);
if (sent) { if (sent) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Retransmitting " + packet + " as an ack");
return; return;
} else { } else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not retransmitting " + packet + " as an ack");
//SimpleTimer.getInstance().addEvent(evt, evt.getNextSendTime()); //SimpleTimer.getInstance().addEvent(evt, evt.getNextSendTime());
} }
} }
} }
// if we don't have anything to retransmit, send a small ack // if we don't have anything to retransmit, send a small ack
packet = _receiver.send(null, 0, 0); packet = _receiver.send(null, 0, 0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("sending new ack: " + packet);
//packet.releasePayload(); //packet.releasePayload();
} }
@ -286,6 +292,8 @@ public class Connection {
if (packet.isFlagSet(Packet.FLAG_CLOSE) || (remaining < 2)) { if (packet.isFlagSet(Packet.FLAG_CLOSE) || (remaining < 2)) {
packet.setOptionalDelay(0); packet.setOptionalDelay(0);
packet.setFlag(Packet.FLAG_DELAY_REQUESTED); packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Requesting no ack delay for packet " + packet);
} else { } else {
int delay = _options.getRTO() / 2; int delay = _options.getRTO() / 2;
packet.setOptionalDelay(delay); packet.setOptionalDelay(delay);
@ -1031,10 +1039,9 @@ public class Connection {
+ newWindowSize + " lifetime " + newWindowSize + " lifetime "
+ (_context.clock().now() - _packet.getCreatedOn()) + "ms)"); + (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
_outboundQueue.enqueue(_packet); _outboundQueue.enqueue(_packet);
_lastSendTime = _context.clock().now();
} }
_lastSendTime = _context.clock().now();
// acked during resending (... or somethin') // acked during resending (... or somethin')
if ( (_packet.getAckTime() > 0) && (_packet.getNumSends() > 1) ) { if ( (_packet.getAckTime() > 0) && (_packet.getNumSends() > 1) ) {
_activeResends--; _activeResends--;

View File

@ -184,8 +184,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Closed is set for a new packet on " + con + ": " + packet); _log.debug("Closed is set for a new packet on " + con + ": " + packet);
} else { } else {
//if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
// _log.debug("Closed is not set for a new packet on " + _connection + ": " + packet); _log.debug("Closed is not set for a new packet on " + _connection + ": " + packet);
} }
return packet; return packet;
} }
@ -202,6 +202,6 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
} }
void destroy() { void destroy() {
_connection = null; //_connection = null;
} }
} }

View File

@ -183,10 +183,14 @@ public class ConnectionPacketHandler {
// we've already scheduled an ack above, so there is no need to schedule // we've already scheduled an ack above, so there is no need to schedule
// a fast ack (we can wait a few ms) // a fast ack (we can wait a few ms)
} else { } else {
if (con.getLastSendTime() + 2000 < _context.clock().now()) { long timeSinceSend = _context.clock().now() - con.getLastSendTime();
if (timeSinceSend >= 2000) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Fast ack for dup " + packet); _log.debug("Fast ack for dup " + packet);
con.ackImmediately(); con.ackImmediately();
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Not fast acking dup " + packet + " since we last sent " + timeSinceSend + "ms ago");
} }
} }
} }
@ -440,7 +444,8 @@ public class ConnectionPacketHandler {
} }
public void timeReached() { public void timeReached() {
if (_con.getLastSendTime() <= _created) { if (_con.getLastSendTime() <= _created) {
if (!_con.getIsConnected()) return; if (_con.getResetReceived() || _con.getResetSent() || (_con.getUnackedPacketsReceived() <= 0) )
return;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Last sent was a while ago, and we want to ack a dup"); _log.debug("Last sent was a while ago, and we want to ack a dup");

View File

@ -610,7 +610,7 @@ public class EepGet {
_log.debug("Request flushed"); _log.debug("Request flushed");
} }
private String getRequest() { private String getRequest() throws IOException {
StringBuffer buf = new StringBuffer(512); StringBuffer buf = new StringBuffer(512);
boolean post = false; boolean post = false;
if ( (_postData != null) && (_postData.length() > 0) ) if ( (_postData != null) && (_postData.length() > 0) )
@ -620,12 +620,8 @@ public class EepGet {
} else { } else {
buf.append("GET ").append(_url).append(" HTTP/1.1\r\n"); buf.append("GET ").append(_url).append(" HTTP/1.1\r\n");
} }
try { URL url = new URL(_url);
URL url = new URL(_url); buf.append("Host: ").append(url.getHost()).append("\r\n");
buf.append("Host: ").append(url.getHost()).append("\r\n");
} catch (MalformedURLException mue) {
mue.printStackTrace();
}
if (_alreadyTransferred > 0) { if (_alreadyTransferred > 0) {
buf.append("Range: bytes="); buf.append("Range: bytes=");
buf.append(_alreadyTransferred); buf.append(_alreadyTransferred);

View File

@ -1,4 +1,7 @@
$Id: history.txt,v 1.373 2005/12/30 13:16:46 jrandom Exp $ $Id: history.txt,v 1.374 2005/12/30 15:57:53 jrandom Exp $
2005-12-30 jrandom
* Close streams more gracefully
2005-12-30 jrandom 2005-12-30 jrandom
* Small streaming lib bugfixes for the modified timeouts * Small streaming lib bugfixes for the modified timeouts

View File

@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
* *
*/ */
public class RouterVersion { public class RouterVersion {
public final static String ID = "$Revision: 1.320 $ $Date: 2005/12/29 08:07:24 $"; public final static String ID = "$Revision: 1.321 $ $Date: 2005/12/30 15:57:53 $";
public final static String VERSION = "0.6.1.8"; public final static String VERSION = "0.6.1.8";
public final static long BUILD = 4; public final static long BUILD = 5;
public static void main(String args[]) { public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID); System.out.println("Router ID: " + RouterVersion.ID);