blob: a2054243a2c8acf82928fa1f7279bc647503b0b7 [file] [log] [blame]
#!/usr/bin/env python
import datetime
import sys
import logging
from collections import defaultdict, OrderedDict
import jira
import argparse
from testrail import TestRail
from testrail.test import Test
# from testrail_api import APIClient
from functools32 import lru_cache
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
LOG = logging.getLogger(__name__)
def run_cli():
cli = argparse.ArgumentParser(
prog="Report generator",
description="Command line tool for generate summary report")
commands = cli.add_subparsers(
title="Operation commands",
dest="command")
cli_process = commands.add_parser(
"create-report",
help="Create summary report",
description="Create summary report")
cli_process_link = commands.add_parser(
"mark-fails",
help="Extract linked bugs from previous reports",
description="Extract linked bugs from previous reports"
" and mark current")
cli_process.add_argument(
"-T", "--testrail-host", dest="testrail_host",
required=True,
help="TestRail hostname")
cli_process.add_argument(
"-U", "--testrail-user", dest="testrail_user",
required=True,
help="TestRail user email")
cli_process.add_argument(
"-K", "--testrail-user-key", dest="testrail_user_key",
required=True,
help="TestRail user key")
cli_process.add_argument(
"-R", "--testrail-plan", dest="testrail_plan",
required=True,
help="TestRail test plan for analize")
cli_process.add_argument(
"-P", "--testrail-project", dest="testrail_project",
required=True,
help="TestRail project name")
cli_process.add_argument(
"--testrail-only-run", dest="testrail_only_run",
help="Analize only one run in selected plan")
cli_process.add_argument(
"--out-type", dest="out_type", choices=["text", "html", 'md', 'none'],
default='none',
help="Select output format for report table. "
"By default print nothing (none).")
cli_process.add_argument(
"--sort-by", dest="sort_by", default='fails',
choices=["fails", "blocks", 'project', 'priority', 'status'],
help="Select sorting column. By deafult table sort by fails")
cli_process.add_argument(
"--push-to-testrail", dest="push_report_flag", action="store_true",
default=False,
help="Save report in plan description")
cli_process.add_argument(
"-j", "--jira-host", dest="jira_host",
required=True,
help="JIRA hostname")
cli_process.add_argument(
"-u", "--jira-user", dest="jira_user_id",
required=True,
help="JIRA username")
cli_process.add_argument(
"-p", "--jira-password", dest="jira_user_password",
required=True,
help="JIRA user password")
# link fail bugs parameters
cli_process_link.add_argument(
"-T", "--testrail-host", dest="testrail_host",
required=True,
help="TestRail hostname")
cli_process_link.add_argument(
"-U", "--testrail-user", dest="testrail_user",
required=True,
help="TestRail user email")
cli_process_link.add_argument(
"-K", "--testrail-user-key", dest="testrail_user_key",
required=True,
help="TestRail user key")
cli_process_link.add_argument(
"-R", "--testrail-plan", dest="testrail_plan",
required=True,
help="TestRail test plan for analize")
cli_process_link.add_argument(
"-M", "--testrail-marked-plan", dest="testrail_marked_plan",
required=False,
help="TestRail test plan for parse")
cli_process_link.add_argument(
"-P", "--testrail-project", dest="testrail_project",
required=True,
help="TestRail project name")
cli_process_link.add_argument(
"--testrail-only-run", dest="testrail_only_run",
help="Name to update only specified run in selected plan")
cli_process_link.add_argument(
"--push-to-testrail", dest="update_report_flag", action="store_true",
default=False,
help="Save report in plan description")
if len(sys.argv) == 1:
cli.print_help()
sys.exit(1)
return cli.parse_args()
def get_runs(t_client, plan_name, run_name):
LOG.info("Get runs from plan - {}".format(plan_name))
ret = []
plan = t_client.plan(plan_name)
if plan:
for e in plan.entries:
for r in e.runs:
LOG.info("Run {} #{}".format(r.name, r.id))
if run_name is not None and r.name != run_name:
continue
ret.append(r)
else:
LOG.warning("Plan {} is empty".format(plan_name))
return ret
def get_all_results(t_client, list_of_runs):
ret = []
for run in list_of_runs:
ret.extend(get_results(t_client, run))
return ret
def get_all_failed_results(t_client, list_of_runs, result_type):
"""
returned result format:
[[run(id,name), result(id,status,defects...), test(id,name..)],
[run(id,name), result(id,status,defects...), test(id,name..)],
...]
"""
ret = []
for run in list_of_runs:
ret.extend(get_failed_results(t_client, run, result_type))
return ret
@lru_cache()
def fetch_test(api, test_id, run_id):
return Test(api.test_with_id(test_id, run_id))
def get_results(t_client, run):
LOG.info("Get results for run - {}".format(run.name))
results = t_client.results(run)
ret = [(run.id, r) for r in results
if r.raw_data()['status_id'] is not None and
r.raw_data()['defects'] is not None]
for r in ret:
run_id, result = r
test = fetch_test(result.api, result.raw_data()['test_id'], run_id)
LOG.info("Test {} - {} - {}".format(test.title, result.status.name,
','.join(result.defects)))
return ret
def get_failed_results(t_client, run, result_type):
"""
returned result format:
[run(id,name),
result(id,status,defects...),
test(id,name..)]
"""
LOG.info("Get results for run - {}".format(run.name))
results = t_client.results(run)
results_with_test = []
if result_type == '5':
ret = [(run, r) for r in results
if r.raw_data()['status_id'] is int(result_type) and
r.raw_data()['defects'] is None]
else:
ret = [(run, r) for r in results
if r.raw_data()['status_id'] is not None and
r.raw_data()['defects'] is not None]
for r in ret:
run, result = r
test = fetch_test(result.api, result.raw_data()['test_id'], run.id)
LOG.info("Test {} - {} - {} - {}"
.format(test.title, result.status.name,
result.raw_data()['status_id'],
','.join(result.defects)))
results_with_test.append([run, result, test])
return results_with_test
def mark_failed_results(t_cl, marked_res, failed_res, t_h):
"""
Extract list tests with defect and compare it with tests to be marked,
and add defects and result from marked tests
Returned result format:
[[target_tests_to_update_with_defect, target_run_id],
[target_tests_to_update_with_defect, target_run_id],
...]
"""
LOG.info("Extract marked tests and attach to failed")
def generate_result(t_c, tst, m_r, m_t):
link_comment = "{url}/index.php?/tests/view/{uid}".format(
url=t_h, uid=m_t.id)
tmp_result = t_c.result()
tmp_result.test = tst
tmp_result.status = m_r.status
tmp_result.comment = "Result taked from: " + link_comment
tmp_result.defects = [str(m_r.defects[0])]
return tmp_result
# def check_if_marked():
# if ret.count()
ret = []
for run, result, test in failed_res:
for m_run, m_result, m_test in marked_res:
if run.name == m_run.name \
and test.title == m_test.title:
LOG.info(" MARKED FOUND: Run:{} test: .. {}-{}"
.format(run.id, test.title[-72:],
m_result.defects[0]))
ret.append([generate_result(t_cl, test, m_result,
m_test), run.id])
return ret
@lru_cache()
def get_defect_info(j_client, defect):
LOG.info("Get info about issue {}".format(defect))
try:
issue = j_client.issue(defect)
except jira.exceptions.JIRAError as e:
if e.status_code == 404:
LOG.error("Defect {} wasn't found in Jira".format(defect))
return {
'id': defect,
'title': "Title for #{} not found".format(defect),
'project': "Not found",
'priority': "Not found",
'status': "Not found",
'url': "Not found"
}
else:
raise
return {
'id': issue.key,
'title': issue.fields.summary,
'project': issue.fields.project.key,
'priority': issue.fields.priority.name,
'status': issue.fields.status.name,
'url': issue.permalink()
}
def get_defects_table(jira_client, list_of_results, sort_by):
LOG.info("Collect report table")
table = defaultdict(dict)
for run_id, result in list_of_results:
for defect in result.defects:
if defect not in table:
info = get_defect_info(jira_client, defect)
table[defect].update(info)
table[defect]['results'] = set([(run_id, result)])
if result.status.name.lower() == 'blocked':
table[defect]['blocks'] = 1
table[defect]['fails'] = 0
else:
table[defect]['fails'] = 1
table[defect]['blocks'] = 0
else:
table[defect]['results'].add((run_id, result))
if result.status.name.lower() == 'blocked':
table[defect]['blocks'] += 1
else:
table[defect]['fails'] += 1
return OrderedDict(
sorted(table.items(), key=lambda i: i[1][sort_by], reverse=True))
def get_text_table(table):
LOG.info("Generation text table")
lines = []
line = ("{fails:^5} | {blocks:^5} | {project:^10} | {priority:^15} | "
"{status:^15} | {bug:^100} | {tests} ")
def title_uid(r):
run_id, result = r
test = fetch_test(result.api, result.raw_data()['test_id'], run_id)
return {
"title": test.title,
"uid": test.id}
def list_of_defect_tests(results):
ret = ["[{title} #{uid}]".format(**title_uid(r)) for
r in results]
return ' '.join(ret)
lines.append(line.format(fails='FAILS', blocks='BLOCKS', project="PROJECT",
priority="PRIORITY", status="STATUS", bug="BUG",
tests="TESTS"))
for k in table:
one = table[k]
data = {
"fails": one['fails'],
"blocks": one['blocks'],
"project": one['project'],
"priority": one['priority'],
"status": one['status'],
"bug": "{uid} {title}".format(uid=one['id'], title=one['title']),
"tests": list_of_defect_tests(one['results'])
}
lines.append(line.format(**data))
return '\n'.join(lines)
def get_md_table(table):
LOG.info("Generation MD table")
lines = []
line = ("||{fails} | {blocks} | {project} | {priority} | "
"{status} | {bug} | {tests} ")
def title_uid_link(r):
run_id, result = r
test = fetch_test(result.api, result.raw_data()['test_id'], run_id)
return {
"title": test.title.replace('[', '{').replace(']', '}'),
"uid": test.id,
"link": "{url}/index.php?/tests/view/{uid}".format(
url=test.api._conf()['url'], uid=test.id)
}
def list_of_defect_tests(results):
ret = ["<[{title} #{uid}]({link})>".format(**title_uid_link(r))
for r in results]
return ' '.join(ret)
lines.append(line.format(fails='|:FAILS', blocks=':BLOCKS',
project=":PROJECT", priority=":PRIORITY",
status=":STATUS", bug=":BUG", tests=":TESTS"))
for k in table:
one = table[k]
data = {
"fails": one['fails'],
"blocks": one['blocks'],
"project": one['project'],
"priority": one['priority'],
"status": one['status'],
"bug": "[{uid} {title}]({url})".format(
uid=one['id'],
title=one['title'].replace('[', '{').replace(']', '}'),
url=one['url']),
"tests": list_of_defect_tests(one['results'])
}
lines.append(line.format(**data))
return '\n'.join(lines)
def get_html_table(table):
LOG.info("Generation HTML table")
html = "<table>{lines}</table>"
lines = []
line = ("<tr><th>{fails:^5}</th><th>{blocks:^5}</th><th>{project:^10}</th>"
"<th>{priority:^15}</th>"
"<th>{status:^15}</th><th>{bug:^100}</th><th>{tests}</th></tr>")
lines.append(line.format(fails='FAILS', blocks='BLOCKS', project="PROJECT",
priority="PRIORITY", status="STATUS", bug="BUG",
tests="TESTS"))
def title_uid_link(r):
run_id, result = r
test = fetch_test(result.api, result.raw_data()['test_id'], run_id)
return {
"title": test.title,
"uid": test.id,
"link": "{url}/index.php?/tests/view/{uid}".format(
url=test.api._conf()['url'], uid=test.id)
}
def list_of_defect_tests(results):
ret = ["<a href='{link}'>{title} #{uid}</a>".format(
**title_uid_link(r)) for r in results]
return ' '.join(ret)
for k in table:
one = table[k]
data = {
"fails": one['fails'],
"blocks": one['blocks'],
"project": one['project'],
"priority": one['priority'],
"status": one['status'],
"bug": "<a href='{url}'>{uid} {title}</a>".format(
uid=one['id'], title=one['title'], url=one['url']),
"tests": list_of_defect_tests(one['results'])
}
lines.append(line.format(**data))
return html.format(lines=''.join(lines))
def out_table(out_type, table):
if out_type == 'none':
return
elif out_type == 'html':
print(get_html_table(table))
elif out_type == 'md':
print(get_md_table(table))
else:
print(get_text_table(table))
def push_report(t_client, plan_name, table):
LOG.info("Push report table into plan - {}".format(plan_name))
text = "Bugs Statistics (generated on {date})\n" \
"=======================================================\n" \
"{table}".format(
date=datetime.datetime.now().strftime("%a %b %d %H:%M:%S %Y"),
table=get_md_table(table))
plan = t_client.plan(plan_name)
if plan:
plan.description = text
plan.api._post(
'update_plan/{}'.format(plan.id),
{
'name': plan.name,
'description': plan.description,
'milestone_id': plan.milestone.id
})
def update_report(t_client, plan_name, tests_table):
LOG.info("Update report table into plan - {}".format(plan_name) +
"\n===\nList tests to udate:")
plan = t_client.plan(plan_name)
if plan:
for r_test, run in tests_table:
t_client.add(r_test)
print(r_test.test.title)
LOG.info("\n===\nUpdate plan finished - {}".format(plan_name))
def create_report(**kwargs):
j_host = kwargs.get('jira_host')
j_user = kwargs.get('jira_user_id')
j_user_pwd = kwargs.get('jira_user_password')
t_host = kwargs.get('testrail_host')
t_user = kwargs.get('testrail_user')
t_user_key = kwargs.get('testrail_user_key')
t_plan = kwargs.get('testrail_plan')
t_project = kwargs.get('testrail_project')
t_a_run = kwargs.get('testrail_only_run')
o_type = kwargs.get('out_type')
push_report_flag = kwargs.get('push_report_flag')
sort_by = kwargs.get('sort_by')
t_client = TestRail(email=t_user, key=t_user_key, url=t_host)
t_client.set_project_id(t_client.project(t_project).id)
j_client = jira.JIRA(j_host, basic_auth=(j_user, j_user_pwd))
runs = get_runs(t_client, t_plan, t_a_run)
results = get_all_results(t_client, runs)
table = get_defects_table(j_client, results, sort_by)
out_table(o_type, table)
if push_report_flag:
push_report(t_client, t_plan, table)
def mark_fails(**kwargs):
testrail_host = kwargs.get('testrail_host')
testrail_user = kwargs.get('testrail_user')
testrail_user_key = kwargs.get('testrail_user_key')
testrail_plan = kwargs.get('testrail_plan')
testrail_m_plan = kwargs.get('testrail_marked_plan')
testrail_project = kwargs.get('testrail_project')
testrail_active_run = kwargs.get('testrail_only_run')
if testrail_active_run == '':
testrail_active_run = None
update_report_flag = kwargs.get('update_report_flag')
testrail_client = TestRail(email=testrail_user, key=testrail_user_key,
url=testrail_host)
testrail_client.set_project_id(testrail_client.project(
testrail_project).id)
# Get list runs with marked results
marked_runs = get_runs(testrail_client, testrail_m_plan,
testrail_active_run)
# Get list runs to update
runs = get_runs(testrail_client, testrail_plan, testrail_active_run)
# Get list (failed, prod_failed, test_failed,skipped..) tests with defects
marked_results = get_all_failed_results(testrail_client, marked_runs,
'2,3,4,5,6,7,8,9')
# Get list (failed) tests without defects to mark
failed_results = get_all_failed_results(testrail_client,
runs, '5') # 5-failed
# Generate list tests to update based on compare (defected
# results for tests with failed and not defected)
tests_to_update = mark_failed_results(testrail_client, marked_results,
failed_results, testrail_host)
if update_report_flag:
update_report(testrail_client, testrail_plan, tests_to_update)
COMMAND_MAP = {
'create-report': create_report,
'mark-fails': mark_fails
}
def main():
args = run_cli()
COMMAND_MAP[args.command](**vars(args))
if __name__ == '__main__':
main()