Dagster
ตัวนี้เป็น data pipeline ตัวนึงนะครับ ถ้าตัวที่มีชื่อสุดก็คงจะยังเป็น Airflow แต่ว่ามันก็มีข้อดีข้อเสียของมันอยู่ ทั้งความใหญ่ของระบบ เวลาที่ต้องใช้ในการเรียนรู้เพื่อใช้งาน และบางอย่างก็ยุ่งยาก เช่นการอัพเดต Dag แต่ก็อาจจะมีการปรับปรุงให้ดีขึ้นได้ ทีนี้กเลยหาตัวที่มีนาดเล็กเพื่อใช้ให้เหมะกับงานของเราเอง ซึ่งที่คัดๆ มาตอนนี้ก็คิดว่าน่าจะเหลือแค่ Dagster กับ Prefect ที่น่าสนใจ
- pip install dagster ก่อนอื่นก็ต้องลองติดตั้งเพื่อทำการเทสกันดูนะครับ ตัวเทสอย่าลืมทำใน env แยกกันนะครับ (สามารถดู Doc ของเค้าได้นะครับ)
- pip install dagit ตัวนี้เอาไว้รันไฟล์ที่เราเขียนนะครับ
- dagster project scaffold --name dagster_lib สร้างโปรเจคของเราในชื่อ dagster_lib
- cd dagster_lib
- pip install -e ".[dev]" เริ่มติดตั้ง
- dagit เริ่มสั่งให้เซิฟเวอร์ทำงานนะครับ ซึ่ง default คือ http://127.0.0.1:3000 นะครับ แล้วกดปิดด้วย Ctrl+c นะครับ
- ที่นี้มาลองสร้างไฟล์ pipeline_01.py กันครับ โดยโค้ดข้างในผมก็อป Doc เค้ามา
import os from dagster import job, op, get_dagster_logger @op def get_file_sizes(): files = [f for f in os.listdir(".") if os.path.isfile(f)] for f in files: get_dagster_logger().info(f"Size of {f} is {os.path.getsize(f)}") @job def file_sizes_job(): get_file_sizes()
- dagit -f pipeline_01.py มันจะทำงานแล้วเปิด server dagit ค้างไว้นะครับ ดังนั้นเราสามารถเข้าไปดูได้ผ่าน server ในเครื่องเราเองที่ http://127.0.0.1:3000 นะครับ
- dagster job execute -f pipeline_01.py อันนี้แบบผ่าน Dagster CLI รันเสร็จจบโค้ดไม่ต้องเพิ่มอะไร
- python3 pipeline_01.py อันนี้แบบผ่าน python api นะครับต้องแก้โค้ดเพิ่มส่วนล่างขึ้นให้เหมือนตอนเขียนปกติ
if __name__ == "__main__": result = file_sizes_job.execute_in_process()
ความคิดเห็น
แสดงความคิดเห็น