< All Blog

Write AWS Kinesis Firehose Lambda Translation Layer

October 14, 2022

You can use Lambda to convert data of Kinesis Data Firehose. This is the transformation process of ETL, and called "Data Transformation Flow" officially.


Data Structure

The data Lambda will pull form the Kinesis Data Firehose is line-json format. The line-json format looks like the following:


The line-json is the basic format for handling JSON in streaming pipeline.

You can encode/decode line-json with Golang's default json package.

Let's assume that here is the incoming JSON's struct:

type StreamData struct {
    AppVersion  string `json:"app_version"`
    SendingTime string `json:"sending_time"`
    LogID       string `json:"log_id"`

In this example, we want to convert the above StreamData, then append some arbitrary columns, and call PutObject to S3 bucket.

Here is the JSON struct as an outcome:

type S3Object struct {
    AppVersion     string `json:"app_version"`
    LogID          string `json:"log_id"`
    Time           string `json:"time"`
    SendingTime    string `json:"sending_time"`
    ProcessingTime string `json:"processing_time"`

Time is the arrival time of log records, which is taken from Kinesis ApproximateArrivalTimestamp.

ProcessingTime is the processing time of log records, which is the invocation time of Lambda.


You can use https://github.com/aws/aws-lambda-go to handle the Lambda data source events. You can find the Firehose events at aws-lambda-go/events/ directory.


func main() {

func handler(evnt events.KinesisFirehoseEvent) (events.KinesisFirehoseResponse, error) {
    var response events.KinesisFirehoseResponse

    for _, record := range evnt.Records {
        transformedRecord, err := buildResponseRecord(record)
        if err != nil {

        response.Records = append(response.Records, transformedRecord)

    return response, nil

func buildResponseRecord(record events.KinesisFirehoseEventRecord) (events.KinesisFirehoseResponseRecord, error) {
    var transformedRecord events.KinesisFirehoseResponseRecord

    data, err := transform(record.Data, record.ApproximateArrivalTimestamp)
    if err != nil {
        return transformedRecord, err

    transformedRecord.RecordID = record.RecordID
    transformedRecord.Result = events.KinesisFirehoseTransformedStateOk
    transformedRecord.Data = []byte(string(data))

    return transformedRecord, nil

buildResponseRecord is the core logic of this lambda.


Then, let's have a look at transform() method. This is simple:

  • decode the incoming stream data bytes
  • convert to the out-coming struct
  • encode to data bytes
func transform(dataBytes []byte, arrivalTimestamp events.MilliSecondsEpochTime) ([]byte, error) {
    records, err := decode(dataBytes)
    if err != nil {
        return nil, err

    objects, err := convertStreamDataToS3Objects(records, arrivalTimestamp)
    if err != nil {
        return nil, err

    outBytes, err := encode(objects)
    if err != nil {
        return nil, err

    return outBytes, nil


Decoding process is also simple. You can use json.Decoder and (*Decoder).* methods.

func decode(dataBytes []byte) ([]*StreamData, error) {
    decoder := json.NewDecoder(strings.NewReader(string(dataBytes)))

    var records []*StreamData
    for decoder.More() {
        var d StreamData
        err := decoder.Decode(&amp;d)
        if err != nil {
            return nil, err
        records = append(records, &amp;d)

    return records, nil


Here is the application specific logic comes in.

func convertStreamDataToS3Objects(records []*StreamData, arrivalTimestamp events.MilliSecondsEpochTime) ([]*S3Object, error) {
    objects := make([]*S3Object, len(records))

    for _, streamData := range records {
        obj := &amp;S3Object{
            AppVersion:     streamData.AppVersion,
            LogID:          streamData.LogID,
            Time:           arrivalTimestamp.Format(time.RFC3339),
            SendingTime:    streamData.SendingTime,
            ProcessingTime: time.Now().Format(time.RFC3339),

        objects = append(objects, obj)
    return filterOutNil(objects), nil

func filterOutNil(objects []*S3Object) []*S3Object {
    filteredObjects := objects[:0]

    // filtering without allocating (https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating)
    for _, o := range objects {
        if o != nil {
            filteredObjects = append(filteredObjects, o)

    return filteredObjects


Finally, encode the out-coming JSON struct to data bytes.

func encode(objects []*S3Object) ([]byte, error) {
    var outBytes bytes.Buffer
    encoder := json.NewEncoder(&amp;outBytes)

    for _, obj := range objects {
        err := encoder.Encode(&amp;obj)
        if err != nil {
            return nil, err

    return outBytes.Bytes(), nil

Recommended Posts

  1. Apache Kafka ASCII Arts

    October 15, 2022
    This blog post shows some ASCII arts describing the concepts of Apache Kafka. I made them just for fun :) Enjoy! Consume Group Partition <-> Consumer Producer