2005-10-19 22:02:37 +00:00
|
|
|
/* PeerConnectionOut - Keeps a queue of outgoing messages and delivers them.
|
|
|
|
Copyright (C) 2003 Mark J. Wielaard
|
|
|
|
|
|
|
|
This file is part of Snark.
|
|
|
|
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
|
|
it under the terms of the GNU General Public License as published by
|
|
|
|
the Free Software Foundation; either version 2, or (at your option)
|
|
|
|
any later version.
|
|
|
|
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
GNU General Public License for more details.
|
|
|
|
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
|
|
along with this program; if not, write to the Free Software Foundation,
|
|
|
|
Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package org.klomp.snark;
|
|
|
|
|
|
|
|
import java.io.*;
|
|
|
|
import java.net.*;
|
|
|
|
import java.util.*;
|
|
|
|
|
2005-12-16 23:18:56 +00:00
|
|
|
import net.i2p.util.I2PThread;
|
2005-12-16 03:00:48 +00:00
|
|
|
import net.i2p.util.Log;
|
2005-12-17 03:47:02 +00:00
|
|
|
import net.i2p.util.SimpleTimer;
|
2005-12-16 03:00:48 +00:00
|
|
|
|
2005-10-19 22:02:37 +00:00
|
|
|
class PeerConnectionOut implements Runnable
|
|
|
|
{
|
2005-12-16 03:00:48 +00:00
|
|
|
private Log _log = new Log(PeerConnectionOut.class);
|
2005-10-19 22:02:37 +00:00
|
|
|
private final Peer peer;
|
|
|
|
private final DataOutputStream dout;
|
|
|
|
|
|
|
|
private Thread thread;
|
|
|
|
private boolean quit;
|
|
|
|
|
|
|
|
// Contains Messages.
|
|
|
|
private List sendQueue = new ArrayList();
|
2005-12-16 03:00:48 +00:00
|
|
|
|
|
|
|
private static long __id = 0;
|
|
|
|
private long _id;
|
2005-12-19 13:34:52 +00:00
|
|
|
|
|
|
|
long lastSent;
|
2005-10-19 22:02:37 +00:00
|
|
|
|
|
|
|
public PeerConnectionOut(Peer peer, DataOutputStream dout)
|
|
|
|
{
|
|
|
|
this.peer = peer;
|
|
|
|
this.dout = dout;
|
2005-12-16 03:00:48 +00:00
|
|
|
_id = ++__id;
|
2005-10-19 22:02:37 +00:00
|
|
|
|
2005-12-19 13:34:52 +00:00
|
|
|
lastSent = System.currentTimeMillis();
|
2005-10-19 22:02:37 +00:00
|
|
|
quit = false;
|
2005-12-20 02:01:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
public void startup() {
|
2005-12-21 12:04:54 +00:00
|
|
|
thread = new I2PThread(this, "Snark sender " + _id + ": " + peer);
|
2005-10-19 22:02:37 +00:00
|
|
|
thread.start();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Continuesly monitors for more outgoing messages that have to be send.
|
|
|
|
* Stops if quit is true of an IOException occurs.
|
|
|
|
*/
|
|
|
|
public void run()
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2005-12-18 05:39:52 +00:00
|
|
|
while (!quit && peer.isConnected())
|
2005-10-19 22:02:37 +00:00
|
|
|
{
|
|
|
|
Message m = null;
|
|
|
|
PeerState state = null;
|
|
|
|
synchronized(sendQueue)
|
|
|
|
{
|
2005-12-18 05:39:52 +00:00
|
|
|
while (!quit && peer.isConnected() && sendQueue.isEmpty())
|
2005-10-19 22:02:37 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
// Make sure everything will reach the other side.
|
|
|
|
dout.flush();
|
|
|
|
|
|
|
|
// Wait till more data arrives.
|
2005-12-21 12:04:54 +00:00
|
|
|
sendQueue.wait(60*1000);
|
2005-10-19 22:02:37 +00:00
|
|
|
}
|
|
|
|
catch (InterruptedException ie)
|
|
|
|
{
|
|
|
|
/* ignored */
|
|
|
|
}
|
|
|
|
}
|
|
|
|
state = peer.state;
|
2005-12-18 05:39:52 +00:00
|
|
|
if (!quit && state != null && peer.isConnected())
|
2005-10-19 22:02:37 +00:00
|
|
|
{
|
|
|
|
// Piece messages are big. So if there are other
|
|
|
|
// (control) messages make sure they are send first.
|
|
|
|
// Also remove request messages from the queue if
|
|
|
|
// we are currently being choked to prevent them from
|
|
|
|
// being send even if we get unchoked a little later.
|
|
|
|
// (Since we will resent them anyway in that case.)
|
|
|
|
// And remove piece messages if we are choking.
|
2005-12-17 03:47:02 +00:00
|
|
|
|
|
|
|
// this should get fixed for starvation
|
2005-10-19 22:02:37 +00:00
|
|
|
Iterator it = sendQueue.iterator();
|
|
|
|
while (m == null && it.hasNext())
|
|
|
|
{
|
|
|
|
Message nm = (Message)it.next();
|
|
|
|
if (nm.type == Message.PIECE)
|
|
|
|
{
|
2005-12-19 13:34:52 +00:00
|
|
|
if (state.choking) {
|
2005-10-19 22:02:37 +00:00
|
|
|
it.remove();
|
2005-12-19 13:34:52 +00:00
|
|
|
SimpleTimer.getInstance().removeEvent(nm.expireEvent);
|
|
|
|
}
|
2005-10-19 22:02:37 +00:00
|
|
|
nm = null;
|
|
|
|
}
|
|
|
|
else if (nm.type == Message.REQUEST && state.choked)
|
|
|
|
{
|
|
|
|
it.remove();
|
2005-12-19 13:34:52 +00:00
|
|
|
SimpleTimer.getInstance().removeEvent(nm.expireEvent);
|
2005-10-19 22:02:37 +00:00
|
|
|
nm = null;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (m == null && nm != null)
|
|
|
|
{
|
|
|
|
m = nm;
|
2005-12-19 13:34:52 +00:00
|
|
|
SimpleTimer.getInstance().removeEvent(nm.expireEvent);
|
2005-10-19 22:02:37 +00:00
|
|
|
it.remove();
|
|
|
|
}
|
|
|
|
}
|
2005-12-19 13:34:52 +00:00
|
|
|
if (m == null && sendQueue.size() > 0) {
|
2005-10-19 22:02:37 +00:00
|
|
|
m = (Message)sendQueue.remove(0);
|
2005-12-19 13:34:52 +00:00
|
|
|
SimpleTimer.getInstance().removeEvent(m.expireEvent);
|
|
|
|
}
|
2005-10-19 22:02:37 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if (m != null)
|
|
|
|
{
|
2005-12-16 03:00:48 +00:00
|
|
|
if (_log.shouldLog(Log.DEBUG))
|
|
|
|
_log.debug("Send " + peer + ": " + m + " on " + peer.metainfo.getName());
|
2005-10-19 22:02:37 +00:00
|
|
|
m.sendMessage(dout);
|
2005-12-19 13:34:52 +00:00
|
|
|
lastSent = System.currentTimeMillis();
|
2005-10-19 22:02:37 +00:00
|
|
|
|
|
|
|
// Remove all piece messages after sending a choke message.
|
|
|
|
if (m.type == Message.CHOKE)
|
|
|
|
removeMessage(Message.PIECE);
|
|
|
|
|
|
|
|
// XXX - Should also register overhead...
|
|
|
|
if (m.type == Message.PIECE)
|
|
|
|
state.uploaded(m.len);
|
|
|
|
|
|
|
|
m = null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (IOException ioe)
|
|
|
|
{
|
|
|
|
// Ignore, probably other side closed connection.
|
2005-12-19 13:34:52 +00:00
|
|
|
if (_log.shouldLog(Log.INFO))
|
|
|
|
_log.info("IOError sending to " + peer, ioe);
|
2005-10-19 22:02:37 +00:00
|
|
|
}
|
|
|
|
catch (Throwable t)
|
|
|
|
{
|
2005-12-19 13:34:52 +00:00
|
|
|
_log.error("Error sending to " + peer, t);
|
2005-12-18 05:39:52 +00:00
|
|
|
if (t instanceof OutOfMemoryError)
|
|
|
|
throw (OutOfMemoryError)t;
|
2005-10-19 22:02:37 +00:00
|
|
|
}
|
|
|
|
finally
|
|
|
|
{
|
|
|
|
quit = true;
|
|
|
|
peer.disconnect();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public void disconnect()
|
|
|
|
{
|
|
|
|
synchronized(sendQueue)
|
|
|
|
{
|
2005-12-19 13:34:52 +00:00
|
|
|
//if (quit == true)
|
|
|
|
// return;
|
2005-10-19 22:02:37 +00:00
|
|
|
|
|
|
|
quit = true;
|
2005-12-20 02:29:09 +00:00
|
|
|
if (thread != null)
|
|
|
|
thread.interrupt();
|
2005-10-19 22:02:37 +00:00
|
|
|
|
|
|
|
sendQueue.clear();
|
|
|
|
sendQueue.notify();
|
|
|
|
}
|
2005-12-21 12:04:54 +00:00
|
|
|
if (dout != null) {
|
|
|
|
try {
|
|
|
|
dout.close();
|
|
|
|
} catch (IOException ioe) {
|
|
|
|
_log.warn("Error closing the stream to " + peer, ioe);
|
|
|
|
}
|
|
|
|
}
|
2005-10-19 22:02:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Adds a message to the sendQueue and notifies the method waiting
|
|
|
|
* on the sendQueue to change.
|
|
|
|
*/
|
|
|
|
private void addMessage(Message m)
|
|
|
|
{
|
2005-12-17 03:47:02 +00:00
|
|
|
SimpleTimer.getInstance().addEvent(new RemoveTooSlow(m), SEND_TIMEOUT);
|
2005-10-19 22:02:37 +00:00
|
|
|
synchronized(sendQueue)
|
|
|
|
{
|
|
|
|
sendQueue.add(m);
|
2005-12-17 03:47:02 +00:00
|
|
|
sendQueue.notifyAll();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2005-12-22 10:04:12 +00:00
|
|
|
/** remove messages not sent in 3m */
|
|
|
|
private static final int SEND_TIMEOUT = 3*60*1000;
|
2005-12-17 03:47:02 +00:00
|
|
|
private class RemoveTooSlow implements SimpleTimer.TimedEvent {
|
|
|
|
private Message _m;
|
|
|
|
public RemoveTooSlow(Message m) {
|
|
|
|
_m = m;
|
2005-12-19 13:34:52 +00:00
|
|
|
m.expireEvent = RemoveTooSlow.this;
|
2005-12-17 03:47:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
public void timeReached() {
|
|
|
|
boolean removed = false;
|
|
|
|
synchronized (sendQueue) {
|
|
|
|
removed = sendQueue.remove(_m);
|
|
|
|
sendQueue.notifyAll();
|
|
|
|
}
|
|
|
|
if (removed)
|
|
|
|
_log.info("Took too long to send " + _m + " to " + peer);
|
2005-10-19 22:02:37 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Removes a particular message type from the queue.
|
|
|
|
*
|
|
|
|
* @param type the Message type to remove.
|
|
|
|
* @returns true when a message of the given type was removed, false
|
|
|
|
* otherwise.
|
|
|
|
*/
|
|
|
|
private boolean removeMessage(int type)
|
|
|
|
{
|
|
|
|
boolean removed = false;
|
|
|
|
synchronized(sendQueue)
|
|
|
|
{
|
|
|
|
Iterator it = sendQueue.iterator();
|
|
|
|
while (it.hasNext())
|
|
|
|
{
|
|
|
|
Message m = (Message)it.next();
|
|
|
|
if (m.type == type)
|
|
|
|
{
|
|
|
|
it.remove();
|
|
|
|
removed = true;
|
|
|
|
}
|
|
|
|
}
|
2005-12-17 03:47:02 +00:00
|
|
|
sendQueue.notifyAll();
|
2005-10-19 22:02:37 +00:00
|
|
|
}
|
|
|
|
return removed;
|
|
|
|
}
|
|
|
|
|
|
|
|
void sendAlive()
|
|
|
|
{
|
|
|
|
Message m = new Message();
|
|
|
|
m.type = Message.KEEP_ALIVE;
|
2006-09-06 06:32:53 +00:00
|
|
|
// addMessage(m);
|
|
|
|
synchronized(sendQueue)
|
|
|
|
{
|
|
|
|
if(sendQueue.isEmpty())
|
|
|
|
sendQueue.add(m);
|
|
|
|
sendQueue.notifyAll();
|
|
|
|
}
|
2005-10-19 22:02:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void sendChoke(boolean choke)
|
|
|
|
{
|
|
|
|
// We cancel the (un)choke but keep PIECE messages.
|
|
|
|
// PIECE messages are purged if a choke is actually send.
|
|
|
|
synchronized(sendQueue)
|
|
|
|
{
|
|
|
|
int inverseType = choke ? Message.UNCHOKE
|
|
|
|
: Message.CHOKE;
|
|
|
|
if (!removeMessage(inverseType))
|
|
|
|
{
|
|
|
|
Message m = new Message();
|
|
|
|
if (choke)
|
|
|
|
m.type = Message.CHOKE;
|
|
|
|
else
|
|
|
|
m.type = Message.UNCHOKE;
|
|
|
|
addMessage(m);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void sendInterest(boolean interest)
|
|
|
|
{
|
|
|
|
synchronized(sendQueue)
|
|
|
|
{
|
|
|
|
int inverseType = interest ? Message.UNINTERESTED
|
|
|
|
: Message.INTERESTED;
|
|
|
|
if (!removeMessage(inverseType))
|
|
|
|
{
|
|
|
|
Message m = new Message();
|
|
|
|
if (interest)
|
|
|
|
m.type = Message.INTERESTED;
|
|
|
|
else
|
|
|
|
m.type = Message.UNINTERESTED;
|
|
|
|
addMessage(m);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void sendHave(int piece)
|
|
|
|
{
|
|
|
|
Message m = new Message();
|
|
|
|
m.type = Message.HAVE;
|
|
|
|
m.piece = piece;
|
|
|
|
addMessage(m);
|
|
|
|
}
|
|
|
|
|
|
|
|
void sendBitfield(BitField bitfield)
|
|
|
|
{
|
|
|
|
Message m = new Message();
|
|
|
|
m.type = Message.BITFIELD;
|
|
|
|
m.data = bitfield.getFieldBytes();
|
|
|
|
m.off = 0;
|
|
|
|
m.len = m.data.length;
|
|
|
|
addMessage(m);
|
|
|
|
}
|
|
|
|
|
|
|
|
void sendRequests(List requests)
|
|
|
|
{
|
|
|
|
Iterator it = requests.iterator();
|
|
|
|
while (it.hasNext())
|
|
|
|
{
|
|
|
|
Request req = (Request)it.next();
|
|
|
|
sendRequest(req);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void sendRequest(Request req)
|
|
|
|
{
|
|
|
|
Message m = new Message();
|
|
|
|
m.type = Message.REQUEST;
|
|
|
|
m.piece = req.piece;
|
|
|
|
m.begin = req.off;
|
|
|
|
m.length = req.len;
|
|
|
|
addMessage(m);
|
|
|
|
}
|
|
|
|
|
|
|
|
void sendPiece(int piece, int begin, int length, byte[] bytes)
|
|
|
|
{
|
|
|
|
Message m = new Message();
|
|
|
|
m.type = Message.PIECE;
|
|
|
|
m.piece = piece;
|
|
|
|
m.begin = begin;
|
|
|
|
m.length = length;
|
|
|
|
m.data = bytes;
|
|
|
|
m.off = begin;
|
|
|
|
m.len = length;
|
|
|
|
addMessage(m);
|
|
|
|
}
|
|
|
|
|
|
|
|
void sendCancel(Request req)
|
|
|
|
{
|
|
|
|
// See if it is still in our send queue
|
|
|
|
synchronized(sendQueue)
|
|
|
|
{
|
|
|
|
Iterator it = sendQueue.iterator();
|
|
|
|
while (it.hasNext())
|
|
|
|
{
|
|
|
|
Message m = (Message)it.next();
|
|
|
|
if (m.type == Message.REQUEST
|
|
|
|
&& m.piece == req.piece
|
|
|
|
&& m.begin == req.off
|
|
|
|
&& m.length == req.len)
|
|
|
|
it.remove();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Always send, just to be sure it it is really canceled.
|
|
|
|
Message m = new Message();
|
|
|
|
m.type = Message.CANCEL;
|
|
|
|
m.piece = req.piece;
|
|
|
|
m.begin = req.off;
|
|
|
|
m.length = req.len;
|
|
|
|
addMessage(m);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Called by the PeerState when the other side doesn't want this
|
|
|
|
// request to be handled anymore. Removes any pending Piece Message
|
|
|
|
// from out send queue.
|
|
|
|
void cancelRequest(int piece, int begin, int length)
|
|
|
|
{
|
|
|
|
synchronized (sendQueue)
|
|
|
|
{
|
|
|
|
Iterator it = sendQueue.iterator();
|
|
|
|
while (it.hasNext())
|
|
|
|
{
|
|
|
|
Message m = (Message)it.next();
|
|
|
|
if (m.type == Message.PIECE
|
|
|
|
&& m.piece == piece
|
|
|
|
&& m.begin == begin
|
|
|
|
&& m.length == length)
|
|
|
|
it.remove();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|