에라모르겠다(‘◇’)?

[Python] APScheduler 본문

Language/Python

[Python] APScheduler

도토리즈 2023. 6. 7. 19:54

■ 스케줄러 선택 

 

-BlockingScheduler: 스케줄러가 프로세스에서 실행 중인 유일한 것일 때 사용
-BackgroundScheduler: 아래의 프레임워크를 사용하지 않고 애플리케이션 내 백그라운드에서 스케줄러를 실행하려는 경우에 사용합니다.
-AsyncIOScheduler: 애플리케이션이 asyncio 모듈을 사용하는 경우 사용
-GeventScheduler: 애플리케이션이 gevent를 사용하는 경우 사용
-TornadoScheduler: Tornado 애플리케이션을 구축하는 경우 사용
-TwistedScheduler: Twisted 애플리케이션을 구축하는 경우 사용
-QtScheduler: Qt 애플리케이션을 구축하는 경우 사용

 

 

ex ) views.py 

 

■ 작업 예약 시 트리거

 

date: 특정 시점에 한 번만 작업을 실행하고 싶을 때 사용
interval: 정해진 시간 간격으로 작업을 실행하고 싶을 때 사용
cron: 하루 중 특정 시간에 주기적으로 작업을 실행하고 싶을 때 사용

 

 

- scheduler.py

from apscheduler.schedulers.background import BackgroundScheduler
from rest_framework.decorators import api_view
from rest_framework.response import Response
from .models import * 
from .serializer import *
import requests
from pathlib import Path
from datetime import datetime, timedelta

scheduler = BackgroundScheduler()

@api_view(['POST'])
def scheduler_start(request) : 
    app_info_list = get_app_info() 
    print("app_info_list = " , app_info_list)

    for app_info in app_info_list : 
        print("app_info  = " , app_info)
        app_name = app_info['app_name']
        url = app_info['url']
        save_dir = app_info['save_dir']

        job = scheduler.get_job(app_name)
        print(job)
        if job is None : 
            scheduler.add_job(
                run_download,
                args=(app_name,url,save_dir),
                trigger= "interval",
                seconds = 1,
                id = app_name
            )
        else : 
            scheduler.resume_job(app_name)
    scheduler.start()

    return Response({"msg" : "scheduler started"})

                                   
def run_download(app_name, url, save_dir) :

    print("run_download")
    app_model = AppInfoModel.objects.filter(app_name = app_name).values()[0]
    start_date = app_model['start_date']
    status = app_model['status']
    print(start_date)
    
    today = datetime.today().date()

    if "-" in start_date:
        start_date_obj = datetime.strptime(start_date, "%Y-%m-%d").date()
    else:
        start_date_obj = datetime.strptime(start_date, "%Y%m%d").date()
	
    
    if status == "waiting" :
        try : 
            AppInfoModel.objects.filter(app_name = app_name).update(status="running")
            args = {
                "start_date" : start_date,
                "save_dir" : save_dir
            }
            response = requests.post(url,data=args)

            if response.status_code ==200 : 
            
                res = response.text
                res_json = json.loads(res)
                save_start_date = res_json['start_date']
                save_end_date = res_json['end_date']
                
                # log rdbms save 
                log = {}
                log['app_name'] = app_name
                log['status'] = "complete"
                log['start_date'] = save_start_date
                log['end_date'] = save_end_date

                log_serializer = AppDownloadLogSerializer(data=log)

                if log_serializer.is_valid():
                    log_serializer.save()
                    print("save")

                AppInfoModel.objects.filter(app_name = app_name).update(status="waiting" 
                                 ,start_date=get_next_day(save_end_date) )
            else :
                print("errored : ", response.status_code ) 
                AppInfoModel.objects.filter(app_name = app_name).update(status="errored")
        except Exception as e  :
            print (e)
            AppInfoModel.objects.filter(app_name = app_name).update(status="errored")
	
    elif status == "pause" : 
    	scheduler.pause_job(app_name)
        print("pause job")
        
        
    else : 
        print(app_name , "status ==" , status)
        return
        

def get_app_info() : 
    app_info_list = AppInfoModel.objects.values()
    return app_info_list


def get_latest_folder(directory):
    directory_path = Path(directory)

    folders = [f for f in directory_path.iterdir() if f.is_dir()]

    if not folders:
        return None

    latest_folder = max(folders, key=lambda f: f.name)

    return latest_folder.name


def get_next_day(folder_name):
    is_formatted = "-" in folder_name

    if is_formatted:
        date = datetime.strptime(folder_name, "%Y-%m-%d")
        next_date = date + timedelta(days=1)
        next_folder_name = next_date.strftime("%Y-%m-%d")
    else:
        date = datetime.strptime(folder_name, "%Y%m%d")
        next_date = date + timedelta(days=1)
        next_folder_name = next_date.strftime("%Y%m%d")

    return next_folder_name

- models.py

from django.db import models

# Create your models here.
class AppInfoModel(models.Model) :
    app_name = models.TextField(null=True, blank=True)
    url = models.TextField(null=True, blank=True)
    save_dir = models.TextField(null=True, blank=True)
    status = models.TextField(null=True, blank=True)
    start_date =  models.TextField(null=True, blank=True)

    
    class Meta:
        db_table = 'app_info'

class AppDownloadLogModel(models.Model) :
    app_name = models.TextField(null=True, blank=True)
    start_date = models.TextField(null=True, blank=True)
    end_date = models.TextField(null=True, blank=True)
    status = models.TextField(null=True, blank=True)
    reg_dtm = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        db_table = 'app_download_log'

 

- serializer.py 

from rest_framework import serializers
from .models import * 


class AppDownloadLogSerializer(serializers.ModelSerializer):
    class Meta:
        model = AppDownloadLogModel
        fields = '__all__'

 

 

 

 

 

 

 

 

 

[APS스케줄러] 공식 GUIDE

Comments