About
이 글에서는 이전 블로그들을 작성할 때는 필자가 알지 못한 부분들을 보충 설명하고자 작성하게 되었다.
이번에 추가된 내용은
conda
데코레이터를 이용한 패키지 의존성 관리하는 법Parameter
클래스를 이용하여 cli 실행시 argument를 넣어주는 법- 간단한 Client API를 이용하여 작업한 data 및 state snapshot을 jupyter notebook에서 확인하는 법
Metaflow를 처음 공부하는 분들은 이 글을 읽기전에 아래 순서대로 먼저 확인하는 것을 추천한다.
모든 Metaflow 블로그들은 numpy
, pandas
사용에 능숙하다는 가정하에 작성되었음을 미리 밝힌다.
Snapshot
Metaflow를 이용하여 run
을 하면 로컬에서는 .metaflow
라는 데이터 스토어를 만들게 된다. 실행할 때 사용된 metadata 및 data snapshot들이 저장되는데, 만약 aws를 사용하도록 연결한 상태라면 S3에 저장이 되게 된다.
snapshot들은 client API를 이용해서 jupyter notebook에서 확인 가능하다. 여기서 알아두면 좋은 점 중에 하나는 모든 step
은 컨테이너로 이루어져있다는 점이다. 각 step
들이 서로 다른 pid(process ID)
를 갖는 것을 보면 알 수 있다. 이 특징에 의해서 각 step
마다 다른 의존성을 부여한다거나 어떤 step
은 로컬에서 어떤 step
은 클라우드에서 실행하는 것이 가능하게 되는 것이다.
Metaflow를 이용하면 좋은 점
Learn Metaflow in 10 mins에서 소개한 “Metaflow를 사용하면 가능한 시나리오들”에 필자의 이해를 좀 더 덧붙인 내용을 통해서 Metaflow에 대해 다시 살펴보자.
Scenario | Description |
---|---|
Collaboration | 다른 Data scientist들과 협업하기 쉽다. S3에 기본적으로 실행하는 시기의 code, data, dependencies 들을 Snapshot으로 보관하기 때문에 팀원 중 누구나 S3 접근 권한만 있다면 원하는 시점부터 협업이 가능하기 때문. |
Resuming a run | 역시 협업에서 설명한 이유와 마찬가지로 S3에 실행한 모든 내용이 저장되기 때문에 원하는 지점부터 다시 시작할 수 있다. |
Hybrid runs | Workflow 중에 일부 step 은 로컬 컴퓨터에서 연산하고 원하는 step 에 대해서는 cloud에서 연산하게끔 설정할 수 있따. |
Inspecting run metadata | 만약 3명의 data scientist들이 같이 hyperparameter를 튜닝하고 있다고 가정해보자. 이 경우에도 역시 S3에 학습할 때마다 저장된 데이터에 접근하여 최고 성적을 내는 parameter를 찾을 수 있다. |
Multiple versions of same package | 전처리는 sklearn 0.20 버젼으로 진행하고 모델링은 0.22 버젼으로 진행하는 것이 가능하다. |
자, 이제 밑에서는 코드와 함께 공부해보도록 하자. 아래 코드는 Metaflow 공식 문서에서 제공하는 Tutorial 중 하나이다.
Import Modules
from metaflow import FlowSpec, step, IncludeFile, Parameter, conda, conda_base
Class variables
코드는 step
별로 살펴보자.
def get_python_version():
import platform
versions = {'2' : '2.7.15',
'3' : '3.7.3'}
return versions[platform.python_version_tuple()[0]]
@conda_base(python=get_python_version())
class PlayListFlow(FlowSpec):
genre = Parameter('genre',
help="Filter movies for a particular genre.",
default='Sci-Fi')
hint = Parameter('hint',
help="Give a hint to the bonus movie algorithm.",
default='Metaflow Release')
recommendations = Parameter('recommendations',
help="The number of movies recommended for "
"the playlist.",
default=5)
@conda_base
데코레이터를 이용하여 python version을 설정할 수 있다.Parameter
클래스로 cli로 실행할 때 argument로 넣을 parameter들을 설정할 수 있다. 예:)python tutorial.py run --genre=Comedy
. 따로 넣어주지 않을 때는 default 값으로 설정된다.
start
참고로 이 tutorial은 Metaflow 공식 문서에서 소개한 tutorial 중 4번 째에 해당한다. 여기서는 2번 째 tutorial flow인 MovieStatsFlow
가 이미 실행되어서 결과값이 .metaflow
에 저장되어 있다는 가정하에 이뤄지고 있는 것임을 명심하자.
@conda(libraries={'pandas' : '0.24.2'})
@step
def start(self):
from metaflow import Flow, get_metadata
print("Using metadata provider: %s" % get_metadata())
run = Flow('MovieStatsFlow').latest_successful_run
print("Using analysis from '%s'" % str(run))
self.dataframe = run['start'].task.data.dataframe
self.genre_stats = run.data.genre_stats
self.next(self.bonus_movie, self.genre_movies)
- 반드시 첫 번째 step의 이름은
start
이어야 한다. -
Flow
,get_metadata
는 Client API에 속한다. 이 예제에서는MovieStatsFlow
에서 작업 완료한data
와start
에서 import했던 dataframe을 가져와서 쓰고 있다.# run에 사용된(self에 선언한 변수들) data run = Flow('MovieStatsFlow').latest_successful_run # start를 포함한 각 step마다 사용한 data run['function_name'].task.data # end에서 마지막으로 적용된 data run.data
self.next(self.bonus_movie, self.genre_movies)
를 통해서Branch
방식의 transition을 사용했다는 것을 알 수 있다. (이해가 안가는 분은 링크를 확인하시길)@conda
데코레이터로 이step
에서는pandas
버젼을 0.24.2로 진행하도록 설정했다.
Branch 1 - bonus_movie
@conda(libraries={'editdistance': '0.5.3', 'pandas' : '0.24.2'})
@step
def bonus_movie(self):
import pandas
import editdistance
def _edit_distance(movie_title):
return editdistance.eval(self.hint, movie_title)
distance = self.dataframe['movie_title'].apply(_edit_distance)
index = distance.idxmin()
self.bonus = (self.dataframe['movie_title'].values[index],
self.dataframe['genres'].values[index])
self.next(self.join)
- 위에서도 이미 설명했듯 각
step
들은 서로 다른 process이다. 이번step
에서는edit_distance
,pandas
를 사용한다고@conda
데코레이터에 명시되어있다. - 참고로
edit_distance
는 문자열간의 유사도를 측정하는 알고리즘이다. 이 예제에서는 class variablehint
(default는 ‘Metaflow Release’이며, 이 값과 가장 비슷한 이름의 영화가 가장 낮은 값을 반환하는 알고리즘)과 가장 가까운movie_title
을 찾아서bonus_movie
를 설정하는 법을 보여주고 있다.
Branch 2 - genre_movies
@conda(libraries={'pandas' : '0.24.2'})
@step
def genre_movies(self):
import pandas
from random import shuffle
genre = self.genre.lower()
if genre not in self.genre_stats:
self.movies = []
else:
df = self.genre_stats[genre]['dataframe']
quartiles = self.genre_stats[genre]['quartiles']
selector = df['gross'] >= quartiles[-1]
self.movies = list(df[selector]['movie_title'])
shuffle(self.movies)
self.next(self.join)
- 여기서
self.genre_stats
는MovieStatsFlow
를 통해 얻은 결과값이다.quartiles
는 장르별로 상위 75%, 50%, 25% 순으로 총수익 값이 들어가 있다.selector = df['gross'] >= quartiles[-1]
은 해당 장르에서 수익률이 상위 25%안에 속한 영화의 index값들만 남기겠다는 의미다. - 중요한 부분은 각
Flow
는 서로 접근하여 각자의 결과물(artifact
)을 사용할 수 있다는 점이다.
join
@step
def join(self, inputs):
self.playlist = inputs.genre_movies.movies
self.bonus = inputs.bonus_movie.bonus
self.next(self.end)
Branch
방식의 transition은 항상join
으로 합쳐줘야한다.inputs
를 통해서 이전에 진행된 함수(process)들에 접근할 수 있다.
end
@step
def end(self):
print("Playlist for movies in genre '%s'" % self.genre)
for pick, movie in enumerate(self.playlist, start=1):
print("Pick %d: '%s'" % (pick, movie))
if pick >= self.recommendations:
break
print("Bonus Pick: '%s' from '%s'" % (self.bonus[0], self.bonus[1]))
if __name__ == '__main__':
PlayListFlow()
- Class variables 중 하나였던
self.recommendations
(default는 5)개수만큼의 영화만 playlist에 남긴다.
Client API
Client API들은 Metaflow 공식 문서 Inspecting Flows and Results에서 자세히 확인이 가능하다. 이와 관련해서는 나중에 다른 블로그에서 자세히 설명하도록 하고 여기서는 간단한 코드만 살펴보도록 하자.
Client API는 jupyter notebook에서 사용하기 좋게 만들어졌으니 특별한 이유가 없다면 jupyter notebook을 사용하는 것을 추천한다.
from metaflow import Flow, get_metadata
print("Current metadata provider: %s" % get_metadata())
## runs를 통해서 지금까지 실행한 `PlayListFlow` 결과들을 다 가져올 수 있다.
## 이 방법을 사용하면 Hyperparameter 튜닝에도 도움이 될 것.
for run in Flow('PlayListFlow').runs():
if run.successful:
print("Playlist generated on %s" % run.finished_at)
print("Playlist for movies in genre '%s'" % run.data.genre)
if run.data.playlist:
print("Top Pick: '%s'" % run.data.playlist[0])
print('\n')
결과는 아래처럼 나온다.
Playlist generated on 2019-12-28T11:19:21.fZ
Playlist for movies in genre 'Comedy'
Top Pick: 'Bucky Larson: Born to Be a Star'
Playlist generated on 2019-12-28T11:08:06.fZ
Playlist for movies in genre 'Comedy'
Top Pick: 'Real Women Have Curves'
Playlist generated on 2019-12-28T11:06:48.fZ
Playlist for movies in genre 'Sci-Fi'
Top Pick: 'RoboCop 3'
Full code with comments
공식 문서에서 제공한 전체 코드도 한번 살펴보면 도움이 될 것.
from metaflow import FlowSpec, step, IncludeFile, Parameter, conda, conda_base
def get_python_version():
"""
A convenience function to get the python version used to run this
tutorial. This ensures that the conda environment is created with an
available version of python.
"""
import platform
versions = {'2' : '2.7.15',
'3' : '3.7.3'}
return versions[platform.python_version_tuple()[0]]
# Use the specified version of python for this flow.
@conda_base(python=get_python_version())
class PlayListFlow(FlowSpec):
"""
The next version of our playlist generator that adds a 'hint' parameter to
choose a bonus movie closest to the 'hint'.
The flow performs the following steps:
1) Load the genre specific statistics from the MovieStatsFlow.
2) In parallel branches:
- A) Build a playlist from the top films in the requested genre.
- B) Choose a bonus movie that has the closest string edit distance to
the user supplied hint.
3) Join the two to create a movie playlist and display it.
"""
genre = Parameter('genre',
help="Filter movies for a particular genre.",
default='Sci-Fi')
hint = Parameter('hint',
help="Give a hint to the bonus movie algorithm.",
default='Metaflow Release')
recommendations = Parameter('recommendations',
help="The number of movies recommended for "
"the playlist.",
default=5)
@conda(libraries={'pandas' : '0.24.2'})
@step
def start(self):
"""
Use the Metaflow client to retrieve the latest successful run from our
MovieStatsFlow and assign them as data artifacts in this flow.
This step uses 'conda' to isolate the environment. This step will
always use pandas==0.24.2 regardless of what is installed on the
system.
"""
# Load the analysis from the MovieStatsFlow.
from metaflow import Flow, get_metadata
# Print metadata provider
print("Using metadata provider: %s" % get_metadata())
# Load the analysis from the MovieStatsFlow.
run = Flow('MovieStatsFlow').latest_successful_run
print("Using analysis from '%s'" % str(run))
# Get the dataframe from the start step before we sliced into into
# genre specific dataframes.
self.dataframe = run['start'].task.data.dataframe
# Also grab the summary statistics.
self.genre_stats = run.data.genre_stats
# Compute our two recomendation types in parallel.
self.next(self.bonus_movie, self.genre_movies)
@conda(libraries={'editdistance': '0.5.3', 'pandas' : '0.24.2'})
@step
def bonus_movie(self):
"""
Use the user supplied 'hint' argument to choose a bonus movie that has
the closest string edit distance to the hint.
This step uses 'conda' to isolate the environment. Note that the
package 'editdistance' need not be installed in your python
environment.
"""
import pandas
import editdistance
# Define a helper function to compute the similarity between two
# strings.
def _edit_distance(movie_title):
return editdistance.eval(self.hint, movie_title)
# Compute the distance and take the argmin to find the closest title.
distance = self.dataframe['movie_title'].apply(_edit_distance)
index = distance.idxmin()
self.bonus = (self.dataframe['movie_title'].values[index],
self.dataframe['genres'].values[index])
self.next(self.join)
@conda(libraries={'pandas' : '0.24.2'})
@step
def genre_movies(self):
"""
Select the top performing movies from the use specified genre.
This step uses 'conda' to isolate the environment. This step will
always use pandas==0.24.2 regardless of what is installed on the
system.
"""
import pandas
from random import shuffle
# For the genre of interest, generate a potential playlist using only
# highest gross box office titles (i.e. those in the last quartile).
genre = self.genre.lower()
if genre not in self.genre_stats:
self.movies = []
else:
df = self.genre_stats[genre]['dataframe']
quartiles = self.genre_stats[genre]['quartiles']
selector = df['gross'] >= quartiles[-1]
self.movies = list(df[selector]['movie_title'])
# Shuffle the content.
shuffle(self.movies)
self.next(self.join)
@step
def join(self, inputs):
"""
Join our parallel branches and merge results,
"""
self.playlist = inputs.genre_movies.movies
self.bonus = inputs.bonus_movie.bonus
self.next(self.end)
@step
def end(self):
"""
This step simply prints out the playlist.
"""
# Print the playist.
print("Playlist for movies in genre '%s'" % self.genre)
for pick, movie in enumerate(self.playlist, start=1):
print("Pick %d: '%s'" % (pick, movie))
if pick >= self.recommendations:
break
print("Bonus Pick: '%s' from '%s'" % (self.bonus[0], self.bonus[1]))
if __name__ == '__main__':
PlayListFlow()
Conculsion
Metaflow를 이해하는데 좀 더 도움이 되었길 바란다. 다음에는 AWS에 적용하는 법 및 Client API에 대해서도 좀 더 자세히 다루도록 하겠다.