DRF_Template/backend/accounts/signals.py

103 lines
5.4 KiB
Python

from django.db.models.signals import post_migrate
from django.dispatch import receiver
from config.settings import SEED_DATA, ROOT_DIR, get_secret
from django_celery_beat.models import PeriodicTask, CrontabSchedule
from .models import CustomUser
import os
import json
# Function to fill in users table with test data on dev/staging
@receiver(post_migrate)
def create_users(sender, **kwargs):
if sender.name == "accounts":
with open(os.path.join(ROOT_DIR, 'seed_data.json'), "r") as f:
seed_data = json.loads(f.read())
for user in seed_data['users']:
USER = CustomUser.objects.filter(
email=user['email']).first()
if not USER:
if user['password'] == 'USE_REGULAR':
password = get_secret('SEED_DATA_PASSWORD')
elif user['password'] == 'USE_ADMIN':
password = get_secret('SEED_DATA_ADMIN_PASSWORD')
else:
password = user['password']
if (user['is_superuser'] == True):
# Admin users are created regardless of SEED_DATA value
USER = CustomUser.objects.create_superuser(
username=user['username'],
email=user['email'],
password=password,
)
print('Created Superuser:', user['email'])
else:
# Only create non-admin users if SEED_DATA=True
if SEED_DATA:
USER = CustomUser.objects.create_user(
username=user['email'],
email=user['email'],
password=password,
)
print('Created User:', user['email'])
USER.first_name = user['first_name']
USER.last_name = user['last_name']
USER.is_active = True
USER.save()
@receiver(post_migrate)
def create_celery_beat_schedules(sender, **kwargs):
if sender.name == "django_celery_beat":
with open(os.path.join(ROOT_DIR, 'seed_data.json'), "r") as f:
seed_data = json.loads(f.read())
# Creating Schedules
for schedule in seed_data['schedules']:
if schedule['type'] == 'crontab':
# Check if Schedule already exists
SCHEDULE = CrontabSchedule.objects.filter(minute=schedule['minute'],
hour=schedule['hour'],
day_of_week=schedule['day_of_week'],
day_of_month=schedule['day_of_month'],
month_of_year=schedule['month_of_year'],
timezone=schedule['timezone']
).first()
# If it does not exist, create a new Schedule
if not SCHEDULE:
SCHEDULE = CrontabSchedule.objects.create(
minute=schedule['minute'],
hour=schedule['hour'],
day_of_week=schedule['day_of_week'],
day_of_month=schedule['day_of_month'],
month_of_year=schedule['month_of_year'],
timezone=schedule['timezone']
)
print(
f'Created Crontab Schedule for Hour:{SCHEDULE.hour},Minute:{SCHEDULE.minute}')
else:
print(
f'Crontab Schedule for Hour:{SCHEDULE.hour},Minute:{SCHEDULE.minute} already exists')
for task in seed_data['scheduled_tasks']:
TASK = PeriodicTask.objects.filter(name=task['name']).first()
if not TASK:
if task['schedule']['type'] == 'crontab':
SCHEDULE = CrontabSchedule.objects.filter(minute=task['schedule']['minute'],
hour=task['schedule']['hour'],
day_of_week=task['schedule']['day_of_week'],
day_of_month=task['schedule']['day_of_month'],
month_of_year=task['schedule']['month_of_year'],
timezone=task['schedule']['timezone']
).first()
TASK = PeriodicTask.objects.create(
crontab=SCHEDULE,
name=task['name'],
task=task['task'],
enabled=task['enabled']
)
print(f'Created Periodic Task: {TASK.name}')
else:
raise Exception('Schedule for Periodic Task not found')
else:
print(f'Periodic Task: {TASK.name} already exists')