You can use Lambda to convert data of Kinesis Data Firehose. This is the transformation process of ETL, and called "Data Transformation Flow" officially.
https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
The data Lambda will pull form the Kinesis Data Firehose is line-json format. The line-json format looks like the following:
{"app_version":"10.0.0","sending_time":"***","log_id":"foobar"}
{"app_version":"10.0.0","sending_time":"***","log_id":"foobar"}
{"app_version":"10.0.0","sending_time":"***","log_id":"foobar"}
The line-json is the basic format for handling JSON in streaming pipeline.
You can encode/decode line-json with Golang's default json package.
Let's assume that here is the incoming JSON's struct:
type StreamData struct {
AppVersion string `json:"app_version"`
SendingTime string `json:"sending_time"`
LogID string `json:"log_id"`
}
In this example, we want to convert the above StreamData, then append some arbitrary columns, and call PutObject
to S3 bucket.
Here is the JSON struct as an outcome:
type S3Object struct {
AppVersion string `json:"app_version"`
LogID string `json:"log_id"`
Time string `json:"time"`
SendingTime string `json:"sending_time"`
ProcessingTime string `json:"processing_time"`
}
Time
is the arrival time of log records, which is taken from Kinesis ApproximateArrivalTimestamp
.
ProcessingTime
is the processing time of log records, which is the invocation time of Lambda.
You can use https://github.com/aws/aws-lambda-go to handle the Lambda data source events. You can find the Firehose events at aws-lambda-go/events/ directory.
https://github.com/aws/aws-lambda-go/blob/master/events/README_KinesisFirehose.md
func main() {
lambda.Start(handler)
}
func handler(evnt events.KinesisFirehoseEvent) (events.KinesisFirehoseResponse, error) {
var response events.KinesisFirehoseResponse
for _, record := range evnt.Records {
transformedRecord, err := buildResponseRecord(record)
if err != nil {
log.Fatalln(err)
}
response.Records = append(response.Records, transformedRecord)
}
return response, nil
}
func buildResponseRecord(record events.KinesisFirehoseEventRecord) (events.KinesisFirehoseResponseRecord, error) {
var transformedRecord events.KinesisFirehoseResponseRecord
data, err := transform(record.Data, record.ApproximateArrivalTimestamp)
if err != nil {
return transformedRecord, err
}
transformedRecord.RecordID = record.RecordID
transformedRecord.Result = events.KinesisFirehoseTransformedStateOk
transformedRecord.Data = []byte(string(data))
return transformedRecord, nil
}
buildResponseRecord
is the core logic of this lambda.
Then, let's have a look at transform()
method. This is simple:
func transform(dataBytes []byte, arrivalTimestamp events.MilliSecondsEpochTime) ([]byte, error) {
records, err := decode(dataBytes)
if err != nil {
return nil, err
}
objects, err := convertStreamDataToS3Objects(records, arrivalTimestamp)
if err != nil {
return nil, err
}
outBytes, err := encode(objects)
if err != nil {
return nil, err
}
return outBytes, nil
}
Decoding process is also simple. You can use json.Decoder and (*Decoder).*
methods.
func decode(dataBytes []byte) ([]*StreamData, error) {
decoder := json.NewDecoder(strings.NewReader(string(dataBytes)))
var records []*StreamData
for decoder.More() {
var d StreamData
err := decoder.Decode(&d)
if err != nil {
return nil, err
}
records = append(records, &d)
}
return records, nil
}
Here is the application specific logic comes in.
func convertStreamDataToS3Objects(records []*StreamData, arrivalTimestamp events.MilliSecondsEpochTime) ([]*S3Object, error) {
objects := make([]*S3Object, len(records))
for _, streamData := range records {
obj := &S3Object{
AppVersion: streamData.AppVersion,
LogID: streamData.LogID,
Time: arrivalTimestamp.Format(time.RFC3339),
SendingTime: streamData.SendingTime,
ProcessingTime: time.Now().Format(time.RFC3339),
}
objects = append(objects, obj)
}
return filterOutNil(objects), nil
}
func filterOutNil(objects []*S3Object) []*S3Object {
filteredObjects := objects[:0]
// filtering without allocating (https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating)
for _, o := range objects {
if o != nil {
filteredObjects = append(filteredObjects, o)
}
}
return filteredObjects
}
Finally, encode the out-coming JSON struct to data bytes.
func encode(objects []*S3Object) ([]byte, error) {
var outBytes bytes.Buffer
encoder := json.NewEncoder(&outBytes)
for _, obj := range objects {
err := encoder.Encode(&obj)
if err != nil {
return nil, err
}
}
return outBytes.Bytes(), nil
}