About
이전 블로그에서는 기본 개념과 구조에 대해서 알아봤다. 이번 블로그에서는 Linear, Branch, Foreach 와 같이 가장 기본이 되는 Transition들과 CLI로 Parameter를 지정하는 법 등을 살펴보도록 하자.
참고로 이 블로그에서 소개하는 예제 코드 및 설명 대부분은 Metaflow 공식 문서을 번역한 것을 미리 알린다.
Metaflow에서 Step 이란?
Metaflow에서는 @step로 데코레이팅된 각각의 함수들을 분리불가한 영역으로 취급한다. 즉, 한 스텝이 전체가 성공하거나 실패하거나 하는 방식이다. 또한, Metaflow는 각 step에서 만들어지는 모든 객체 변수들이 지속될 수 있도록 snapshot으로 저장한다. 그래서 설사 어떤 step의 다음 step에서 오류가 발생해도 그 전 단계까지는 다시 실행할 필요없게 저장된 것을 사용한다는 의미다.
공식 문서에서 제시한 모범사례(Best practice)
를 간단히 살펴보면 아래와 같다.
- step을 너무 잘게 쪼개면 checkpointing에 의한 오버헤드가 발생하므로 피하라.
- 하나의 step에서 한시간 이상의 시간이 걸리는 작업은 없도록 하라.
-
코드의 가독성을 위해 아래의 명령어를 실행하여 출력된 정보로 step의 나뉜 정도가 합리적인지 확인하라.
python myflow.py show
Linear
가장 기본적인 Transition은 linear(선형)
구조다. 글자그대로 각 스텝에서 스텝으로 이동하는 방식이다.
from metaflow import FlowSpec, step
class LinearFlow(FlowSpec):
@step
def start(self):
self.my_var = 'hello world'
self.next(self.a)
@step
def a(self):
print('the data artifact is: %s' % self.my_var)
self.next(self.end)
@step
def end(self):
print('the data artifact is still: %s' % self.my_var)
if __name__ == '__main__':
LinearFlow()
이 코드에서 주목해야하는 점은 artifact
라는 부분이다. 위의 코드에서는 my_var
에 해당하는 부분인데 이 값은 local 환경에서 개발 중인 사람은 local에 AWS와 연결해서 사용하는 경우에는 지정한 S3에 그 값이 저장되어 다른 step에서도 접근이 가능하게 된다.
이미 이전 블로그에서 설명한 부분이지만, Metaflow를 사용하게 되면 모든 코드와 데이터 스냅샷이 자동 저장된다. 그로인해 분산 컴퓨팅 플랫폼에서 작동할 수 있게 된다.
어쨋든 한가지 주의할 점은 artifact
가 branch 구조를 사용할 때는 다르게 행동한다는 것이다. 아래에서 살펴보자.
Branch
Branch 방식을 이용하면 병렬 스텝을 설정할 수 있다. 코드는 아래와 같다.
from metaflow import FlowSpec, step
class BranchFlow(FlowSpec):
@step
def start(self):
self.next(self.a, self.b)
@step
def a(self):
self.x = 1
self.next(self.join)
@step
def b(self):
self.x = 2
self.next(self.join)
@step
def join(self, inputs):
print('a is %s' % inputs.a.x)
print('b is %s' % inputs.b.x)
print('total is %d' % sum(input.x for input in inputs))
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
BranchFlow()
모든 Branch들은 반드시 join
을 이용하여 합쳐야한다. join
의 경우에는 반드시 inputs
argument를 추가적으로 받아야하는 것을 잊지말자. 재밌는 점은 self.x
가 def a
와 def b
에서 다 쓰였지만 같은 변수는 아니라는 점이다.
자세히는 data가 flow에서 어떻게 흐르는지를 공부해야한다.
Data flow through graph
아래의 예제를 통해서 살펴보자.
from metaflow import FlowSpec, step
class MergeArtifactsFlow(FlowSpec):
@step
def start(self):
self.pass_down = 'a'
self.next(self.a, self.b)
@step
def a(self):
self.common = 5
self.x = 1
self.y = 3
self.from_a = 6
self.next(self.join)
@step
def b(self):
self.common = 5
self.x = 2
self.y = 4
self.next(self.join)
@step
def join(self, inputs):
self.x = inputs.a.x
self.merge_artifacts(inputs, exclude=['y'])
print('x is %s' % self.x)
print('pass_down is %s' % self.pass_down)
print('common is %d' % self.common)
print('from_a is %d' % self.from_a)
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
MergeArtifactsFlow()
먼저 결과부터 살펴보면 아래와 같다.
2019-12-15 23:36:13.186 [1/join/4 (pid 2)] x is 1
2019-12-15 23:36:13.220 [1/join/4 (pid 2)] pass_down is a
2019-12-15 23:36:13.220 [1/join/4 (pid 2)] common is 5
2019-12-15 23:36:13.220 [1/join/4 (pid 2)] from_a is 6
pass_down
의 경우는a
와b
에서 수정된적이 없으므로 그대로 전파된다.common
각 branch에서 서로 같은 값을 갖고 있으므로 역시 그대로 전파된다. 이는 Metaflow가 content based deduplication, 즉 중복되는 데이터는 제거해서 저장하기 때문에 가능한 일이다. deduplication 현상은 같은 flow(start에서 end로 끝나는 하나의 class)내에서만 적용된다.x
는merge_artifacts
가 호출되기전에 코드상에서 이미 처리를 하였다. 이러한 패턴을 이용하면 서로 다른 값을 사용하여 변수가 모호한 경우를 해결할 수 있다. self.x = inputs.a.x를 주석처리하고 실행하면 결과는 아래처럼 나온다.
2019-12-15 23:36:39.998 [1/join/4 (pid 2)] <flow MergeArtifactsFlow step join> failed:
2019-12-15 23:36:39.999 [1/join/4 (pid 2)] Unhandled artifacts in merge:
2019-12-15 23:36:39.999 [1/join/4 (pid 2)] Step join cannot merge the following artifacts due to them having conflicting values:
2019-12-15 23:36:40.033 [1/join/4 (pid 2)] [x].
2019-12-15 23:36:40.033 [1/join/4 (pid 2)] To remedy this issue, be sure to explictly set those artifacts (using self.<artifact_name> = ...) prior to calling merge_artifacts.
2019-12-15 23:36:40.033 [1/join/4 (pid 2)]
2019-12-15 23:36:40.035 [1/join/4 (pid 2)] Task failed.
y
는exclude
리스트에 포함되어 있으므로 전파되지 않는다.merge_artifacts
의 기본 설정은 모든artifacts
들을 다 전파하는 것임을 기억하자. 만약 self.y를 호출하게 되면AttributeError: Flow MergeArtifactsFlow has no attribute 'y'
와 같은 에러를 확인할 수 있다.from_a
은a
에서만 사용되어 하나로 특정할 수 있으므로 전파된다. 즉 하나의 branch에서만 존재한 변수들은merge_artifacts
를 사용하면 다 전파된다는 것이다.
Foreach
이번 블로그에서 살펴볼 마지막 Transition은 Foreach다.
코드를 살펴보자.
from metaflow import FlowSpec, step
class ForeachFlow(FlowSpec):
@step
def start(self):
self.titles = ['Stranger Things',
'House of Cards',
'Narcos']
self.next(self.a, foreach='titles')
@step
def a(self):
self.title = '%s processed' % self.input
self.next(self.join)
@step
def join(self, inputs):
self.results = [input.title for input in inputs]
self.next(self.end)
@step
def end(self):
print('\n'.join(self.results))
if __name__ == '__main__':
ForeachFlow()
foreach는 초기화 단계(start
함수)에서 self.next()
를 호출할 때 foreach
라는 argument를 추가적으로 넣어야한다. 그 값은 객체 변수로 선언된 list의 변수명을 string으로 넣어주면 된다.
위의 예제를 실행하게되면 len(self.titles)이 3이므로 총 3개의 a
스텝이 병렬적으로 실행되게 된다. 역시 branch와 마찬가지로 join 스텝에서는 inputs
argument을 통해서 각 노드에 저장된 input.title
값에 접근할 수 있다.
Conclusion
위에서 배운 linear, foreach와 branch 등은 자유롭게 섞어가면 사용할 수 있다. 이번 블로그에서는 기본 Transition들을 살펴보았는데 다음에는 AWS와 연결해서 사용하는 법을 작성해보도록 하겠다.