diff --git a/src/InfluxDB.Collector/InfluxDB.Collector.csproj b/src/InfluxDB.Collector/InfluxDB.Collector.csproj
index e902178..7885c5a 100644
--- a/src/InfluxDB.Collector/InfluxDB.Collector.csproj
+++ b/src/InfluxDB.Collector/InfluxDB.Collector.csproj
@@ -20,6 +20,7 @@
+
diff --git a/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs b/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs
index 521ca6f..4d2f177 100644
--- a/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs
+++ b/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs
@@ -1,6 +1,8 @@
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using InfluxDB.Collector.Diagnostics;
using InfluxDB.Collector.Platform;
@@ -10,8 +12,7 @@ namespace InfluxDB.Collector.Pipeline.Batch
{
class IntervalBatcher : IPointEmitter, IDisposable
{
- readonly object _queueLock = new object();
- Queue _queue = new Queue();
+ ConcurrentQueue _queue = new ConcurrentQueue();
readonly TimeSpan _interval;
readonly int? _maxBatchSize;
@@ -54,19 +55,17 @@ Task OnTick()
{
try
{
- Queue batch;
- lock (_queueLock)
- {
- if (_queue.Count == 0)
- return Task.Delay(0);
-
- batch = _queue;
- _queue = new Queue();
- }
+ if (_queue.IsEmpty)
+ return Task.Delay(0);
+
+ var batch = Interlocked.Exchange(ref _queue, new ConcurrentQueue()).ToArray();
+
+ if (batch.Length == 0)
+ return Task.Delay(0);
- if (_maxBatchSize == null || batch.Count <= _maxBatchSize.Value)
+ if (_maxBatchSize == null || batch.Length <= _maxBatchSize.Value)
{
- _parent.Emit(batch.ToArray());
+ _parent.Emit(batch);
}
else
{
@@ -104,11 +103,8 @@ public void Emit(PointData[] points)
}
}
- lock (_queueLock)
- {
- foreach(var point in points)
- _queue.Enqueue(point);
- }
+ foreach (var point in points)
+ _queue.Enqueue(point);
}
}
}