Skip to content

Commit c1289d4

Browse files
committed
Implement Aggregation of Increment and Time measurements
Fixes influxdata#9
1 parent 7dc6860 commit c1289d4

10 files changed

+492
-6
lines changed

src/InfluxDB.Collector/CollectorConfiguration.cs

+5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public class CollectorConfiguration
1010
readonly PipelinedCollectorTagConfiguration _tag;
1111
readonly PipelinedCollectorEmitConfiguration _emitter;
1212
readonly PipelinedCollectorBatchConfiguration _batcher;
13+
readonly PipelinedCollectorAggregateConfiguration _aggregator;
1314

1415
public CollectorConfiguration()
1516
: this(null)
@@ -22,6 +23,7 @@ internal CollectorConfiguration(IPointEmitter parent = null)
2223
_tag = new PipelinedCollectorTagConfiguration(this);
2324
_emitter = new PipelinedCollectorEmitConfiguration(this);
2425
_batcher = new PipelinedCollectorBatchConfiguration(this);
26+
_aggregator = new PipelinedCollectorAggregateConfiguration(this);
2527
}
2628

2729
public CollectorTagConfiguration Tag => _tag;
@@ -30,13 +32,16 @@ internal CollectorConfiguration(IPointEmitter parent = null)
3032

3133
public CollectorBatchConfiguration Batch => _batcher;
3234

35+
public CollectorAggregateConfiguration Aggregate => _aggregator;
36+
3337
public MetricsCollector CreateCollector()
3438
{
3539
Action disposeEmitter;
3640
Action disposeBatcher;
3741

3842
var emitter = _parent;
3943
emitter = _emitter.CreateEmitter(emitter, out disposeEmitter);
44+
emitter = _aggregator.CreateEmitter(emitter, out disposeEmitter);
4045
emitter = _batcher.CreateEmitter(emitter, out disposeBatcher);
4146

4247
return new PipelinedMetricsCollector(emitter, _tag.CreateEnricher(), () =>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System;
2+
using System.Collections.Generic;
3+
4+
namespace InfluxDB.Collector.Configuration
5+
{
6+
public abstract class CollectorAggregateConfiguration
7+
{
8+
public abstract CollectorConfiguration AtInterval(TimeSpan interval);
9+
10+
public abstract CollectorConfiguration SumIncrements();
11+
12+
public abstract CollectorConfiguration AggregateTimes(Func<IEnumerable<long>, double> func);
13+
}
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using InfluxDB.Collector.Pipeline;
4+
using InfluxDB.Collector.Pipeline.Aggregate;
5+
6+
namespace InfluxDB.Collector.Configuration
7+
{
8+
class PipelinedCollectorAggregateConfiguration : CollectorAggregateConfiguration
9+
{
10+
private readonly CollectorConfiguration _configuration;
11+
12+
bool _sumIncrements;
13+
Func<IEnumerable<long>, double> _timeAggregation;
14+
TimeSpan? _interval;
15+
16+
public PipelinedCollectorAggregateConfiguration(CollectorConfiguration configuration)
17+
{
18+
if (configuration == null) throw new ArgumentNullException(nameof(configuration));
19+
_configuration = configuration;
20+
}
21+
22+
public override CollectorConfiguration AtInterval(TimeSpan interval)
23+
{
24+
_interval = interval;
25+
return _configuration;
26+
}
27+
28+
public override CollectorConfiguration SumIncrements()
29+
{
30+
_sumIncrements = true;
31+
return _configuration;
32+
}
33+
34+
public override CollectorConfiguration AggregateTimes(Func<IEnumerable<long>, double> func)
35+
{
36+
_timeAggregation = func;
37+
return _configuration;
38+
}
39+
40+
public IPointEmitter CreateEmitter(IPointEmitter parent, out Action dispose)
41+
{
42+
if (_interval == null)
43+
{
44+
dispose = null;
45+
return parent;
46+
}
47+
48+
var aggregator = new AggregatePointEmitter(_interval.Value, _sumIncrements, _timeAggregation, parent);
49+
dispose = aggregator.Dispose;
50+
return aggregator;
51+
}
52+
}
53+
}

src/InfluxDB.Collector/MetricsCollector.cs

+5-5
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ public abstract class MetricsCollector : IPointEmitter, IDisposable
1111

1212
public void Increment(string measurement, long count = 1, IReadOnlyDictionary<string, string> tags = null)
1313
{
14-
Write(measurement, new Dictionary<string, object> { { "count", count } }, tags);
14+
Write(measurement, new Dictionary<string, object> { { "count", count } }, tags, kind: MeasurementKind.Increment);
1515
}
1616

17-
public void Measure(string measurement, object value, IReadOnlyDictionary<string, string> tags = null)
17+
public void Measure(string measurement, object value, IReadOnlyDictionary<string, string> tags = null, MeasurementKind kind = MeasurementKind.Other)
1818
{
19-
Write(measurement, new Dictionary<string, object> { { "value", value } }, tags);
19+
Write(measurement, new Dictionary<string, object> { { "value", value } }, tags, kind: kind);
2020
}
2121

2222
public IDisposable Time(string measurement, IReadOnlyDictionary<string, string> tags = null)
@@ -36,11 +36,11 @@ public void Dispose()
3636

3737
protected virtual void Dispose(bool disposing) { }
3838

39-
public void Write(string measurement, IReadOnlyDictionary<string, object> fields, IReadOnlyDictionary<string, string> tags = null, DateTime? timestamp = null)
39+
public void Write(string measurement, IReadOnlyDictionary<string, object> fields, IReadOnlyDictionary<string, string> tags = null, DateTime? timestamp = null, MeasurementKind kind = MeasurementKind.Other)
4040
{
4141
try
4242
{
43-
var point = new PointData(measurement, fields, tags, timestamp ?? _timestampSource.GetUtcNow());
43+
var point = new PointData(measurement, fields, tags, timestamp ?? _timestampSource.GetUtcNow(), kind);
4444
Emit(new[] { point });
4545
}
4646
catch (Exception ex)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
using System;
2+
using System.Collections.Generic;
3+
4+
namespace InfluxDB.Collector.Pipeline.Aggregate
5+
{
6+
struct GroupingKey : IEquatable<GroupingKey>
7+
{
8+
private static readonly Dictionary<string, string> EmptyDict = new Dictionary<string, string>();
9+
10+
public long Bucket { get; }
11+
12+
public MeasurementKind Kind { get; }
13+
14+
public string Measurement { get; }
15+
16+
public Dictionary<string, string> Tags { get; }
17+
18+
public GroupingKey(long bucket, MeasurementKind kind, string measurement, Dictionary<string, string> tags)
19+
{
20+
Bucket = bucket;
21+
Kind = kind;
22+
Measurement = measurement;
23+
Tags = tags ?? EmptyDict;
24+
}
25+
26+
public bool Equals(GroupingKey other)
27+
{
28+
return Bucket == other.Bucket && Kind == other.Kind && Measurement == other.Measurement && DictionaryEquals(Tags, other.Tags);
29+
}
30+
31+
public override bool Equals(object obj)
32+
{
33+
if (ReferenceEquals(null, obj))
34+
{
35+
return false;
36+
}
37+
38+
return obj is GroupingKey key && Equals(key);
39+
}
40+
41+
public override int GetHashCode()
42+
{
43+
unchecked
44+
{
45+
int hashCode = Bucket.GetHashCode();
46+
hashCode = (hashCode * 397) ^ (int) Kind;
47+
hashCode = (hashCode * 397) ^ Measurement.GetHashCode();
48+
hashCode = (hashCode * 397) ^ TagsHashCode();
49+
return hashCode;
50+
}
51+
}
52+
53+
int TagsHashCode()
54+
{
55+
unchecked
56+
{
57+
int hashCode = 1;
58+
foreach (var kvp in Tags)
59+
{
60+
hashCode *= (kvp.Key.GetHashCode() * 397) ^ kvp.Key.GetHashCode();
61+
}
62+
63+
return hashCode;
64+
}
65+
}
66+
67+
static bool DictionaryEquals(Dictionary<string, string> dict, Dictionary<string, string> dict2)
68+
{
69+
if (dict.Count != dict2.Count)
70+
{
71+
return false;
72+
}
73+
74+
foreach (var kvp in dict)
75+
{
76+
if (dict2.TryGetValue(kvp.Key, out string value))
77+
{
78+
if (value != kvp.Value)
79+
{
80+
return false;
81+
}
82+
}
83+
else
84+
{
85+
return false;
86+
}
87+
}
88+
89+
return true;
90+
}
91+
}
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// ==========================================================================
2+
// AggregatePointEmitter.cs
3+
// Bus Portal (busliniensuche.de)
4+
// ==========================================================================
5+
// All rights reserved.
6+
// ==========================================================================
7+
8+
using System;
9+
using System.Collections.Generic;
10+
using System.Linq;
11+
12+
namespace InfluxDB.Collector.Pipeline.Aggregate
13+
{
14+
class AggregatePointEmitter : IPointEmitter, IDisposable
15+
{
16+
readonly TimeSpan _timeSpan;
17+
readonly bool _sumIncrements;
18+
readonly Func<IEnumerable<long>, double> _timesAggregation;
19+
readonly IPointEmitter _parent;
20+
21+
public AggregatePointEmitter(TimeSpan timeSpan, bool sumIncrements, Func<IEnumerable<long>, double> timesAggregation, IPointEmitter parent)
22+
{
23+
_timeSpan = timeSpan;
24+
_sumIncrements = sumIncrements;
25+
_timesAggregation = timesAggregation;
26+
_parent = parent;
27+
}
28+
29+
public void Emit(PointData[] points)
30+
{
31+
var grouped = points.GroupBy(x => new GroupingKey(
32+
x.UtcTimestamp.HasValue ? x.UtcTimestamp.Value.Ticks / _timeSpan.Ticks : 0,
33+
DetermineKind(x),
34+
x.Measurement,
35+
x.Tags
36+
));
37+
38+
var aggregated = grouped.SelectMany(Aggregate).ToArray();
39+
40+
_parent.Emit(aggregated);
41+
}
42+
43+
IEnumerable<PointData> Aggregate(IGrouping<GroupingKey, PointData> group)
44+
{
45+
GroupingKey key = group.Key;
46+
MeasurementKind kind = key.Kind;
47+
48+
if (kind == MeasurementKind.Increment && _sumIncrements)
49+
{
50+
long sum = group.Sum(x => (long) x.Fields["count"]);
51+
return new[]
52+
{
53+
new PointData(
54+
key.Measurement,
55+
new Dictionary<string, object> { { "count", sum } },
56+
key.Tags,
57+
AverageTime(key),
58+
key.Kind)
59+
};
60+
}
61+
62+
if (kind == MeasurementKind.Time && _timesAggregation != null)
63+
{
64+
long ticks = (long) _timesAggregation(group.Select(x => ((TimeSpan) x.Fields["value"]).Ticks));
65+
return new[]
66+
{
67+
new PointData(
68+
key.Measurement,
69+
new Dictionary<string, object> { { "value", new TimeSpan(ticks) } },
70+
key.Tags,
71+
AverageTime(key),
72+
key.Kind)
73+
};
74+
}
75+
76+
return group;
77+
}
78+
79+
private DateTime AverageTime(GroupingKey key)
80+
{
81+
return new DateTime(key.Bucket + _timeSpan.Ticks / 2, DateTimeKind.Utc);
82+
}
83+
84+
static MeasurementKind DetermineKind(PointData x)
85+
{
86+
if (x.Fields.Count != 1) return MeasurementKind.Other;
87+
88+
switch (x.Kind)
89+
{
90+
case MeasurementKind.Increment when x.Fields.TryGetValue("count", out var count) && count is long:
91+
return MeasurementKind.Increment;
92+
case MeasurementKind.Time when x.Fields.TryGetValue("value", out var value) && value is TimeSpan:
93+
return MeasurementKind.Time;
94+
default:
95+
return MeasurementKind.Other;
96+
}
97+
}
98+
99+
public void Dispose()
100+
{
101+
}
102+
}
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace InfluxDB.Collector.Pipeline
2+
{
3+
public enum MeasurementKind
4+
{
5+
Other = 0, Increment, Time
6+
}
7+
}

src/InfluxDB.Collector/Pipeline/PointData.cs

+11
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public class PointData
1010
public Dictionary<string, object> Fields { get; }
1111
public Dictionary<string, string> Tags { get; set; }
1212
public DateTime? UtcTimestamp { get; }
13+
public MeasurementKind Kind { get; }
1314

1415
public PointData(
1516
string measurement,
@@ -25,5 +26,15 @@ public PointData(
2526
Tags = tags.ToDictionary(kv => kv.Key, kv => kv.Value);
2627
UtcTimestamp = utcTimestamp;
2728
}
29+
30+
public PointData(
31+
string measurement,
32+
IReadOnlyDictionary<string, object> fields,
33+
IReadOnlyDictionary<string, string> tags,
34+
DateTime utcTimestamp,
35+
MeasurementKind kind) : this(measurement, fields, tags, utcTimestamp)
36+
{
37+
Kind = kind;
38+
}
2839
}
2940
}

src/InfluxDB.Collector/StopwatchTimer.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Diagnostics;
4+
using InfluxDB.Collector.Pipeline;
45

56
namespace InfluxDB.Collector
67
{
@@ -22,7 +23,7 @@ public StopwatchTimer(MetricsCollector collector, string measurement, IReadOnlyD
2223
public void Dispose()
2324
{
2425
_stopwatch.Stop();
25-
_collector.Measure(_measurement, _stopwatch.Elapsed, _tags);
26+
_collector.Measure(_measurement, _stopwatch.Elapsed, _tags, MeasurementKind.Time);
2627
}
2728
}
2829
}

0 commit comments

Comments
 (0)