About

이전 블로그에서는 기본 개념과 구조에 대해서 알아봤다. 이번 블로그에서는 Linear, Branch, Foreach 와 같이 가장 기본이 되는 Transition들과 CLI로 Parameter를 지정하는 법 등을 살펴보도록 하자.

참고로 이 블로그에서 소개하는 예제 코드 및 설명 대부분은 Metaflow 공식 문서을 번역한 것을 미리 알린다.

Metaflow에서 Step 이란?

Metaflow에서는 @step로 데코레이팅된 각각의 함수들을 분리불가한 영역으로 취급한다. 즉, 한 스텝이 전체가 성공하거나 실패하거나 하는 방식이다. 또한, Metaflow는 각 step에서 만들어지는 모든 객체 변수들이 지속될 수 있도록 snapshot으로 저장한다. 그래서 설사 어떤 step의 다음 step에서 오류가 발생해도 그 전 단계까지는 다시 실행할 필요없게 저장된 것을 사용한다는 의미다.

공식 문서에서 제시한 모범사례(Best practice) 를 간단히 살펴보면 아래와 같다.

  • step을 너무 잘게 쪼개면 checkpointing에 의한 오버헤드가 발생하므로 피하라.
  • 하나의 step에서 한시간 이상의 시간이 걸리는 작업은 없도록 하라.
  • 코드의 가독성을 위해 아래의 명령어를 실행하여 출력된 정보로 step의 나뉜 정도가 합리적인지 확인하라.

    python myflow.py show

Linear

가장 기본적인 Transition은 linear(선형) 구조다. 글자그대로 각 스텝에서 스텝으로 이동하는 방식이다.

from metaflow import FlowSpec, step

class LinearFlow(FlowSpec):

    @step
    def start(self):
        self.my_var = 'hello world'
        self.next(self.a)

    @step
    def a(self):
        print('the data artifact is: %s' % self.my_var)
        self.next(self.end)

    @step
    def end(self):
        print('the data artifact is still: %s' % self.my_var)

if __name__ == '__main__':
    LinearFlow()

이 코드에서 주목해야하는 점은 artifact라는 부분이다. 위의 코드에서는 my_var에 해당하는 부분인데 이 값은 local 환경에서 개발 중인 사람은 local에 AWS와 연결해서 사용하는 경우에는 지정한 S3에 그 값이 저장되어 다른 step에서도 접근이 가능하게 된다.

이미 이전 블로그에서 설명한 부분이지만, Metaflow를 사용하게 되면 모든 코드와 데이터 스냅샷이 자동 저장된다. 그로인해 분산 컴퓨팅 플랫폼에서 작동할 수 있게 된다.

어쨋든 한가지 주의할 점은 artifact가 branch 구조를 사용할 때는 다르게 행동한다는 것이다. 아래에서 살펴보자.

Branch

Branch 방식을 이용하면 병렬 스텝을 설정할 수 있다. 코드는 아래와 같다.

from metaflow import FlowSpec, step

class BranchFlow(FlowSpec):
  
    @step
    def start(self):
        self.next(self.a, self.b)
    
    @step
    def a(self):
        self.x = 1
        self.next(self.join)
    
    @step
    def b(self):
        self.x = 2
        self.next(self.join)
    
    @step
    def join(self, inputs):
        print('a is %s' % inputs.a.x)
        print('b is %s' % inputs.b.x)
        print('total is %d' % sum(input.x for input in inputs))
        self.next(self.end)
    
    @step
    def end(self):
        pass

if __name__ == '__main__':
    BranchFlow()

모든 Branch들은 반드시 join을 이용하여 합쳐야한다. join의 경우에는 반드시 inputs argument를 추가적으로 받아야하는 것을 잊지말자. 재밌는 점은 self.xdef adef b에서 다 쓰였지만 같은 변수는 아니라는 점이다.

자세히는 data가 flow에서 어떻게 흐르는지를 공부해야한다.

Data flow through graph

아래의 예제를 통해서 살펴보자.

from metaflow import FlowSpec, step

class MergeArtifactsFlow(FlowSpec):

    @step
    def start(self):
        self.pass_down = 'a'
        self.next(self.a, self.b)

    @step
    def a(self):
        self.common = 5
        self.x = 1
        self.y = 3
        self.from_a = 6
        self.next(self.join)

    @step
    def b(self):
        self.common = 5
        self.x = 2
        self.y = 4
        self.next(self.join)

    @step
    def join(self, inputs):
        self.x = inputs.a.x
        self.merge_artifacts(inputs, exclude=['y'])
        print('x is %s' % self.x)
        print('pass_down is %s' % self.pass_down)
        print('common is %d' % self.common)
        print('from_a is %d' % self.from_a)
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == '__main__':
    MergeArtifactsFlow()

먼저 결과부터 살펴보면 아래와 같다.

2019-12-15 23:36:13.186 [1/join/4 (pid 2)] x is 1
2019-12-15 23:36:13.220 [1/join/4 (pid 2)] pass_down is a
2019-12-15 23:36:13.220 [1/join/4 (pid 2)] common is 5
2019-12-15 23:36:13.220 [1/join/4 (pid 2)] from_a is 6
  • pass_down의 경우는 ab에서 수정된적이 없으므로 그대로 전파된다.
  • common 각 branch에서 서로 같은 값을 갖고 있으므로 역시 그대로 전파된다. 이는 Metaflow가 content based deduplication, 즉 중복되는 데이터는 제거해서 저장하기 때문에 가능한 일이다. deduplication 현상은 같은 flow(start에서 end로 끝나는 하나의 class)내에서만 적용된다.
  • xmerge_artifacts가 호출되기전에 코드상에서 이미 처리를 하였다. 이러한 패턴을 이용하면 서로 다른 값을 사용하여 변수가 모호한 경우를 해결할 수 있다. self.x = inputs.a.x를 주석처리하고 실행하면 결과는 아래처럼 나온다.
2019-12-15 23:36:39.998 [1/join/4 (pid 2)] <flow MergeArtifactsFlow step join> failed:
2019-12-15 23:36:39.999 [1/join/4 (pid 2)] Unhandled artifacts in merge:
2019-12-15 23:36:39.999 [1/join/4 (pid 2)] Step join cannot merge the following artifacts due to them having conflicting values:
2019-12-15 23:36:40.033 [1/join/4 (pid 2)] [x].
2019-12-15 23:36:40.033 [1/join/4 (pid 2)] To remedy this issue, be sure to explictly set those artifacts (using self.<artifact_name> = ...) prior to calling merge_artifacts.
2019-12-15 23:36:40.033 [1/join/4 (pid 2)] 
2019-12-15 23:36:40.035 [1/join/4 (pid 2)] Task failed.
  • yexclude 리스트에 포함되어 있으므로 전파되지 않는다. merge_artifacts의 기본 설정은 모든 artifacts들을 다 전파하는 것임을 기억하자. 만약 self.y를 호출하게 되면 AttributeError: Flow MergeArtifactsFlow has no attribute 'y'와 같은 에러를 확인할 수 있다.
  • from_aa에서만 사용되어 하나로 특정할 수 있으므로 전파된다. 즉 하나의 branch에서만 존재한 변수들은 merge_artifacts를 사용하면 다 전파된다는 것이다.

Foreach

이번 블로그에서 살펴볼 마지막 Transition은 Foreach다.

코드를 살펴보자.

from metaflow import FlowSpec, step

class ForeachFlow(FlowSpec):

   @step
   def start(self):
       self.titles = ['Stranger Things',
                      'House of Cards',
                      'Narcos']
       self.next(self.a, foreach='titles')
   
   @step
   def a(self):
       self.title = '%s processed' % self.input
       self.next(self.join)
   
   @step
   def join(self, inputs):
       self.results = [input.title for input in inputs]
       self.next(self.end)
   
   @step
   def end(self):
       print('\n'.join(self.results))

if __name__ == '__main__':
   ForeachFlow()

foreach는 초기화 단계(start 함수)에서 self.next()를 호출할 때 foreach라는 argument를 추가적으로 넣어야한다. 그 값은 객체 변수로 선언된 list의 변수명을 string으로 넣어주면 된다.

위의 예제를 실행하게되면 len(self.titles)이 3이므로 총 3개의 a 스텝이 병렬적으로 실행되게 된다. 역시 branch와 마찬가지로 join 스텝에서는 inputs argument을 통해서 각 노드에 저장된 input.title 값에 접근할 수 있다.

Conclusion

위에서 배운 linear, foreach와 branch 등은 자유롭게 섞어가면 사용할 수 있다. 이번 블로그에서는 기본 Transition들을 살펴보았는데 다음에는 AWS와 연결해서 사용하는 법을 작성해보도록 하겠다.