From 0f7f45a38244dc4e1a50b8eaf1e8ebcfaac13dbe Mon Sep 17 00:00:00 2001 From: Lucas Angeli Date: Fri, 7 Mar 2025 16:40:29 -0300 Subject: [PATCH] Implement RdKafka getAssignment Method --- pkg/rdkafka/RdKafkaConsumer.php | 17 ++++++ pkg/rdkafka/Tests/RdKafkaConsumerTest.php | 66 +++++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 8b6cf12c6..1158942b5 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -10,6 +10,7 @@ use Interop\Queue\Queue; use RdKafka\KafkaConsumer; use RdKafka\TopicPartition; +use RdKafka\Exception as RdKafkaException; class RdKafkaConsumer implements Consumer { @@ -88,6 +89,22 @@ public function getQueue(): Queue return $this->topic; } + /** + * @return RdKafkaTopic[] + */ + public function getAssignment(): array + { + try { + return array_map(function (TopicPartition $partition) { + $topic = new RdKafkaTopic($partition->getTopic()); + $topic->setPartition($partition->getPartition()); + return $topic; + }, $this->consumer->getAssignment()); + } catch (RdKafkaException) { + return []; + } + } + /** * @return RdKafkaMessage */ diff --git a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php index a0544da6c..dd73d751e 100644 --- a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php @@ -10,6 +10,8 @@ use PHPUnit\Framework\TestCase; use RdKafka\KafkaConsumer; use RdKafka\Message; +use RdKafka\TopicPartition; +use RdKafka\Exception as RdKafkaException; class RdKafkaConsumerTest extends TestCase { @@ -258,6 +260,62 @@ public function testShouldAllowGetPreviouslySetSerializer() $this->assertSame($expectedSerializer, $consumer->getSerializer()); } + public function testShouldGetAssignmentWhenThereAreNoPartitions(): void + { + $rdKafka = $this->createKafkaConsumerMock(); + $rdKafka->expects($this->once()) + ->method('getAssignment') + ->willReturn([]); + + $consumer = new RdKafkaConsumer( + $rdKafka, + $this->createContextMock(), + new RdKafkaTopic(''), + $this->createSerializerMock() + ); + + $this->assertEquals([], $consumer->getAssignment()); + } + + public function testShouldGetAssignmentWhenThereArePartitions(): void + { + $partition = new TopicPartition('', 0); + + $rdKafka = $this->createKafkaConsumerMock(); + $rdKafka->expects($this->once()) + ->method('getAssignment') + ->willReturn([$partition]); + + $consumer = new RdKafkaConsumer( + $rdKafka, + $this->createContextMock(), + new RdKafkaTopic(''), + $this->createSerializerMock() + ); + + $expected = new RdKafkaTopic(''); + $expected->setPartition(0); + + $this->assertEquals([$expected], $consumer->getAssignment()); + } + + public function testShouldGetAssignmentReturnEmptyArrayWhenThrowException(): void + { + $rdKafka = $this->createKafkaConsumerMock(); + $rdKafka->expects($this->once()) + ->method('getAssignment') + ->willThrowException($this->createExceptionMock()); + + $consumer = new RdKafkaConsumer( + $rdKafka, + $this->createContextMock(), + new RdKafkaTopic(''), + $this->createSerializerMock() + ); + + $this->assertEquals([], $consumer->getAssignment()); + } + /** * @return \PHPUnit\Framework\MockObject\MockObject|KafkaConsumer */ @@ -281,4 +339,12 @@ private function createSerializerMock() { return $this->createMock(Serializer::class); } + + /** + * @return \PHPUnit\Framework\MockObject\MockObject|RdKafkaException + */ + private function createExceptionMock() + { + return $this->createMock(RdKafkaException::class); + } }