上一篇中,我們有介紹到如何建立一個最基本的 Firehose,這篇主要講進階功能,使用 Dynamic Partitioning。

S3 根據 Folder 的分類,對 performance 是有影響的,細節可以參考這篇 文章


Firehose 預設函式

Firehose 已經內建一些函式,可以不用透過 Dynamic Partitioning 就能做一些 S3 基本的 Prefix 分類。

這邊是 官方文件

例如,在建立 Firehose 的時候,S3 Prefix 設定以下路徑:

1!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/testing-

產生出來的檔案會有年月日的 Folder,然後檔案的 prefix 會是 testing-,後面接 Firehose 預設的檔案名稱。


Firehose 使用 JQ

如果 Payload 是 JSON 格式,可以使用 Firehose 內建的 Dynamic Partitioning,這邊使用 JQ 1.6 作為 Parsing Payload 的工具。

若使用 JQ,需啟用 Dynamic Partitioning,並且打開 Inline Parsing for JSON

接著設定 JQ 的 Key Name,這個 Key 會對應到 Firehose Prefix 的 Key Name。

在設定 S3 Prefix 的地方,namespace 必須填 partitionKeyFromQuery,接著填 Key Name。

firehoseWithJQ

這邊附上我的 S3 Prefix:

1!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:s3_prefix}-

如果配上圖片上的設定,加上我用以下 Payload 打 Firehose:

1aws firehose put-record \
2    --cli-binary-format raw-in-base64-out \
3    --delivery-stream-name JQ-Example \
4    --record "{\"Data\": \"{\\\"month\\\":12, \\\"day\\\":5, \\\"s3Prefix\\\":\\\"example\\\"}\\n\"}"

會看到 S3 的路徑是 12/5/,而檔案名稱是 example- 開頭的。

firehoseWithJQResult


Firehose with Lambda

如果要透過 Lambda 做 Dynamic Partitioning,需要打開 transformation 的功能。

這邊提供範例程式碼:

 1import json
 2import logging
 3import base64
 4
 5def lambda_handler(event, context):
 6   output = []
 7   logger = logging.getLogger()
 8   logger.setLevel(logging.INFO)
 9
10   logger.info("Lambda Start")
11   logger.info(f"event: {json.dumps(event)}")
12
13   for record in event['records']:
14       logger.info(f"record['data']: {record['data']}")
15       decodeResult = base64.b64decode(record['data']).decode('utf-8')
16
17       logger.info(f"base64 decode : {decodeResult}")
18
19       payload = json.loads(decodeResult)
20       payload['firehose'] = 'true'
21       partition_key = payload.get('key', 'default')
22       newData = json.dumps(payload)
23       logger.info(f"newData : {newData}")
24
25       output_record = {
26           'recordId': record['recordId'],
27           'result': 'Ok',
28           'data': base64.b64encode(newData.encode()).decode(),
29           'metadata': {
30               'partitionKeys': {
31                   'partitionKey': partition_key,
32                   'file_prefix': payload.get('file_prefix', '2024'),
33                   'year': payload.get('year', '2024'),
34                   'month': payload.get('month', '09'),
35                   'day': payload.get('day', '01'),
36                   'hour': payload.get('hour', '01')
37               }
38           }
39       }
40       output.append(output_record)
41
42   return {'records': output}

解釋:

  • Line 15:送到 Firehose 的 Data 必須經過 Base64 Encode,所以取出時需要 Decode
  • Line 20:在 Transformation 過程中,對 Payload 新增 Key
  • Line 26:須將原本的 Record 返回給 Firehose。
  • Line 27result 參數可設為 “Ok”(保留)或 “Dropped”(丟棄)。
  • Line 28data 參數是最後存入檔案的資料,需再次進行 Base64 Encode
  • Line 29-36metadata 底下的 partitionKeys 是用來設定 S3 Prefix

建立好 Lambda 之後,就可以在 transformation 設定裡,填入 Lambda 的 ARN

firehoseWithLambda

啟用 Dynamic Partitioning,這次 Inline Parsing for JSON 不需要勾選。

firehoseWithLambda2

S3 Prefixnamespace 使用 partitionKeyFromLambda,這樣就能取出 Lambda 返回的結果。

注意: partitionKeyFromQuerypartitionKeyFromLambda 是不同的!

這邊附上我的 S3 Prefix 設定:

1!{partitionKeyFromLambda:year}/!{partitionKeyFromLambda:month}/!{partitionKeyFromLambda:day}/!{partitionKeyFromLambda:file_prefix}-!{partitionKeyFromLambda:partitionKey}-!{partitionKeyFromLambda:year}-!{partitionKeyFromLambda:month}-!{partitionKeyFromLambda:day}-!{partitionKeyFromLambda:hour}

測試 Firehose with Lambda

使用 AWS CLI 來放入測試資料:

1aws firehose put-record \
2    --cli-binary-format raw-in-base64-out \
3    --delivery-stream-name Lambda-Example \
4    --record "{\"Data\": \"{\\\"month\\\":12, \\\"day\\\":5, \\\"key\\\":\\\"example1\\\"}\\n\"}"

檢查 S3 prefix 是否符合預期:

firehoseWithLambdaResult

檔案內容

1{"month": 12, "day": 5, "key": "example1", "firehose": "true"}

這樣就能成功使用 Lambda + Firehose 進行 Dynamic Partitioning! 🚀