blob: 350f9da7890b12159dfbd2ea388a752b972ed94e [file] [log] [blame]
import datetime
import json
import os
from django.shortcuts import HttpResponse, redirect, render
from django_celery_beat.models import CrontabSchedule, PeriodicTasks
from . import forms, models
from .celery_tasks import test_rail_api
from .celery_tasks.tasks import (
get_test_passability_in_suite,
process_run,
update_plot_data,
)
from .utils import get_dict_diff, short_names_for_dict
def index(request):
runs = models.TestRailTestRun.objects.all()
return render(request, "control/index.html", {"runs": runs})
def redirect_to_index(request):
return redirect("index")
def single_run(request, run_id):
run = models.TestRailTestRun.objects.get(pk=run_id)
if request.method == "POST":
form = forms.TestRunForm(request.POST, instance=run)
if form.is_valid():
form.save()
return redirect("single_run", run_id)
else:
form = forms.TestRunForm(instance=run)
return render(
request,
"control/update_run.html",
{"form": form, "run_id": run_id, "checked_tests": run.checked_tests},
)
def create_run(request):
if request.method == "POST":
form = forms.TestRunForm(request.POST)
if form.is_valid():
obj = form.save()
return redirect("single_run", obj.id)
else:
form = forms.TestRunForm()
form.fields["created_after"].initial = (
datetime.date.today() + datetime.timedelta(days=-3 * 30)
)
form.fields["created_before"].initial = datetime.date.today()
return render(request, "control/update_run.html", {"form": form})
def list_reports(request):
reports = models.TestRailReport.objects.order_by("-created_at").all()
return render(request, "control/reports.html", {"reports": reports})
def single_report(request, report_id):
report: models.TestRailReport = models.TestRailReport.objects.get(
pk=report_id
)
data = report.path.read().decode("utf-8")
if (
request.method == "POST"
and request.META.get("HTTP_X_REQUESTED_WITH") == "XMLHttpRequest"
):
return HttpResponse(
json.dumps({"data": data, "finished": report.finished}),
content_type="application/json",
)
test_results = [
models.TestResult.objects.filter(result_id=test_id)[0]
for test_id in report.test_results
]
return render(
request,
"control/report.html",
{"report_obj": report, "report": data, "test_results": test_results},
)
def update_test_result(request, report_id, result_id, action_needed):
result = models.TestResult.objects.get(result_id=result_id)
result.action_needed = bool(action_needed)
result.save()
print(result.__dict__)
return redirect("single_report", report_id)
def delete_report(request, report_id):
report: models.TestRailReport = models.TestRailReport.objects.get(
pk=report_id
)
try:
os.remove(report.path.path)
except FileNotFoundError:
pass
report.delete()
return redirect("list_reports")
def submit_run(request, run_id):
run = models.TestRailTestRun.objects.get(pk=run_id)
is_testplan = test_rail_api.is_testplan(run.run_id)
if is_testplan:
testrail_run = test_rail_api.get_plan_by_id(run.run_id)
else:
testrail_run = test_rail_api.get_run_by_id(run.run_id)
if not run.run_name:
if is_testplan:
_name = f"Plan {testrail_run['name']}"
else:
parent_plan_id = testrail_run["plan_id"]
parent_plan_name = test_rail_api.get_plan_by_id(parent_plan_id)[
"name"
]
_name = f"Run {testrail_run['name']} from {parent_plan_name}"
run.run_name = _name
run.save()
report_name = "{}-run_id-{}-date-{}".format(
run.run_name,
run.run_id,
datetime.datetime.isoformat(datetime.datetime.now()),
)
path = os.path.join(models.fs.location, report_name)
with open(path, "w"):
pass
report = models.TestRailReport(report_name=report_name, path=path)
report.save()
process_run.delay(run_id, report.id, path, is_testplan)
return redirect("single_report", report.id)
def delete_run(request, run_id):
run = models.TestRailTestRun.objects.get(pk=run_id)
run.delete()
return redirect("index")
def show_help(request):
return render(request, "control/help.html")
def update_jenkins_plot(request):
try:
models.ActionLog.objects.get(name="update_plot_started")
return HttpResponse("Update in progress", status=403)
except models.ActionLog.DoesNotExist:
pass
update = models.ActionLog(
name="update_plot_started", date=datetime.datetime.now()
)
update.save()
update_plot_data.delay()
return HttpResponse("Started Update", status=200)
def jenkins_plot(request):
try:
update_date = models.ActionLog.objects.get(
name="update_jenkins_plot"
).date
except models.ActionLog.DoesNotExist:
update_date = None
try:
models.ActionLog.objects.get(name="update_plot_started")
update_started = True
except models.ActionLog.DoesNotExist:
update_started = False
job_names_path = os.path.join(models.fs.location, "job_names.txt")
job_names = []
if os.path.exists(job_names_path):
try:
with open(job_names_path, "r") as f:
job_names = json.load(f)
except Exception:
pass
return render(
request,
"control/jenkins_plot.html",
{
"update_date": update_date,
"update_started": update_started,
"job_names": enumerate(job_names, 1),
},
)
def submit_suites(request):
form = forms.DiffPassRatesForm(request.POST)
if not form.is_valid():
print(f"{form.errors=}")
return
report1 = models.SuitePassRate(suite_id=request.POST["first-suite_id"])
report1.save()
report2 = models.SuitePassRate(suite_id=request.POST["second-suite_id"])
report2.save()
diff_model = models.DiffOfSuitesPassRates(
report1=report1,
report2=report2,
limit=form.cleaned_data["limit"],
test_keyword=form.cleaned_data["test_keyword"],
)
diff_model.save()
get_test_passability_in_suite.delay(diff_model.id, report1.id)
get_test_passability_in_suite.delay(diff_model.id, report2.id)
return redirect("report_comparing_suites", diff_model.id)
def compare_suites(request):
if request.method == "POST":
return submit_suites(request)
diff_form = forms.DiffPassRatesForm()
report1_form = forms.SuitePassRateForm(prefix="first")
report2_form = forms.SuitePassRateForm(prefix="second")
return render(
request,
"control/compare_suites.html",
{
"diff_form": diff_form,
"report1_form": report1_form,
"report2_form": report2_form,
"finished": None,
},
)
def list_of_comparing_reports(request):
list_of_reports = models.DiffOfSuitesPassRates.objects.all()
return render(
request,
"control/list_comparing_suites.html",
{"reports": list_of_reports},
)
def report_comparing_suites(request, report_id):
report = models.DiffOfSuitesPassRates.objects.get(pk=report_id)
passrate1 = short_names_for_dict(
json.loads(report.report1.passrate_by_tests)
)
passrate2 = short_names_for_dict(
json.loads(report.report2.passrate_by_tests)
)
diff_table = get_dict_diff(
dict1=passrate1, dict2=passrate2, compare_by_key="rate"
)
diff_form = forms.DiffPassRatesForm(instance=report)
report1_form = forms.SuitePassRateForm(
instance=report.report1, prefix="first"
)
report2_form = forms.SuitePassRateForm(
instance=report.report2, prefix="second"
)
return render(
request,
"control/compare_suites.html",
{
"diff_form": diff_form,
"report1_form": report1_form,
"report2_form": report2_form,
"report1": report.report1,
"report2": report.report2,
"is_finished": report.report1.finished and report.report2.finished,
"diff_table": diff_table,
},
)
def schedulers(request):
return render(
request,
"control/schedulers.html",
{"schedulers": models.CronPeriodicTask.objects.all()},
)
def scheduler(request, pk=None):
if request.method == "POST":
return save_scheduler(request)
if pk:
task_pk = models.CronPeriodicTask.objects.get(pk=pk)
form = forms.PeriodicTaskForm(instance=task_pk)
else:
form = forms.PeriodicTaskForm()
return render(
request,
"control/scheduler.html",
{"form": form, "pk": pk, "TASKS": models.TASKS},
)
def save_scheduler(request, pk=None):
minute, hour, day_of_month, month_of_year, day_of_week = request.POST.get(
"cron", "* * * * *"
).split(" ")
if pk is None:
sch = CrontabSchedule.objects.create(
minute=minute,
hour=hour,
day_of_month=day_of_month,
month_of_year=month_of_year,
day_of_week=day_of_week,
)
task = models.CronPeriodicTask.objects.create(
crontab=sch,
cron=request.POST.get("cron"),
name=request.POST.get("name"),
task_name=request.POST.get("task_name"),
enabled=request.POST.get("enabled") == "on",
testplan_id_arg=request.POST.get("testplan_id_arg"),
)
else:
task = models.CronPeriodicTask.objects.get(pk=pk)
task.task = request.POST.get("task_name")
task.args = json.dumps((request.POST.get("testplan_id_arg"),))
form = forms.PeriodicTaskForm(request.POST, instance=task)
CrontabSchedule.objects.filter(id=task.crontab.id).update(
minute=minute,
hour=hour,
day_of_month=day_of_month,
month_of_year=month_of_year,
day_of_week=day_of_week,
)
if not form.is_valid():
print(f"{form.errors=}")
return
form.save()
PeriodicTasks.update_changed()
return render(
request,
"control/scheduler.html",
{"form": form, "pk": task.id, "TASKS": models.TASKS},
)
def delete_scheduler(request, pk):
task = models.CronPeriodicTask.objects.get(pk=pk)
task.delete()
return redirect("schedulers")