본문 바로가기

Big Data

Apache Beam 시작하기

출처: https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/getting-started.ipynb#scrollTo=PoRd7hlnoOu5

Apache Beam

Apache Beam은 "병렬 처리"를 위한 라이브러리다. 아래와 같은 특성을 갖는다.

  • 구글이 만들어 공개
  • Java와 Python 등 다양한 언어를 지원
  • Flink, Spark, GCP Dataflow 등 여러 병럴 처리 프레임워크 상에서 동작
  • API 추상화 수준이 올라갔으므로 성능 최적화는 Beam이 알아서 (반대로 말하면 그 이상 최적화는 힘듦)
  • Streaming, Batch를 동일한 코드로 처리
  • Functional Programming 패러다임을 사용, Map() 함수에 익숙하면 금방 익힐 수 있음

embarrassingly parallel 문제, 즉 두 분산 작업 간의 통신이 필요 없는 Workload에 적합한 라이브러리이다.

개인적인 생각으로 Kubernetes나 Bazel이나 Flutter, Beam까지 구글은 "인터페이스 대통합"을 참 좋아하는 것 같다.

Tour of Beam - Getting Started

 

Beam을 사용하면 데이터 처리 (Data Processing)를 위한 Pipeline을 정의할 수 있다. 이렇게 프로그래밍 된 Pipeline을 여러 Runner를 통해 실행하게 된다. ETL (Extract 수집/추출 - Transform 가공 - Load 적재)이라고도 불리는 이 Pipeline을 Beam에서는 어떻게 정의할 수 있을까?

Pipeline이란 일련의 Data Transformation 작업을 의미한다. 공장에서 컨베이어 벨트에 여러 사람들이 서서 앞 사람이 작업한 내용을 뒷 사람이 이어받아 작업을 수행하는 것에 비유할 수 있다.

주요 자료구조

Beam에서 Data들은 PCollection에 저장되어 있다 (Parallel Collection). PCollection은 쉽게 와 유사하다고 볼 수 있는데 여러 Data가 순서를 보장하지 않고 담겨 있는 구조체이다. 순서를 보장하지 않기 때문에 Beam에서는 Data를 병렬적으로 처리할 수 있다.

이렇게 저장된 Data는 PTransform에 의해 처리된다 (Parallel Transform). PTransform은 Function과 유사하다고 볼 수 있다. PCollection에 담긴 각 Data를 Input으로 처리, 변경하여 Output으로 출력하는 형태이다.

Pipeline은 여러 PTransform를 일렬로 이어서 정의하고 이루어지는데 각 PTransform을 Step이라 한다. 결과는 PCollection에 저장된다.

Step 정의: Pipe Operator 사용

outputs = pipeline | step1 | step2 | step3​

Step Label 정의: Right shift Operator 사용

outputs = (
    pipeline
    | "Step 1" >> step1
    | "Step 2" >> step2
    | "Step 3" >> step3
)

Data Transformation

PTransform은 Beam Pipeline의 각 Step을 담당하는 주요 작업으로 PCollection 안의 Data를 병렬로 처리할 수 있도록 한다.

Map: one-to-one

Beam 문서에서 Map Function을 One-to-one이라고 표현한 것을 보고 딱 와닿는 표현 방법이라고 생각했다.

PCollection 안의 1개의 Data를 Input으로 받아 1개의 Output을 다음 PCollection에 담는다.

코드로 표현하면 다음과 같다.

import apache_beam as beam

inputs = [0, 1, 2, 3]

with beam.Pipeline() as pipeline:
    outputs = pipeline | beam.Create(inputs) | beam.Map(lambda x: x * 2)
    outputs | beam.Map(print)

FlatMap: one-to-many

Map으로는 PCollection에 담긴 Element의 수를 변경할 수 없다.

FlatMap은 Data 처리 후 나온 List를 "Flatten"하는 Transformation이다.

기억할 점은, Iterable 객체 모두에 적용 가능하다는 점이다. (Python이라면 Generator를 통해 Lazy Loading 구현이 가능하다.)

inputs = [0, 1, 2, 3]

with beam.Pipeline() as pipeline:
    outputs = pipeline | beam.Create(inputs) | beam.FlatMap(lambda x: [x for _ in range(x)])
    outputs | beam.Map(print)

Filter: one-to-zero

FlatMap에서도 Empty List를 반환함으로써 one-to-zero가 가능하지만 Filter는 좀 더 직관적인 기능을 제공한다. Filter에 넘겨지는 Function의 결과가 True/False이냐에 따라 다음 PCollection에 Element를 유지하거나 제거한다.

inputs = [0, 1, 2, 3]

with beam.Pipeline() as pipeline:
    outputs = pipeline | beam.Create(inputs) | beam.Filter(lambda x: x % 2 == 0)
    outputs | beam.Map(print)

Combine: Many-to-one

일반적으로 reduce에 해당하는 Transformation으로 foldLeft, foldRight 등의 Method와도 유사하다.

Beam에는 이러한 Many-to-one을 처리하는 작업, 즉 Aggregation 작업을 처리하기 위한 다양한 Transformation을 지원한다.
(참고: https://beam.apache.org/documentation/transforms/python/overview/#aggregation)

위 그림에 나온 예제는 CombineGlobally라는 Transformation으로 처리할 수 있다.

inputs = [0, 1, 2, 3]

with beam.Pipeline() as pipeline:
    outputs = pipeline | beam.Create(inputs) | beam.CombineGlobally(sum)
    outputs | beam.Map(print)

한 가지 Aggregation을 추가로 살펴보면 GroupBy()를 들 수 있다.

Key 별로 데이터를 모아서 Tuple로 이루어진 PCollection을 제공한다. -> (Key, Iterable<Value>)

inputs = [
    ('key1', 'value1'),
    ('key2', 'value2'),
    ('key3', 'value3'),
    ('key1', 'value4'),
    ('key2', 'value5'),
    ('key3', 'value6'),
]

with beam.Pipeline() as pipeline:
    outputs = pipeline | beam.Create(inputs) | beam.GroupByKey()
    outputs | beam.Map(print)
    
# ('key1', ['value1', 'value4'])
# ('key2', ['value2', 'value5'])
# ('key3', ['value3', 'value6'])