Search

Spark 쓰면서 겪은 이야기들

생성일
3/7/2022, 1:42:18 AM
태그
Apache Spark
PySpark
Hadoop
Big Data
작성자

Ticketplace는 이렇게 Batch process를 만들어 쓰고 있습니다

EMR Version 6.1.0, Spark Version 3.0.0 을 이용하고 있습니다.

배경

초기 Youha를 만들 때, 데이터 긁어오기 (Youtube기반이다 보니까 Youtube API를 써서 Youtube channel과 video들의 정보를 긁어옵니다), 정보의 집계/통계 등의 일을 본 서비스 외에 백그라운드에서 돌려야 하는 일을 모두 AWS Lambda 로 처리했습니다. 뭐 그냥 바로 바로 돌아가니 뭐 문제는 없을 수 있지만 다음의 문제들을 고민할 수 밖에 없었습니다.
AWS Lambda는 최대 15분 이상 돌아갈 수 없습니다. 만약 데이터가 크거나 뭔가 오래 걸리는 일이 벌어지면 중간에 어디만큼 했는지 다 기록을 해놔야 합니다.
데이터의 크기가 점점 커져갈텐데, 이런 구조에 대한 배려없이 단순 코딩만으로는 한계가 닥칠 것입니다. 예를 들어 10만건 이상의 데이터에 대해서 모아서 한번에 집계를 하려면 Big data framework없이는 스스로 이를 나누고 쪼개서 돌려야 하는데 쉽지가 않습니다.
각 Batch로 돌아가는 일들의 순서를 관리할 수 없습니다.
이러한 문제들을 처리하기 위해서 새로운 구조의 Batch process를 처리하는 시스템을 만들어야 했습니다.

새로운 Batch process의 구조

새로운 Batch proccess의 구조는 다음과 같은 구조로 바꿨습니다.
Apache airflow로 전체 Batch process의 Schedule과 순서를 제어합니다.
Apache airflow의 DAG로 선언된 Job들은 Amazon EMR에서 실행합니다.
이를 위해 EMR에 각 Job들을 CLI명령으로 만들어 둡니다.
ETL(Extract / Trasform / Load)의 마지막으로는 Back-end의 OLTP DB (현재 MongoDB를 주로 쓰고 있습니다.)나 혹은 S3에 정리한 결과를 저장해 놓습니다.
이 S3에 담은 데이터는 대부분 실제 서비스에서 바로바로 제공해야 하는 데이터들입니다.
OLTP DB에 들어가는 데이터들은 실제 화면에서 이를 이용해서 Sort / Filtering 해야하는데 필요한 값들이어서 DB에 기록합니다.

Airflow

Airflow는 Job들의 순서와 의존성을 DAG구조로 이어서 배치 프로세스들을 관리하는 시스템입니다. 다른 어떤 것보다 제일 직관적으로 관리하기가 나아서 이를 이용하기로 했습니다. 당시 이를 만드는 과정에서 EKS의 관리가 현재 팀의 역량에는 너무 어렵다는 결론을 내려서 ECS를 이용해서 Airflow를 올렸습니다. (문제는 열심히 만들어 놨더니 며칠 지나서 AWS에 갑자기 관리형 Airflow가 나왔네요..어흑)

EMR (Elastic Map Reduce)

현재 Ticketplace에서는 Spark를 돌리기 위해서 AWS의 EMR을 이용해서 Spark cluster들을 관리하고 있습니다. 별도의 관리형 서비스들이 있긴 하지만 현재로서는 제일 저렴한 서비스여서 이를 활용하기로 했습니다.
Cluster의 효율적인 관리를 위해서 Yarn cluster 관리자를 이용했습니다. Yarn은 클러스터의 자원을 ‘컨테이너’ 단위로 쪼갤 수 있고 이를 각 Spark job들에게 동시에 배분할 수 있습니다. 그리고 하나의 Cluster를 이용하기로 했는데 이는 지속적으로 계속 Spark job들이 돌면서 일을 처리해야 하기 때문에 매번 새롭게 EMR cluster를 만드는 것보다는 낫다고 판단했습니다. (참고)
EMR cluaster안에 실제 일을 처리하는 코드는 어떻게 작성할까요? EMR cluster는 Docker image를 지정해서 이 Docker image안에 들어 있는 Spark job을 실행하는 기능을 6.0.0부터 가지고 있습니다. 간단하게 설명하면 Docker registry와 여기서 가져다 쓸 Docker image이름을 지정하면 사용할 수 있습니다.

Airflow→EMR cluster까지 Code로 설명하기

위에서 설명한대로 Airflow에서 EMR cluster까지 흘러가는 명령의 흐름을 예를 들어서 설명해보겠습니다. 예를 들어 보면 Airflow의 DAG는 EMRSSHOperator를 이용해서 이렇게 구성되어 있습니다.
args = { ...... } # EMR cluster의 Executor 수, Core 수, Memory 수를 설정합니다. default_spark_config_args = ( ..... ) # 내부에서 만든 함수, ECR registry의 URI를 가져옵니다. ecr_uri = ecr_helper.get_ecr_uri() background_job_for_youtube_channel_dag = DAG( dag_id='background_job_for_youtube_channel', default_args=args, schedule_interval='## ## * * *', catchup=False, ) # 여기, EMRSSHOperator로 Spark job을 보냅니다. gather_trends_videos_and_channels = EMRSSHOperator( dag=background_job_for_youtube_channel_dag, task_id="gather_trends_videos_and_channels", docker_image_name=ecr_uri, spark_program_args="gather-trends-videos-and-channels", spark_config_args=default_spark_config_args, ) ........ run_in_everyday_task >> \ gather_trends_videos_and_channels >> ......
Python
DAG 예제
여기서 EMRSSHOperator로 보낸 내용은 이러한 command를 Spark cluster에서 실행한 것과 같은 것입니다.
$ spark-submit \ --master yarn \ --deploy-mode cluster \ ......... \ run_task.py gather_trends_videos_and_channels .......
Shell
이렇게 EMR cluster에 지정된 Docker image안에 있는 run_task.py가 지정된 명령을 실행하게 됩니다. run_task.py는 이런 코드입니다. 이 내용은 ‘Argument로 주어진 shell command를 실행한다'는 뜻입니다.
(참고로 여기서부터는 EMR에서 실행되는 Docker image안입니다.)
import sys root_path = '/pyspark' if root_path not in sys.path: sys.path.insert(0, root_path) # 미리 모아놓은 CLI들, Click을 이용해 구현 from youha.entry_point import cli if __name__ == '__main__': cli()
Python
run_task.py
위의 코드에서 참조하고 있는 from youha.entry_point import cli 는 아래와 같습니다. click이라는 python library로 구성된 CLI command들의 집합입니다.
import click from youha.channel.cli import channel_commands from youha.contents.cli import contents_command from youha.proposal.cli import proposal_command from youha.user.cli import user_commands from youha.dashbaord.cli import dashboard_commands from youha.analytics.cli import analytics_commands cli = click.CommandCollection( sources=[channel_commands, user_commands, contents_command, proposal_command, dashboard_commands, analytics_commands])
Python
youha/entry_point.py
위에서 부른 gather_trends_videos_and_channels는 그럼 어떻게 구현되어 있을까요? 이렇게 구현되어 있습니다.
import click from pyspark import SparkConf from pyspark.sql import SparkSession from youha.channel.etl import gather_trends_videos_and_channels_task from youha.common.helper import cli_helper from youha.common.log import log @click.command() @click.option('-r', '--region_code', type=click.STRING, default="KR", show_default=True) def gather_trends_videos_and_channels(region_code): log("starting task: gather_trends_videos_and_channels") conf = cli_helper.set_spark_s3a_config(SparkConf()) conf.setAppName("gather_trends_videos_and_channels_task") spark = (SparkSession .builder .config(conf=conf) .getOrCreate()) log("created spark session") try: gather_trends_videos_and_channels_task( spark=spark, region_code=region_code) except Exception as e: print(e) raise e finally: spark.stop()
Python
youha/channel/cli/gather_trends_videos_and_channels.py
이렇게 되면 각 Job들의 순서를 바꾸기 전에는 DAG는 손댈 이유가 없습니다. 계속해서 Spark job을 CLI로 감싸놓은 명령들을 품고 있는 Docker image를 배포하고 이를 사용하는 EMR cluster를 배포하면 됩니다.

새롭게 알게 된 것들

Spark의 Executor / Node / 가용 메모리 최적화

초기에 Spark설정에서 우선 ‘돌아간다'에 의의를 두고 있었는데 너무 느리거나 뭔가 자원이 많은데도 시작을 못하거나 하는 경우도 있었습니다. 결론적으로 말하면 Executor, Node, 메모리 할당을 잘못하고 있었더라고요. 이를 위해서 다음의 규칙을 이용했습니다. (참고)
예를 들어 아래와 같은 구성이라 가정하면,
**Cluster Config:** 10 Nodes 16 cores per Node 64GB RAM per Node
HDFS의 Throughput을 좋게 하기 위해 executor당 5개를 할당합니다. => -executor-cores = 5 
HDFS Client에 너무 많은 Thread가 돌게 되면 이를 감당할 수 없습니다. 실제 관찰해보면 executor당 5개 이하로 잡는 것이 좋습니다. 그래서 이 숫자는 5나 혹은 그보다 작은 값을 추천합니다.
1개 core는 Hadoop / Yarn daemon용으로 남겨놓습니다. => Num cores available per node = 16-1 = 15
그래서 전체 Cluster안에 가능한 core수는 = 15 x 10 = 150
전체 가능한 executor수 = 전체 core수 / executor당 전체 core수 = 150/5 = 30
위의 값에서 ApplicationManager용으로 하나를 더 빼면 => -num-executors = 29
Node당 executor의 숫자 = 30/10 = 3
한 Node위의 executor당 할당 가능한 Memory는 = 64GB/3 = 21GB
Heap 관리시 꽉차는 경우(overhead)를 감안해서 조금 깍아야 한다 = 7% of 21GB = 3GB.
그래서 실제  -executor-memory = 21 - 3 = 18GB
이 구성은 상황마다 조금씩 다르지만 이 부분을 Tuning하고나서 Performance의 차이가 매우 달라지는 것을 확인했습니다. 여러 Spark job이 돌아갈 때는 특히 이를 어떻게 Tuning했느냐에 따라서 성능차이가 많이 납니다.

동시에 여러 DAG가 돌지 않게 해주세요

실제 EMR의 힘(?)만 믿고, 매시간 Batch process가 돌아가는 구조를 만들었는데 동시에 4개 이상의 일이 같은 시간에 돌아가는 것은 무리더라고요. 물론 무한한 EMR자원을 무한하게 쓰면 되겠지만 저희의 지갑은 무한하지 않지요. ㅎㅎㅎ ‘무리’라고 표현은 했지만 동시에 무거운 Batch process가 돌다보면 정작 빨리 끝날 수 있는 일들도 자원이 없어서 실행이 느려지곤 합니다. 그래서 Schedule을 조절해서 동시에 4개 이상의 일이 겹쳐서 돌아가지는 않게 하길 권합니다.
그리고 전날 돌아야 하는 일이 안끝나서 다음날 도는 일에 영향을 미치기도 합니다. 이럴 때는 해당 batch job을 과감히 Fail처리해야 할 수도 있습니다. Case by case가 많지만 대응하는 수 밖에 없더라고요...T_T

Spark job을 테스트 가능하게 하자

저희 Youha 의 기존 Spark 코드는 하나의 Job에서 Extract, Transform, Load 중 2개 이상을 같이 사용하게 만들어져있었습니다.
한 함수에서 2가지 이상의 동작을 하여 테스트를 하기 어렵고 테스트케이스가 준비되어있지 않았습니다. 지속해서 개발을 하기 위해서 테스트 케이스가 필요하게 되었고, 테스트 케이스를 만들기 위해서 규칙을 만들어 나가게 되었습니다.
1.
BaseTask 추상클래스를 만들고 Extract, Transform, Load 단계를 추상메소드로 구성한 후 run 함수에서 Extract →Transform → Load 단계별로 실행될 수 있도록 구현 후, 단계별 실행 후 리턴값이 있는 경우 이후 단계에서 사용할 수 있도록 정의 해주었습니다.
a.
Extract: 데이터를 수집하는 함수로 구성합니다. 예를 들어 S3, API를 사용하여 데이터를 가져오는 단계입니다. 리턴 값의 타입은 Dataframe으로 강제합니다.
b.
Transform: Extract 단계에서 처리하고 리턴된 Dataframe 을 Load 하기위한 변환단계로, Spark Transformation 함수들을 사용하여 Dataframe을 조작하는 단계입니다. 이때 단계에서도 역시 리턴 값의 타입은 Dataframe 으로 강제합니다.
c.
Load: Transform 단계에서 처리하고 반환된 Dataframe을 S3, API를 통해서 데이터를 생성하는 단계입니다.
2.
Transform 단계의 로직만 테스트 케이스를 구성합니다.
a.
테스트 케이스 구현하기위해 pytest 모듈을 사용합니다. 모듈을 임포트 한 후 fixture을 사용하여 SparkSession 오브젝트를 생성하고 yield 를 사용하여 테스트 케이스에서 사용할 수 있도록 구현 후 테스트가 마무리되면 SparkSession 을 중지시킬 수 있도록 구현합니다.
b.
Extract / Load 의 경우 S3, API 등을 사용하기 때문에 동일한 타입의 Dataframe 을 생성합니다.
c.
Transform 단계를 진행 한 후 변환된 Dataframe 을 생성합니다
d.
Transform 함수에 Extract 에서 생성한 Dataframe 넘겨준 후 반환된 Dataframe 과 Transform 에서 생성된 Dataframe 을 비교하여 예상했던 값이 제대로 나왔는지 확인합니다.
Extract, Transform 단계에서 Dataframe 을 리턴하는 이유 Spark 는 기본적으로 Lazy-Loading 으로 실행됩니다. 이는 계산을 바로바로 하지 않고 논리적으로만 처리를 어떻게 할 지 정해둔 후 Spark Action 함수가 (ex. collect(), count()) 불려질때 비로소 계산을 시작합니다. 위 단계에서 Action 함수를 사용할 때 마다 비용이 늘어나게되고, 데이터 처리의 시간 또한 오래 걸리게됩니다. Action 함수는 Load 단계에서만 사용할 수 있도록 하기위해서 Dataframe 으로 처리 될 수 있도록 하였습니다.
위와 같은 규칙을 만들고 테스트케이스를 구성하며, 이후 단계별 변경이 일어나는 경우 바로 알 수 있고, 빠르게 코드를 수정 할 수 있는 장점을 얻었습니다.
# BaseTask class 코드 class BaseTask(metaclass=ABCMeta): @abstractmethod def extract(self, **kwargs) -> Union[Dict[str, DataFrame], None]: pass @abstractmethod def transform(self, **kwargs: DataFrame) -> Union[Dict[str, DataFrame], None]: pass @abstractmethod def load(self, **kwargs: DataFrame) -> None: pass def run(self) -> None: input_date = self.extract() if not input_date: return output_data = self.transform(**input_date) if not output_data: return self.load(**output_data)
Python
BaseTask class

DataFrame 에 NullType 이 들어가지 않게 하자

ETL 규칙을 만든 후 데이터를 테스트하는 중 Dataframe Scheme을 확인하는 중에 NullType 인 필드들을 확인하였습니다. Scheme을 선언하지 않고 바로 MongoDB 에서 데이터를 가져오다 보니 데이터 샘플링을 할 때 전체가 다 None 값으로 들어가 있어 NullType 으로 유추를 하고 넣어둔 것으로 보입니다.
이를 해결하기 위해서 저희 Youha 에서는 데이터를 읽어올 때 NullType 으로 지정되어있는 StructField Type이 NullType 인 경우 StringType 으로 변경해주는 작업을 추가해주었습니다.
당연히 위 작업을 통해서 전체적인 Scheme 를 정확히 잡을 수 없는 점을 확인하고 이 문제를 해결하고자 아래 방법을 사용했습니다.

MongoDB 를 사용하더라도 Scheme 를 선언하자

저희 Youha 에서는 MongoDB를 사용하고 있으며, Spark에서 MongoDB 데이터를 읽어올 때 spark.read.options 의 format을 mongo 로 사용하고 있었습니다.
모든 DB들이 그렇듯, 서비스가 개발되어 갈수록 새로운 필드가 추가되어야 하는 상황들이 생겼습니다. 새로운 필드는 spark job 에서 다른 값을 생성할 때 참조가 되므로 테스트를 위해서 특정 document 들만 필드를 추가시켜두고 Dataframe으로 데이터를 가져오는 spark job을 실행시켰습니다.
그런데 이상한 문제가 발생했습니다. 특정 컬럼들을 읽고 join 을 하는 곳에서 추가된 컬럼을 찾지 못한다는 에러가 발생한것입니다. MongoDB documents 의 필드에는 추가 한 값이 존재하는데 왜 이런 현상이 발생했을까요?
이를 해결하기 위해 mongodb, pyspark 문서를 찾는 중, spark 에서 DB의 스키마를 결정할 때, Spark는 1000개의 document를 기준으로 스키마를 추론을 해서 데이터를 읽어오는 속도를 높인다는 것을 알았습니다. 즉, 저희가 추가한 필드를 가지고 있는 Document 는 많지 않았기 때문에 샘플링해서는 새로운 필드가 있다는 것을 알지 못했다는 것입니다. 그래서 mongodb 의 스키마와 동일하게 Dataframe scheme 을 생성해서 사용하는 방식을 쓰고 있습니다.
statistics_scheme = StructType([ StructField(name="fieldNameA", dataType=IntergerType(), nullable=True), StructField(name="fieldNameB", dataType=StringType(), nullable=False), StructField(name="fieldNameC", dataType=FloatType(), nullable=True), StructField(name="fieldNameD", dataType=BooleanType(), nullable=False), ])
Python
만들어 주었던 스키마의 예시
“아니, NoSQL을 쓰는데 왜 스키마를 정해?”라고 하실 수도 있습니다. 이것은 Spark의 동작 형태가 그러해서 저희로서는 어쩔 수 없이 택한 방법이었습니다. 이 Scheme를 사용하는 방식으로 MongoDB와 연결해서 Dataframe을 만들어서 빠짐없이 처리할 수 있었습니다.

저자소개

유진호는 티켓플레이스의 CTO입니다. 전체 아키텍쳐 설계와 데이터 구조의 설계, 그리고 근처 맛집 수집(?)용 크롤러에 관심이 많습니다.
정해민은 DevOps, SRE에 관심이 많은 Software engineer입니다. 티켓플레이스에서 Batch processing의 Infrastructure와 Batch job code들을 개발, 유지보수하고 있습니다.

Reference