在上一篇中,我們有介紹到如何建立一個最基本的 Firehose,這篇主要講進階功能,使用 Dynamic Partitioning。
S3 根據 Folder 的分類,對 performance 是有影響的,細節可以參考這篇 文章。
Firehose 預設函式
Firehose 已經內建一些函式,可以不用透過 Dynamic Partitioning 就能做一些 S3 基本的 Prefix 分類。
這邊是 官方文件。
例如,在建立 Firehose 的時候,S3 Prefix 設定以下路徑:
1!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/testing-
產生出來的檔案會有年月日的 Folder,然後檔案的 prefix 會是 testing-
,後面接 Firehose 預設的檔案名稱。
Firehose 使用 JQ
如果 Payload 是 JSON 格式,可以使用 Firehose 內建的 Dynamic Partitioning,這邊使用 JQ 1.6 作為 Parsing Payload 的工具。
若使用 JQ,需啟用 Dynamic Partitioning,並且打開 Inline Parsing for JSON。
接著設定 JQ 的 Key Name,這個 Key 會對應到 Firehose Prefix 的 Key Name。
在設定 S3 Prefix 的地方,namespace
必須填 partitionKeyFromQuery
,接著填 Key Name。
這邊附上我的 S3 Prefix:
1!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:s3_prefix}-
如果配上圖片上的設定,加上我用以下 Payload 打 Firehose:
1aws firehose put-record \
2 --cli-binary-format raw-in-base64-out \
3 --delivery-stream-name JQ-Example \
4 --record "{\"Data\": \"{\\\"month\\\":12, \\\"day\\\":5, \\\"s3Prefix\\\":\\\"example\\\"}\\n\"}"
會看到 S3 的路徑是 12/5/
,而檔案名稱是 example-
開頭的。
Firehose with Lambda
如果要透過 Lambda 做 Dynamic Partitioning,需要打開 transformation 的功能。
這邊提供範例程式碼:
1import json
2import logging
3import base64
4
5def lambda_handler(event, context):
6 output = []
7 logger = logging.getLogger()
8 logger.setLevel(logging.INFO)
9
10 logger.info("Lambda Start")
11 logger.info(f"event: {json.dumps(event)}")
12
13 for record in event['records']:
14 logger.info(f"record['data']: {record['data']}")
15 decodeResult = base64.b64decode(record['data']).decode('utf-8')
16
17 logger.info(f"base64 decode : {decodeResult}")
18
19 payload = json.loads(decodeResult)
20 payload['firehose'] = 'true'
21 partition_key = payload.get('key', 'default')
22 newData = json.dumps(payload)
23 logger.info(f"newData : {newData}")
24
25 output_record = {
26 'recordId': record['recordId'],
27 'result': 'Ok',
28 'data': base64.b64encode(newData.encode()).decode(),
29 'metadata': {
30 'partitionKeys': {
31 'partitionKey': partition_key,
32 'file_prefix': payload.get('file_prefix', '2024'),
33 'year': payload.get('year', '2024'),
34 'month': payload.get('month', '09'),
35 'day': payload.get('day', '01'),
36 'hour': payload.get('hour', '01')
37 }
38 }
39 }
40 output.append(output_record)
41
42 return {'records': output}
解釋:
- Line 15:送到 Firehose 的 Data 必須經過 Base64 Encode,所以取出時需要 Decode。
- Line 20:在 Transformation 過程中,對 Payload 新增 Key。
- Line 26:須將原本的 Record 返回給 Firehose。
- Line 27:
result
參數可設為 “Ok”(保留)或 “Dropped”(丟棄)。 - Line 28:
data
參數是最後存入檔案的資料,需再次進行 Base64 Encode。 - Line 29-36:
metadata
底下的partitionKeys
是用來設定 S3 Prefix。
建立好 Lambda 之後,就可以在 transformation 設定裡,填入 Lambda 的 ARN。
啟用 Dynamic Partitioning,這次 Inline Parsing for JSON 不需要勾選。
在 S3 Prefix 的 namespace
使用 partitionKeyFromLambda
,這樣就能取出 Lambda 返回的結果。
注意:
partitionKeyFromQuery
和 partitionKeyFromLambda
是不同的!
這邊附上我的 S3 Prefix 設定:
1!{partitionKeyFromLambda:year}/!{partitionKeyFromLambda:month}/!{partitionKeyFromLambda:day}/!{partitionKeyFromLambda:file_prefix}-!{partitionKeyFromLambda:partitionKey}-!{partitionKeyFromLambda:year}-!{partitionKeyFromLambda:month}-!{partitionKeyFromLambda:day}-!{partitionKeyFromLambda:hour}
測試 Firehose with Lambda
使用 AWS CLI 來放入測試資料:
1aws firehose put-record \
2 --cli-binary-format raw-in-base64-out \
3 --delivery-stream-name Lambda-Example \
4 --record "{\"Data\": \"{\\\"month\\\":12, \\\"day\\\":5, \\\"key\\\":\\\"example1\\\"}\\n\"}"
檢查 S3 prefix 是否符合預期:
檔案內容
1{"month": 12, "day": 5, "key": "example1", "firehose": "true"}
這樣就能成功使用 Lambda + Firehose 進行 Dynamic Partitioning! 🚀
評論