blob: fe1322bb0c7a5c8c9e5b49b36e19df43f4935f16 [file] [log] [blame]
#!/usr/bin/env python
import datetime
import sys
import logging
from collections import defaultdict, OrderedDict
import jira
import ipdb
import argparse
from testrail import TestRail
from testrail.test import Test
from functools import lru_cache
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
LOG = logging.getLogger(__name__)
def run_cli():
cli = argparse.ArgumentParser(
prog="Report generator",
description="Command line tool for generate summary report",
)
commands = cli.add_subparsers(title="Operation commands", dest="command")
cli_process = commands.add_parser(
"create-report",
help="Create summary report",
description="Create summary report",
)
cli_process_link = commands.add_parser(
"mark-fails",
help="Extract linked bugs from previous reports",
description="Extract linked bugs from previous reports"
" and mark current",
)
cli_process.add_argument(
"-T",
"--testrail-host",
dest="testrail_host",
required=True,
help="TestRail hostname",
)
cli_process.add_argument(
"-U",
"--testrail-user",
dest="testrail_user",
required=True,
help="TestRail user email",
)
cli_process.add_argument(
"-K",
"--testrail-user-key",
dest="testrail_user_key",
required=True,
help="TestRail user key",
)
cli_process.add_argument(
"-R",
"--testrail-plan",
dest="testrail_plan",
required=True,
help="TestRail test plan for analize",
)
cli_process.add_argument(
"-P",
"--testrail-project",
dest="testrail_project",
required=True,
help="TestRail project name",
)
cli_process.add_argument(
"--testrail-only-run",
dest="testrail_only_run",
help="Analize only one run in selected plan",
)
cli_process.add_argument(
"--out-type",
dest="out_type",
choices=["text", "html", "md", "none"],
default="none",
help="Select output format for report table. "
"By default print nothing (none).",
)
cli_process.add_argument(
"--sort-by",
dest="sort_by",
default="fails",
choices=["fails", "blocks", "project", "priority", "status"],
help="Select sorting column. By deafult table sort by fails",
)
cli_process.add_argument(
"--push-to-testrail",
dest="push_report_flag",
action="store_true",
default=False,
help="Save report in plan description",
)
cli_process.add_argument(
"-j", "--jira-host", dest="jira_host", required=True,
help="JIRA hostname"
)
cli_process.add_argument(
"-u", "--jira-user", dest="jira_user_id", required=True,
help="JIRA username"
)
cli_process.add_argument(
"-p",
"--jira-password",
dest="jira_user_password",
required=True,
help="JIRA user password",
)
# link fail bugs parameters
cli_process_link.add_argument(
"-T",
"--testrail-host",
dest="testrail_host",
required=True,
help="TestRail hostname",
)
cli_process_link.add_argument(
"-U",
"--testrail-user",
dest="testrail_user",
required=True,
help="TestRail user email",
)
cli_process_link.add_argument(
"-K",
"--testrail-user-key",
dest="testrail_user_key",
required=True,
help="TestRail user key",
)
cli_process_link.add_argument(
"-R",
"--testrail-plan",
dest="testrail_plan",
required=True,
help="TestRail test plan for analize",
)
cli_process_link.add_argument(
"-M",
"--testrail-marked-plan",
dest="testrail_marked_plan",
required=False,
help="TestRail test plan for parse",
)
cli_process_link.add_argument(
"-P",
"--testrail-project",
dest="testrail_project",
required=True,
help="TestRail project name",
)
cli_process_link.add_argument(
"--testrail-only-run",
dest="testrail_only_run",
help="Name to update only specified run in selected plan",
)
cli_process_link.add_argument(
"--push-to-testrail",
dest="update_report_flag",
action="store_true",
default=False,
help="Save report in plan description",
)
if len(sys.argv) == 1:
cli.print_help()
sys.exit(1)
return cli.parse_args()
def get_runs(t_client, plan_name, run_name):
LOG.info("Get runs from plan - {}".format(plan_name))
ret = []
plan = t_client.plan(plan_name)
if plan:
for e in plan.entries:
for r in e.runs:
LOG.info("Run {} #{}".format(r.name, r.id))
if run_name is not None and r.name != run_name:
continue
ret.append(r)
else:
LOG.warning("Plan {} is empty".format(plan_name))
return ret
def get_all_results(t_client, list_of_runs):
ret = []
for run in list_of_runs:
ret.extend(get_results(t_client, run))
return ret
def get_all_failed_results(t_client, list_of_runs, result_type):
"""
returned result format:
[[run(id,name), result(id,status,defects...), test(id,name..)],
[run(id,name), result(id,status,defects...), test(id,name..)],
...]
"""
ret = []
for run in list_of_runs:
ret.extend(get_failed_results(t_client, run, result_type))
return ret
@lru_cache()
def fetch_test(api, test_id, run_id):
return Test(api.test_with_id(test_id, run_id))
def get_results(t_client, run):
LOG.info("Get results for run - {}".format(run.name))
results = t_client.results(run)
ret = [
(run.id, r)
for r in results
if r.raw_data()["status_id"] is not None
and r.raw_data()["defects"] is not None
]
for r in ret:
run_id, result = r
test = fetch_test(result.api, result.raw_data()["test_id"], run_id)
LOG.info(
"Test {} - {} - {}".format(
test.title, result.status.name, ",".join(result.defects)
)
)
return ret
def get_failed_results(t_client, run, result_type):
"""
returned result format:
[run(id,name),
result(id,status,defects...),
test(id,name..)]
"""
LOG.info("Get results for run - {}".format(run.name))
results = t_client.results(run)
results_with_test = []
if result_type == "5":
ret = [
(run, r)
for r in results
if r.raw_data()["status_id"] is int(result_type)
and r.raw_data()["defects"] is None
]
else:
ret = [
(run, r)
for r in results
if r.raw_data()["status_id"] is not None
and r.raw_data()["defects"] is not None
]
for r in ret:
run, result = r
test = fetch_test(result.api, result.raw_data()["test_id"], run.id)
LOG.info(
"Test {} - {} - {} - {}".format(
test.title,
result.status.name,
result.raw_data()["status_id"],
",".join(result.defects),
)
)
results_with_test.append([run, result, test])
return results_with_test
def mark_failed_results(t_cl, marked_res, failed_res, t_h):
"""
Extract list tests with defect and compare it with tests to be marked,
and add defects and result from marked tests
Returned result format:
[[target_tests_to_update_with_defect, target_run_id],
[target_tests_to_update_with_defect, target_run_id],
...]
"""
LOG.info("Extract marked tests and attach to failed")
def generate_result(t_c, tst, m_r, m_t):
link_comment = "{url}/index.php?/tests/view/{uid}".format(
url=t_h,
uid=m_t.id)
tmp_result = t_c.result()
tmp_result.test = tst
tmp_result.status = m_r.status
tmp_result.comment = "Result taked from: " + link_comment
tmp_result.defects = [str(m_r.defects[0])]
return tmp_result
# def check_if_marked():
# if ret.count()
ret = []
for run, result, test in failed_res:
for m_run, m_result, m_test in marked_res:
if run.name == m_run.name and test.title == m_test.title:
LOG.info(
" MARKED FOUND: Run:{} test: .. {}-{}".format(
run.id, test.title[-72:], m_result.defects[0]
)
)
ret.append([generate_result(t_cl,
test,
m_result,
m_test),
run.id])
return ret
@lru_cache()
def get_defect_info(j_client, defect):
LOG.info("Get info about issue {}".format(defect))
try:
issue = j_client.issue(defect)
except jira.exceptions.JIRAError as e:
if e.status_code == 404:
LOG.error("Defect {} wasn't found in Jira".format(defect))
return {
"id": defect,
"title": "Title for #{} not found".format(defect),
"project": "Not found",
"priority": "Not found",
"status": "Not found",
"url": "Not found",
}
else:
raise
return {
"id": issue.key,
"title": issue.fields.summary,
"project": issue.fields.project.key,
"priority": issue.fields.priority.name,
"status": issue.fields.status.name,
"url": issue.permalink(),
}
def get_defects_table(jira_client, list_of_results, sort_by):
LOG.info("Collect report table")
table = defaultdict(dict)
for run_id, result in list_of_results:
for defect in result.defects:
if defect not in table:
info = get_defect_info(jira_client, defect)
table[defect].update(info)
table[defect]["results"] = set([(run_id, result)])
if result.status.name.lower() == "blocked":
table[defect]["blocks"] = 1
table[defect]["fails"] = 0
else:
table[defect]["fails"] = 1
table[defect]["blocks"] = 0
else:
table[defect]["results"].add((run_id, result))
if result.status.name.lower() == "blocked":
table[defect]["blocks"] += 1
else:
table[defect]["fails"] += 1
return OrderedDict(sorted(table.items(),
key=lambda i: i[1][sort_by],
reverse=True))
def get_text_table(table):
LOG.info("Generation text table")
lines = []
line = (
"{fails:^5} | {blocks:^5} | {project:^10} | {priority:^15} | "
"{status:^15} | {bug:^100} | {tests} "
)
def title_uid(r):
run_id, result = r
test = fetch_test(result.api, result.raw_data()["test_id"], run_id)
return {"title": test.title, "uid": test.id}
def list_of_defect_tests(results):
ret = ["[{title} #{uid}]".format(**title_uid(r)) for r in results]
return " ".join(ret)
lines.append(
line.format(
fails="FAILS",
blocks="BLOCKS",
project="PROJECT",
priority="PRIORITY",
status="STATUS",
bug="BUG",
tests="TESTS",
)
)
for k in table:
one = table[k]
data = {
"fails": one["fails"],
"blocks": one["blocks"],
"project": one["project"],
"priority": one["priority"],
"status": one["status"],
"bug": "{uid} {title}".format(uid=one["id"], title=one["title"]),
"tests": list_of_defect_tests(one["results"]),
}
lines.append(line.format(**data))
return "\n".join(lines)
def get_md_table(table):
LOG.info("Generation MD table")
lines = []
line = (
"||{fails} | {blocks} | {priority} | "
"{status} | <div style='width:200px'>{bug}</div> | {tests} |"
)
def title_uid_link(r):
run_id, result = r
test = fetch_test(result.api, result.raw_data()["test_id"], run_id)
return {
"title": test.title.replace("[", "{").replace("]", "}"),
"uid": test.id,
"link": "{url}/index.php?/tests/view/{uid}".format(
url=test.api._conf()["url"], uid=test.id
),
}
def list_of_defect_tests(results):
ret = [
"<[{title} #{uid}]({link})>".format(**title_uid_link(r))
for r in results
]
return " ".join(ret)
lines.append(
line.format(
fails="|:FAILS",
blocks=":BLOCKS",
project=":PROJECT",
priority=":PRIORITY",
status=":STATUS",
bug=":BUG",
tests=":TESTS",
)
)
for k in table:
one = table[k]
data = {
"fails": one["fails"],
"blocks": one["blocks"],
"project": one["project"],
"priority": one["priority"],
"status": one["status"],
"bug": "[{uid} {title}]({url})".format(
uid=one["id"],
title=one["title"].replace("[", "{").replace("]", "}"),
url=one["url"],
),
"tests": list_of_defect_tests(one["results"]),
}
lines.append(line.format(**data))
return "\n".join(lines)
def get_html_table(table):
LOG.info("Generation HTML table")
html = "<table>{lines}</table>"
lines = []
line = (
"<tr><th>{fails:^5}</th><th>{blocks:^5}</th><th>{project:^10}</th>"
"<th>{priority:^15}</th>"
"<th>{status:^15}</th><th>{bug:^100}</th><th>{tests}</th></tr>"
)
lines.append(
line.format(
fails="FAILS",
blocks="BLOCKS",
project="PROJECT",
priority="PRIORITY",
status="STATUS",
bug="BUG",
tests="TESTS",
)
)
def title_uid_link(r):
run_id, result = r
test = fetch_test(result.api, result.raw_data()["test_id"], run_id)
return {
"title": test.title,
"uid": test.id,
"link": "{url}/index.php?/tests/view/{uid}".format(
url=test.api._conf()["url"], uid=test.id
),
}
def list_of_defect_tests(results):
ret = [
"<a href='{link}'>{title} #{uid}</a>".format(**title_uid_link(r))
for r in results
]
return " ".join(ret)
for k in table:
one = table[k]
data = {
"fails": one["fails"],
"blocks": one["blocks"],
"project": one["project"],
"priority": one["priority"],
"status": one["status"],
"bug": "<a href='{url}'>{uid} {title}</a>".format(
uid=one["id"], title=one["title"], url=one["url"]
),
"tests": list_of_defect_tests(one["results"]),
}
lines.append(line.format(**data))
return html.format(lines="".join(lines))
def out_table(out_type, table):
if out_type == "none":
return
elif out_type == "html":
print(get_html_table(table))
elif out_type == "md":
print(get_md_table(table))
else:
print(get_text_table(table))
def push_report(t_client, plan_name, table):
LOG.info("Push report table into plan - {}".format(plan_name))
text = (
"Bugs Statistics (generated on {date})\n"
"=======================================================\n"
"{table}".format(
date=datetime.datetime.now().strftime("%a %b %d %H:%M:%S %Y"),
table=get_md_table(table),
)
)
plan = t_client.plan(plan_name)
if plan:
plan.description = text
plan.api._post(
"update_plan/{}".format(plan.id),
{
"name": plan.name,
"description": plan.description,
"milestone_id": plan.milestone.id,
},
)
def update_report(t_client, plan_name, tests_table):
LOG.info(
"Update report table into plan - {}".format(plan_name)
+ "\n===\nList tests to udate:"
)
plan = t_client.plan(plan_name)
if plan:
for r_test, run in tests_table:
t_client.add(r_test)
print(r_test.test.title)
LOG.info("\n===\nUpdate plan finished - {}".format(plan_name))
def create_report(**kwargs):
j_host = kwargs.get("jira_host")
j_user = kwargs.get("jira_user_id")
j_user_pwd = kwargs.get("jira_user_password")
t_host = kwargs.get("testrail_host")
t_user = kwargs.get("testrail_user")
t_user_key = kwargs.get("testrail_user_key")
t_plan = kwargs.get("testrail_plan")
t_project = kwargs.get("testrail_project")
t_a_run = kwargs.get("testrail_only_run")
o_type = kwargs.get("out_type")
push_report_flag = kwargs.get("push_report_flag")
sort_by = kwargs.get("sort_by")
t_client = TestRail(email=t_user, key=t_user_key, url=t_host)
t_client.set_project_id(t_client.project(t_project).id)
j_client = jira.JIRA(j_host, basic_auth=(j_user, j_user_pwd))
runs = get_runs(t_client, t_plan, t_a_run)
results = get_all_results(t_client, runs)
table = get_defects_table(j_client, results, sort_by)
out_table(o_type, table)
if push_report_flag:
push_report(t_client, t_plan, table)
def mark_fails(**kwargs):
testrail_host = kwargs.get("testrail_host")
testrail_user = kwargs.get("testrail_user")
testrail_user_key = kwargs.get("testrail_user_key")
testrail_plan = kwargs.get("testrail_plan")
testrail_m_plan = kwargs.get("testrail_marked_plan")
testrail_project = kwargs.get("testrail_project")
testrail_active_run = kwargs.get("testrail_only_run")
if testrail_active_run == "":
testrail_active_run = None
update_report_flag = kwargs.get("update_report_flag")
testrail_client = TestRail(
email=testrail_user, key=testrail_user_key, url=testrail_host
)
testrail_client.set_project_id(
testrail_client.project(testrail_project).id)
# Get list runs with marked results
marked_runs = get_runs(testrail_client, testrail_m_plan,
testrail_active_run)
# Get list runs to update
runs = get_runs(testrail_client, testrail_plan, testrail_active_run)
# Get list (failed, prod_failed, test_failed,skipped..) tests with defects
marked_results = get_all_failed_results(
testrail_client, marked_runs, "2,3,4,5,6,7,8,9"
)
# Get list (failed) tests without defects to mark
# 5-failed
failed_results = get_all_failed_results(testrail_client, runs, "5")
# Generate list tests to update based on compare (defected
# results for tests with failed and not defected)
tests_to_update = mark_failed_results(
testrail_client, marked_results, failed_results, testrail_host
)
if update_report_flag:
update_report(testrail_client, testrail_plan, tests_to_update)
COMMAND_MAP = {"create-report": create_report, "mark-fails": mark_fails}
def main():
args = run_cli()
COMMAND_MAP[args.command](**vars(args))
if __name__ == "__main__":
with ipdb.launch_ipdb_on_exception():
main()