Skip to content
This repository was archived by the owner on Jul 18, 2023. It is now read-only.

Commit b47e85f

Browse files
committed
Implement Aggregation of Increment and Time measurements
Fixes #9
1 parent bec2ce3 commit b47e85f

File tree

9 files changed

+566
-91
lines changed

9 files changed

+566
-91
lines changed

src/InfluxDB.Collector/CollectorConfiguration.cs

+6-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public class CollectorConfiguration
1010
readonly PipelinedCollectorTagConfiguration _tag;
1111
readonly PipelinedCollectorEmitConfiguration _emitter;
1212
readonly PipelinedCollectorBatchConfiguration _batcher;
13+
readonly PipelinedCollectorAggregateConfiguration _aggregator;
1314

1415
public CollectorConfiguration()
1516
: this(null)
@@ -22,6 +23,7 @@ internal CollectorConfiguration(IPointEmitter parent = null)
2223
_tag = new PipelinedCollectorTagConfiguration(this);
2324
_emitter = new PipelinedCollectorEmitConfiguration(this);
2425
_batcher = new PipelinedCollectorBatchConfiguration(this);
26+
_aggregator = new PipelinedCollectorAggregateConfiguration(this);
2527
}
2628

2729
public CollectorTagConfiguration Tag => _tag;
@@ -30,6 +32,8 @@ internal CollectorConfiguration(IPointEmitter parent = null)
3032

3133
public CollectorBatchConfiguration Batch => _batcher;
3234

35+
public CollectorAggregateConfiguration Aggregate => _aggregator;
36+
3337
public MetricsCollector CreateCollector()
3438
{
3539
Action disposeEmitter;
@@ -38,6 +42,7 @@ public MetricsCollector CreateCollector()
3842
var emitter = _parent;
3943
emitter = _emitter.CreateEmitter(emitter, out disposeEmitter);
4044
emitter = _batcher.CreateEmitter(emitter, out disposeBatcher);
45+
emitter = _aggregator.CreateEmitter(emitter, out disposeEmitter);
4146

4247
return new PipelinedMetricsCollector(emitter, _tag.CreateEnricher(), () =>
4348
{
@@ -46,4 +51,4 @@ public MetricsCollector CreateCollector()
4651
});
4752
}
4853
}
49-
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System;
2+
using System.Collections.Generic;
3+
4+
namespace InfluxDB.Collector.Configuration
5+
{
6+
public abstract class CollectorAggregateConfiguration
7+
{
8+
public abstract CollectorConfiguration AtInterval(TimeSpan interval);
9+
10+
public abstract CollectorConfiguration SumIncrements();
11+
12+
public abstract CollectorConfiguration AggregateTimes(Func<IEnumerable<long>, double> func);
13+
}
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using InfluxDB.Collector.Pipeline;
4+
using InfluxDB.Collector.Pipeline.Aggregate;
5+
6+
namespace InfluxDB.Collector.Configuration
7+
{
8+
class PipelinedCollectorAggregateConfiguration : CollectorAggregateConfiguration
9+
{
10+
private readonly CollectorConfiguration _configuration;
11+
12+
bool _sumIncrements;
13+
Func<IEnumerable<long>, double> _timeAggregation;
14+
TimeSpan? _interval;
15+
16+
public PipelinedCollectorAggregateConfiguration(CollectorConfiguration configuration)
17+
{
18+
if (configuration == null) throw new ArgumentNullException(nameof(configuration));
19+
_configuration = configuration;
20+
}
21+
22+
public override CollectorConfiguration AtInterval(TimeSpan interval)
23+
{
24+
_interval = interval;
25+
return _configuration;
26+
}
27+
28+
public override CollectorConfiguration SumIncrements()
29+
{
30+
_sumIncrements = true;
31+
return _configuration;
32+
}
33+
34+
public override CollectorConfiguration AggregateTimes(Func<IEnumerable<long>, double> func)
35+
{
36+
_timeAggregation = func;
37+
return _configuration;
38+
}
39+
40+
public IPointEmitter CreateEmitter(IPointEmitter parent, out Action dispose)
41+
{
42+
if (_interval == null)
43+
{
44+
dispose = null;
45+
return parent;
46+
}
47+
48+
var aggregator = new AggregatePointEmitter(_interval.Value, _sumIncrements, _timeAggregation, parent);
49+
dispose = aggregator.Dispose;
50+
return aggregator;
51+
}
52+
}
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
using System;
2+
using System.Collections.Generic;
3+
4+
namespace InfluxDB.Collector.Pipeline.Aggregate
5+
{
6+
struct GroupingKey : IEquatable<GroupingKey>
7+
{
8+
private static readonly Dictionary<string, string> EmptyDict = new Dictionary<string, string>();
9+
10+
public long Bucket { get; }
11+
12+
public MeasurementKind Kind { get; }
13+
14+
public string Measurement { get; }
15+
16+
public Dictionary<string, string> Tags { get; }
17+
18+
public GroupingKey(long bucket, MeasurementKind kind, string measurement, Dictionary<string, string> tags)
19+
{
20+
Bucket = bucket;
21+
Kind = kind;
22+
Measurement = measurement;
23+
Tags = tags ?? EmptyDict;
24+
}
25+
26+
public bool Equals(GroupingKey other)
27+
{
28+
return Bucket == other.Bucket && Kind == other.Kind && Measurement == other.Measurement && DictionaryEquals(Tags, other.Tags);
29+
}
30+
31+
public override bool Equals(object obj)
32+
{
33+
if (ReferenceEquals(null, obj))
34+
{
35+
return false;
36+
}
37+
38+
return obj is GroupingKey key && Equals(key);
39+
}
40+
41+
public override int GetHashCode()
42+
{
43+
unchecked
44+
{
45+
int hashCode = Bucket.GetHashCode();
46+
hashCode = (hashCode * 397) ^ (int) Kind;
47+
hashCode = (hashCode * 397) ^ Measurement.GetHashCode();
48+
hashCode = (hashCode * 397) ^ TagsHashCode();
49+
return hashCode;
50+
}
51+
}
52+
53+
int TagsHashCode()
54+
{
55+
unchecked
56+
{
57+
int hashCode = 1;
58+
foreach (var kvp in Tags)
59+
{
60+
hashCode *= (kvp.Key.GetHashCode() * 397) ^ kvp.Key.GetHashCode();
61+
}
62+
63+
return hashCode;
64+
}
65+
}
66+
67+
static bool DictionaryEquals(Dictionary<string, string> dict, Dictionary<string, string> dict2)
68+
{
69+
if (dict.Count != dict2.Count)
70+
{
71+
return false;
72+
}
73+
74+
foreach (var kvp in dict)
75+
{
76+
if (dict2.TryGetValue(kvp.Key, out string value))
77+
{
78+
if (value != kvp.Value)
79+
{
80+
return false;
81+
}
82+
}
83+
else
84+
{
85+
return false;
86+
}
87+
}
88+
89+
return true;
90+
}
91+
}
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using InfluxDB.Collector.Pipeline.Common;
5+
6+
namespace InfluxDB.Collector.Pipeline.Aggregate
7+
{
8+
class AggregatePointEmitter : IntervalEmitterBase
9+
{
10+
readonly bool _sumIncrements;
11+
readonly Func<IEnumerable<long>, double> _timesAggregation;
12+
readonly IPointEmitter _parent;
13+
14+
public AggregatePointEmitter(TimeSpan timeSpan, bool sumIncrements, Func<IEnumerable<long>, double> timesAggregation, IPointEmitter parent)
15+
: base(timeSpan)
16+
{
17+
_sumIncrements = sumIncrements;
18+
_timesAggregation = timesAggregation;
19+
_parent = parent;
20+
}
21+
22+
protected override void HandleBatch(IReadOnlyCollection<PointData> batch)
23+
{
24+
var grouped = batch.GroupBy(x => new GroupingKey(
25+
x.UtcTimestamp.HasValue ? x.UtcTimestamp.Value.Ticks / _interval.Ticks : 0,
26+
DetermineKind(x),
27+
x.Measurement,
28+
x.Tags
29+
));
30+
31+
var aggregated = grouped.SelectMany(Aggregate).ToArray();
32+
33+
_parent.Emit(aggregated);
34+
}
35+
36+
IEnumerable<PointData> Aggregate(IGrouping<GroupingKey, PointData> group)
37+
{
38+
GroupingKey key = group.Key;
39+
MeasurementKind kind = key.Kind;
40+
41+
if (kind == MeasurementKind.Increment && _sumIncrements)
42+
{
43+
long sum = group.Sum(x => (long) x.Fields["count"]);
44+
return new[]
45+
{
46+
new PointData(
47+
key.Measurement,
48+
new Dictionary<string, object> { { "count", sum } },
49+
key.Tags,
50+
AverageTime(key))
51+
};
52+
}
53+
54+
if (kind == MeasurementKind.Time && _timesAggregation != null)
55+
{
56+
long ticks = (long) _timesAggregation(group.Select(x => ((TimeSpan) x.Fields["value"]).Ticks));
57+
return new[]
58+
{
59+
new PointData(
60+
key.Measurement,
61+
new Dictionary<string, object> { { "value", new TimeSpan(ticks) } },
62+
key.Tags,
63+
AverageTime(key))
64+
};
65+
}
66+
67+
return group;
68+
}
69+
70+
private DateTime AverageTime(GroupingKey key)
71+
{
72+
return new DateTime(key.Bucket * _interval.Ticks + _interval.Ticks / 2, DateTimeKind.Utc);
73+
}
74+
75+
static MeasurementKind DetermineKind(PointData x)
76+
{
77+
if (x.Fields.Count != 1) return MeasurementKind.Other;
78+
79+
if (x.Fields.TryGetValue("count", out var count) && count is long)
80+
{
81+
return MeasurementKind.Increment;
82+
}
83+
else if (x.Fields.TryGetValue("value", out var value) && value is TimeSpan)
84+
{
85+
return MeasurementKind.Time;
86+
}
87+
else
88+
{
89+
return MeasurementKind.Other;
90+
}
91+
}
92+
}
93+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace InfluxDB.Collector.Pipeline.Aggregate
2+
{
3+
public enum MeasurementKind
4+
{
5+
Other = 0, Increment, Time
6+
}
7+
}

0 commit comments

Comments
 (0)