K8sExecutor & K8sPodOperator 가 실행하는 Dag
Airflow 사용하면서 궁금 했던 점을 정리하였음
다음과 같은 분들이 읽어 주시면 감사하겠습니다.
- Airflow & K8s를 사용해 엔지니어링을 시작하시는 분들
- 피드백을 남겨주실 고수분들
- 아무나
1. K8sExecutor &K8sPodOperator 동작 과정
을 요약하자면
- 수행해야 할 시점이 된 태스크를 스케줄러가 찾는다,
- Executor는 동적으로 Airflow 워커를 POD 형태로 실행한다.
- 해당 워커 POD는 개발자가 직접 정의한 컨테이너 이미지를 POD 형태로 또다시 실행한다
아래가 요약 과정을 그림으로 나타낸 것 이다.
그림 출처 : 라인 테크블로그
설명 끝..
동작 과정만 놓고 보면 간단한데, 여기서 궁금 한 점은
K8sExecutor & K8sPodOperator 가 실행하는 Dag가 뭐지?
였다.
"뭐긴 뭐냐 airflow.cfg 내부에 'dag_folder'안에 있는 Dag 지 ㅋㅋ"
맞다.
바로 그 Dag 정보를 누가 사용하는지 궁금했고 정확히는 Dag 파일 관리를 위한 Dag sync 작업이나 Volume 마운트 과정이 필요한 이유를 좀 더 찾고 싶었음.
2. 코드 파헤치기
스케줄러가 Executor를 trigger 해서 동작 시작
class KubernetesExecutor(BaseExecutor):
...
...
...
def start(self) -> None:
"""Starts the executor."""
self.log.info("Start Kubernetes executor")
동작하게 되면, AirflowKubernetesScheduelr 초기화
def start(self) -> None:
...
...
...
self.kube_scheduler = AirflowKubernetesScheduler(
kube_config=self.kube_config,
result_queue=self.result_queue,
kube_client=self.kube_client,
scheduler_job_id=self.scheduler_job_id,
)
AirflowKubernetesScheduler 주요 역할은 Pod를 생성한다는 점
생성에 사용되는 이미지는 airflow.cfg 내부의 worker repository 값을 사용함
class AirflowKubernetesScheduler:
def run_next(self, next_job: KubernetesJobType) -> None:
"""Receives the next job to run, builds the pod, and creates it."""
pod = PodGenerator.construct_pod(
namespace=self.namespace,
scheduler_job_id=self.scheduler_job_id,
pod_id=create_pod_id(dag_id, task_id),
dag_id=dag_id,
task_id=task_id,
kube_image=self.kube_config.kube_image,
try_number=try_number,
map_index=map_index,
date=None,
run_id=run_id,
args=command,
pod_override_object=kube_executor_config,
base_worker_pod=base_worker_pod,
with_mutation_hook=True,
)
그러면 dag_id, task_id값은 어디서 가져오고 있는 건가
- run_next 인자 값으로 들어오는 next_job == task에서 가져오고
- task는 task_queue에서 가져오고 있음
self.task_queue: Queue[KubernetesJobType] = self._manager.Queue()
...
task = self.task_queue.get_nowait()
self.kube_scheduler.run_next(task)
task_queue는 multiprocessing.Manager()에서 왔다.
- Manager() 기능의 주요 역할은 데이터 동기화 및 일관성을 보장하면서 서로 다른 프로세스 간에 데이터 공유를 활성화할 때 사용함
self._manager = multiprocessing.Manager()
잠깐 예시
- 10개 프로세스로 하나의 shared_dict에 값을 업데이트하는 예시임
from multiprocessing import Process, Manager
def update_dict(d, key, value):
d[key] = value
if __name__ == "__main__":
with Manager() as manager:
shared_dict = manager.dict()
# Creating multiple processes to update the shared_dict
processes = [
Process(target=update_dict, args=(shared_dict, i, i * 2))
for i in range(10)
]
for process in processes:
process.start()
for process in processes:
process.join()
# Printing the updated shared_dict
print("Shared Dictionary:", dict(shared_dict))
결론적으로 앞서 봤던 task_queue는 다른 프로세스에서 접근 가능한 Queue 개체고
Queue에 있는 제일 첫 번째 task를 run_next에 넘겨서 Pod를 Generator 하는 것
당연히 Queue는 FIFO
self.task_queue: Queue[KubernetesJobType] = self._manager.Queue()
task = self.task_queue.get_nowait()
self.kube_scheduler.run_next(task)
그럼 누가 Queue에 task 넣었나라는 건데,
- 여러 종류의 Executor들은 기본적으로 BaseExecutor를 상속받아 쓰는데 거기서 task를 trigger 하는 부분임
- 결론적으로 webserver 혹은 cli task를 Trigger 하는 경우, 해당 Task를 Queue에 aysnc 하게 넣음
class BaseExecutor(LoggingMixin):
...
...
def trigger_tasks(self, open_slots: int) -> None:
...
if task_tuples:
self._process_tasks(task_tuples)
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
for key, command, queue, executor_config in task_tuples:
del self.queued_tasks[key]
self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config)
self.running.add(key)
결국 해당 Task의 Dag_id랑, Task_id, CFG의 image를 가지고 Pod를 Generator
pod = PodGenerator.construct_pod(
namespace=self.namespace,
scheduler_job_id=self.scheduler_job_id,
pod_id=create_pod_id(dag_id, task_id),
dag_id=dag_id,
task_id=task_id,
kube_image=self.kube_config.kube_image,
try_number=try_number,
map_index=map_index,
date=None,
run_id=run_id,
args=command,
pod_override_object=kube_executor_config,
base_worker_pod=base_worker_pod,
with_mutation_hook=True,
다시 말하지만,
- 여기서 kube image는 cfg의 "worker_container_repository”와 "worker_container_tag” 값을 사용
class KubeConfig:
"""Configuration for Kubernetes."""
def __init__(self):
...
self.worker_container_repository = conf.get(self.kubernetes_section, "worker_container_repository")
self.worker_container_tag = conf.get(self.kubernetes_section, "worker_container_tag")
if self.worker_container_repository and self.worker_container_tag:
self.kube_image = f"{self.worker_container_repository}:{self.worker_container_tag}"
else:
self.kube_image = None
만약 Pod가 잘 생성되었다면,
- 이제 Worker Pod가 생성된 것
- 여기서 KubernetesPodOperator를 보면 특별한 것은 없고 흔히 Pod 생성에 필요한 정보를 필요로 한다.
class KubernetesPodOperator(BaseOperator):
...
...
def __init__(
self,
*,
kubernetes_conn_id: str | None = None, # 'kubernetes_default',
namespace: str | None = None,
image: str | None = None,
name: str | None = None,
random_name_suffix: bool = True,
cmds: list[str] | None = None,
arguments: list[str] | None = None,
ports: list[k8s.V1ContainerPort] | None = None,
volume_mounts: list[k8s.V1VolumeMount] | None = None,
...
...
Example을 보면서 깨달아야 하는 것은
- dag_id를 물고 있다는 것인데 사실 당연한 소리다
with DAG(
dag_id="example_kubernetes_operator",
schedule=None,
start_date=datetime(2021, 1, 1),
tags=["example"],
) as dag:
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
secrets=[secret_file, secret_env, secret_all_keys],
ports=[port],
volumes=[volume],
volume_mounts=[volume_mount],
env_from=configmaps,
name="airflow-test-pod",
task_id="task",
affinity=affinity,
is_delete_operator_pod=True,
hostnetwork=False,
tolerations=tolerations,
init_containers=[init_container],
priority_class_name="medium",
)
스케줄러에 의해 생성된 Worker Pod도 Dag id를 알고 있고
그렇게 생성된 Worker Pod는 dags folder에서 Dag id를 찾아서
PodOperator가 실행되는 것이다.
핵심은, Worker Pod 도 K8sPodOpeartor도 동일한 Dag에 접근이 가능해야 한다는 것이다.
그렇기 때문에 통상적으로 2가지 방법을 사용한다.
- 볼륨 마운트
- S3나 Github를 통한 Dag sync
3. 마무리
K8sExecutor & K8sPodOpeartor를 빨리 사용하려다 일단 이미지 내부에 Dag를 작성하다 에러 참교육 당하고 코드를 뒤젹거리면서 확인한 사실을 적은 글인데, 뭔가 글이 눈에 안들온다..
이상 삽질하다 지능이슈 발생해서 오픈 소스 보러 간 개발자의 포스팅 끝
'MLOps > Development' 카테고리의 다른 글
미증권 뉴스 스크랩핑(Node.js , Express, Puppeteer, Koyeb) (0) | 2023.04.03 |
---|---|
효율적인 대규모 크롤링 시스템 운영을 위한 Fargate on EKS 적용하기 - 3편 (0) | 2023.03.23 |
효율적인 대규모 크롤링 시스템 운영을 위한 Fargate on EKS 적용하기 - 2편 (2) | 2023.03.09 |
효율적인 대규모 크롤링 시스템 운영을 위한 Fargate on EKS 적용하기 - 1편 (2) | 2023.02.25 |
[티끌모아 빅데이터] 나의 방문 일지 - 제 1편 : 티끌의 시작 (0) | 2023.02.17 |