이전 블로그에서는 기본 개념과 구조에 대해서 알아봤다. 이번 블로그에서는 Linear, Branch, Foreach 와 같이 가장 기본이 되는 Transition들과 CLI로 Parameter를 지정하는 법 등을 살펴보도록 하자.
참고로 이 블로그에서 소개하는 예제 코드 및 설명 대부분은 Metaflow 공식 문서을 번역한 것을 미리 알린다.
Metaflow에서 Step 이란?
Metaflow에서는 @step로 데코레이팅된 각각의 함수들을 분리불가한 영역으로 취급한다. 즉, 한 스텝이 전체가 성공하거나 실패하거나 하는 방식이다. 또한, Metaflow는 각 step에서 만들어지는 모든 객체 변수들이 지속될 수 있도록 snapshot으로 저장한다. 그래서 설사 어떤 step의 다음 step에서 오류가 발생해도 그 전 단계까지는 다시 실행할 필요없게 저장된 것을 사용한다는 의미다.
공식 문서에서 제시한 모범사례(Best practice)
를 간단히 살펴보면 아래와 같다.
- step을 너무 잘게 쪼개면 checkpointing에 의한 오버헤드가 발생하므로 피하라.
- 하나의 step에서 한시간 이상의 시간이 걸리는 작업은 없도록 하라.
코드의 가독성을 위해 아래의 명령어를 실행하여 출력된 정보로 step의 나뉜 정도가 합리적인지 확인하라.
python myflow.py show
가장 기본적인 Transition은 linear(선형)
구조다. 글자그대로 각 스텝에서 스텝으로 이동하는 방식이다.
from metaflow import FlowSpec, step
class LinearFlow(FlowSpec):
def start(self):
self.my_var = 'hello world'
def a(self):
print('the data artifact is: %s' % self.my_var)
def end(self):
print('the data artifact is still: %s' % self.my_var)
if __name__ == '__main__':
이 코드에서 주목해야하는 점은 artifact
라는 부분이다. 위의 코드에서는 my_var
에 해당하는 부분인데 이 값은 local 환경에서 개발 중인 사람은 local에 AWS와 연결해서 사용하는 경우에는 지정한 S3에 그 값이 저장되어 다른 step에서도 접근이 가능하게 된다.
이미 이전 블로그에서 설명한 부분이지만, Metaflow를 사용하게 되면 모든 코드와 데이터 스냅샷이 자동 저장된다. 그로인해 분산 컴퓨팅 플랫폼에서 작동할 수 있게 된다.
어쨋든 한가지 주의할 점은 artifact
가 branch 구조를 사용할 때는 다르게 행동한다는 것이다. 아래에서 살펴보자.
Branch 방식을 이용하면 병렬 스텝을 설정할 수 있다. 코드는 아래와 같다.
from metaflow import FlowSpec, step
class BranchFlow(FlowSpec):
def start(self):
self.next(self.a, self.b)
def a(self):
self.x = 1
def b(self):
self.x = 2
def join(self, inputs):
print('a is %s' % inputs.a.x)
print('b is %s' % inputs.b.x)
print('total is %d' % sum(input.x for input in inputs))
def end(self):
if __name__ == '__main__':
모든 Branch들은 반드시 join
을 이용하여 합쳐야한다. join
의 경우에는 반드시 inputs
argument를 추가적으로 받아야하는 것을 잊지말자. 재밌는 점은 self.x
가 def a
와 def b
에서 다 쓰였지만 같은 변수는 아니라는 점이다.
자세히는 data가 flow에서 어떻게 흐르는지를 공부해야한다.
Data flow through graph
아래의 예제를 통해서 살펴보자.
from metaflow import FlowSpec, step
class MergeArtifactsFlow(FlowSpec):
def start(self):
self.pass_down = 'a'
self.next(self.a, self.b)
def a(self):
self.common = 5
self.x = 1
self.y = 3
self.from_a = 6
def b(self):
self.common = 5
self.x = 2
self.y = 4
def join(self, inputs):
self.x = inputs.a.x
self.merge_artifacts(inputs, exclude=['y'])
print('x is %s' % self.x)
print('pass_down is %s' % self.pass_down)
print('common is %d' % self.common)
print('from_a is %d' % self.from_a)
def end(self):
if __name__ == '__main__':
먼저 결과부터 살펴보면 아래와 같다.
2019-12-15 23:36:13.186 [1/join/4 (pid 2)] x is 1
2019-12-15 23:36:13.220 [1/join/4 (pid 2)] pass_down is a
2019-12-15 23:36:13.220 [1/join/4 (pid 2)] common is 5
2019-12-15 23:36:13.220 [1/join/4 (pid 2)] from_a is 6
의 경우는a
에서 수정된적이 없으므로 그대로 전파된다.common
각 branch에서 서로 같은 값을 갖고 있으므로 역시 그대로 전파된다. 이는 Metaflow가 content based deduplication, 즉 중복되는 데이터는 제거해서 저장하기 때문에 가능한 일이다. deduplication 현상은 같은 flow(start에서 end로 끝나는 하나의 class)내에서만 적용된다.x
가 호출되기전에 코드상에서 이미 처리를 하였다. 이러한 패턴을 이용하면 서로 다른 값을 사용하여 변수가 모호한 경우를 해결할 수 있다. self.x = inputs.a.x를 주석처리하고 실행하면 결과는 아래처럼 나온다.
2019-12-15 23:36:39.998 [1/join/4 (pid 2)] <flow MergeArtifactsFlow step join> failed:
2019-12-15 23:36:39.999 [1/join/4 (pid 2)] Unhandled artifacts in merge:
2019-12-15 23:36:39.999 [1/join/4 (pid 2)] Step join cannot merge the following artifacts due to them having conflicting values:
2019-12-15 23:36:40.033 [1/join/4 (pid 2)] [x].
2019-12-15 23:36:40.033 [1/join/4 (pid 2)] To remedy this issue, be sure to explictly set those artifacts (using self.<artifact_name> = ...) prior to calling merge_artifacts.
2019-12-15 23:36:40.033 [1/join/4 (pid 2)]
2019-12-15 23:36:40.035 [1/join/4 (pid 2)] Task failed.
리스트에 포함되어 있으므로 전파되지 않는다.merge_artifacts
의 기본 설정은 모든artifacts
들을 다 전파하는 것임을 기억하자. 만약 self.y를 호출하게 되면AttributeError: Flow MergeArtifactsFlow has no attribute 'y'
와 같은 에러를 확인할 수 있다.from_a
에서만 사용되어 하나로 특정할 수 있으므로 전파된다. 즉 하나의 branch에서만 존재한 변수들은merge_artifacts
를 사용하면 다 전파된다는 것이다.
이번 블로그에서 살펴볼 마지막 Transition은 Foreach다.
코드를 살펴보자.
from metaflow import FlowSpec, step
class ForeachFlow(FlowSpec):
def start(self):
self.titles = ['Stranger Things',
'House of Cards',
self.next(self.a, foreach='titles')
def a(self):
self.title = '%s processed' % self.input
def join(self, inputs):
self.results = [input.title for input in inputs]
def end(self):
if __name__ == '__main__':
foreach는 초기화 단계(start
함수)에서 self.next()
를 호출할 때 foreach
라는 argument를 추가적으로 넣어야한다. 그 값은 객체 변수로 선언된 list의 변수명을 string으로 넣어주면 된다.
위의 예제를 실행하게되면 len(self.titles)이 3이므로 총 3개의 a
스텝이 병렬적으로 실행되게 된다. 역시 branch와 마찬가지로 join 스텝에서는 inputs
argument을 통해서 각 노드에 저장된 input.title
값에 접근할 수 있다.
위에서 배운 linear, foreach와 branch 등은 자유롭게 섞어가면 사용할 수 있다. 이번 블로그에서는 기본 Transition들을 살펴보았는데 다음에는 AWS와 연결해서 사용하는 법을 작성해보도록 하겠다.