From 80084fafbe6aa6f427a8ea3027c0e800689c9e75 Mon Sep 17 00:00:00 2001 From: Arup Malakar Date: Wed, 20 Nov 2024 19:11:51 -0800 Subject: [PATCH 1/2] Support otfMetadata in Kinesis firehose response metadata for iceberg table routing --- events/firehose.go | 9 ++++++++- .../testdata/kinesis-firehose-response.json | 19 +++++++++++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/events/firehose.go b/events/firehose.go index 85b8fd18..9e63020c 100644 --- a/events/firehose.go +++ b/events/firehose.go @@ -37,7 +37,14 @@ type KinesisFirehoseResponseRecord struct { } type KinesisFirehoseResponseRecordMetadata struct { - PartitionKeys map[string]string `json:"partitionKeys"` + PartitionKeys map[string]string `json:"partitionKeys"` + OTFMetadata KinesisFirehoseResponseOTFMetadata `json:"otfMetadata"` +} + +type KinesisFirehoseResponseOTFMetadata struct { + DestinationTableName string `json:"destinationTableName"` + DestinationDatabaseName string `json:"destinationDatabaseName"` + Operation string `json:"operation"` } type KinesisFirehoseRecordMetadata struct { diff --git a/events/testdata/kinesis-firehose-response.json b/events/testdata/kinesis-firehose-response.json index c7c4466c..2de412d7 100644 --- a/events/testdata/kinesis-firehose-response.json +++ b/events/testdata/kinesis-firehose-response.json @@ -5,7 +5,12 @@ "recordId": "record1", "result": "TRANSFORMED_STATE_OK", "metadata": { - "partitionKeys": {} + "partitionKeys": {}, + "otfMetadata": { + "destinationTableName": "", + "destinationDatabaseName": "", + "operation": "" + } } }, { @@ -13,7 +18,12 @@ "recordId": "record2", "result": "TRANSFORMED_STATE_DROPPED", "metadata": { - "partitionKeys": {} + "partitionKeys": {}, + "otfMetadata": { + "destinationTableName": "", + "destinationDatabaseName": "", + "operation": "" + } } }, { @@ -24,6 +34,11 @@ "partitionKeys": { "iamKey1": "iamValue1", "iamKey2": "iamValue2" + }, + "otfMetadata": { + "destinationTableName": "", + "destinationDatabaseName": "", + "operation": "" } } } From 88cdb7b06e73c99ecbfdde5350be4400a64a0239 Mon Sep 17 00:00:00 2001 From: Arup Malakar Date: Wed, 20 Nov 2024 19:18:34 -0800 Subject: [PATCH 2/2] Add constant for otf operation --- events/firehose.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/events/firehose.go b/events/firehose.go index 9e63020c..583c6599 100644 --- a/events/firehose.go +++ b/events/firehose.go @@ -25,6 +25,13 @@ const ( KinesisFirehoseTransformedStateProcessingFailed = "ProcessingFailed" ) +// Constants used for otf operation for the record +const ( + KinesisFirehoseOtfOperationInsert = "insert" + KinesisFirehoseOtfOperationUpdate = "update" + KinesisFirehoseOtfOperationDelete = "delete" +) + type KinesisFirehoseResponse struct { Records []KinesisFirehoseResponseRecord `json:"records"` }