About

이 글에서는 이전 블로그들을 작성할 때는 필자가 알지 못한 부분들을 보충 설명하고자 작성하게 되었다.

이번에 추가된 내용은

  • conda 데코레이터를 이용한 패키지 의존성 관리하는 법
  • Parameter 클래스를 이용하여 cli 실행시 argument를 넣어주는 법
  • 간단한 Client API를 이용하여 작업한 data 및 state snapshot을 jupyter notebook에서 확인하는 법

Metaflow를 처음 공부하는 분들은 이 글을 읽기전에 아래 순서대로 먼저 확인하는 것을 추천한다.

  1. Netflix 데이터 파이프라이닝 오픈소스 Metaflow 알아보기
  2. Linear, Branch, Foreach 등 기본 구조와 코드를 살펴보자.

모든 Metaflow 블로그들은 numpy, pandas 사용에 능숙하다는 가정하에 작성되었음을 미리 밝힌다.

Snapshot

Metaflow를 이용하여 run을 하면 로컬에서는 .metaflow라는 데이터 스토어를 만들게 된다. 실행할 때 사용된 metadata 및 data snapshot들이 저장되는데, 만약 aws를 사용하도록 연결한 상태라면 S3에 저장이 되게 된다.

snapshot들은 client API를 이용해서 jupyter notebook에서 확인 가능하다. 여기서 알아두면 좋은 점 중에 하나는 모든 step은 컨테이너로 이루어져있다는 점이다. 각 step들이 서로 다른 pid(process ID)를 갖는 것을 보면 알 수 있다. 이 특징에 의해서 각 step마다 다른 의존성을 부여한다거나 어떤 step은 로컬에서 어떤 step은 클라우드에서 실행하는 것이 가능하게 되는 것이다.

Metaflow를 이용하면 좋은 점

Learn Metaflow in 10 mins에서 소개한 “Metaflow를 사용하면 가능한 시나리오들”에 필자의 이해를 좀 더 덧붙인 내용을 통해서 Metaflow에 대해 다시 살펴보자.

Scenario Description
Collaboration 다른 Data scientist들과 협업하기 쉽다. S3에 기본적으로 실행하는 시기의 code, data, dependencies 들을 Snapshot으로 보관하기 때문에 팀원 중 누구나 S3 접근 권한만 있다면 원하는 시점부터 협업이 가능하기 때문.
Resuming a run 역시 협업에서 설명한 이유와 마찬가지로 S3에 실행한 모든 내용이 저장되기 때문에 원하는 지점부터 다시 시작할 수 있다.
Hybrid runs Workflow 중에 일부 step은 로컬 컴퓨터에서 연산하고 원하는 step에 대해서는 cloud에서 연산하게끔 설정할 수 있따.
Inspecting run metadata 만약 3명의 data scientist들이 같이 hyperparameter를 튜닝하고 있다고 가정해보자. 이 경우에도 역시 S3에 학습할 때마다 저장된 데이터에 접근하여 최고 성적을 내는 parameter를 찾을 수 있다.
Multiple versions of same package 전처리는 sklearn 0.20 버젼으로 진행하고 모델링은 0.22 버젼으로 진행하는 것이 가능하다.

자, 이제 밑에서는 코드와 함께 공부해보도록 하자. 아래 코드는 Metaflow 공식 문서에서 제공하는 Tutorial 중 하나이다.

Import Modules

from metaflow import FlowSpec, step, IncludeFile, Parameter, conda, conda_base

Class variables

코드는 step별로 살펴보자.

def get_python_version():
    import platform
    versions = {'2' : '2.7.15',
                '3' : '3.7.3'}
    return versions[platform.python_version_tuple()[0]]


@conda_base(python=get_python_version())
class PlayListFlow(FlowSpec):
    genre = Parameter('genre',
                      help="Filter movies for a particular genre.",
                      default='Sci-Fi')

    hint = Parameter('hint',
                     help="Give a hint to the bonus movie algorithm.",
                     default='Metaflow Release')

    recommendations = Parameter('recommendations',
                                help="The number of movies recommended for "
                                "the playlist.",
                                default=5)
  • @conda_base 데코레이터를 이용하여 python version을 설정할 수 있다.
  • Parameter 클래스로 cli로 실행할 때 argument로 넣을 parameter들을 설정할 수 있다. 예:) python tutorial.py run --genre=Comedy. 따로 넣어주지 않을 때는 default 값으로 설정된다.

start

참고로 이 tutorial은 Metaflow 공식 문서에서 소개한 tutorial 중 4번 째에 해당한다. 여기서는 2번 째 tutorial flow인 MovieStatsFlow가 이미 실행되어서 결과값이 .metaflow에 저장되어 있다는 가정하에 이뤄지고 있는 것임을 명심하자.

    @conda(libraries={'pandas' : '0.24.2'})
    @step
    def start(self):
        from metaflow import Flow, get_metadata

        print("Using metadata provider: %s" % get_metadata())

        run = Flow('MovieStatsFlow').latest_successful_run
        print("Using analysis from '%s'" % str(run))

        self.dataframe = run['start'].task.data.dataframe

        self.genre_stats = run.data.genre_stats

        self.next(self.bonus_movie, self.genre_movies)
  • 반드시 첫 번째 step의 이름은 start이어야 한다.
  • Flow, get_metadata는 Client API에 속한다. 이 예제에서는 MovieStatsFlow에서 작업 완료한 datastart에서 import했던 dataframe을 가져와서 쓰고 있다.

    # run에 사용된(self에 선언한 변수들) data
    run = Flow('MovieStatsFlow').latest_successful_run
    # start를 포함한 각 step마다 사용한 data
    run['function_name'].task.data
    # end에서 마지막으로 적용된 data
    run.data
  • self.next(self.bonus_movie, self.genre_movies)를 통해서 Branch 방식의 transition을 사용했다는 것을 알 수 있다. (이해가 안가는 분은 링크를 확인하시길)
  • @conda 데코레이터로 이 step에서는 pandas 버젼을 0.24.2로 진행하도록 설정했다.

Branch 1 - bonus_movie

    @conda(libraries={'editdistance': '0.5.3', 'pandas' : '0.24.2'})
    @step
    def bonus_movie(self):
        import pandas
        import editdistance

        def _edit_distance(movie_title):
            return editdistance.eval(self.hint, movie_title)

        distance = self.dataframe['movie_title'].apply(_edit_distance)
        index = distance.idxmin()
        self.bonus = (self.dataframe['movie_title'].values[index],
                      self.dataframe['genres'].values[index])

        self.next(self.join)
  • 위에서도 이미 설명했듯 각 step들은 서로 다른 process이다. 이번 step에서는 edit_distance, pandas를 사용한다고 @conda 데코레이터에 명시되어있다.
  • 참고로 edit_distance는 문자열간의 유사도를 측정하는 알고리즘이다. 이 예제에서는 class variable hint(default는 ‘Metaflow Release’이며, 이 값과 가장 비슷한 이름의 영화가 가장 낮은 값을 반환하는 알고리즘)과 가장 가까운 movie_title을 찾아서 bonus_movie를 설정하는 법을 보여주고 있다.

Branch 2 - genre_movies

    @conda(libraries={'pandas' : '0.24.2'})
    @step
    def genre_movies(self):
        import pandas
        from random import shuffle

        genre = self.genre.lower()
        if genre not in self.genre_stats:
            self.movies = []

        else:
            df = self.genre_stats[genre]['dataframe']
            quartiles = self.genre_stats[genre]['quartiles']
            selector = df['gross'] >= quartiles[-1]
            self.movies = list(df[selector]['movie_title'])

        shuffle(self.movies)

        self.next(self.join)
  • 여기서 self.genre_statsMovieStatsFlow를 통해 얻은 결과값이다. quartiles는 장르별로 상위 75%, 50%, 25% 순으로 총수익 값이 들어가 있다. selector = df['gross'] >= quartiles[-1]은 해당 장르에서 수익률이 상위 25%안에 속한 영화의 index값들만 남기겠다는 의미다.
  • 중요한 부분은 각 Flow는 서로 접근하여 각자의 결과물(artifact)을 사용할 수 있다는 점이다.

join

    @step
    def join(self, inputs):
        self.playlist = inputs.genre_movies.movies
        self.bonus = inputs.bonus_movie.bonus

        self.next(self.end)
  • Branch방식의 transition은 항상 join으로 합쳐줘야한다.
  • inputs를 통해서 이전에 진행된 함수(process)들에 접근할 수 있다.

end

    @step
    def end(self):
        print("Playlist for movies in genre '%s'" % self.genre)
        for pick, movie in enumerate(self.playlist, start=1):
            print("Pick %d: '%s'" % (pick, movie))
            if pick >= self.recommendations:
                break

        print("Bonus Pick: '%s' from '%s'" % (self.bonus[0], self.bonus[1]))

if __name__ == '__main__':
    PlayListFlow()
  • Class variables 중 하나였던 self.recommendations(default는 5)개수만큼의 영화만 playlist에 남긴다.

Client API

Client API들은 Metaflow 공식 문서 Inspecting Flows and Results에서 자세히 확인이 가능하다. 이와 관련해서는 나중에 다른 블로그에서 자세히 설명하도록 하고 여기서는 간단한 코드만 살펴보도록 하자.

Client API는 jupyter notebook에서 사용하기 좋게 만들어졌으니 특별한 이유가 없다면 jupyter notebook을 사용하는 것을 추천한다.

from metaflow import Flow, get_metadata
print("Current metadata provider: %s" % get_metadata())

## runs를 통해서 지금까지 실행한 `PlayListFlow` 결과들을 다 가져올 수 있다.
## 이 방법을 사용하면 Hyperparameter 튜닝에도 도움이 될 것.
for run in Flow('PlayListFlow').runs():
    if run.successful:
        print("Playlist generated on %s" % run.finished_at)
        print("Playlist for movies in genre '%s'" % run.data.genre)
        if run.data.playlist:
            print("Top Pick: '%s'" % run.data.playlist[0])
        print('\n')

결과는 아래처럼 나온다.

Playlist generated on 2019-12-28T11:19:21.fZ
Playlist for movies in genre 'Comedy'
Top Pick: 'Bucky Larson: Born to Be a Star'


Playlist generated on 2019-12-28T11:08:06.fZ
Playlist for movies in genre 'Comedy'
Top Pick: 'Real Women Have Curves'


Playlist generated on 2019-12-28T11:06:48.fZ
Playlist for movies in genre 'Sci-Fi'
Top Pick: 'RoboCop 3'

Full code with comments

공식 문서에서 제공한 전체 코드도 한번 살펴보면 도움이 될 것.

from metaflow import FlowSpec, step, IncludeFile, Parameter, conda, conda_base


def get_python_version():
    """
    A convenience function to get the python version used to run this
    tutorial. This ensures that the conda environment is created with an
    available version of python.

    """
    import platform
    versions = {'2' : '2.7.15',
                '3' : '3.7.3'}
    return versions[platform.python_version_tuple()[0]]


# Use the specified version of python for this flow.
@conda_base(python=get_python_version())
class PlayListFlow(FlowSpec):
    """
    The next version of our playlist generator that adds a 'hint' parameter to
    choose a bonus movie closest to the 'hint'.

    The flow performs the following steps:

    1) Load the genre specific statistics from the MovieStatsFlow.
    2) In parallel branches:
       - A) Build a playlist from the top films in the requested genre.
       - B) Choose a bonus movie that has the closest string edit distance to
         the user supplied hint.
    3) Join the two to create a movie playlist and display it.

    """
    genre = Parameter('genre',
                      help="Filter movies for a particular genre.",
                      default='Sci-Fi')

    hint = Parameter('hint',
                     help="Give a hint to the bonus movie algorithm.",
                     default='Metaflow Release')

    recommendations = Parameter('recommendations',
                                help="The number of movies recommended for "
                                "the playlist.",
                                default=5)

    @conda(libraries={'pandas' : '0.24.2'})
    @step
    def start(self):
        """
        Use the Metaflow client to retrieve the latest successful run from our
        MovieStatsFlow and assign them as data artifacts in this flow.

        This step uses 'conda' to isolate the environment. This step will
        always use pandas==0.24.2 regardless of what is installed on the
        system.

        """
        # Load the analysis from the MovieStatsFlow.
        from metaflow import Flow, get_metadata

        # Print metadata provider
        print("Using metadata provider: %s" % get_metadata())

        # Load the analysis from the MovieStatsFlow.
        run = Flow('MovieStatsFlow').latest_successful_run
        print("Using analysis from '%s'" % str(run))

        # Get the dataframe from the start step before we sliced into into
        # genre specific dataframes.
        self.dataframe = run['start'].task.data.dataframe

        # Also grab the summary statistics.
        self.genre_stats = run.data.genre_stats

        # Compute our two recomendation types in parallel.
        self.next(self.bonus_movie, self.genre_movies)

    @conda(libraries={'editdistance': '0.5.3', 'pandas' : '0.24.2'})
    @step
    def bonus_movie(self):
        """
        Use the user supplied 'hint' argument to choose a bonus movie that has
        the closest string edit distance to the hint.

        This step uses 'conda' to isolate the environment. Note that the
        package 'editdistance' need not be installed in your python
        environment.

        """
        import pandas
        import editdistance

        # Define a helper function to compute the similarity between two
        # strings.
        def _edit_distance(movie_title):
            return editdistance.eval(self.hint, movie_title)


        # Compute the distance and take the argmin to find the closest title.
        distance = self.dataframe['movie_title'].apply(_edit_distance)
        index = distance.idxmin()
        self.bonus = (self.dataframe['movie_title'].values[index],
                      self.dataframe['genres'].values[index])

        self.next(self.join)

    @conda(libraries={'pandas' : '0.24.2'})
    @step
    def genre_movies(self):
        """
        Select the top performing movies from the use specified genre.

        This step uses 'conda' to isolate the environment. This step will
        always use pandas==0.24.2 regardless of what is installed on the
        system.

        """
        import pandas
        from random import shuffle

        # For the genre of interest, generate a potential playlist using only
        # highest gross box office titles (i.e. those in the last quartile).
        genre = self.genre.lower()
        if genre not in self.genre_stats:
            self.movies = []

        else:
            df = self.genre_stats[genre]['dataframe']
            quartiles = self.genre_stats[genre]['quartiles']
            selector = df['gross'] >= quartiles[-1]
            self.movies = list(df[selector]['movie_title'])

        # Shuffle the content.
        shuffle(self.movies)

        self.next(self.join)

    @step
    def join(self, inputs):
        """
        Join our parallel branches and merge results,

        """
        self.playlist = inputs.genre_movies.movies
        self.bonus = inputs.bonus_movie.bonus

        self.next(self.end)

    @step
    def end(self):
        """
        This step simply prints out the playlist.

        """
        # Print the playist.
        print("Playlist for movies in genre '%s'" % self.genre)
        for pick, movie in enumerate(self.playlist, start=1):
            print("Pick %d: '%s'" % (pick, movie))
            if pick >= self.recommendations:
                break

        print("Bonus Pick: '%s' from '%s'" % (self.bonus[0], self.bonus[1]))


if __name__ == '__main__':
    PlayListFlow()

Conculsion

Metaflow를 이해하는데 좀 더 도움이 되었길 바란다. 다음에는 AWS에 적용하는 법 및 Client API에 대해서도 좀 더 자세히 다루도록 하겠다.