From 03a515d772ab4d043f563815dd2278a04976b593 Mon Sep 17 00:00:00 2001 From: Alexander Moerman Date: Tue, 14 May 2019 09:46:12 +0200 Subject: [PATCH] Use ConcurrentQueue instead of Queue + lock in IntervalBatcher --- .../InfluxDB.Collector.csproj | 1 + .../Pipeline/Batch/IntervalBatcher.cs | 32 ++++++++----------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/src/InfluxDB.Collector/InfluxDB.Collector.csproj b/src/InfluxDB.Collector/InfluxDB.Collector.csproj index e902178..7885c5a 100644 --- a/src/InfluxDB.Collector/InfluxDB.Collector.csproj +++ b/src/InfluxDB.Collector/InfluxDB.Collector.csproj @@ -20,6 +20,7 @@ + diff --git a/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs b/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs index 521ca6f..4d2f177 100644 --- a/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs +++ b/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using InfluxDB.Collector.Diagnostics; using InfluxDB.Collector.Platform; @@ -10,8 +12,7 @@ namespace InfluxDB.Collector.Pipeline.Batch { class IntervalBatcher : IPointEmitter, IDisposable { - readonly object _queueLock = new object(); - Queue _queue = new Queue(); + ConcurrentQueue _queue = new ConcurrentQueue(); readonly TimeSpan _interval; readonly int? _maxBatchSize; @@ -54,19 +55,17 @@ Task OnTick() { try { - Queue batch; - lock (_queueLock) - { - if (_queue.Count == 0) - return Task.Delay(0); - - batch = _queue; - _queue = new Queue(); - } + if (_queue.IsEmpty) + return Task.Delay(0); + + var batch = Interlocked.Exchange(ref _queue, new ConcurrentQueue()).ToArray(); + + if (batch.Length == 0) + return Task.Delay(0); - if (_maxBatchSize == null || batch.Count <= _maxBatchSize.Value) + if (_maxBatchSize == null || batch.Length <= _maxBatchSize.Value) { - _parent.Emit(batch.ToArray()); + _parent.Emit(batch); } else { @@ -104,11 +103,8 @@ public void Emit(PointData[] points) } } - lock (_queueLock) - { - foreach(var point in points) - _queue.Enqueue(point); - } + foreach (var point in points) + _queue.Enqueue(point); } } }