from django.db.models.signals import post_migrate from django.dispatch import receiver from config.settings import SEED_DATA, ROOT_DIR, get_secret from django_celery_beat.models import PeriodicTask, CrontabSchedule from .models import CustomUser import os import json # Function to fill in users table with test data on dev/staging @receiver(post_migrate) def create_users(sender, **kwargs): if sender.name == "accounts": with open(os.path.join(ROOT_DIR, 'seed_data.json'), "r") as f: seed_data = json.loads(f.read()) for user in seed_data['users']: USER = CustomUser.objects.filter( email=user['email']).first() if not USER: if user['password'] == 'USE_REGULAR': password = get_secret('SEED_DATA_PASSWORD') elif user['password'] == 'USE_ADMIN': password = get_secret('SEED_DATA_ADMIN_PASSWORD') else: password = user['password'] if (user['is_superuser'] == True): # Admin users are created regardless of SEED_DATA value USER = CustomUser.objects.create_superuser( username=user['username'], email=user['email'], password=password, ) print('Created Superuser:', user['email']) else: # Only create non-admin users if SEED_DATA=True if SEED_DATA: USER = CustomUser.objects.create_user( username=user['email'], email=user['email'], password=password, ) print('Created User:', user['email']) USER.first_name = user['first_name'] USER.last_name = user['last_name'] USER.is_active = True USER.save() @receiver(post_migrate) def create_celery_beat_schedules(sender, **kwargs): if sender.name == "django_celery_beat": with open(os.path.join(ROOT_DIR, 'seed_data.json'), "r") as f: seed_data = json.loads(f.read()) # Creating Schedules for schedule in seed_data['schedules']: if schedule['type'] == 'crontab': # Check if Schedule already exists SCHEDULE = CrontabSchedule.objects.filter(minute=schedule['minute'], hour=schedule['hour'], day_of_week=schedule['day_of_week'], day_of_month=schedule['day_of_month'], month_of_year=schedule['month_of_year'], timezone=schedule['timezone'] ).first() # If it does not exist, create a new Schedule if not SCHEDULE: SCHEDULE = CrontabSchedule.objects.create( minute=schedule['minute'], hour=schedule['hour'], day_of_week=schedule['day_of_week'], day_of_month=schedule['day_of_month'], month_of_year=schedule['month_of_year'], timezone=schedule['timezone'] ) print( f'Created Crontab Schedule for Hour:{SCHEDULE.hour},Minute:{SCHEDULE.minute}') else: print( f'Crontab Schedule for Hour:{SCHEDULE.hour},Minute:{SCHEDULE.minute} already exists') for task in seed_data['scheduled_tasks']: TASK = PeriodicTask.objects.filter(name=task['name']).first() if not TASK: if task['schedule']['type'] == 'crontab': SCHEDULE = CrontabSchedule.objects.filter(minute=task['schedule']['minute'], hour=task['schedule']['hour'], day_of_week=task['schedule']['day_of_week'], day_of_month=task['schedule']['day_of_month'], month_of_year=task['schedule']['month_of_year'], timezone=task['schedule']['timezone'] ).first() TASK = PeriodicTask.objects.create( crontab=SCHEDULE, name=task['name'], task=task['task'], enabled=task['enabled'] ) print(f'Created Periodic Task: {TASK.name}') else: raise Exception('Schedule for Periodic Task not found') else: print(f'Periodic Task: {TASK.name} already exists')