Skip to content

Commit

Permalink
Completed DB relay feature
Browse files Browse the repository at this point in the history
  • Loading branch information
lukevenediger committed Aug 25, 2014
1 parent ebd8688 commit 900bc65
Show file tree
Hide file tree
Showing 18 changed files with 530 additions and 245 deletions.
8 changes: 4 additions & 4 deletions DemoDataFeeder/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ static void Main(string[] args)
}
while ( true )
{
//client.LogCount( "test.count.one." + rnd.Next( 5 ) );
//client.LogCount( "test.count.bigValue", rnd.Next( 50 ) );
//client.LogTiming( "test.timing." + rnd.Next( 5 ), rnd.Next( 100, 2000 ) );
//client.LogGauge( "test.gauge." + rnd.Next( 5 ), rnd.Next( 100 ) );
client.LogCount( "test.count.one." + rnd.Next( 5 ) );
client.LogCount( "test.count.bigValue", rnd.Next( 50 ) );
client.LogTiming( "test.timing." + rnd.Next( 5 ), rnd.Next( 100, 2000 ) );
client.LogGauge( "test.gauge." + rnd.Next( 5 ), rnd.Next( 100 ) );
client.LogCalendargram("test.calendargram.users", letters.Next(), CalendargramRetentionPeriod.ONE_MINUTE);
Thread.Sleep( options.Delay );
Interlocked.Add( ref totalMetricsSent, 4 );
Expand Down
7 changes: 7 additions & 0 deletions statsd.net.shared/ExtensionMethods.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using log4net;
using statsd.net.shared;
using statsd.net.shared.Messages;
using System;
using System.Collections.Concurrent;
Expand Down Expand Up @@ -92,6 +93,12 @@ public static bool ToBoolean(this XElement element, string attributeName)
return Boolean.Parse(element.Attribute(attributeName).Value);
}

public static TimeSpan ToTimeSpan(this XElement element, string attributeName)
{
var value = element.Attribute(attributeName).Value;
return Utility.ConvertToTimespan(value);
}

public static bool ToBoolean(this XElement element, string attributeName, bool defaultValue)
{
if (!element.Attributes().Any(p => p.Name == attributeName))
Expand Down
157 changes: 138 additions & 19 deletions statsd.net.shared/Listeners/MSSQLRelayListener.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
using statsd.net.shared.Services;
using statsd.net.core;
using statsd.net.shared.Messages;
using statsd.net.shared.Services;
using statsd.net.shared.Structures;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
Expand All @@ -10,49 +15,163 @@

namespace statsd.net.shared.Listeners
{
/// <summary>
/// Listens out for raw metrics in a SQL db and feeds them
/// in as raw metrics.
/// </summary>
public class MSSQLRelayListener : IListener
{
private static string[] SPACE_SPLITTER = new String[] { " " };

private string _connectionString;
private IIntervalService _intervalService;
private IntervalService _intervalService;
private int _batchSize;
private bool _deleteAfterSend;

private BufferBlock<RelayMetric> _buffer;
private ActionBlock<RelayMetric[]> _feeder;
private ISystemMetricsService _metrics;
private ITargetBlock<string> _target;
private CancellationToken _cancellationToken;

public MSSQLRelayListener(string connectionString,
IIntervalService intervalService,
int batchSize,
bool deleteAfterSend)
TimeSpan pollInterval,
CancellationToken cancellationToken,
int batchSize,
bool deleteAfterSend,
ISystemMetricsService metrics)
{
_connectionString = connectionString;
_intervalService = intervalService;
_intervalService = new IntervalService(pollInterval, cancellationToken);
_cancellationToken = cancellationToken;
_batchSize = batchSize;
_deleteAfterSend = deleteAfterSend;
_buffer = new BufferBlock<RelayMetric>();
_feeder = new ActionBlock<RelayMetric[]>(p => FeedMetrics(p));
_metrics = metrics;

var stopwatch = new Stopwatch();

_intervalService.Elapsed += (sender, e) =>
{
if (IsListening)
{
_intervalService.Cancel(true);
stopwatch.Restart();
ReadAndFeed();
stopwatch.Stop();
metrics.LogCount("listeners.mssql-relay.feedTimeSeconds", Convert.ToInt32(stopwatch.Elapsed.TotalSeconds));

// Only continue the interval service if cancellation
// isn't in progress
if (!cancellationToken.IsCancellationRequested)
{
_intervalService.Start();
}
}
};
}

private Task FeedMetrics(RelayMetric[] metrics)
public void LinkTo(ITargetBlock<string> target, CancellationToken token)
{
foreach(RelayMetric metric in metrics)
_target = target;
IsListening = true;
_intervalService.Start();
}

public bool IsListening { get; private set; }

private void ReadAndFeed()
{
try
{
var parts = line.Split(SPACE_SPLITTER, StringSplitOptions.RemoveEmptyEntries);
_target.Post(parts[0] + ":" + parts[1] + "|r|" + parts[2]);
_metrics.LogCount("listeners.mssql-relay.feed.attempt");
var lines = GetNewLinesFromDB();
foreach (String line in lines)
{
var parts = line.Split(SPACE_SPLITTER, StringSplitOptions.RemoveEmptyEntries);
_target.Post(parts[0] + ":" + parts[1] + "|r|" + parts[2]);
}
_metrics.LogCount("listeners.mssql-relay.lines.posted" + lines.Count);
_metrics.LogCount("listeners.mssql-relay.feed.success");
}
catch (Exception ex)
{
_metrics.LogCount("listeners.mssql-relay.error." + ex.GetType().Name);
_metrics.LogCount("listeners.mssql-relay.feed.failure");
}
}

public void LinkTo(ITargetBlock<string> target, CancellationToken token)
private List<String> GetNewLinesFromDB()
{
_target = target;
using (var conn = new SqlConnection( _connectionString ))
{
conn.Open();
var lastRowID = GetLastRowID(conn);
SqlCommand cmd = conn.CreateCommand();
cmd.CommandText =
String.Format(
"SELECT TOP {0} measure, rowid FROM tb_metrics WHERE rowid > {1} ORDER BY rowid ASC",
_batchSize,
lastRowID
);
cmd.CommandType = CommandType.Text;
lastRowID = 0;
var counter = 0;

var rows = new List<String>();
using (SqlDataReader reader = cmd.ExecuteReader())
{
string row;
while (reader.Read())
{
row = reader.GetString(0);
lastRowID = reader.GetInt64(1);
rows.Add(row);
counter++;
}
}

_metrics.LogCount("listeners.mssql-relay.action.fetchNewRows");
_metrics.LogCount("listeners.mssql-relay.lines.fetched" + rows.Count);

// Make note of the last row ID we updated
if (counter > 0)
{
UpdateLastRowID(conn, lastRowID);
}

return rows;
}
}

private long GetLastRowID(SqlConnection conn)
{
var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT ISNULL(lastRowId, -1) FROM tb_metricscontrol";
cmd.CommandType = CommandType.Text;
var result = cmd.ExecuteScalar();
_metrics.LogCount("listeners.mssql-relay.action.getLastRowID");
return result == null ? 0 : (long)result;
}

private void UpdateLastRowID(SqlConnection conn, long lastRowId)
{
var cmd = conn.CreateCommand();
cmd.CommandText =
String.Format(
"IF EXISTS (SELECT 1 FROM tb_metricscontrol) UPDATE tb_metricscontrol SET lastRowID = {0} " +
"ELSE INSERT INTO tb_metricscontrol (lastRowID) VALUES ({0})",
lastRowId);
cmd.CommandType = CommandType.Text;
cmd.ExecuteNonQuery();
_metrics.LogCount("listeners.mssql-relay.action.updateLastRowID");
}

public bool IsListening
private int DeleteProcessedRecords(SqlConnection conn, long lastRowId)
{
get { throw new NotImplementedException(); }
var cmd = conn.CreateCommand();
cmd.CommandText = String.Format("DELETE FROM tb_metrics WHERE rowid < {0}", lastRowId);
cmd.CommandType = CommandType.Text;
var rowsDeleted = cmd.ExecuteNonQuery();
_metrics.LogCount("listeners.mssql-relay.action.deleteProcessedRecords");
_metrics.LogCount("listeners.mssql-relay.rowsDeleted", rowsDeleted);
return rowsDeleted;
}
}
}
138 changes: 73 additions & 65 deletions statsd.net.shared/Services/IntervalService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,83 +10,91 @@

namespace statsd.net.shared.Services
{
public interface IIntervalService
{
event EventHandler<IntervalFiredEventArgs> Elapsed;
void Start();
void Cancel();
void RunOnce();
int IntervalSeconds { get; }
}
public interface IIntervalService
{
event EventHandler<IntervalFiredEventArgs> Elapsed;
void Start();
void Cancel();
void RunOnce();
int IntervalSeconds { get; }
}

[DebuggerDisplay("Fires every {_timer.Interval} milliseconds.")]
public class IntervalService : IIntervalService
{
private System.Timers.Timer _timer;
private ManualResetEvent _callbackComplete;
[DebuggerDisplay("Fires every {_timer.Interval} milliseconds.")]
public class IntervalService : IIntervalService
{
private System.Timers.Timer _timer;
private ManualResetEvent _callbackComplete;

public int IntervalSeconds { get; private set; }
public int IntervalSeconds { get; private set; }

public IntervalService(TimeSpan delay, CancellationToken? cancellationToken = null)
{
_callbackComplete = new ManualResetEvent(true);
IntervalSeconds = Convert.ToInt32(delay.TotalSeconds);
_timer = new System.Timers.Timer(delay.TotalMilliseconds);
_timer.Elapsed += (sender, e) =>
public IntervalService(TimeSpan delay, CancellationToken? cancellationToken = null)
{
if (cancellationToken.HasValue && cancellationToken.Value.IsCancellationRequested)
{
_timer.Stop();
return;
}
_callbackComplete.Reset();
FireEvent( e.SignalTime.ToEpoch() );
_callbackComplete.Set();
};
_timer.AutoReset = true;
}
_callbackComplete = new ManualResetEvent(true);
IntervalSeconds = Convert.ToInt32(delay.TotalSeconds);
_timer = new System.Timers.Timer(delay.TotalMilliseconds);
_timer.Elapsed += (sender, e) =>
{
if (cancellationToken.HasValue && cancellationToken.Value.IsCancellationRequested)
{
_timer.Stop();
return;
}
_callbackComplete.Reset();
FireEvent(e.SignalTime.ToEpoch());
_callbackComplete.Set();
};
_timer.AutoReset = true;
}

public IntervalService(int delayInSeconds, CancellationToken? cancellationToken = null)
:this (new TimeSpan(0,0,delayInSeconds), cancellationToken)
{
}
public IntervalService(int delayInSeconds, CancellationToken? cancellationToken = null)
: this(new TimeSpan(0, 0, delayInSeconds), cancellationToken)
{
}

public void Start()
{
_timer.Start();
}
public void Start()
{
_timer.Start();
}

public void Cancel()
{
_timer.Stop();
// Wait until the callback has finised executing
_callbackComplete.WaitOne(new TimeSpan(0, 0, 30));
}
public void Cancel()
{
Cancel(false);
}

public void RunOnce()
{
FireEvent(DateTime.Now.ToEpoch());
}
public void Cancel(bool dontWaitForCallback = false)
{
_timer.Stop();
if (!dontWaitForCallback)
{
// Wait until the callback has finised executing
_callbackComplete.WaitOne(new TimeSpan(0, 0, 30));
}
}

private void FireEvent ( long epoch )
{
if ( Elapsed != null )
{
Elapsed( this, new IntervalFiredEventArgs( epoch ) );
}
}
public void RunOnce()
{
FireEvent(DateTime.Now.ToEpoch());
}

public event EventHandler<IntervalFiredEventArgs> Elapsed;
}
private void FireEvent(long epoch)
{
if (Elapsed != null)
{
Elapsed(this, new IntervalFiredEventArgs(epoch));
}
}

[DebuggerDisplay("{Epoch}")]
public class IntervalFiredEventArgs : EventArgs
{
public long Epoch { get; private set; }
public event EventHandler<IntervalFiredEventArgs> Elapsed;
}

public IntervalFiredEventArgs(long epoch)
[DebuggerDisplay("{Epoch}")]
public class IntervalFiredEventArgs : EventArgs
{
Epoch = epoch;
public long Epoch { get; private set; }

public IntervalFiredEventArgs(long epoch)
{
Epoch = epoch;
}
}
}
}
Loading

0 comments on commit 900bc65

Please sign in to comment.