mirror of
https://github.com/lemeow125/DRF_Template.git
synced 2024-11-17 04:09:25 +08:00
110 lines
5.1 KiB
Python
110 lines
5.1 KiB
Python
import json
|
|
import os
|
|
|
|
from config.settings import ROOT_DIR, SEED_DATA, TIME_ZONE, get_secret
|
|
from django.db.models.signals import post_migrate
|
|
from django.dispatch import receiver
|
|
from django_celery_beat.models import CrontabSchedule, PeriodicTask
|
|
|
|
from .models import CustomUser
|
|
|
|
# Function to fill in users table with test data on dev/staging
|
|
|
|
|
|
@receiver(post_migrate)
|
|
def create_users(sender, **kwargs):
|
|
if sender.name == "accounts":
|
|
with open(os.path.join(ROOT_DIR, "seed_data.json"), "r") as f:
|
|
seed_data = json.loads(f.read())
|
|
for user in seed_data["users"]:
|
|
USER = CustomUser.objects.filter(email=user["email"]).first()
|
|
if not USER:
|
|
if user["password"] == "USE_REGULAR":
|
|
password = get_secret("SEED_DATA_PASSWORD")
|
|
elif user["password"] == "USE_ADMIN":
|
|
password = get_secret("SEED_DATA_ADMIN_PASSWORD")
|
|
else:
|
|
password = user["password"]
|
|
if user["is_superuser"] == True:
|
|
# Admin users are created regardless of SEED_DATA value
|
|
USER = CustomUser.objects.create_superuser(
|
|
username=user["username"],
|
|
email=user["email"],
|
|
password=password,
|
|
)
|
|
print("Created Superuser:", user["email"])
|
|
else:
|
|
# Only create non-admin users if SEED_DATA=True
|
|
if SEED_DATA:
|
|
USER = CustomUser.objects.create_user(
|
|
username=user["email"],
|
|
email=user["email"],
|
|
password=password,
|
|
)
|
|
print("Created User:", user["email"])
|
|
|
|
USER.first_name = user["first_name"]
|
|
USER.last_name = user["last_name"]
|
|
USER.is_active = True
|
|
USER.save()
|
|
|
|
|
|
@receiver(post_migrate)
|
|
def create_celery_beat_schedules(sender, **kwargs):
|
|
if sender.name == "django_celery_beat":
|
|
with open(os.path.join(ROOT_DIR, "seed_data.json"), "r") as f:
|
|
seed_data = json.loads(f.read())
|
|
# Creating Schedules
|
|
for schedule in seed_data["schedules"]:
|
|
if schedule["type"] == "crontab":
|
|
# Check if Schedule already exists
|
|
SCHEDULE = CrontabSchedule.objects.filter(
|
|
minute=schedule["minute"],
|
|
hour=schedule["hour"],
|
|
day_of_week=schedule["day_of_week"],
|
|
day_of_month=schedule["day_of_month"],
|
|
month_of_year=schedule["month_of_year"],
|
|
timezone=TIME_ZONE,
|
|
).first()
|
|
# If it does not exist, create a new Schedule
|
|
if not SCHEDULE:
|
|
SCHEDULE = CrontabSchedule.objects.create(
|
|
minute=schedule["minute"],
|
|
hour=schedule["hour"],
|
|
day_of_week=schedule["day_of_week"],
|
|
day_of_month=schedule["day_of_month"],
|
|
month_of_year=schedule["month_of_year"],
|
|
timezone=TIME_ZONE,
|
|
)
|
|
print(
|
|
f"Created Crontab Schedule for Hour:{
|
|
SCHEDULE.hour},Minute:{SCHEDULE.minute}"
|
|
)
|
|
else:
|
|
print(
|
|
f"Crontab Schedule for Hour:{SCHEDULE.hour},Minute:{
|
|
SCHEDULE.minute} already exists"
|
|
)
|
|
for task in seed_data["scheduled_tasks"]:
|
|
TASK = PeriodicTask.objects.filter(name=task["name"]).first()
|
|
if not TASK:
|
|
if task["schedule"]["type"] == "crontab":
|
|
SCHEDULE = CrontabSchedule.objects.filter(
|
|
minute=task["schedule"]["minute"],
|
|
hour=task["schedule"]["hour"],
|
|
day_of_week=task["schedule"]["day_of_week"],
|
|
day_of_month=task["schedule"]["day_of_month"],
|
|
month_of_year=task["schedule"]["month_of_year"],
|
|
timezone=TIME_ZONE,
|
|
).first()
|
|
TASK = PeriodicTask.objects.create(
|
|
crontab=SCHEDULE,
|
|
name=task["name"],
|
|
task=task["task"],
|
|
enabled=task["enabled"],
|
|
)
|
|
print(f"Created Periodic Task: {TASK.name}")
|
|
else:
|
|
raise Exception("Schedule for Periodic Task not found")
|
|
else:
|
|
print(f"Periodic Task: {TASK.name} already exists")
|