Write AWS Kinesis Firehose Lambda Translation Layer
2022-10-14A detailed guide to implementing a Lambda transformation layer for Kinesis Data Firehose using Golang, with code examples for processing line-JSON data and preparing it for S3 storage in an ETL pipeline.
You can use Lambda to convert data of Kinesis Data Firehose. This is the transformation process of ETL, and called "Data Transformation Flow" officially.
https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
Data Structure
The data Lambda will pull form the Kinesis Data Firehose is line-json format. The line-json format looks like the following:
{"app_version":"10.0.0","sending_time":"***","log_id":"foobar"}
{"app_version":"10.0.0","sending_time":"***","log_id":"foobar"}
{"app_version":"10.0.0","sending_time":"***","log_id":"foobar"}
The line-json is the basic format for handling JSON in streaming pipeline.
You can encode/decode line-json with Golang's default json package.
Let's assume that here is the incoming JSON's struct:
type StreamData struct {
AppVersion string `json:"app_version"`
SendingTime string `json:"sending_time"`
LogID string `json:"log_id"`
}
In this example, we want to convert the above StreamData, then append some arbitrary columns, and call PutObject
to S3 bucket.
Here is the JSON struct as an outcome:
type S3Object struct {
AppVersion string `json:"app_version"`
LogID string `json:"log_id"`
Time string `json:"time"`
SendingTime string `json:"sending_time"`
ProcessingTime string `json:"processing_time"`
}
Time
is the arrival time of log records, which is taken from Kinesis ApproximateArrivalTimestamp
.
ProcessingTime
is the processing time of log records, which is the invocation time of Lambda.
Handler
You can use https://github.com/aws/aws-lambda-go to handle the Lambda data source events. You can find the Firehose events at aws-lambda-go/events/ directory.
https://github.com/aws/aws-lambda-go/blob/master/events/README_KinesisFirehose.md
func main() {
lambda.Start(handler)
}
func handler(evnt events.KinesisFirehoseEvent) (events.KinesisFirehoseResponse, error) {
var response events.KinesisFirehoseResponse
for _, record := range evnt.Records {
transformedRecord, err := buildResponseRecord(record)
if err != nil {
log.Fatalln(err)
}
response.Records = append(response.Records, transformedRecord)
}
return response, nil
}
func buildResponseRecord(record events.KinesisFirehoseEventRecord) (events.KinesisFirehoseResponseRecord, error) {
var transformedRecord events.KinesisFirehoseResponseRecord
data, err := transform(record.Data, record.ApproximateArrivalTimestamp)
if err != nil {
return transformedRecord, err
}
transformedRecord.RecordID = record.RecordID
transformedRecord.Result = events.KinesisFirehoseTransformedStateOk
transformedRecord.Data = []byte(string(data))
return transformedRecord, nil
}
buildResponseRecord
is the core logic of this lambda.
- transform incoming stream data to the out-coming data
- in this example, this is S3Object struct
- append RecordID, Result, and Data to the return objects
- which is required as a Data-Transformer Lambda
- read "Data Transformation and Status Model" section of https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
Transform
Then, let's have a look at transform()
method. This is simple:
- decode the incoming stream data bytes
- convert to the out-coming struct
- encode to data bytes
func transform(dataBytes []byte, arrivalTimestamp events.MilliSecondsEpochTime) ([]byte, error) {
records, err := decode(dataBytes)
if err != nil {
return nil, err
}
objects, err := convertStreamDataToS3Objects(records, arrivalTimestamp)
if err != nil {
return nil, err
}
outBytes, err := encode(objects)
if err != nil {
return nil, err
}
return outBytes, nil
}
Decode
Decoding process is also simple. You can use json.Decoder and (*Decoder).*
methods.
func decode(dataBytes []byte) ([]*StreamData, error) {
decoder := json.NewDecoder(strings.NewReader(string(dataBytes)))
var records []*StreamData
for decoder.More() {
var d StreamData
err := decoder.Decode(&d)
if err != nil {
return nil, err
}
records = append(records, &d)
}
return records, nil
}
Converter
Here is the application specific logic comes in.
func convertStreamDataToS3Objects(records []*StreamData, arrivalTimestamp events.MilliSecondsEpochTime) ([]*S3Object, error) {
objects := make([]*S3Object, len(records))
for _, streamData := range records {
obj := &S3Object{
AppVersion: streamData.AppVersion,
LogID: streamData.LogID,
Time: arrivalTimestamp.Format(time.RFC3339),
SendingTime: streamData.SendingTime,
ProcessingTime: time.Now().Format(time.RFC3339),
}
objects = append(objects, obj)
}
return filterOutNil(objects), nil
}
func filterOutNil(objects []*S3Object) []*S3Object {
filteredObjects := objects[:0]
// filtering without allocating (https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating)
for _, o := range objects {
if o != nil {
filteredObjects = append(filteredObjects, o)
}
}
return filteredObjects
}
Encode
Finally, encode the out-coming JSON struct to data bytes.
func encode(objects []*S3Object) ([]byte, error) {
var outBytes bytes.Buffer
encoder := json.NewEncoder(&outBytes)
for _, obj := range objects {
err := encoder.Encode(&obj)
if err != nil {
return nil, err
}
}
return outBytes.Bytes(), nil
}