Write AWS Kinesis Firehose Lambda Translation Layer

You can use Lambda to convert data of Kinesis Data Firehose. This is the transformation process of ETL, and called "Data Transformation Flow" officially.

https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

Data Structure

The data Lambda will pull form the Kinesis Data Firehose is line-json format. The line-json format looks like the following:

{"app_version":"10.0.0","sending_time":"***","log_id":"foobar"}
{"app_version":"10.0.0","sending_time":"***","log_id":"foobar"}
{"app_version":"10.0.0","sending_time":"***","log_id":"foobar"}

The line-json is the basic format for handling JSON in streaming pipeline.

You can encode/decode line-json with Golang's default json package.

Let's assume that here is the incoming JSON's struct:

type StreamData struct {
    AppVersion  string `json:"app_version"`
    SendingTime string `json:"sending_time"`
    LogID       string `json:"log_id"`
}

In this example, we want to convert the above StreamData, then append some arbitrary columns, and call PutObject to S3 bucket.

Here is the JSON struct as an outcome:

type S3Object struct {
    AppVersion     string `json:"app_version"`
    LogID          string `json:"log_id"`
    Time           string `json:"time"`
    SendingTime    string `json:"sending_time"`
    ProcessingTime string `json:"processing_time"`
}

Time is the arrival time of log records, which is taken from Kinesis ApproximateArrivalTimestamp.

ProcessingTime is the processing time of log records, which is the invocation time of Lambda.

Handler

You can use https://github.com/aws/aws-lambda-go to handle the Lambda data source events. You can find the Firehose events at aws-lambda-go/events/ directory.

https://github.com/aws/aws-lambda-go/blob/master/events/README_KinesisFirehose.md

func main() {
    lambda.Start(handler)
}

func handler(evnt events.KinesisFirehoseEvent) (events.KinesisFirehoseResponse, error) {
    var response events.KinesisFirehoseResponse

    for _, record := range evnt.Records {
        transformedRecord, err := buildResponseRecord(record)
        if err != nil {
            log.Fatalln(err)
        }

        response.Records = append(response.Records, transformedRecord)
    }

    return response, nil
}

func buildResponseRecord(record events.KinesisFirehoseEventRecord) (events.KinesisFirehoseResponseRecord, error) {
    var transformedRecord events.KinesisFirehoseResponseRecord

    data, err := transform(record.Data, record.ApproximateArrivalTimestamp)
    if err != nil {
        return transformedRecord, err
    }

    transformedRecord.RecordID = record.RecordID
    transformedRecord.Result = events.KinesisFirehoseTransformedStateOk
    transformedRecord.Data = []byte(string(data))

    return transformedRecord, nil
}

buildResponseRecord is the core logic of this lambda.

Transform

Then, let's have a look at transform() method. This is simple:

  • decode the incoming stream data bytes
  • convert to the out-coming struct
  • encode to data bytes
func transform(dataBytes []byte, arrivalTimestamp events.MilliSecondsEpochTime) ([]byte, error) {
    records, err := decode(dataBytes)
    if err != nil {
        return nil, err
    }

    objects, err := convertStreamDataToS3Objects(records, arrivalTimestamp)
    if err != nil {
        return nil, err
    }

    outBytes, err := encode(objects)
    if err != nil {
        return nil, err
    }

    return outBytes, nil
}

Decode

Decoding process is also simple. You can use json.Decoder and (*Decoder).* methods.

func decode(dataBytes []byte) ([]*StreamData, error) {
    decoder := json.NewDecoder(strings.NewReader(string(dataBytes)))

    var records []*StreamData
    for decoder.More() {
        var d StreamData
        err := decoder.Decode(&d)
        if err != nil {
            return nil, err
        }
        records = append(records, &d)
    }

    return records, nil
}

Converter

Here is the application specific logic comes in.

func convertStreamDataToS3Objects(records []*StreamData, arrivalTimestamp events.MilliSecondsEpochTime) ([]*S3Object, error) {
    objects := make([]*S3Object, len(records))

    for _, streamData := range records {
        obj := &S3Object{
            AppVersion:     streamData.AppVersion,
            LogID:          streamData.LogID,
            Time:           arrivalTimestamp.Format(time.RFC3339),
            SendingTime:    streamData.SendingTime,
            ProcessingTime: time.Now().Format(time.RFC3339),
        }

        objects = append(objects, obj)
    }
    return filterOutNil(objects), nil
}

func filterOutNil(objects []*S3Object) []*S3Object {
    filteredObjects := objects[:0]

    // filtering without allocating (https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating)
    for _, o := range objects {
        if o != nil {
            filteredObjects = append(filteredObjects, o)
        }
    }

    return filteredObjects
}

Encode

Finally, encode the out-coming JSON struct to data bytes.

func encode(objects []*S3Object) ([]byte, error) {
    var outBytes bytes.Buffer
    encoder := json.NewEncoder(&outBytes)

    for _, obj := range objects {
        err := encoder.Encode(&obj)
        if err != nil {
            return nil, err
        }
    }

    return outBytes.Bytes(), nil
}
October 14, 2022