About
AWS를 이용하여 kinesis -> firehose -> elasticsearch로 보내는 방식으로 log를 많이 저장한다. 이때 elasticsearch로 firehose가 보내는 것을 실패하거나 혹은 설정에 따라서는 성공한 모든 경우에 s3에 데이터를 자동으로 backup하게 할 수 있다.
하지만, s3에 들어가 있는 데이터를 가져오는 것은 아래 3가지 이유로 인해 약간 번거롭다.
- s3에서 object_name이
bucketname/year/month/day/hour
와 같은 형식으로 들어가있어서 한번에 list로 다 들고와야하는 형식이다. - 데이터가
bytes
타입으로 들어가있다. -
아래와 같은 형식으로 들어가있다. 우리가 원하는 것은 dictionary의 list이지만 delimiter 없이 연결되어 있어서 따로 전처리가 필요하다.
b'{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{}'
Solution
위의 문제점들을 하나씩 해결해보자.
import json
import gzip
import boto3
from io import BytesIO
from collection import defaultdict
먼저 사용하게될 패키지들을 import한다.
bucket_name='bucket_name'
s3 = boto3.client(
's3',
region_name='ap-northeast-2',
)
def object_list_from_s3(
prefix,
):
global bucket_name, s3
objs = s3.list_objects(
Bucket=bucket_name,
Prefix=prefix,
)
if 'Contents' in objs:
return [
obj['Key']
for obj in objs['Contents']
]
return []
s3에서 object_name이 bucketname/year/month/day/hour
와 같은 형식으로 들어가있으므로 object
를 한번에 list로 가져올 수 있는
method가 필요하다. 여기서 prefix
에 2020/05/24
를 넣으면 해당 날짜의 object list를 가져올 수 있다.
def s3_get_object(
file_name,
):
global bucket_name, s3
obj = s3.get_object(
Bucket=bucket_name,
Key=file_name
)
return obj['Body'].read()
위에서 얻은 list를 돌면서 각 object name을 이용하여 s3에서 object를 불러오는 method이다.
def object_written_by_firehose_decoder(data):
result = []
content = data.decode('utf-8')
decoder = json.JSONDecoder()
content_length = len(content)
decode_index = 0
while decode_index < content_length:
try:
obj, decode_index = decoder.raw_decode(
content,
decode_index,
)
result.append(obj)
except JSONDecodeError as e:
print('JSONDecodeError :', e)
decode_index += 1
return result
이 method는 위에서 설명한 3번째 문제점을 해결하기 위해서 사용되는데, 예외사항이 있을 수도 있으니 주의하자. 필자도 링크를 통해서 찾은 것인데 필자의 경우에는 문제없이 잘 parsing을 했기 때문에 사용하고 있으나 경우에 따라서는 독자분께 적절한 method를 만들어야 할 수도 있다.
어쨌든 이 method가 기본적으로 해주는 역할은 {}{}{}
형태의 bytes 값들을 dictionary list로 반환해준다.
def get_saved_objects_by_firehose(
prefix,
):
global bucket_name
bodies = defaultdict(list)
obj_names = object_list_from_s3(
bucket_name=bucket_name,
prefix=prefix,
)
for obj_name in obj_names:
log_date = '-'.join(obj_name.split('/')[:3])
is_gzip = True if obj_name[-3:] == '.gz' else False
try:
obj = s3_get_object(
file_name=obj_name,
)
fileobj = BytesIO(obj)
if is_gzip:
gzipfile = gzip.GzipFile(fileobj=fileobj)
content = gzipfile.read()
else:
content = fileobj.read()
content = object_written_by_firehose_decoder(
content
)
bodies[log_date] += content
except Exception as err:
print(err)
return bodies
끝으로 전체 데이터를 가져오는 method이다. 여기서 bodies는 dictionary로 반환하게 되어있는데 역시 자신의 상황에 맞게 수정해서 사용하면 되겠다. 이제 사용하는 예제 코드는 단순하다.
import pandas as pd
bodies = get_saved_objects_by_firehose(
prefix='2020/05/26',
)
# 바로 dataframe으로도 사용할 수 있다.
df = pd.DataFrame(bodies['2020/05/26'])
Conclusion
s3에 bytes형태로 backup된 데이터를 가져오는 일은 어렵지 않지만 원하는 형태로 처리해서 가져오는 것은 은근 귀찮은 부분이었다. 필자와 같은 상황에 처한 사람들에게 이 블로그가 도움이 되었길 바란다.