본문 바로가기

MLOps/Development

Dag를 알고 있는 녀석은?(K8sExecutor & K8sPodOperator)

K8sExecutor & K8sPodOperator 가 실행하는 Dag

Airflow 사용하면서 궁금 했던 점을 정리하였음


다음과 같은 분들이 읽어 주시면 감사하겠습니다.

  • Airflow & K8s를 사용해 엔지니어링을 시작하시는 분들
  • 피드백을 남겨주실 고수분들
  • 아무나

1. K8sExecutor &K8sPodOperator 동작 과정 

을 요약하자면

  1. 수행해야 할 시점이 된 태스크를 스케줄러가 찾는다,
  2. Executor는 동적으로 Airflow 워커를 POD 형태로 실행한다.
  3. 해당 워커 POD는 개발자가 직접 정의한 컨테이너 이미지를 POD 형태로 또다시 실행한다

아래가 요약 과정을 그림으로 나타낸 것 이다.

그림 출처 : 라인 테크블로그

설명 끝..

동작 과정만 놓고 보면 간단한데, 여기서 궁금 한 점은

K8sExecutor & K8sPodOperator 가 실행하는 Dag가 뭐지?

였다.

 

"뭐긴 뭐냐 airflow.cfg 내부에 'dag_folder'안에 있는 Dag 지 ㅋㅋ"

 

맞다.

바로 그 Dag 정보를 누가 사용하는지 궁금했고 정확히는 Dag 파일 관리를 위한 Dag sync 작업이나 Volume 마운트 과정이 필요한 이유를 좀 더 찾고 싶었음.


2. 코드 파헤치기

스케줄러가 Executor를 trigger 해서  동작 시작

class KubernetesExecutor(BaseExecutor):
  ...
  ...
  ...
	def start(self) -> None:
        """Starts the executor."""
        	self.log.info("Start Kubernetes executor")

동작하게 되면, AirflowKubernetesScheduelr 초기화

def start(self) -> None:
		...
		...
			...
		self.kube_scheduler = AirflowKubernetesScheduler(
		            kube_config=self.kube_config,
		            result_queue=self.result_queue,
		            kube_client=self.kube_client,
		            scheduler_job_id=self.scheduler_job_id,
		        )

AirflowKubernetesScheduler 주요 역할은 Pod를 생성한다는 점

생성에 사용되는 이미지 airflow.cfg 내부의 worker repository 값을 사용

class AirflowKubernetesScheduler:
    def run_next(self, next_job: KubernetesJobType) -> None:
        """Receives the next job to run, builds the pod, and creates it."""

        pod = PodGenerator.construct_pod(
            namespace=self.namespace,
            scheduler_job_id=self.scheduler_job_id,
            pod_id=create_pod_id(dag_id, task_id),
            dag_id=dag_id,
            task_id=task_id,
            kube_image=self.kube_config.kube_image,
            try_number=try_number,
            map_index=map_index,
            date=None,
            run_id=run_id,
            args=command,
            pod_override_object=kube_executor_config,
            base_worker_pod=base_worker_pod,
            with_mutation_hook=True,
        )

그러면 dag_id, task_id값은 어디서 가져오고 있는 건가

  • run_next 인자 값으로 들어오는 next_job == task에서 가져오고
  • task는 task_queue에서 가져오고 있음
self.task_queue: Queue[KubernetesJobType] = self._manager.Queue()

...
task = self.task_queue.get_nowait()

self.kube_scheduler.run_next(task)

task_queue는 multiprocessing.Manager()에서 왔다.

  • Manager() 기능의 주요 역할은 데이터 동기화 및 일관성을 보장하면서 서로 다른 프로세스 간에 데이터 공유를 활성화할 때 사용함
self._manager = multiprocessing.Manager()

잠깐 예시

  • 10개 프로세스로 하나의 shared_dict에 값을 업데이트하는 예시임
from multiprocessing import Process, Manager

def update_dict(d, key, value):
    d[key] = value

if __name__ == "__main__":
    with Manager() as manager:
        shared_dict = manager.dict()

        # Creating multiple processes to update the shared_dict
        processes = [
            Process(target=update_dict, args=(shared_dict, i, i * 2))
            for i in range(10)
        ]
        for process in processes:
            process.start()

        for process in processes:
            process.join()

        # Printing the updated shared_dict
        print("Shared Dictionary:", dict(shared_dict))

결론적으로 앞서 봤던 task_queue다른 프로세스에서 접근 가능한 Queue 개체고

Queue에 있는 제일 첫 번째 task를 run_next에 넘겨서 Pod를 Generator 하는 것

당연히 Queue는 FIFO

self.task_queue: Queue[KubernetesJobType] = self._manager.Queue()
task = self.task_queue.get_nowait()

self.kube_scheduler.run_next(task)

그럼 누가 Queue에 task 넣었나라는 건데,

  • 여러 종류의 Executor들은 기본적으로 BaseExecutor를 상속받아 쓰는데 거기서 task를 trigger 하는 부분임
  • 결론적으로 webserver 혹은 cli task를 Trigger 하는 경우, 해당 Task를 Queue에 aysnc 하게 넣음
class BaseExecutor(LoggingMixin):
	...
	...
	def trigger_tasks(self, open_slots: int) -> None:
		...
		if task_tuples:
            self._process_tasks(task_tuples)

      def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
          for key, command, queue, executor_config in task_tuples:
              del self.queued_tasks[key]
              self.execute_async(key=key, command=command, queue=queue, executor_config=executor_config)
              self.running.add(key)

결국 해당 Task의 Dag_id랑, Task_id, CFG의 image를 가지고 Pod를 Generator

  pod = PodGenerator.construct_pod(
      namespace=self.namespace,
      scheduler_job_id=self.scheduler_job_id,
      pod_id=create_pod_id(dag_id, task_id),
      dag_id=dag_id,
      task_id=task_id,
      kube_image=self.kube_config.kube_image,
      try_number=try_number,
      map_index=map_index,
      date=None,
      run_id=run_id,
      args=command,
      pod_override_object=kube_executor_config,
      base_worker_pod=base_worker_pod,
      with_mutation_hook=True,

다시 말하지만,

  • 여기서 kube image는 cfg의 "worker_container_repository”와 "worker_container_tag” 값을 사용
class KubeConfig:
    """Configuration for Kubernetes."""
	def __init__(self):
				...
        self.worker_container_repository = conf.get(self.kubernetes_section, "worker_container_repository")
        self.worker_container_tag = conf.get(self.kubernetes_section, "worker_container_tag")
        if self.worker_container_repository and self.worker_container_tag:
          self.kube_image = f"{self.worker_container_repository}:{self.worker_container_tag}"
        else:
          self.kube_image = None

만약 Pod가 잘 생성되었다면,

  • 이제 Worker Pod가 생성된 것
  • 여기서 KubernetesPodOperator를 보면 특별한 것은 없고 흔히 Pod 생성에 필요한 정보를 필요로 한다.
class KubernetesPodOperator(BaseOperator):
	...
	...
	def __init__(
	        self,
	        *,
        kubernetes_conn_id: str | None = None,  # 'kubernetes_default',
        namespace: str | None = None,
        image: str | None = None,
        name: str | None = None,
        random_name_suffix: bool = True,
        cmds: list[str] | None = None,
        arguments: list[str] | None = None,
        ports: list[k8s.V1ContainerPort] | None = None,
        volume_mounts: list[k8s.V1VolumeMount] | None = None,
                ...
                ...

Example을 보면서 깨달아야 하는 것은

  • dag_id를 물고 있다는 것인데 사실 당연한 소리다
with DAG(
    dag_id="example_kubernetes_operator",
    schedule=None,
    start_date=datetime(2021, 1, 1),
    tags=["example"],
) as dag:
    k = KubernetesPodOperator(
        namespace="default",
        image="ubuntu:16.04",
        cmds=["bash", "-cx"],
        arguments=["echo", "10"],
        labels={"foo": "bar"},
        secrets=[secret_file, secret_env, secret_all_keys],
        ports=[port],
        volumes=[volume],
        volume_mounts=[volume_mount],
        env_from=configmaps,
        name="airflow-test-pod",
        task_id="task",
        affinity=affinity,
        is_delete_operator_pod=True,
        hostnetwork=False,
        tolerations=tolerations,
        init_containers=[init_container],
        priority_class_name="medium",
    )

스케줄러에 의해 생성된 Worker Pod도 Dag id를 알고 있고

그렇게 생성된 Worker Pod는 dags folder에서 Dag id를 찾아서

PodOperator가 실행되는 것이다.


핵심은, Worker Pod 도 K8sPodOpeartor도 동일한 Dag에 접근이 가능해야 한다는 것이다.

그렇기 때문에 통상적으로 2가지 방법을 사용한다.

  1. 볼륨 마운트
  2. S3나 Github를 통한 Dag sync

3. 마무리

K8sExecutor & K8sPodOpeartor를 빨리 사용하려다 일단 이미지 내부에 Dag를 작성하다 에러 참교육 당하고 코드를 뒤젹거리면서 확인한 사실을 적은 글인데, 뭔가 글이 눈에 안들온다.. 

 

이상 삽질하다 지능이슈 발생해서 오픈 소스 보러 간 개발자의 포스팅 끝