diff --git a/apps/heartbeat/doc/readme.gui.txt b/apps/heartbeat/doc/readme.gui.txt new file mode 100644 index 000000000..fde06a336 --- /dev/null +++ b/apps/heartbeat/doc/readme.gui.txt @@ -0,0 +1,39 @@ +The Heartbeat GUI loads up the stat files generated by the Heartbeat +engine and renders them visually, offering a way to drill through different +data points and take snapshots as things change (by saving particular stat +files for later). The GUI itself doesn't need to be on the same machine +as the Heartbeat engine - it pulls the stat files through any URL - even +through the EepProxy. + +An example Heartbeat GUI config file follows + + # how often do we want to pull new data to render + refreshFrequency=60 + ## for each peer test we may want to include in the GUI: + # where to find the current stat file (URL or filename) + stat.0.location=http://dev.i2p.net/stats/heartbeatStat_khWY_30s_1kb.txt + ## optional entries for each peer test describing what we want shown + ## (and how we want it shown) + # do we want to plot the send time (from when the ping was sent until the pong server got it)? + stat.0.plot.current.send=true + # do we want to plot the receive time (from when the pong was sent until reception)? + stat.0.plot.current.receive=true + # do we want to plot the lost messages? + stat.0.plot.current.lost=true + # what color should the current lines be rendered in? + stat.0.plot.current.color=BLUE + ## optional entries for each peer test describing what averages we want + ## rendered + # plot 1 minute send average? + stat.0.plot.1m.send=true + # plot 1 minute receive average? + stat.0.plot.1m.receive=true + # plot 1 minute lost message average? + stat.0.plot.1m.lost=true + # what color should the 1 minute averages be rendered as? + stat.0.plot.1m.color=GREEN + ## repeated for all of the averaged periods, e.g. + ## stat.0.plot.30m, .60m, 1440m (1 day) + +There may be some other options, such as where to store snapshot files, whether +to generate PNG images, etc. \ No newline at end of file diff --git a/apps/heartbeat/doc/readme.txt b/apps/heartbeat/doc/readme.txt new file mode 100644 index 000000000..d9b667a1a --- /dev/null +++ b/apps/heartbeat/doc/readme.txt @@ -0,0 +1,122 @@ +Heartbeat + +Application layer tool for monitoring the long term health of the +network by periodically testing peers, generating stats, and +rendering them visually. The engine (both server and client) should +work headless and seperate from the GUI, exposing the data in a simple +to parse (and human readable) text file for each peer being tested. +The GUI then periodically refreshes itself by loading those files ( +either locally or from a URL) and renders the current state accordingly, +giving users a way to check that the network is alive, devs a tool to +both monitor the state of the network and to debug different situations (by +accessing the stat file - either live or archived). + +The heartbeat configuration file is organized as a standard properties +file (by default located at heartbeat.config, but that can be overridden by +passing a filename as the first argument to the Heartbeat command): + + # where the router is located (default is localhost) + i2cpHost=localhost + # I2CP port for the router (default is 7654) + i2cpPort=4001 + # How many hops we want the router to put in our tunnels (default is 2) + numHops=2 + # where our private destination keys are located - if this doesn't exist, + # a new one will be created and saved there (by default, heartbeat.keys) + privateDestinationFile=heartbeat_r2.keys + + ## peer tests configured below: + + # destination peer for test 0 + peer.0.peer=[destination in base64] + # where will we write out the stat data? + peer.0.statFile=heartbeatStat_khWY_30s_1kb.txt + # how many minutes will we keep stats for? + peer.0.statDuration=30 + # how often will we write out new stat data (in seconds)? + peer.0.statFrequency=60 + # how often will we send a ping to the peer (in seconds)? + peer.0.sendFrequency=30 + # how many bytes will be included in the ping? + peer.0.sendSize=1024 + # take a guess... + peer.0.comment=Test with localhost sending 1KB of data every 30 seconds + # we can keep track of a few moving averages - this value includes a whitespace + # delimited list of numbers, each specifying a period to calculate the average + # over (in minutes) + peer.0.averagePeriods=1 5 30 + ## repeat the peer.0.* for as many tests as desired, incrementing as necessary + +If there are no peer.* lines, it will simply run a pong server. If any data is +missing, it will use the defaults (though there are no defaults for peer.* lines) - +running the Heartbeat app with no heartbeat configuration file whatsoever will create +a new pong server (storing its keys at heartbeat.keys) and using the I2P router at +localhost:7654. + +The stat file generated for each set of peer.n.* lines contains the current state +of the test, its averages, as well as any other interesting data points. An example +stat file follows (hopefully it is self explanatory): + + peer khWYqCETu9YtPUvGV92ocsbEW5DezhKlIG7ci8RLX3g= + local u-9hlR1ik2hemXf0HvKMfeRgrS86CbNQh25e7XBhaQE= + peerDest [base 64 of the full destination] + localDest [base 64 of the full destination] + numTunnelHops 2 + comment Test with localhost sending 30KB every 20 seconds + sendFrequency 20 + sendSize 30720 + sessionStart 20040409.22:51:10.915 + currentTime 20040409.23:31:39.607 + numPending 2 + lifetimeSent 118 + lifetimeRecv 113 + #averages minutes sendMs recvMs numLost + periodAverage 1 1843 771 0 + periodAverage 5 786 752 1 + periodAverage 30 855 735 3 + #action status date and time sent sendMs replyMs + EVENT OK 20040409.23:21:44.742 691 670 + EVENT OK 20040409.23:22:05.201 671 581 + EVENT OK 20040409.23:22:26.301 1182 1452 + EVENT OK 20040409.23:22:47.322 24304 1723 + EVENT OK 20040409.23:23:08.232 2293 1081 + EVENT OK 20040409.23:23:29.332 1392 641 + EVENT OK 20040409.23:23:50.262 641 761 + EVENT OK 20040409.23:24:11.102 651 701 + EVENT OK 20040409.23:24:31.401 841 621 + EVENT OK 20040409.23:24:52.061 651 681 + EVENT OK 20040409.23:25:12.480 701 1623 + EVENT OK 20040409.23:25:32.990 1442 1212 + EVENT OK 20040409.23:25:54.230 591 631 + EVENT OK 20040409.23:26:14.620 620 691 + EVENT OK 20040409.23:26:35.199 1793 1432 + EVENT OK 20040409.23:26:56.570 661 641 + EVENT OK 20040409.23:27:17.200 641 660 + EVENT OK 20040409.23:27:38.120 611 921 + EVENT OK 20040409.23:27:58.699 831 621 + EVENT OK 20040409.23:28:19.559 801 661 + EVENT OK 20040409.23:28:40.279 601 611 + EVENT OK 20040409.23:29:00.648 601 621 + EVENT OK 20040409.23:29:21.288 701 661 + EVENT LOST 20040409.23:29:41.828 + EVENT LOST 20040409.23:30:02.327 + EVENT LOST 20040409.23:30:22.656 + EVENT OK 20040409.23:31:24.305 1843 771 + +The actual ping and pong messages sent are formatted trivially - +ping messages contain + $from $series $type $sentOn $size $payload +while pong messages contain + $from $series $type $sentOn $receivedOn $size $payload + +$series is a number describing the sending client's test (so that you can +ping the same peer with different configurations concurrently, varying things +like the frequency and size of the message, window, etc). + +They are sent as raw binary messages though, so see I2PAdapter.sendPing(..) +and I2PAdapter.sendPong(..) for the details. + +To get valid measurements, of course, you will want to make sure that +both the heartbeat client and pong server have synchronized clocks (even +more so than I2P requires). It is highly recommended that only NTP +synchronized peers be used for heartbeat tests. \ No newline at end of file diff --git a/apps/heartbeat/java/build.xml b/apps/heartbeat/java/build.xml new file mode 100644 index 000000000..567c476d7 --- /dev/null +++ b/apps/heartbeat/java/build.xml @@ -0,0 +1,41 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/apps/heartbeat/java/src/net/i2p/heartbeat/ClientConfig.java b/apps/heartbeat/java/src/net/i2p/heartbeat/ClientConfig.java new file mode 100644 index 000000000..6056ad21f --- /dev/null +++ b/apps/heartbeat/java/src/net/i2p/heartbeat/ClientConfig.java @@ -0,0 +1,251 @@ +package net.i2p.heartbeat; + +import net.i2p.data.Destination; +import net.i2p.data.DataFormatException; +import net.i2p.util.Log; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.StringTokenizer; + +/** + * Define the configuration for testing against one particular peer as a client + * + */ +public class ClientConfig { + private static final Log _log = new Log(ClientConfig.class); + private Destination _peer; + private Destination _us; + private String _statFile; + private int _statDuration; + private int _statFrequency; + private int _sendFrequency; + private int _sendSize; + private int _numHops; + private String _comment; + private int _averagePeriods[]; + + public static final String PROP_PREFIX = "peer."; + + public static final String PROP_PEER = ".peer"; + public static final String PROP_STATFILE = ".statFile"; + public static final String PROP_STATDURATION = ".statDuration"; + public static final String PROP_STATFREQUENCY = ".statFrequency"; + public static final String PROP_SENDFREQUENCY = ".sendFrequency"; + public static final String PROP_SENDSIZE = ".sendSize"; + public static final String PROP_COMMENT = ".comment"; + public static final String PROP_AVERAGEPERIODS = ".averagePeriods"; + + public ClientConfig() { + this(null, null, null, -1, -1, -1, -1, 0, null, null); + } + + /** + * @param peer who we will test against + * @param us who we are + * @param duration how many minutes to keep events for + * @param statFreq how often to write out stats + * @param sendFreq how often to send pings + * @param sendSize how large the pings should be + * @param numHops how many hops is the current Heartbeat app using + * @param comment describe this test + * @param averagePeriods list of minutes to summarize over + */ + public ClientConfig(Destination peer, Destination us, String statFile, int duration, int statFreq, int sendFreq, int sendSize, int numHops, String comment, int averagePeriods[]) { + _peer = peer; + _us = us; + _statFile = statFile; + _statDuration = duration; + _statFrequency = statFreq; + _sendFrequency = sendFreq; + _sendSize = sendSize; + _numHops = numHops; + _comment = comment; + _averagePeriods = averagePeriods; + } + + /** peer to test against */ + public Destination getPeer() { return _peer; } + public void setPeer(Destination peer) { _peer = peer; } + + /** who we are when we test */ + public Destination getUs() { return _us; } + public void setUs(Destination us) { _us = us; } + + /** location to write the current stats to */ + public String getStatFile() { return _statFile; } + public void setStatFile(String statFile) { _statFile = statFile; } + + /** how many minutes of statistics should be maintained within the window for this client? */ + public int getStatDuration() { return _statDuration; } + public void setStatDuration(int durationMinutes) { _statDuration = durationMinutes; } + + /** how frequenty the stats are written out (in seconds) */ + public int getStatFrequency() { return _statFrequency; } + public void setStatFrequency(int freqSeconds) { _statFrequency = freqSeconds; } + + /** how frequenty we send messages to the peer (in seconds) */ + public int getSendFrequency() { return _sendFrequency; } + public void setSendFrequency(int freqSeconds) { _sendFrequency = freqSeconds; } + + /** + * How many bytes should the ping messages be (min values ~700, max ~32KB)? + * + */ + public int getSendSize() { return _sendSize; } + public void setSendSize(int numBytes) { _sendSize = numBytes; } + + /** + * Brief 1 line description of the test. Useful comments are along the lines + * of "The peer is located on a fast router and connection with 2 hop tunnels". + * + */ + public String getComment() { return _comment; } + public void setComment(String comment) { _comment = comment; } + + /** + * Periods that the client's tests should be averaged over. + * + * @return list of periods (in minutes) that the data should be averaged over, or null + */ + public int[] getAveragePeriods() { return _averagePeriods; } + public void setAveragePeriods(int periods[]) { _averagePeriods = periods; } + + /** + * How many hops is this test engine configured to use for its outbound and inbound tunnels? + * + */ + public int getNumHops() { return _numHops; } + public void setNumHops(int numHops) { _numHops = numHops; } + + /** + * Load the client config from the properties specified, deriving the current + * config entry from the peer number. + * + * @return true if it was loaded correctly, false if there were errors + */ + public boolean load(Properties clientConfig, int peerNum) { + if ( (clientConfig == null) || (peerNum < 0) ) return false; + String peerVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_PEER); + String statFileVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_STATFILE); + String statDurationVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_STATDURATION); + String statFrequencyVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_STATFREQUENCY); + String sendFrequencyVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_SENDFREQUENCY); + String sendSizeVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_SENDSIZE); + String commentVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_COMMENT); + String periodsVal = clientConfig.getProperty(PROP_PREFIX + peerNum + PROP_AVERAGEPERIODS); + + if ( (peerVal == null) || (statFileVal == null) || (statDurationVal == null) || + (statFrequencyVal == null) || (sendFrequencyVal == null) || (sendSizeVal == null) ) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Peer number "+ peerNum + " does not exist"); + return false; + } + + + try { + int duration = getInt(statDurationVal); + int statFreq = getInt(statFrequencyVal); + int sendFreq = getInt(sendFrequencyVal); + int sendSize = getInt(sendSizeVal); + + if ( (duration <= 0) || (statFreq <= 0) || (sendFreq <= 0) || (sendSize <= 0) ) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid client config: duration [" + statDurationVal + "] stat frequency [" + statFrequencyVal + + "] send frequency [" + sendFrequencyVal + "] send size [" + sendSizeVal + "]"); + return false; + } + + statFileVal = statFileVal.trim(); + if (statFileVal.length() <= 0) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Stat file is blank for peer " + peerNum); + return false; + } + + Destination d = new Destination(); + d.fromBase64(peerVal); + + if (commentVal == null) + commentVal = ""; + commentVal = commentVal.trim(); + commentVal = commentVal.replace('\n', '_'); + + List periods = new ArrayList(4); + if (periodsVal != null) { + StringTokenizer tok = new StringTokenizer(periodsVal); + while (tok.hasMoreTokens()) { + String periodVal = tok.nextToken(); + int minutes = getInt(periodVal); + if (minutes > 0) + periods.add(new Integer(minutes)); + } + } + int avgPeriods[] = new int[periods.size()]; + for (int i = 0; i < periods.size(); i++) + avgPeriods[i] = ((Integer)periods.get(i)).intValue(); + + _comment = commentVal; + _statDuration = duration; + _statFrequency = statFreq; + _sendFrequency = sendFreq; + _sendSize = sendSize; + _statFile = statFileVal; + _peer = d; + _averagePeriods = avgPeriods; + return true; + } catch (DataFormatException dfe) { + _log.error("Peer destination for " + peerNum + " was invalid: " + peerVal); + return false; + } + } + + /** + * Store the client config to the properties specified, deriving the current + * config entry from the peer number. + * + * @return true if it was stored correctly, false if there were errors + */ + public boolean store(Properties clientConfig, int peerNum) { + if ( (_peer == null) || (_sendFrequency <= 0) || (_sendSize <= 0) || + (_statDuration <= 0) || (_statFrequency <= 0) || (_statFile == null) ) { + return false; + } + + String comment = _comment; + if (comment == null) + comment = ""; + comment = comment.trim(); + comment = comment.replace('\n', '_'); + + StringBuffer buf = new StringBuffer(32); + if (_averagePeriods != null) { + for (int i = 0; i < _averagePeriods.length; i++) { + buf.append(_averagePeriods[i]).append(' '); + } + } + + clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_PEER, _peer.toBase64()); + clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_STATFILE, _statFile); + clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_STATDURATION, _statDuration + ""); + clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_STATFREQUENCY, _statFrequency + ""); + clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_SENDFREQUENCY, _sendFrequency + ""); + clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_SENDSIZE, _sendSize + ""); + clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_COMMENT, comment); + clientConfig.setProperty(PROP_PREFIX + peerNum + PROP_AVERAGEPERIODS, buf.toString()); + return true; + } + + private static final int getInt(String val) { + if (val == null) return -1; + try { + int i = Integer.parseInt(val); + return i; + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Value [" + val + "] is not a valid integer"); + return -1; + } + } +} \ No newline at end of file diff --git a/apps/heartbeat/java/src/net/i2p/heartbeat/ClientEngine.java b/apps/heartbeat/java/src/net/i2p/heartbeat/ClientEngine.java new file mode 100644 index 000000000..ee5c0b237 --- /dev/null +++ b/apps/heartbeat/java/src/net/i2p/heartbeat/ClientEngine.java @@ -0,0 +1,110 @@ +package net.i2p.heartbeat; + +import net.i2p.data.Destination; +import net.i2p.util.Clock; +import net.i2p.util.Log; +import net.i2p.util.I2PThread; + +/** + * Responsible for actually conducting the tests, coordinating the storing of the + * stats, and the management of the rates. This has its own thread specific for + * pumping data around as well. + * + */ +class ClientEngine { + private static final Log _log = new Log(ClientEngine.class); + /** who can send our pings/ */ + private Heartbeat _heartbeat; + /** actual test state */ + private PeerData _data; + /** have we been stopped? */ + private boolean _active; + /** used to generate engine IDs */ + private static int __id = 0; + /** this engine's id, unique to the {test,sendingClient,startTime} */ + private int _id; + private static PeerDataWriter writer = new PeerDataWriter(); + + /** + * Create a new engine that will send its pings through the given heartbeat + * system, and will coordinate the test according to the configuration specified. + * + */ + public ClientEngine(Heartbeat heartbeat, ClientConfig config) { + _heartbeat = heartbeat; + _data = new PeerData(config); + _active = false; + _id = ++__id; + } + + /** stop sending any more pings or writing any more state */ + public void stopEngine() { + _active = false; + if (_log.shouldLog(Log.INFO)) + _log.info("Stopping engine talking to peer " + _data.getConfig().getPeer().calculateHash().toBase64()); + } + /** start up the test (this does not block, as it fires up the test thread) */ + public void startEngine() { + _active = true; + I2PThread t = new I2PThread(new ClientRunner()); + t.setName("HeartbeatClient " + _id); + t.start(); + } + /** who are we testing? */ + public Destination getPeer() { return _data.getConfig().getPeer(); } + /** what is our series identifier (used to locally identify a test) */ + public int getSeriesNum() { return _id; } + /** + * receive notification from the heartbeat system that a pong was received in + * reply to a ping we have sent. + * + * @param sentOn when did we send the ping? + * @param replyOn when did the peer send the pong? + */ + public void receivePong(long sentOn, long replyOn) { + _data.pongReceived(sentOn, replyOn); + } + + /** fire off a new ping */ + private void doSend() { + long now = Clock.getInstance().now(); + _heartbeat.sendPing(_data.getConfig().getPeer(), _id, now, _data.getConfig().getSendSize()); + _data.addPing(now); + } + + /** our actual heartbeat pumper - this drives the test */ + private class ClientRunner implements Runnable { + public void run() { + if (_log.shouldLog(Log.INFO)) + _log.info("Starting engine talking to peer " + _data.getConfig().getPeer().calculateHash().toBase64()); + + // when do we need to send the next PING? + long nextSend = Clock.getInstance().now(); + // when do we need to write out the next state data? + long nextWrite = Clock.getInstance().now(); + + while (_active) { + + if (Clock.getInstance().now() >= nextSend) { + doSend(); + nextSend = Clock.getInstance().now() + _data.getConfig().getSendFrequency()*1000; + } + + if (Clock.getInstance().now() >= nextWrite) { + boolean written = writer.persist(_data); + if (!written) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Unable to write the client state data"); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Client state data written"); + } + } + + _data.cleanup(); + + try { Thread.sleep(1000); } catch (InterruptedException ie) {} + } + } + } +} \ No newline at end of file diff --git a/apps/heartbeat/java/src/net/i2p/heartbeat/Heartbeat.java b/apps/heartbeat/java/src/net/i2p/heartbeat/Heartbeat.java new file mode 100644 index 000000000..ec53b0c6f --- /dev/null +++ b/apps/heartbeat/java/src/net/i2p/heartbeat/Heartbeat.java @@ -0,0 +1,233 @@ +package net.i2p.heartbeat; + +import net.i2p.util.Log; +import net.i2p.util.Clock; +import net.i2p.data.Destination; + +import java.io.IOException; +import java.io.File; +import java.io.FileInputStream; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Iterator; +import java.util.Date; + +/** + * Main driver for the heartbeat engine, loading 0 or more tests, firing + * up a ClientEngine for each, and serving as a pong server. If there isn't + * a configuration file, or if the configuration file doesn't specify any tests, + * it simply sits around as a pong server, passively responding to whatever is + * sent its way.

+ * + * The config file format is examplified below: + *

+ *    # where the router is located (default is localhost)
+ *    i2cpHost=localhost
+ *    # I2CP port for the router (default is 7654)
+ *    i2cpPort=4001
+ *    # How many hops we want the router to put in our tunnels (default is 2)
+ *    numHops=2
+ *    # where our private destination keys are located - if this doesn't exist,
+ *    # a new one will be created and saved there (by default, heartbeat.keys)
+ *    privateDestinationFile=heartbeat_r2.keys
+ *
+ *    ## peer tests configured below:
+ *    
+ *    # destination peer for test 0
+ *    peer.0.peer=[destination in base64]
+ *    # where will we write out the stat data?
+ *    peer.0.statFile=heartbeatStat_khWY_30s_1kb.txt
+ *    # how many minutes will we keep stats for?
+ *    peer.0.statDuration=30
+ *    # how often will we write out new stat data (in seconds)?
+ *    peer.0.statFrequency=60
+ *    # how often will we send a ping to the peer (in seconds)?
+ *    peer.0.sendFrequency=30
+ *    # how many bytes will be included in the ping?
+ *    peer.0.sendSize=1024
+ *    # take a guess...
+ *    peer.0.comment=Test with localhost sending 1KB of data every 30 seconds
+ *    # we can keep track of a few moving averages - this value includes a whitespace
+ *    # delimited list of numbers, each specifying a period to calculate the average
+ *    # over (in minutes)
+ *    peer.0.averagePeriods=1 5 30
+ *    ## repeat the peer.0.* for as many tests as desired, incrementing as necessary
+ * 
+ * + */ +public class Heartbeat { + private static final Log _log = new Log(Heartbeat.class); + /** location containing this heartbeat's config */ + private String _configFile; + /** clientNum (Integer) to ClientConfig mapping */ + private Map _clientConfigs; + /** series num (Integer) to ClientEngine mapping */ + private Map _clientEngines; + /** helper class for managing our I2P send/receive and message formatting */ + private I2PAdapter _adapter; + /** our own callback that the I2PAdapter notifies on ping or pong messages */ + private PingPongAdapter _eventAdapter; + + /** if there are no command line arguments, load the config from "heartbeat.config" */ + public static final String CONFIG_FILE_DEFAULT = "heartbeat.config"; + + /** build up a new heartbeat manager, but don't actually do anything */ + public Heartbeat(String configFile) { + _configFile = configFile; + _clientConfigs = new HashMap(); + _clientEngines = new HashMap(); + _eventAdapter = new PingPongAdapter(); + _adapter = new I2PAdapter(); + _adapter.setListener(_eventAdapter); + } + private Heartbeat() {} + + /** load up the config data (but don't build any engines or start them up) */ + public void loadConfig() { + Properties props = new Properties(); + FileInputStream fin = null; + File configFile = new File (_configFile); + if (configFile.exists()) { + try { + fin = new FileInputStream(_configFile); + props.load(fin); + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error reading the config data", ioe); + } finally { + if (fin != null) try { fin.close(); } catch (IOException ioe) {} + } + } + + loadBaseConfig(props); + loadClientConfigs(props); + } + + + /** + * send a ping message to the peer + * + * @param peer peer to ping + * @param seriesNum id used to keep track of multiple pings (of different size/frequency) to a peer + * @param now current time to be sent in the ping (so we can watch for it in the pong) + * @param size total message size to send + */ + void sendPing(Destination peer, int seriesNum, long now, int size) { + if (_adapter.getIsConnected()) + _adapter.sendPing(peer, seriesNum, now, size); + } + + /** load up the base data (I2CP config, etc) */ + private void loadBaseConfig(Properties props) { + _adapter.loadConfig(props); + } + + /** load up all of the test config data */ + private void loadClientConfigs(Properties props) { + int i = 0; + while (true) { + ClientConfig config = new ClientConfig(); + if (!config.load(props, i)) + break; + _clientConfigs.put(new Integer(i), config); + i++; + } + } + + /** connect to the network */ + private void connect() { + boolean connected = _adapter.connect(); + if (!connected) + _log.error("Unable to connect to the router"); + } + /** disconnect from the network */ + private void disconnect() { + _adapter.disconnect(); + } + + /** start up all of the tests */ + public void startEngines() { + for (Iterator iter = _clientConfigs.values().iterator(); iter.hasNext(); ) { + ClientConfig config = (ClientConfig)iter.next(); + ClientEngine engine = new ClientEngine(this, config); + config.setUs(_adapter.getLocalDestination()); + config.setNumHops(_adapter.getNumHops()); + _clientEngines.put(new Integer(engine.getSeriesNum()), engine); + engine.startEngine(); + } + } + /** stop all of the tests */ + public void stopEngines() { + for (Iterator iter = _clientEngines.values().iterator(); iter.hasNext(); ) { + ClientEngine engine = (ClientEngine)iter.next(); + engine.stopEngine(); + } + _clientEngines.clear(); + } + + /** + * Fire up a new heartbeat system, waiting until, well, forever. Builds + * a new heartbeat system, loads the config, connects to the network, starts + * the engines, and then sits back and relaxes, responding to any pings and + * running any tests.

+ * + * Usage: Heartbeat [configFileName]

+ */ + public static void main(String args[]) { + String configFile = CONFIG_FILE_DEFAULT; + if (args.length == 1) + configFile = args[0]; + + if (_log.shouldLog(Log.INFO)) + _log.info("Starting up with config file " + configFile); + Heartbeat heartbeat = new Heartbeat(configFile); + heartbeat.loadConfig(); + heartbeat.connect(); + heartbeat.startEngines(); + Object o = new Object(); + while (true) { + try { + synchronized (o) { + o.wait(); + } + } catch (InterruptedException ie) {} + } + } + + /** + * Receive event notification from the I2PAdapter + * + */ + private class PingPongAdapter implements I2PAdapter.PingPongEventListener { + /** + * We were pinged, so always just send a pong back. + * + * @param from who sent us the ping? + * @param seriesNum what series did the sender specify? + * @param sentOn when did the sender say they sent their ping? + * @param data arbitrary payload data + */ + public void receivePing(Destination from, int seriesNum, Date sentOn, byte[] data) { + if (_adapter.getIsConnected()) + _adapter.sendPong(from, seriesNum, sentOn, data); + } + + /** + * We received a pong, so find the right client engine and tell it about the pong. + * + * @param from who sent us the pong + * @param seriesNum our client ID + * @param sentOn when did we send the ping? + * @param replyOn when did they send their pong? + * @param data the arbitrary data we sent in the ping (that they sent back in the pong) + */ + public void receivePong(Destination from, int seriesNum, Date sentOn, Date replyOn, byte[] data) { + ClientEngine engine = (ClientEngine)_clientEngines.get(new Integer(seriesNum)); + if (engine.getPeer().equals(from)) + engine.receivePong(sentOn.getTime(), replyOn.getTime()); + } + } + +} \ No newline at end of file diff --git a/apps/heartbeat/java/src/net/i2p/heartbeat/I2PAdapter.java b/apps/heartbeat/java/src/net/i2p/heartbeat/I2PAdapter.java new file mode 100644 index 000000000..a347e66bc --- /dev/null +++ b/apps/heartbeat/java/src/net/i2p/heartbeat/I2PAdapter.java @@ -0,0 +1,462 @@ +package net.i2p.heartbeat; + +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +import java.util.Properties; +import java.util.Date; +import java.util.Arrays; + +import net.i2p.data.DataFormatException; +import net.i2p.data.DataHelper; +import net.i2p.client.I2PClientFactory; +import net.i2p.client.I2PClient; +import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionException; +import net.i2p.client.I2PSessionListener; +import net.i2p.I2PException; + +import net.i2p.data.Destination; +import net.i2p.util.Log; +import net.i2p.util.Clock; + +/** + * Tie-in to the I2P SDK for the Heartbeat system, talking to the I2PSession and + * dealing with the raw ping and pong messages. + * + */ +class I2PAdapter { + private final static Log _log = new Log(I2PAdapter.class); + /** I2CP host */ + private String _i2cpHost; + /** I2CP port */ + private int _i2cpPort; + /** how long do we want our tunnels to be? */ + private int _numHops; + /** filename containing the heartbeat engine's private destination info */ + private String _privateDestFile; + /** our destination */ + private Destination _localDest; + /** who do we tell? */ + private PingPongEventListener _listener; + /** how do we talk to the router */ + private I2PSession _session; + /** object that receives our i2cp notifications from the session and tells us */ + private I2PListener _i2pListener; + + /** + * This config property tells us where the private destination data for our + * connection (or if it doesn't exist, where will we save it) + */ + private static final String DEST_FILE_PROP = "privateDestinationFile"; + /** by default, the private destination data is in "heartbeat.keys" */ + private static final String DEST_FILE_DEFAULT = "heartbeat.keys"; + /** This config property defines where the I2P router is */ + private static final String I2CP_HOST_PROP = "i2cpHost"; + /** by default, the I2P host is "localhost" */ + private static final String I2CP_HOST_DEFAULT = "localhost"; + /** This config property defines the I2CP port on the router */ + private static final String I2CP_PORT_PROP = "i2cpPort"; + /** by default, the I2CP port is 7654 */ + private static final int I2CP_PORT_DEFAULT = 7654; + + /** This property defines how many hops we want in our tunnels. */ + public static final String NUMHOPS_PROP = "numHops"; + /** by default, use 2 hop tunnels */ + public static final int NUMHOPS_DEFAULT = 2; + + public I2PAdapter() { + _privateDestFile = null; + _i2cpHost = null; + _i2cpPort = -1; + _localDest = null; + _listener = null; + _session = null; + _numHops = 0; + } + + /** who are we? */ + public Destination getLocalDestination() { return _localDest; } + + /** who gets notified when we receive a ping or a pong? */ + public PingPongEventListener getListener() { return _listener; } + public void setListener(PingPongEventListener listener) { _listener = listener; } + + /** how many hops do we want in our tunnels? */ + public int getNumHops() { return _numHops; } + + /** are we connected? */ + public boolean getIsConnected() { return _session != null; } + + /** + * Read in all of the config data + * + */ + void loadConfig(Properties props) { + String privDestFile = props.getProperty(DEST_FILE_PROP, DEST_FILE_DEFAULT); + String host = props.getProperty(I2CP_HOST_PROP, I2CP_HOST_DEFAULT); + String port = props.getProperty(I2CP_PORT_PROP, ""+I2CP_PORT_DEFAULT); + String numHops = props.getProperty(NUMHOPS_PROP, ""+NUMHOPS_DEFAULT); + + int portNum = -1; + try { + portNum = Integer.parseInt(port); + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid I2CP port specified [" + port + "]"); + portNum = I2CP_PORT_DEFAULT; + } + int hops = -1; + try { + hops = Integer.parseInt(numHops); + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid # hops specified [" + numHops + "]"); + hops = NUMHOPS_DEFAULT; + } + + _numHops = hops; + _privateDestFile = privDestFile; + _i2cpHost = host; + _i2cpPort = portNum; + } + + /** write out the config to the props */ + void storeConfig(Properties props) { + if (_privateDestFile != null) + props.setProperty(DEST_FILE_PROP, _privateDestFile); + else + props.setProperty(DEST_FILE_PROP, DEST_FILE_DEFAULT); + if (_i2cpHost != null) + props.setProperty(I2CP_HOST_PROP, _i2cpHost); + else + props.setProperty(I2CP_HOST_PROP, I2CP_HOST_DEFAULT); + if (_i2cpPort > 0) + props.setProperty(I2CP_PORT_PROP, ""+_i2cpPort); + else + props.setProperty(I2CP_PORT_PROP, ""+I2CP_PORT_DEFAULT); + props.setProperty(NUMHOPS_PROP, ""+_numHops); + } + + private static final int TYPE_PING = 0; + private static final int TYPE_PONG = 1; + + /** + * send a ping message to the peer + * + * @param peer peer to ping + * @param seriesNum id used to keep track of multiple pings (of different size/frequency) to a peer + * @param now current time to be sent in the ping (so we can watch for it in the pong) + * @param size total message size to send + * + * @throws IllegalStateException if we are not connected to the router + */ + public void sendPing(Destination peer, int seriesNum, long now, int size) { + if (_session == null) throw new IllegalStateException("Not connected to the router"); + ByteArrayOutputStream baos = new ByteArrayOutputStream(size); + try { + _localDest.writeBytes(baos); + DataHelper.writeLong(baos, 2, seriesNum); + DataHelper.writeLong(baos, 1, TYPE_PING); + DataHelper.writeDate(baos, new Date(now)); + int padding = size - baos.size(); + byte paddingData[] = new byte[padding]; + Arrays.fill(paddingData, (byte)0x2A); + DataHelper.writeLong(baos, 2, padding); + baos.write(paddingData); + boolean sent = _session.sendMessage(peer, baos.toByteArray()); + if (!sent) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error sending the ping to " + peer.calculateHash().toBase64() + " for series " + seriesNum); + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Ping sent to " + peer.calculateHash().toBase64() + " for series " + seriesNum); + } + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error sending the ping", ioe); + } catch (DataFormatException dfe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error writing out the ping message", dfe); + } catch (I2PSessionException ise) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error writing out the ping message", ise); + } + } + + /** + * send a pong message to the peer + * + * @param peer peer to pong + * @param seriesNum id given to us in the ping + * @param sentOn date the peer said they sent us the message + * @param data payload the peer sent us in the ping + * + * @throws IllegalStateException if we are not connected to the router + */ + public void sendPong(Destination peer, int seriesNum, Date sentOn, byte data[]) { + if (_session == null) throw new IllegalStateException("Not connected to the router"); + ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length + 768); + try { + _localDest.writeBytes(baos); + DataHelper.writeLong(baos, 2, seriesNum); + DataHelper.writeLong(baos, 1, TYPE_PONG); + DataHelper.writeDate(baos, sentOn); + DataHelper.writeDate(baos, new Date(Clock.getInstance().now())); + DataHelper.writeLong(baos, 2, data.length); + baos.write(data); + boolean sent = _session.sendMessage(peer, baos.toByteArray()); + if (!sent) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error sending the pong to " + peer.calculateHash().toBase64() + " for series " + seriesNum + " which was sent on " + sentOn); + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Pong sent to " + peer.calculateHash().toBase64() + " for series " + seriesNum + " which was sent on " + sentOn); + } + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error sending the ping", ioe); + } catch (DataFormatException dfe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error writing out the pong message", dfe); + } catch (I2PSessionException ise) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error writing out the pong message", ise); + } + } + + /** + * We've received this data from I2P - parse it into a ping or a pong + * and notify accordingly + */ + private void handleMessage(byte data[]) { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + try { + Destination from = new Destination(); + from.readBytes(bais); + int series = (int)DataHelper.readLong(bais, 2); + long type = DataHelper.readLong(bais, 1); + Date sentOn = DataHelper.readDate(bais); + Date receivedOn = null; + if (type == TYPE_PONG) { + receivedOn = DataHelper.readDate(bais); + } + int size = (int)DataHelper.readLong(bais, 2); + byte payload[] = new byte[size]; + int read = DataHelper.read(bais, payload); + if (read != size) + throw new IOException("Malformed payload - read " + read + " instead of " + size); + + if (_listener == null) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Listener isn't set, but we received a valid message of type " + type + " sent from " + from.calculateHash().toBase64()); + return; + } + + if (type == TYPE_PING) { + if (_log.shouldLog(Log.INFO)) + _log.info("Ping received from " + from.calculateHash().toBase64() + " on series " + series + " sent on " + sentOn + " containing " + size + " bytes"); + _listener.receivePing(from, series, sentOn, payload); + } else if (type == TYPE_PONG) { + if (_log.shouldLog(Log.INFO)) + _log.info("Pong received from " + from.calculateHash().toBase64() + " on series " + series + " sent on " + sentOn + " with pong sent on " + receivedOn + " containing " + size + " bytes"); + _listener.receivePong(from, series, sentOn, receivedOn, payload); + } else { + throw new IOException("Invalid message type " + type); + } + + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error handling the message", ioe); + } catch (DataFormatException dfe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error parsing the message", dfe); + } + } + + + /** + * connect to the I2P router and either authenticate ourselves with the + * destination we're given, or create a new one and write that to the + * destination file. + * + * @return true if we connect successfully, false otherwise + */ + boolean connect() { + I2PClient client = I2PClientFactory.createClient(); + Destination us = null; + File destFile = new File(_privateDestFile); + us = verifyDestination(client, destFile); + if (us == null) return false; + + // if we're here, we got a destination. lets connect + FileInputStream fin = null; + try { + fin = new FileInputStream(destFile); + Properties options = getOptions(); + I2PSession session = client.createSession(fin, options); + I2PListener lsnr = new I2PListener(); + session.setSessionListener(lsnr); + session.connect(); + _localDest = session.getMyDestination(); + if (_log.shouldLog(Log.INFO)) + _log.info("I2CP Session created and connected as " + _localDest.calculateHash().toBase64()); + _session = session; + _i2pListener = lsnr; + } catch (I2PSessionException ise) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error connecting", ise); + return false; + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error loading the destionation", ioe); + return false; + } finally { + if (fin != null) try { fin.close(); } catch (IOException ioe) {} + } + + return true; + } + + /** + * load, verify, or create a destination + * + * @return the destination loaded, or null if there was an error + */ + private Destination verifyDestination(I2PClient client, File destFile) { + Destination us = null; + FileInputStream fin = null; + if (destFile.exists()) { + try { + fin = new FileInputStream(destFile); + us = new Destination(); + us.readBytes(fin); + if (_log.shouldLog(Log.INFO)) + _log.info("Existing destination loaded: [" + us.toBase64() + "]"); + } catch (IOException ioe) { + if (fin != null) try { fin.close(); } catch (IOException ioe2) {} + fin = null; + destFile.delete(); + us = null; + } catch (DataFormatException dfe) { + if (fin != null) try { fin.close(); } catch (IOException ioe2) {} + fin = null; + destFile.delete(); + us = null; + } finally { + if (fin != null) try { fin.close(); } catch (IOException ioe2) {} + fin = null; + } + } + + if (us == null) { + // need to create a new one + FileOutputStream fos = null; + try { + fos = new FileOutputStream(destFile); + us = client.createDestination(fos); + if (_log.shouldLog(Log.INFO)) + _log.info("New destination created: [" + us.toBase64() + "]"); + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error writing out the destination keys being created", ioe); + return null; + } catch (I2PException ie) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error creating the destination", ie); + return null; + } finally { + if (fos != null) try { fos.close(); } catch (IOException ioe) {} + } + } + return us; + } + + /** + * I2PSession connect options + */ + private Properties getOptions() { + Properties props = new Properties(); + props.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT); + props.setProperty(I2PClient.PROP_TCP_HOST, _i2cpHost); + props.setProperty(I2PClient.PROP_TCP_PORT, _i2cpPort + ""); + props.setProperty("tunnels.depthInbound", ""+_numHops); + props.setProperty("tunnels.depthOutbound", ""+_numHops); + return props; + } + + /** disconnect from the I2P router */ + void disconnect() { + if (_session != null) { + try { + _session.destroySession(); + } catch (I2PSessionException ise) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error destroying the session", ise); + } + _session = null; + } + } + + /** + * Defines an event notification system for receiving pings and pongs + * + */ + public interface PingPongEventListener { + /** + * receive a ping message from the peer + * + * @param from peer that sent us the ping + * @param seriesNum id the peer sent us in the ping + * @param sentOn date the peer said they sent us the message + * @param data payload from the ping + */ + void receivePing(Destination from, int seriesNum, Date sentOn, byte data[]); + + /** + * receive a pong message from the peer + * + * @param from peer that sent us the pong + * @param seriesNum id the peer sent us in the pong (that we sent them in the ping) + * @param sentOn when we sent out the ping + * @param replyOn when they sent out the pong + * @param data payload from the ping/pong + */ + void receivePong(Destination from, int seriesNum, Date sentOn, Date replyOn, byte data[]); + } + + /** + * Receive data from the session and pass it along to handleMessage for parsing/dispersal + * + */ + private class I2PListener implements I2PSessionListener { + public void disconnected(I2PSession session) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Session disconnected"); + disconnect(); + } + public void errorOccurred(I2PSession session, String message, Throwable error) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error occurred", error); + } + public void reportAbuse(I2PSession session, int severity) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Abuse reported"); + } + + public void messageAvailable(I2PSession session, int msgId, long size) { + try { + byte data[] = session.receiveMessage(msgId); + handleMessage(data); + } catch (I2PSessionException ise) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error receiving the message", ise); + disconnect(); + } + } + } +} \ No newline at end of file diff --git a/apps/heartbeat/java/src/net/i2p/heartbeat/PeerData.java b/apps/heartbeat/java/src/net/i2p/heartbeat/PeerData.java new file mode 100644 index 000000000..8f38e27e6 --- /dev/null +++ b/apps/heartbeat/java/src/net/i2p/heartbeat/PeerData.java @@ -0,0 +1,268 @@ +package net.i2p.heartbeat; + +import net.i2p.util.Clock; +import net.i2p.util.Log; +import net.i2p.stat.Rate; +import net.i2p.stat.RateStat; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * Contain the current window of data for a particular series of ping/pong stats + * sent to a peer. This should be periodically kept clean by calling cleanup() + * to timeout expired pings and to drop data outside the window. + * + */ +public class PeerData { + private final static Log _log = new Log(PeerData.class); + /** peer / sequence / config in this data series */ + private ClientConfig _peer; + /** date sent (Long) to EventDataPoint containing the datapoints sent in the current period */ + private Map _dataPoints; + /** date sent (Long) to EventDataPoint containing pings that haven't yet timed out or been ponged */ + private Map _pendingPings; + private long _sessionStart; + private long _lifetimeSent; + private long _lifetimeReceived; + /** rate averaging the time to send over a variety of periods */ + private RateStat _sendRate; + /** rate averaging the time to receive over a variety of periods */ + private RateStat _receiveRate; + /** rate averaging the frequency of lost messages over a variety of periods */ + private RateStat _lostRate; + + /** how long we wait before timing out pending pings (30 seconds) */ + private static final long TIMEOUT_PERIOD = 30*1000; + + /** synchronize on this when updating _dataPoints or _pendingPings */ + private Object _updateLock = new Object(); + + public PeerData(ClientConfig config) { + _peer = config; + _dataPoints = new TreeMap(); + _pendingPings = new TreeMap(); + _sessionStart = Clock.getInstance().now(); + _lifetimeSent = 0; + _lifetimeReceived = 0; + _sendRate = new RateStat("sendRate", "How long it takes to send", "peer", getPeriods(config.getAveragePeriods())); + _receiveRate = new RateStat("receiveRate", "How long it takes to receive", "peer", getPeriods(config.getAveragePeriods())); + _lostRate = new RateStat("lostRate", "How frequently we lose messages", "peer", getPeriods(config.getAveragePeriods())); + } + + /** turn the periods (# minutes) into rate periods (# milliseconds) */ + private static long[] getPeriods(int periods[]) { + long rv[] = null; + if (periods == null) periods = new int[0]; + rv = new long[periods.length]; + for (int i = 0; i < periods.length; i++) + rv[i] = (long)periods[i] * 60*1000; // they're in minutes + Arrays.sort(rv); + return rv; + } + + /** how many pings are still outstanding? */ + public int getPendingCount() { synchronized (_updateLock) { return _pendingPings.size(); } } + /** how many data points are available in the current window? */ + public int getDataPointCount() { synchronized (_updateLock) { return _dataPoints.size(); } } + /** when did this test begin? */ + public long getSessionStart() { return _sessionStart; } + /** how many pings have we sent for this test? */ + public long getLifetimeSent() { return _lifetimeSent; } + /** how many pongs have we received for this test? */ + public long getLifetimeReceived() { return _lifetimeReceived; } + public ClientConfig getConfig() { return _peer; } + + /** + * What periods are we averaging the data over (in minutes)? + */ + public int[] getAveragePeriods() { return (_peer.getAveragePeriods() != null ? _peer.getAveragePeriods() : new int[0]); } + /** + * average time to send over the given period. + * + * @param period number of minutes to retrieve the average for + * @return milliseconds average, or -1 if we dont track that period + */ + public double getAverageSendTime(int period) { return getAverage(_sendRate, period); } + /** + * average time to receive over the given period. + * + * @param period number of minutes to retrieve the average for + * @return milliseconds average, or -1 if we dont track that period + */ + public double getAverageReceiveTime(int period) { return getAverage(_receiveRate, period); } + /** + * number of lost messages over the given period. + * + * @param period number of minutes to retrieve the average for + * @return number of lost messages in the period, or -1 if we dont track that period + */ + public double getLostMessages(int period) { + Rate rate = _lostRate.getRate(period * 60*1000); + if (rate == null) + return -1; + return rate.getCurrentTotalValue(); + } + + private double getAverage(RateStat stat, int period) { + Rate rate = stat.getRate(period * 60*1000); + if (rate == null) + return -1; + return rate.getAverageValue(); + } + + /** + * Return an ordered list of data points in the current window (after doing a cleanup) + * + * @return list of EventDataPoint objects + */ + public List getDataPoints() { + cleanup(); + synchronized (_updateLock) { + return new ArrayList(_dataPoints.values()); + } + } + + /** + * We have sent the peer a ping on this series (using the send time as given) + * + */ + public void addPing(long dateSent) { + EventDataPoint sent = new EventDataPoint(dateSent); + synchronized (_updateLock) { + _pendingPings.put(new Long(dateSent), sent); + } + _lifetimeSent++; + } + + /** + * we have received a pong from the peer on this series + * + * @param dateSent when we sent the ping + * @param pongSent when the peer received the ping and sent the pong + */ + public void pongReceived(long dateSent, long pongSent) { + long now = Clock.getInstance().now(); + synchronized (_updateLock) { + EventDataPoint data = (EventDataPoint)_pendingPings.remove(new Long(dateSent)); + if (data != null) { + data.setPongReceived(now); + data.setPongSent(pongSent); + data.setWasPonged(true); + _dataPoints.put(new Long(dateSent), data); + } + } + _sendRate.addData(pongSent-dateSent, 0); + _receiveRate.addData(now-pongSent, 0); + _lifetimeReceived++; + } + + /** + * drop all datapoints outside the window we're watching, and timeout all + * pending pings not ponged in the TIMEOUT_PERIOD, both updating the lost message + * rate and coallescing all of the rates. + * + */ + public void cleanup() { + long dropBefore = Clock.getInstance().now() - _peer.getStatDuration() * 60*1000; + long timeoutBefore = Clock.getInstance().now() - TIMEOUT_PERIOD; + long numDropped = 0; + long numTimedOut = 0; + + synchronized (_updateLock) { + List toTimeout = new ArrayList(4); + List toDrop = new ArrayList(4); + for (Iterator iter = _pendingPings.keySet().iterator(); iter.hasNext(); ) { + Long when = (Long)iter.next(); + if (when.longValue() < dropBefore) + toDrop.add(when); + else if (when.longValue() < timeoutBefore) + toTimeout.add(when); + else + break; // its ordered, so once we are past timeoutBefore, no need + } + for (Iterator iter = toDrop.iterator(); iter.hasNext(); ) { + _pendingPings.remove(iter.next()); + } + + List toAdd = new ArrayList(toTimeout.size()); + for (Iterator iter = toTimeout.iterator(); iter.hasNext(); ) { + Long when = (Long)iter.next(); + EventDataPoint data = (EventDataPoint)_pendingPings.remove(when); + data.setWasPonged(false); + toAdd.add(data); + } + + numDropped = toDrop.size(); + numTimedOut = toDrop.size(); + toDrop.clear(); + + for (Iterator iter = _dataPoints.keySet().iterator(); iter.hasNext(); ) { + Long when = (Long)iter.next(); + if (when.longValue() < dropBefore) + toDrop.add(when); + else + break; // ordered + } + for (Iterator iter = toDrop.iterator(); iter.hasNext(); ) { + _dataPoints.remove(iter.next()); + } + + numDropped += toDrop.size(); + + for (Iterator iter = toAdd.iterator(); iter.hasNext(); ) { + EventDataPoint data = (EventDataPoint)iter.next(); + _dataPoints.put(new Long(data.getPingSent()), data); + } + + numTimedOut += toAdd.size(); + } + + _lostRate.addData(numTimedOut, 0); + + _receiveRate.coallesceStats(); + _sendRate.coallesceStats(); + _lostRate.coallesceStats(); + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Peer data cleaned up " + numTimedOut + " timed out pings and removed " + numDropped + " old entries"); + } + + /** actual data point for the peer */ + public class EventDataPoint { + private boolean _wasPonged; + private long _pingSent; + private long _pongSent; + private long _pongReceived; + + public EventDataPoint() { + this(-1); + } + public EventDataPoint(long pingSentOn) { + _wasPonged = false; + _pingSent = pingSentOn; + _pongSent = -1; + _pongReceived = -1; + } + + /** when did we send this ping? */ + public long getPingSent() { return _pingSent; } + public void setPingSent(long when) { _pingSent = when; } + + /** when did the peer receive the ping? */ + public long getPongSent() { return _pongSent; } + public void setPongSent(long when) { _pongSent = when; } + + /** when did we receive the peer's pong? */ + public long getPongReceived() { return _pongReceived; } + public void setPongReceived(long when) { _pongReceived = when; } + + /** did the peer reply in time? */ + public boolean getWasPonged() { return _wasPonged; } + public void setWasPonged(boolean pong) { _wasPonged = pong; } + } +} \ No newline at end of file diff --git a/apps/heartbeat/java/src/net/i2p/heartbeat/PeerDataWriter.java b/apps/heartbeat/java/src/net/i2p/heartbeat/PeerDataWriter.java new file mode 100644 index 000000000..bee02f26f --- /dev/null +++ b/apps/heartbeat/java/src/net/i2p/heartbeat/PeerDataWriter.java @@ -0,0 +1,107 @@ +package net.i2p.heartbeat; + +import net.i2p.util.Log; +import net.i2p.util.Clock; + +import java.io.IOException; +import java.io.File; +import java.io.FileOutputStream; + +import java.text.SimpleDateFormat; +import java.text.DecimalFormat; +import java.text.DecimalFormatSymbols; +import java.util.Locale; +import java.util.Date; +import java.util.Iterator; + +/** + * Actually write out the stats for peer test + * + */ +class PeerDataWriter { + private final static Log _log = new Log(PeerDataWriter.class); + + /** + * persist the peer state to the location specified in the peer config + * + * @return true if it was persisted correctly, false on error + */ + public boolean persist(PeerData data) { + String filename = data.getConfig().getStatFile(); + String header = getHeader(data); + File statFile = new File(filename); + FileOutputStream fos = null; + try { + fos = new FileOutputStream(statFile); + fos.write(header.getBytes()); + fos.write("#action\tstatus\tdate and time sent \tsendMs\treplyMs\n".getBytes()); + for (Iterator iter = data.getDataPoints().iterator(); iter.hasNext(); ) { + PeerData.EventDataPoint point = (PeerData.EventDataPoint)iter.next(); + String line = getEvent(point); + fos.write(line.getBytes()); + } + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error persisting the peer data for " + data.getConfig().getPeer().calculateHash().toBase64(), ioe); + return false; + } finally { + if (fos != null) try { fos.close(); } catch (IOException ioe) {} + } + return true; + } + + private String getHeader(PeerData data) { + StringBuffer buf = new StringBuffer(1024); + buf.append("peer \t").append(data.getConfig().getPeer().calculateHash().toBase64()).append('\n'); + buf.append("local \t").append(data.getConfig().getUs().calculateHash().toBase64()).append('\n'); + buf.append("peerDest \t").append(data.getConfig().getPeer().toBase64()).append('\n'); + buf.append("localDest \t").append(data.getConfig().getUs().toBase64()).append('\n'); + buf.append("numTunnelHops\t").append(data.getConfig().getNumHops()).append('\n'); + buf.append("comment \t").append(data.getConfig().getComment()).append('\n'); + buf.append("sendFrequency\t").append(data.getConfig().getSendFrequency()).append('\n'); + buf.append("sendSize \t").append(data.getConfig().getSendSize()).append('\n'); + buf.append("sessionStart \t").append(getTime(data.getSessionStart())).append('\n'); + buf.append("currentTime \t").append(getTime(Clock.getInstance().now())).append('\n'); + buf.append("numPending \t").append(data.getPendingCount()).append('\n'); + buf.append("lifetimeSent \t").append(data.getLifetimeSent()).append('\n'); + buf.append("lifetimeRecv \t").append(data.getLifetimeReceived()).append('\n'); + int periods[] = data.getAveragePeriods(); + buf.append("#averages\tminutes\tsendMs\trecvMs\tnumLost\n"); + for (int i = 0; i < periods.length; i++) { + buf.append("periodAverage\t").append(periods[i]).append('\t'); + buf.append(getNum(data.getAverageSendTime(periods[i]))).append('\t'); + buf.append(getNum(data.getAverageReceiveTime(periods[i]))).append('\t'); + buf.append(getNum(data.getLostMessages(periods[i]))).append('\n'); + } + return buf.toString(); + } + + private String getEvent(PeerData.EventDataPoint point) { + StringBuffer buf = new StringBuffer(128); + buf.append("EVENT\t"); + if (point.getWasPonged()) + buf.append("OK\t"); + else + buf.append("LOST\t"); + buf.append(getTime(point.getPingSent())).append('\t'); + if (point.getWasPonged()) { + buf.append(point.getPongSent() - point.getPingSent()).append('\t'); + buf.append(point.getPongReceived() - point.getPongSent()).append('\t'); + } + buf.append('\n'); + return buf.toString(); + } + + private static final SimpleDateFormat _fmt = new SimpleDateFormat("yyyyMMdd.HH:mm:ss.SSS", Locale.UK); + public String getTime(long when) { + synchronized (_fmt) { + return _fmt.format(new Date(when)); + } + } + private static final DecimalFormat _numFmt = new DecimalFormat("#0", new DecimalFormatSymbols(Locale.UK)); + public String getNum(double val) { + synchronized (_numFmt) { + return _numFmt.format(val); + } + } +} \ No newline at end of file