Dagster

            ตัวนี้เป็น data pipeline ตัวนึงนะครับ ถ้าตัวที่มีชื่อสุดก็คงจะยังเป็น Airflow แต่ว่ามันก็มีข้อดีข้อเสียของมันอยู่ ทั้งความใหญ่ของระบบ เวลาที่ต้องใช้ในการเรียนรู้เพื่อใช้งาน และบางอย่างก็ยุ่งยาก เช่นการอัพเดต Dag แต่ก็อาจจะมีการปรับปรุงให้ดีขึ้นได้ ทีนี้กเลยหาตัวที่มีนาดเล็กเพื่อใช้ให้เหมะกับงานของเราเอง ซึ่งที่คัดๆ มาตอนนี้ก็คิดว่าน่าจะเหลือแค่ Dagster กับ Prefect ที่น่าสนใจ

  1. pip install dagster ก่อนอื่นก็ต้องลองติดตั้งเพื่อทำการเทสกันดูนะครับ ตัวเทสอย่าลืมทำใน env แยกกันนะครับ (สามารถดู Doc ของเค้าได้นะครับ)
  2. pip install dagit ตัวนี้เอาไว้รันไฟล์ที่เราเขียนนะครับ
  3. dagster project scaffold --name dagster_lib สร้างโปรเจคของเราในชื่อ dagster_lib
  4. cd dagster_lib 
  5. pip install -e ".[dev]" เริ่มติดตั้ง
  6. dagit เริ่มสั่งให้เซิฟเวอร์ทำงานนะครับ ซึ่ง default คือ http://127.0.0.1:3000 นะครับ แล้วกดปิดด้วย Ctrl+c นะครับ 
  7. ที่นี้มาลองสร้างไฟล์ pipeline_01.py กันครับ โดยโค้ดข้างในผมก็อป Doc เค้ามา
    import os
    from dagster import job, op, get_dagster_logger
    
    @op
    def get_file_sizes():
         files = [f for f in os.listdir(".") if os.path.isfile(f)]
         for f in files:
             get_dagster_logger().info(f"Size of {f} is {os.path.getsize(f)}")
    
    @job
    def file_sizes_job():
         get_file_sizes()
    
  8. dagit -f pipeline_01.py มันจะทำงานแล้วเปิด server dagit ค้างไว้นะครับ ดังนั้นเราสามารถเข้าไปดูได้ผ่าน server ในเครื่องเราเองที่ http://127.0.0.1:3000 นะครับ 
  9. dagster job execute -f pipeline_01.py อันนี้แบบผ่าน Dagster CLI รันเสร็จจบโค้ดไม่ต้องเพิ่มอะไร
  10. python3 pipeline_01.py อันนี้แบบผ่าน python api นะครับต้องแก้โค้ดเพิ่มส่วนล่างขึ้นให้เหมือนตอนเขียนปกติ 

    if __name__ == "__main__":
         result = file_sizes_job.execute_in_process()
    


ความคิดเห็น

โพสต์ยอดนิยมจากบล็อกนี้

โปรโมชั่นเน็ต TOT

โน๊ตบุ๊ค acer switch sa5-271 แบตบวม T^T

Blog นี้สร้างเพื่อ?