blob: 8d5f69215fa59133488fbfbf2d35f837526c7fb0 [file] [log] [blame]
import datetime
import json
import os
from .celery_tasks import test_rail_api
from django.shortcuts import render, redirect, HttpResponse
from django_celery_beat.models import CrontabSchedule, PeriodicTasks
from . import models
from . import forms
from .celery_tasks.tasks import process_run, update_plot_data, \
get_test_passability_in_suite
from .utils import short_names_for_dict, get_dict_diff
def index(request):
runs = models.TestRailTestRun.objects.all()
return render(request, "control/index.html", {"runs": runs})
def redirect_to_index(request):
return redirect("index")
def single_run(request, run_id):
run = models.TestRailTestRun.objects.get(pk=run_id)
if request.method == "POST":
form = forms.TestRunForm(request.POST, instance=run)
if form.is_valid():
form.save()
return redirect("single_run", run_id)
else:
form = forms.TestRunForm(instance=run)
return render(request, "control/update_run.html",
{"form": form, "run_id": run_id, "checked_tests":
run.checked_tests})
def create_run(request):
if request.method == "POST":
form = forms.TestRunForm(request.POST)
if form.is_valid():
obj = form.save()
return redirect("single_run", obj.id)
else:
form = forms.TestRunForm()
form.fields["created_after"].initial = datetime.date.today() + \
datetime.timedelta(days=-3 * 30)
form.fields["created_before"].initial = datetime.date.today()
return render(request, "control/update_run.html", {"form": form})
def list_reports(request):
reports = models.TestRailReport.objects.order_by("-created_at").all()
return render(request, "control/reports.html", {"reports": reports})
def single_report(request, report_id):
report = models.TestRailReport.objects.get(pk=report_id)
data = report.path.read().decode("utf-8")
if request.method == "POST" \
and request.META.get('HTTP_X_REQUESTED_WITH') == 'XMLHttpRequest':
return HttpResponse(
json.dumps({"data": data, "finished": report.finished}),
content_type="application/json")
return render(request, "control/report.html",
{"report_id": report.id,
"report": data,
"finished": report.finished})
def delete_report(request, report_id):
report:models.TestRailReport = models.TestRailReport.objects.get(pk=report_id)
try:
os.remove(report.path.path)
except FileNotFoundError:
pass
report.delete()
return redirect("list_reports")
def submit_run(request, run_id):
run = models.TestRailTestRun.objects.get(pk=run_id)
is_testplan = test_rail_api.is_testplan(run.run_id)
if is_testplan:
testrail_run = test_rail_api.get_plan_by_id(run.run_id)
else:
testrail_run = test_rail_api.get_run_by_id(run.run_id)
if not run.run_name:
if is_testplan:
_name = f"Plan {testrail_run['name']}"
else:
parent_plan_id = testrail_run["plan_id"]
parent_plan_name = \
test_rail_api.get_plan_by_id(parent_plan_id)["name"]
_name = f"Run {testrail_run['name']} from {parent_plan_name}"
run.run_name = _name
run.save()
report_name = "{}-run_id-{}-date-{}".format(
run.run_name,
run.run_id,
datetime.datetime.isoformat(datetime.datetime.now()))
path = os.path.join(models.fs.location, report_name)
with open(path, "w"):
pass
report = models.TestRailReport(
report_name=report_name,
path=path)
report.save()
process_run.delay(run_id, report.id, path, is_testplan)
return redirect("single_report", report.id)
def delete_run(request, run_id):
run = models.TestRailTestRun.objects.get(pk=run_id)
run.delete()
return redirect("index")
def show_help(request):
return render(request, "control/help.html")
def update_jenkins_plot(request):
try:
models.ActionLog.objects.get(name="update_plot_started")
return HttpResponse("Update in progress", status=403)
except models.ActionLog.DoesNotExist:
pass
update = models.ActionLog(
name="update_plot_started", date=datetime.datetime.now())
update.save()
update_plot_data.delay()
return HttpResponse("Started Update", status=200)
def jenkins_plot(request):
try:
update_date = models.ActionLog.objects.get(
name="update_jenkins_plot").date
except models.ActionLog.DoesNotExist:
update_date = None
try:
models.ActionLog.objects.get(name="update_plot_started")
update_started = True
except models.ActionLog.DoesNotExist:
update_started = False
job_names_path = os.path.join(models.fs.location, "job_names.txt")
job_names = []
if os.path.exists(job_names_path):
try:
with open(job_names_path, "r") as f:
job_names = json.load(f)
except:
pass
return render(
request, "control/jenkins_plot.html",
{"update_date": update_date, "update_started": update_started,
"job_names": enumerate(job_names, 1)})
def submit_suites(request):
form = forms.DiffPassRatesForm(request.POST)
if not form.is_valid():
print(f"{form.errors=}")
return
report1 = models.SuitePassRate(suite_id=request.POST["first-suite_id"])
report1.save()
report2 = models.SuitePassRate(suite_id=request.POST["second-suite_id"])
report2.save()
diff_model = models.DiffOfSuitesPassRates(
report1=report1,
report2=report2,
limit=form.cleaned_data["limit"],
test_keyword=form.cleaned_data["test_keyword"]
)
diff_model.save()
get_test_passability_in_suite.delay(diff_model.id, report1.id)
get_test_passability_in_suite.delay(diff_model.id, report2.id)
return redirect("report_comparing_suites", diff_model.id)
def compare_suites(request):
if request.method == "POST":
return submit_suites(request)
diff_form = forms.DiffPassRatesForm()
report1_form = forms.SuitePassRateForm(prefix='first')
report2_form = forms.SuitePassRateForm(prefix='second')
return render(
request, "control/compare_suites.html",
{
"diff_form": diff_form,
"report1_form": report1_form,
"report2_form": report2_form,
"finished": None
})
def list_of_comparing_reports(request):
list_of_reports = models.DiffOfSuitesPassRates.objects.all()
return render(
request, "control/list_comparing_suites.html",
{
"reports": list_of_reports
})
def report_comparing_suites(request, report_id):
report = models.DiffOfSuitesPassRates.objects.get(pk=report_id)
passrate1 = short_names_for_dict(json.loads(
report.report1.passrate_by_tests))
passrate2 = short_names_for_dict(json.loads(
report.report2.passrate_by_tests))
diff_table = get_dict_diff(dict1=passrate1,
dict2=passrate2,
compare_by_key="rate")
diff_form = forms.DiffPassRatesForm(instance=report)
report1_form = forms.SuitePassRateForm(instance=report.report1,
prefix="first")
report2_form = forms.SuitePassRateForm(instance=report.report2,
prefix="second")
return render(
request, "control/compare_suites.html",
{"diff_form": diff_form,
"report1_form": report1_form,
"report2_form": report2_form,
"report1": report.report1,
"report2": report.report2,
"is_finished": report.report1.finished and report.report2.finished,
"diff_table": diff_table})
def schedulers(request):
return render(
request, "control/schedulers.html",
{"schedulers": models.CronPeriodicTask.objects.all()})
def scheduler(request, pk=None):
if request.method == "POST":
return save_scheduler(request)
if pk:
task_pk = models.CronPeriodicTask.objects.get(pk=pk)
form = forms.PeriodicTaskForm(instance=task_pk)
else:
form = forms.PeriodicTaskForm()
return render(
request, "control/scheduler.html",
{
"form": form,
"pk": pk,
"TASK_CHOICES": models.TASK_CHOICES
}
)
def save_scheduler(request, pk=None):
minute, hour, day_of_month, month_of_year, day_of_week = \
request.POST.get("cron", "* * * * *").split(" ")
if pk is None:
sch = CrontabSchedule.objects.create(
minute=minute,
hour=hour,
day_of_month=day_of_month,
month_of_year=month_of_year,
day_of_week=day_of_week
)
task = models.CronPeriodicTask.objects.create(
crontab=sch,
cron=request.POST.get("cron"),
name=request.POST.get("name"),
task="control.celery_tasks.tasks.check_today_testplan",
enabled=request.POST.get("enabled") == 'on',
# TODO(harhipova): uncomment when implemented tasks with arguments
# args=args,
# kwargs=kwargs,
)
else:
task = models.CronPeriodicTask.objects.get(pk=pk)
form = forms.PeriodicTaskForm(request.POST, instance=task)
CrontabSchedule.objects.filter(id=task.crontab.id).update(
minute=minute,
hour=hour,
day_of_month=day_of_month,
month_of_year=month_of_year,
day_of_week=day_of_week
)
form.save()
PeriodicTasks.update_changed()
return render(
request, "control/scheduler.html",
{
"form": form,
"pk": task.id,
"TASK_CHOICES": models.TASK_CHOICES
}
)
def delete_scheduler(request, pk):
task = models.CronPeriodicTask.objects.get(pk=pk)
task.delete()
return redirect("schedulers")