Skip to content

Commit

Permalink
Merge pull request #8 from bloomberg/write_http
Browse files Browse the repository at this point in the history
Added support for WriteHttp plugin
  • Loading branch information
sh9189 committed Sep 9, 2015
2 parents 1675223 + e5468d8 commit 631ca1c
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 15 deletions.
1 change: 1 addition & 0 deletions src/CollectdWinService/CollectdWinService.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
<Compile Include="TypesDB.cs" />
<Compile Include="Util.cs" />
<Compile Include="WindowsPerformanceCounterPlugin.cs" />
<Compile Include="WriteHttpPlugin.cs" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="CollectdWinService.resx">
Expand Down
123 changes: 109 additions & 14 deletions src/CollectdWinService/MetricsConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,21 @@ public AmqpConfig Amqp
{
get { return (AmqpConfig) base["Amqp"]; }
set { base["Amqp"] = value; }
}

[ConfigurationProperty("WriteHttp", IsRequired = false)]
public WriteHttpConfig WriteHttp
{
get { return (WriteHttpConfig)base["WriteHttp"]; }
set { base["WriteHttp"] = value; }
}

[ConfigurationProperty("WindowsPerformanceCounters", IsRequired = false)]
public WindowsPerformanceCountersConfig WindowsPerformanceCounters
{
get { return (WindowsPerformanceCountersConfig) base["WindowsPerformanceCounters"]; }
set { base["WindowsPerformanceCounters"] = value; }
}
}

public static CollectdWinConfig GetConfig()
{
Expand Down Expand Up @@ -113,6 +120,17 @@ public string RoutingKeyPrefix
set { base["RoutingKeyPrefix"] = value; }
}
}
}

public sealed class WindowsPerformanceCountersConfig : ConfigurationElement
{
[ConfigurationProperty("Counters", IsRequired = false)]
[ConfigurationCollection(typeof(CounterConfigCollection), AddItemName = "Counter")]
public CounterConfigCollection Counters
{
get { return (CounterConfigCollection)base["Counters"]; }
set { base["Counters"] = value; }
}
}

public sealed class CounterConfig : ConfigurationElement
Expand Down Expand Up @@ -182,7 +200,7 @@ public uint ScaleDownFactor
}
}

public class CounterConfigCollection : ConfigurationElementCollection
public sealed class CounterConfigCollection : ConfigurationElementCollection
{
protected override ConfigurationElement CreateNewElement()
{
Expand Down Expand Up @@ -220,7 +238,7 @@ public bool StoreRates
}
}

public class PluginCollection : ConfigurationElementCollection
public sealed class PluginCollection : ConfigurationElementCollection
{
protected override ConfigurationElement CreateNewElement()
{
Expand Down Expand Up @@ -393,18 +411,95 @@ public PercentileCollection Percentiles
set { base["Percentiles"] = value; }
}
}
}

public sealed class WriteHttpNodeConfigCollection : ConfigurationElementCollection
{
protected override ConfigurationElement CreateNewElement()
{
return new WriteHttpNodeConfig();
}

protected override object GetElementKey(ConfigurationElement element)
{
var nodeConfig = (WriteHttpNodeConfig)element;
return (nodeConfig.Name);
}
}

public sealed class WriteHttpNodeConfig : ConfigurationElement
{
[ConfigurationProperty("Name", IsRequired = true)]
public string Name
{
get { return (string)base["Name"]; }
set { base["Name"] = value; }
}

[ConfigurationProperty("Url", IsRequired = true)]
public string Url
{
get { return (string)base["Url"]; }
set { base["Url"] = value; }
}

[ConfigurationProperty("Timeout", IsRequired = true)]
public int Timeout
{
get { return (int)base["Timeout"]; }
set { base["Timeout"] = value; }
}

[ConfigurationProperty("BatchSize", IsRequired = true)]
public int BatchSize
{
get { return (int)base["BatchSize"]; }
set { base["BatchSize"] = value; }
}

[ConfigurationProperty("MaxIdleTime", IsRequired = false)]
public int MaxIdleTime
{
get { return (int)base["MaxIdleTime"]; }
set { base["MaxIdleTime"] = value; }
}

[ConfigurationProperty("Proxy", IsRequired = true)]
public ProxyConfig Proxy
{
get { return (ProxyConfig)base["Proxy"]; }
set { base["Proxy"] = value; }
}

public sealed class ProxyConfig : ConfigurationElement
{
[ConfigurationProperty("Enable", IsRequired = true)]
public bool Enable
{
get { return (bool)base["Enable"]; }
set { base["Enable"] = value; }
}

[ConfigurationProperty("Url", IsRequired = true)]
public string Url
{
get { return (string)base["Url"]; }
set { base["Url"] = value; }
}
}
}

public sealed class WriteHttpConfig : ConfigurationElement
{
[ConfigurationProperty("Nodes", IsRequired = false)]
[ConfigurationCollection(typeof(WriteHttpNodeConfigCollection), AddItemName = "Node")]
public WriteHttpNodeConfigCollection Nodes
{
get { return (WriteHttpNodeConfigCollection)base["Nodes"]; }
set { base["Nodes"] = value; }
}
}

public sealed class WindowsPerformanceCountersConfig : ConfigurationElement
{
[ConfigurationProperty("Counters", IsRequired = false)]
[ConfigurationCollection(typeof (CounterConfigCollection), AddItemName = "Counter")]
public CounterConfigCollection Counters
{
get { return (CounterConfigCollection) base["Counters"]; }
set { base["Counters"] = value; }
}
}

}
}

Expand Down
187 changes: 187 additions & 0 deletions src/CollectdWinService/WriteHttpPlugin.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Net;
using System.Text;
using NLog;

namespace BloombergFLP.CollectdWin
{
internal class WriteHttpPlugin : IMetricsWritePlugin
{
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();

private readonly IList<HttpWriter> _httpWriters;

public WriteHttpPlugin()
{
_httpWriters = new List<HttpWriter>();
}

public void Configure()
{
var config = ConfigurationManager.GetSection("CollectdWinConfig") as CollectdWinConfig;
if (config == null)
{
throw new Exception("WriteHttpPlugin - Cannot get configuration section : CollectdWinConfig");
}

_httpWriters.Clear();

foreach (CollectdWinConfig.WriteHttpNodeConfig node in config.WriteHttp.Nodes)
{
var writer = new HttpWriter
{
Url = node.Url,
Timeout = node.Timeout,
BatchSize = node.BatchSize,
MaxIdleTime = node.MaxIdleTime,
EnableProxy = node.Proxy.Enable
};

if (writer.EnableProxy)
{
writer.WebProxy = node.Proxy.Url.Length > 0 ? new WebProxy(node.Proxy.Url) : new WebProxy();
}
_httpWriters.Add(writer);
}

Logger.Info("WriteHttp plugin configured");
}

public void Start()
{
Logger.Info("WriteHttp - plugin started");
}

public void Stop()
{
Logger.Info("WriteHttp - plugin stopped");
}

public void Write(MetricValue metric)
{
if (metric == null)
{
Logger.Debug("write() - Invalid null metric");
return;
}
foreach (HttpWriter writer in _httpWriters)
{
writer.Write(metric);
}
}
}

internal class HttpWriter
{
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();

public int BatchSize = 20;
public bool EnableProxy = false;
public int MaxIdleTime;
public int Timeout;
public string Url;
public WebProxy WebProxy = null;

private StringBuilder _batchedMetricStr;
private int _numMetrics;

public void Write(MetricValue metric)
{
string message = metric.GetMetricJsonStr();
if (_batchedMetricStr == null)
{
_batchedMetricStr = new StringBuilder("[").Append(message);
}
else
{
_batchedMetricStr.Append(",").Append(message);
}
_numMetrics++;

if (_numMetrics < BatchSize) return;

_batchedMetricStr.Append("]");
HttpPost(_batchedMetricStr.ToString());
_batchedMetricStr = null;
_numMetrics = 0;
}

public void HttpPost(string metricJsonStr)
{
HttpWebResponse response = null;
try
{
byte[] data = Encoding.UTF8.GetBytes(metricJsonStr);
var request = (HttpWebRequest) WebRequest.Create(Url);

request.Method = "POST";
request.ContentType = "application/json";
request.ContentLength = data.Length;
request.UserAgent = "CollectdWin/1.0";
request.Accept = "*/*";
request.KeepAlive = true;
request.Timeout = Timeout;

if (EnableProxy)
{
request.Proxy = WebProxy;
}
if (MaxIdleTime > 0)
{
request.ServicePoint.MaxIdleTime = MaxIdleTime;
}

// Display service point properties.
Logger.Trace("Connection properties: ServicePoint - HashCode:{0}, MaxIdleTime:{1}, IdleSince:{2}",
request.ServicePoint.GetHashCode(), request.ServicePoint.MaxIdleTime, request.ServicePoint.IdleSince);

using (Stream reqStream = request.GetRequestStream())
{
reqStream.Write(data, 0, data.Length);
}

response = (HttpWebResponse) request.GetResponse();
var statusCode = (int) response.StatusCode;
if (statusCode < 200 || statusCode >= 300)
{
Logger.Error("Got a bad response code : ", statusCode);
}

if (!Logger.IsTraceEnabled) return;

Stream respStream = response.GetResponseStream();
string responseString = new StreamReader(respStream).ReadToEnd();
Logger.Trace("Got response: " + responseString);
}
catch (Exception exp)
{
Logger.Error("Got exception in http post : ", exp);
}
finally
{
if (response != null)
{
response.Close();
}
}
}
}
}

// ----------------------------------------------------------------------------
// Copyright (C) 2015 Bloomberg Finance L.P.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// ----------------------------- END-OF-FILE ----------------------------------
9 changes: 9 additions & 0 deletions src/CollectdWinService/app.config
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
<Plugin Name="WindowsPerformanceCounter" Class="BloombergFLP.CollectdWin.WindowsPerformanceCounterPlugin"
Enable="true" />
<Plugin Name="Amqp" Class="BloombergFLP.CollectdWin.AmqpPlugin" Enable="false" />
<Plugin Name="WriteHttp" Class="BloombergFLP.CollectdWin.WriteHttpPlugin" Enable="false" />
<Plugin Name="Console" Class="BloombergFLP.CollectdWin.ConsolePlugin" Enable="true" />
</PluginRegistry>

Expand All @@ -47,6 +48,14 @@
Exchange="exchange" RoutingKeyPrefix="collectd" />
</Amqp>

<WriteHttp>
<Nodes>
<Node Name="example" Url="http://localhost:9223/collectd" Timeout="10000" BatchSize="25" MaxIdleTime="600000">
<Proxy Enable="false" Url="" />
</Node>
</Nodes>
</WriteHttp>

<WindowsPerformanceCounters>
<Counters>
<Counter Category="Processor" Name="% Processor Time" Instance="_Total" CollectdPlugin="aggregation"
Expand Down
2 changes: 1 addition & 1 deletion src/installer/Product.wxs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
IncludeMinimum="yes"
OnlyDetect="no"
Maximum="$(var.Version)"
IncludeMaximum="yes"
IncludeMaximum="no"
Property="PREVIOUSFOUND" />
</Upgrade>

Expand Down

0 comments on commit 631ca1c

Please sign in to comment.