diff --git a/src/InfluxDB.Collector/InfluxDB.Collector.csproj b/src/InfluxDB.Collector/InfluxDB.Collector.csproj index e902178..7885c5a 100644 --- a/src/InfluxDB.Collector/InfluxDB.Collector.csproj +++ b/src/InfluxDB.Collector/InfluxDB.Collector.csproj @@ -20,6 +20,7 @@ + diff --git a/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs b/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs index 521ca6f..4d2f177 100644 --- a/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs +++ b/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using InfluxDB.Collector.Diagnostics; using InfluxDB.Collector.Platform; @@ -10,8 +12,7 @@ namespace InfluxDB.Collector.Pipeline.Batch { class IntervalBatcher : IPointEmitter, IDisposable { - readonly object _queueLock = new object(); - Queue _queue = new Queue(); + ConcurrentQueue _queue = new ConcurrentQueue(); readonly TimeSpan _interval; readonly int? _maxBatchSize; @@ -54,19 +55,17 @@ Task OnTick() { try { - Queue batch; - lock (_queueLock) - { - if (_queue.Count == 0) - return Task.Delay(0); - - batch = _queue; - _queue = new Queue(); - } + if (_queue.IsEmpty) + return Task.Delay(0); + + var batch = Interlocked.Exchange(ref _queue, new ConcurrentQueue()).ToArray(); + + if (batch.Length == 0) + return Task.Delay(0); - if (_maxBatchSize == null || batch.Count <= _maxBatchSize.Value) + if (_maxBatchSize == null || batch.Length <= _maxBatchSize.Value) { - _parent.Emit(batch.ToArray()); + _parent.Emit(batch); } else { @@ -104,11 +103,8 @@ public void Emit(PointData[] points) } } - lock (_queueLock) - { - foreach(var point in points) - _queue.Enqueue(point); - } + foreach (var point in points) + _queue.Enqueue(point); } } }