1. Workflow(DAG) 작성 및 실행
이제부터 직접 python을 통해 workflow를 DAG형태로 만들어보고 해당 workflow를 airflow에서 실행하고 이해해봅시다. airflow안에서 yolov5 model으로 inference하는 것을 목적으로 하겠습니다. 따라오시죠!
1.1 실행 환경 준비
실행 환경을 다음과 같이 셋팅합니다.
- airflow webserver 실행 (localhost의 8080포트로 연결)
airflow webserver -p 8080
- airflow scheduler 실행
airflow scheduler
- DAG file을 생성할 저장소 생성 (맥북 기준)
cd ~/airflow/
mkdir dags
참고로 airflow.cfg 는 Airflow 관련 설정에 대한 파일, airflow.db은 sqlite database파일이며 이는 airflow설치 시 자동으로 사용되는 db이다.
1.2 workflow 정의
이전 글에서 workflow 즉, DAG를 실행시키고 싶은 지에 대한 정의는 python 파일을 통해 가능하다고 말씀드렸죠. 그래서 이번에 python을 통해 다음과 같은 workflow를 만들 수 있도록 해보겠습니다.
해당 workflow의 목적은 다음과 같습니다.
- 목적: 랜덤하게 사람 이미지를 다운로드하고 해당 이미지를 yolov5 model로 inference하고 해당 결과를 저장
- make_image_store: 이미지가 저장될 장소를 만듬
- download_person_image: unsplash으로부터 사람(person) 사진을 이미지 저장소에 저장
- unsplash에서는 무료로 사진을 다운받을 수 있음
- Inference_using_yolov5: yolov5 model을 다운받아 위의 이미지로 inference하고 결과를 이미지 저장소에 저장함
- 해당 workflow는 한번만 실행되도록 schedule_interval을 None으로 설정함
2. workflow 작성 in Python
2.1 DAG 정의
위에서 만든 ~/airflow/dags폴더안에 workflow가 정의된 python파일을 만들면 webserver에서 자동으로 해당 DAG를 등록해줍니다. (만들고 webserver에 등록되는(보여지는) 데 30초정도 걸려요!) 그럼 이제 해당 폴더안에 yolov5_inference.py라는 파일을 만들겠습니다. 그리고 다음과 같이 작성해봅시다.
#yolov5_inference.py
from datetime import datetime
from pathlib import Path
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# DAG 정의
dag = DAG(
dag_id="yolov5_inference", # dag의 고유 이름 (webserver에 표시될 dag 이름)
description="Download person picture and inference it using yolov5", # dag 설명
start_date=datetime(2022, 1, 1), #해당 pipeline 실행 시작 시간
tags=["yolov5"],
schedule_interval=None, #해당 pipeline 실행 주기 (None으로 한번만 실행하도록 함)
catchup=False # 이전에 실행되지 않았던 dag를 실행할지 말지 결정
)
위에서 중요하게 볼 요소는 4가지입니다.
- dag_id: dag의 고유 이름
- start_date: dag의 실행 시작 날짜
- schedule_interval: dag의 실행 주기
- None일 경우 한번만 실행됨
- catchup: 이전에 실행되지 않았던 dag를 실행할지 말지 결정
- False 값은 실행을 안함을 명시
start_date와 schedule_interval의 이해
다음과 같이 start_date를 2022년1월1일 0시, schedule_interval이 @daily(하루마다 실행)이고 실제 dag를 trigger run하는 시간도 2022년1월1일 0시이라면 실제 dag가 실행되는 시간은 (start_date + schedule_interval)2022년1월2일 0시가 됩니다. 그리고 schedule_interval이 daily이므로 2022년1월3일 0시에는 2번째 dag run이 실행됩니다.
아래는 schedule interval의 가능한 다른 옵션과 그에 대한 의미는 다음과 같습니다.
catchup 이해
만약 2022년1월1일부터 하루마다 주기로(@daily) dag run이 실행되고 있다가 2,3일에 서버가 다운되거나 dag의 코딩오류와 같은 특정 이유로 dag가 run이 되지않았다고 하였다고 해봅시다. 그리고 1월 4일에 해당 dag를 다시 실행시킬려고 할때 catchup를 True로 할경우 이전에 실행되지 않았던(2일,3일) dag run을 실행하면서 catchup하는 것입니다.
2,3일에 실행되지 않았던 dag run때문에 2,3일에 대한 workflow 결과물이 없습니다. 이런 문제점을 해결하기 위해 4일에 catchup을 True로 해놓으면 실행되지 않았던 2,3일에 대한 dag run을 하면서 해당 날짜들에 결과물도 얻을 수 있는 것입니다.
2.2 DAG의 task 정의 (make_image_store+ download_person_picture)
이제 task들을 만들어 볼텐데요. task는 operator를 wrapping하고 있으며 실제로 operator를 통해 task가 이루어진다고 생각하시면 됩니다. 먼저 이미지를 저장할 저장소를 만들어 주기 위해 make_image_store의 task는 pythonOperator를 사용하게됩니다. pythonOperator는 python함수를 실행시킬 수 있는 operator입니다. 그리고 이미지를 저장소를 만들어주었다면 해당 저장소에 이미지를 다운로드하는 것은 Bash 명령어를 사용하게 되므로 BashOperator을 사용하게 됩니다. (다음 코드는 위의 DAG정의 뒤에 붙여주시면 됩니다.)
IMAGE_DIR = '/tmp/images' # image 저장 장소 지정 (원하는 곳으로 지정)
def _make_img_store():
Path(IMAGE_DIR).mkdir(exist_ok=True, parents=True) # image가 저장될 장소 만듬
# task 정의
make_image_store = PythonOperator(
task_id="make_image_store",
python_callable=_make_img_store,
dag=dag
)
# task 정의
download_person_picture = BashOperator(
task_id="download_person_picture",
bash_command=f"curl -L https://source.unsplash.com/random?person --output {Path(IMAGE_DIR)/'image.png'}", # download person pic
dag=dag,
)
make_image_store >> download_person_picture
- make_image_store: pythonOperator를 사용하여 python 함수 실행(make_img_store)
- role: /tmp/images 디렉토리를 생성함
- download_person_picture: BashOperator를 사용하여 curl 명령어 사용
- role: unsplash를 이용하여 random한 사람이미지를 다운로드하여 /tmp/images에 저장
- make_image_store >> download_person_picture: task간의 dependency를 나타냄
- make_image_store가 실행되고 나서야 download_person_picture이 실행됨
위처럼 코딩해놓았다면 webserver에 접속해보면 dag_id(yolov5_inference)이름으로 dag가 등록되어있음을 확인가능합니다.
생성한 yolov5_inference dag에서 tree 메뉴와 graph메뉴를 눌러보면 다음과 같은 화면을 확인가능하다.
뿐만아니라 code 메뉴를 통해 우리가 만든 코드는 확인가능하다.
DAG run
webserver 오른쪽에 보면 실행버튼이 있을텐데 눌러서 이제 직접 dag run을 해보자. 정상적으로 실행되었다면 다음과 같은 화면으로 변경된다. 저는 2월 14일에 dag run을 시켰으므로 해당 시간으로 기록되고 schedule_interval이 None이므로 dag run을 한 시간에 한번만 실행되고 종료된다. (dependency에 따라 make_image_store가 실행되고 download_person_picture이 실행된다.)
정상적으로 실행됨을 확인 했으니 실제로 /tmp/images에 이미지가 잘 저장되었는지 확인해보자.
현재까지 dag를 정의하고 task를 정의하고 task들이 실행됨을 확인하였다. 뒤에서 yolov5 task를 추가하고 새로 dag run을 위해 기존의 dag run의 record를 삭제(초기화)해보자. (cmd창에서하세용)
airflow dags delete yolov5_inference -y #yolov5_inference는 위에서 정의한 dag_id
2.3 DAG의 task 정의 (inference_using_yolov5)
yolov5s model로 다운로드한 random한 person 이미지를 inference하기 위해 다음과 같은 shell script(yolov5_inference.sh)를 ~/airflow/dags/에 작성합니다.
#yolov5_inference.sh
git clone https://github.com/ultralytics/yolov5.git #yolov5 repo을 clone함
cd yolov5 # clone한 repo 폴더로 접속
curl -L https://github.com/ultralytics/yolov5/releases/download/v6.0/v61_yolov5s.pt --output v61_yolov5s.pt # yolov5s model(.pt)를 다운로드함
pip install -r requirements.txt # inference(detect)를 위한 library설치
python3 detect.py --weights v61_yolov5s.pt --source $1 --project /tmp/images --name result # 첫번째 인자로받은($1) image를 입력으로 yolov5s model로 inference
위의 shell script 명령어의 의미를 주석으로 적어놓았고 자세한 특징점은 다음과 같다.
v6_yolov5s.pt
로 저장된 model은 coco dataset으로 학습됨- coco dataset에는 person detection을 위한 class도 포함
python3 detect.py ...
의 $1의 값은 shell script을 실행하는 인자로 받음을 명시- 인자로 받은 path는 image path가 될것이며 이는 detection model의 입력 source로 들어감
- detect하는 데 사용되는 모델은 다운받은 v61_yolov5s.pt가 됨
- 해당 명령어의 --project와 --name의 value값을 detection 결과 이미지 저장 디렉토리로 지정함
- 저장 장소: /tmp/images/result/
위와 같이 shell script를 작성했다면 이제 dag의 task를 추가 정의해보자.
# task 정의
inference_using_yolov5 = BashOperator(
task_id="inference_using_yolov5",
bash_command=f"sudo sh ~/airflow/dags/yolov5_inference.sh {Path(IMAGE_DIR)/'image.png'}", # inference person image using yolov5
dag=dag,
)
make_image_store >> download_person_picture >> inference_using_yolov5
보시면 아시겠지만 yolov5_inference.sh를 실행시키기 위해 BashOperator를 사용하는 것이고 인자값으로 다운로드한 random한 image의 path가 들어간다. 그럼 이제 다시 webserver에 접속해 해당 task가 추가되었는지 확인한다.
잘 생성되었음을 확인하고 해당 dag trigger run을 해보자!
dag run이 정상적으로 확인되었음을 확인가능하다. 그렇다면 /tmp/images/result 폴더에 가서 detection이 잘 되었는지 확인한다. (새롭게 run시킨것이기 때문에 위에서 보여드린 여자분 사진으로 detection을 진행되지 않았습니다.)
추가로 webserver에서 다음과 같이 task를 누르고 log를 누르면 task에 대한 log도 확인가능하며 이는 만약 dag run을 하면서 fail될때 유용하게 사용될 것입니다.
이렇게 해서 우리가 원했던 workflow pipeline을 작성하고 실행하는 법을 알아보았습니다. 해당 예제 코드는 airflow_tutorial에서 사용가능합니다.
'AI Engineering > MLOps' 카테고리의 다른 글
[BentoML] ML model serving 방법 (feat. YOLOv8) (0) | 2023.05.22 |
---|---|
Airflow (1) - Airflow 이해 및 설치 (0) | 2022.03.16 |
Docker/Kubernetes - (12) Kubernetes Ingress (0) | 2022.03.15 |
Docker/Kubernetes - (11) Kubernetes 리소스의 관리와 설정 (0) | 2022.03.15 |
Docker/Kubernetes - (10) Kubernetes 이해 및 사용 (0) | 2022.03.15 |