About

AWS를 이용하여 kinesis -> firehose -> elasticsearch로 보내는 방식으로 log를 많이 저장한다. 이때 elasticsearch로 firehose가 보내는 것을 실패하거나 혹은 설정에 따라서는 성공한 모든 경우에 s3에 데이터를 자동으로 backup하게 할 수 있다.

하지만, s3에 들어가 있는 데이터를 가져오는 것은 아래 3가지 이유로 인해 약간 번거롭다.

  1. s3에서 object_name이 bucketname/year/month/day/hour 와 같은 형식으로 들어가있어서 한번에 list로 다 들고와야하는 형식이다.
  2. 데이터가 bytes 타입으로 들어가있다.
  3. 아래와 같은 형식으로 들어가있다. 우리가 원하는 것은 dictionary의 list이지만 delimiter 없이 연결되어 있어서 따로 전처리가 필요하다.

    b'{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{}'

Solution

위의 문제점들을 하나씩 해결해보자.

import json
import gzip
import boto3
from io import BytesIO
from collection import defaultdict

먼저 사용하게될 패키지들을 import한다.

bucket_name='bucket_name'
s3 = boto3.client(
  's3',
  region_name='ap-northeast-2',
)


def object_list_from_s3(
    prefix,
):
    global bucket_name, s3

    objs = s3.list_objects(
        Bucket=bucket_name,
        Prefix=prefix,
    )
    if 'Contents' in objs:
        return [
            obj['Key']
            for obj in objs['Contents']
        ]

    return []

s3에서 object_name이 bucketname/year/month/day/hour 와 같은 형식으로 들어가있으므로 object를 한번에 list로 가져올 수 있는 method가 필요하다. 여기서 prefix2020/05/24를 넣으면 해당 날짜의 object list를 가져올 수 있다.

def s3_get_object(
    file_name,
):
    global bucket_name, s3

    obj = s3.get_object(
        Bucket=bucket_name,
        Key=file_name
    )

    return obj['Body'].read()

위에서 얻은 list를 돌면서 각 object name을 이용하여 s3에서 object를 불러오는 method이다.

def object_written_by_firehose_decoder(data):
    result = []
    content = data.decode('utf-8')
    decoder = json.JSONDecoder()
    content_length = len(content)
    decode_index = 0

    while decode_index < content_length:
        try:
            obj, decode_index = decoder.raw_decode(
                content,
                decode_index,
            )
            result.append(obj)
        except JSONDecodeError as e:
            print('JSONDecodeError :', e)
            decode_index += 1

    return result

이 method는 위에서 설명한 3번째 문제점을 해결하기 위해서 사용되는데, 예외사항이 있을 수도 있으니 주의하자. 필자도 링크를 통해서 찾은 것인데 필자의 경우에는 문제없이 잘 parsing을 했기 때문에 사용하고 있으나 경우에 따라서는 독자분께 적절한 method를 만들어야 할 수도 있다.

어쨌든 이 method가 기본적으로 해주는 역할은 {}{}{}형태의 bytes 값들을 dictionary list로 반환해준다.

def get_saved_objects_by_firehose(
    prefix,
):
    global bucket_name

    bodies = defaultdict(list)
    obj_names = object_list_from_s3(
        bucket_name=bucket_name,
        prefix=prefix,
    )

    for obj_name in obj_names:
        log_date = '-'.join(obj_name.split('/')[:3])
        is_gzip = True if obj_name[-3:] == '.gz' else False

        try:
            obj = s3_get_object(
                file_name=obj_name,
            )
            fileobj = BytesIO(obj)
            if is_gzip:
                gzipfile = gzip.GzipFile(fileobj=fileobj)
                content = gzipfile.read()
            else:
                content = fileobj.read()

            content = object_written_by_firehose_decoder(
                content
            )
            bodies[log_date] += content

        except Exception as err:
            print(err)

    return bodies

끝으로 전체 데이터를 가져오는 method이다. 여기서 bodies는 dictionary로 반환하게 되어있는데 역시 자신의 상황에 맞게 수정해서 사용하면 되겠다. 이제 사용하는 예제 코드는 단순하다.

import pandas as pd
bodies = get_saved_objects_by_firehose(
    prefix='2020/05/26',
)
# 바로 dataframe으로도 사용할 수 있다.
df = pd.DataFrame(bodies['2020/05/26'])

Conclusion

s3에 bytes형태로 backup된 데이터를 가져오는 일은 어렵지 않지만 원하는 형태로 처리해서 가져오는 것은 은근 귀찮은 부분이었다. 필자와 같은 상황에 처한 사람들에게 이 블로그가 도움이 되었길 바란다.