|  | #!/usr/bin/env python | 
|  | import datetime | 
|  | import sys | 
|  | import logging | 
|  | from collections import defaultdict, OrderedDict | 
|  |  | 
|  | import jira | 
|  | import argparse | 
|  |  | 
|  | from testrail import TestRail | 
|  | from testrail.test import Test | 
|  | # from testrail_api import APIClient | 
|  | from functools32 import lru_cache | 
|  |  | 
|  | logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO) | 
|  | LOG = logging.getLogger(__name__) | 
|  |  | 
|  |  | 
|  | def run_cli(): | 
|  | cli = argparse.ArgumentParser( | 
|  | prog="Report generator", | 
|  | description="Command line tool for generate summary report") | 
|  |  | 
|  | commands = cli.add_subparsers( | 
|  | title="Operation commands", | 
|  | dest="command") | 
|  |  | 
|  | cli_process = commands.add_parser( | 
|  | "create-report", | 
|  | help="Create summary report", | 
|  | description="Create summary report") | 
|  | cli_process_link = commands.add_parser( | 
|  | "mark-fails", | 
|  | help="Extract linked bugs from previous reports", | 
|  | description="Extract linked bugs from previous reports" | 
|  | " and mark current") | 
|  | cli_process.add_argument( | 
|  | "-T", "--testrail-host", dest="testrail_host", | 
|  | required=True, | 
|  | help="TestRail hostname") | 
|  | cli_process.add_argument( | 
|  | "-U", "--testrail-user", dest="testrail_user", | 
|  | required=True, | 
|  | help="TestRail user email") | 
|  | cli_process.add_argument( | 
|  | "-K", "--testrail-user-key", dest="testrail_user_key", | 
|  | required=True, | 
|  | help="TestRail user key") | 
|  | cli_process.add_argument( | 
|  | "-R", "--testrail-plan", dest="testrail_plan", | 
|  | required=True, | 
|  | help="TestRail test plan for analize") | 
|  | cli_process.add_argument( | 
|  | "-P", "--testrail-project", dest="testrail_project", | 
|  | required=True, | 
|  | help="TestRail project name") | 
|  | cli_process.add_argument( | 
|  | "--testrail-only-run", dest="testrail_only_run", | 
|  | help="Analize only one run in selected plan") | 
|  | cli_process.add_argument( | 
|  | "--out-type", dest="out_type", choices=["text", "html", 'md', 'none'], | 
|  | default='none', | 
|  | help="Select output format for report table. " | 
|  | "By default print nothing (none).") | 
|  | cli_process.add_argument( | 
|  | "--sort-by", dest="sort_by", default='fails', | 
|  | choices=["fails", "blocks", 'project', 'priority', 'status'], | 
|  | help="Select sorting column. By deafult table sort by fails") | 
|  | cli_process.add_argument( | 
|  | "--push-to-testrail", dest="push_report_flag", action="store_true", | 
|  | default=False, | 
|  | help="Save report in plan description") | 
|  | cli_process.add_argument( | 
|  | "-j", "--jira-host", dest="jira_host", | 
|  | required=True, | 
|  | help="JIRA hostname") | 
|  | cli_process.add_argument( | 
|  | "-u", "--jira-user", dest="jira_user_id", | 
|  | required=True, | 
|  | help="JIRA username") | 
|  | cli_process.add_argument( | 
|  | "-p", "--jira-password", dest="jira_user_password", | 
|  | required=True, | 
|  | help="JIRA user password") | 
|  |  | 
|  | # link fail bugs parameters | 
|  | cli_process_link.add_argument( | 
|  | "-T", "--testrail-host", dest="testrail_host", | 
|  | required=True, | 
|  | help="TestRail hostname") | 
|  | cli_process_link.add_argument( | 
|  | "-U", "--testrail-user", dest="testrail_user", | 
|  | required=True, | 
|  | help="TestRail user email") | 
|  | cli_process_link.add_argument( | 
|  | "-K", "--testrail-user-key", dest="testrail_user_key", | 
|  | required=True, | 
|  | help="TestRail user key") | 
|  | cli_process_link.add_argument( | 
|  | "-R", "--testrail-plan", dest="testrail_plan", | 
|  | required=True, | 
|  | help="TestRail test plan for analize") | 
|  | cli_process_link.add_argument( | 
|  | "-M", "--testrail-marked-plan", dest="testrail_marked_plan", | 
|  | required=False, | 
|  | help="TestRail test plan for parse") | 
|  | cli_process_link.add_argument( | 
|  | "-P", "--testrail-project", dest="testrail_project", | 
|  | required=True, | 
|  | help="TestRail project name") | 
|  | cli_process_link.add_argument( | 
|  | "--testrail-only-run", dest="testrail_only_run", | 
|  | help="Name to update only specified run in selected plan") | 
|  | cli_process_link.add_argument( | 
|  | "--push-to-testrail", dest="update_report_flag", action="store_true", | 
|  | default=False, | 
|  | help="Save report in plan description") | 
|  |  | 
|  | if len(sys.argv) == 1: | 
|  | cli.print_help() | 
|  | sys.exit(1) | 
|  |  | 
|  | return cli.parse_args() | 
|  |  | 
|  |  | 
|  | def get_runs(t_client, plan_name, run_name): | 
|  | LOG.info("Get runs from plan - {}".format(plan_name)) | 
|  | ret = [] | 
|  | plan = t_client.plan(plan_name) | 
|  | if plan: | 
|  | for e in plan.entries: | 
|  | for r in e.runs: | 
|  | LOG.info("Run {} #{}".format(r.name, r.id)) | 
|  | if run_name is not None and r.name != run_name: | 
|  | continue | 
|  | ret.append(r) | 
|  | else: | 
|  | LOG.warning("Plan {} is empty".format(plan_name)) | 
|  | return ret | 
|  |  | 
|  |  | 
|  | def get_all_results(t_client, list_of_runs): | 
|  | ret = [] | 
|  | for run in list_of_runs: | 
|  | ret.extend(get_results(t_client, run)) | 
|  | return ret | 
|  |  | 
|  |  | 
|  | def get_all_failed_results(t_client, list_of_runs, result_type): | 
|  | """ | 
|  | returned result format: | 
|  | [[run(id,name), result(id,status,defects...), test(id,name..)], | 
|  | [run(id,name), result(id,status,defects...), test(id,name..)], | 
|  | ...] | 
|  | """ | 
|  | ret = [] | 
|  | for run in list_of_runs: | 
|  | ret.extend(get_failed_results(t_client, run, result_type)) | 
|  | return ret | 
|  |  | 
|  |  | 
|  | @lru_cache() | 
|  | def fetch_test(api, test_id, run_id): | 
|  | return Test(api.test_with_id(test_id, run_id)) | 
|  |  | 
|  |  | 
|  | def get_results(t_client, run): | 
|  | LOG.info("Get results for run - {}".format(run.name)) | 
|  | results = t_client.results(run) | 
|  | ret = [(run.id, r) for r in results | 
|  | if r.raw_data()['status_id'] is not None and | 
|  | r.raw_data()['defects'] is not None] | 
|  | for r in ret: | 
|  | run_id, result = r | 
|  | test = fetch_test(result.api, result.raw_data()['test_id'], run_id) | 
|  | LOG.info("Test {} - {} - {}".format(test.title, result.status.name, | 
|  | ','.join(result.defects))) | 
|  | return ret | 
|  |  | 
|  |  | 
|  | def get_failed_results(t_client, run, result_type): | 
|  | """ | 
|  | returned result format: | 
|  | [run(id,name), | 
|  | result(id,status,defects...), | 
|  | test(id,name..)] | 
|  | """ | 
|  | LOG.info("Get results for run - {}".format(run.name)) | 
|  | results = t_client.results(run) | 
|  | results_with_test = [] | 
|  | if result_type == '5': | 
|  | ret = [(run, r) for r in results | 
|  | if r.raw_data()['status_id'] is int(result_type) and | 
|  | r.raw_data()['defects'] is None] | 
|  | else: | 
|  | ret = [(run, r) for r in results | 
|  | if r.raw_data()['status_id'] is not None and | 
|  | r.raw_data()['defects'] is not None] | 
|  | for r in ret: | 
|  | run, result = r | 
|  | test = fetch_test(result.api, result.raw_data()['test_id'], run.id) | 
|  | LOG.info("Test {} - {} - {} - {}" | 
|  | .format(test.title, result.status.name, | 
|  | result.raw_data()['status_id'], | 
|  | ','.join(result.defects))) | 
|  | results_with_test.append([run, result, test]) | 
|  | return results_with_test | 
|  |  | 
|  |  | 
|  | def mark_failed_results(t_cl, marked_res, failed_res, t_h): | 
|  | """ | 
|  | Extract list tests with defect and compare it with tests to be marked, | 
|  | and add defects and result from marked tests | 
|  | Returned result format: | 
|  | [[target_tests_to_update_with_defect, target_run_id], | 
|  | [target_tests_to_update_with_defect, target_run_id], | 
|  | ...] | 
|  | """ | 
|  | LOG.info("Extract marked tests and attach to failed") | 
|  |  | 
|  | def generate_result(t_c, tst, m_r, m_t): | 
|  | link_comment = "{url}/index.php?/tests/view/{uid}".format( | 
|  | url=t_h, uid=m_t.id) | 
|  | tmp_result = t_c.result() | 
|  | tmp_result.test = tst | 
|  | tmp_result.status = m_r.status | 
|  | tmp_result.comment = "Result taked from: " + link_comment | 
|  | tmp_result.defects = [str(m_r.defects[0])] | 
|  | return tmp_result | 
|  |  | 
|  | # def check_if_marked(): | 
|  | #     if ret.count() | 
|  |  | 
|  | ret = [] | 
|  | for run, result, test in failed_res: | 
|  | for m_run, m_result, m_test in marked_res: | 
|  | if run.name == m_run.name \ | 
|  | and test.title == m_test.title: | 
|  | LOG.info(" MARKED FOUND: Run:{} test: .. {}-{}" | 
|  | .format(run.id, test.title[-72:], | 
|  | m_result.defects[0])) | 
|  | ret.append([generate_result(t_cl, test, m_result, | 
|  | m_test), run.id]) | 
|  | return ret | 
|  |  | 
|  |  | 
|  | @lru_cache() | 
|  | def get_defect_info(j_client, defect): | 
|  | LOG.info("Get info about issue {}".format(defect)) | 
|  | try: | 
|  | issue = j_client.issue(defect) | 
|  | except jira.exceptions.JIRAError as e: | 
|  | if e.status_code == 404: | 
|  | LOG.error("Defect {} wasn't found in Jira".format(defect)) | 
|  | return { | 
|  | 'id': defect, | 
|  | 'title': "Title for #{} not found".format(defect), | 
|  | 'project': "Not found", | 
|  | 'priority': "Not found", | 
|  | 'status': "Not found", | 
|  | 'url': "Not found" | 
|  | } | 
|  | else: | 
|  | raise | 
|  | return { | 
|  | 'id': issue.key, | 
|  | 'title': issue.fields.summary, | 
|  | 'project': issue.fields.project.key, | 
|  | 'priority': issue.fields.priority.name, | 
|  | 'status': issue.fields.status.name, | 
|  | 'url': issue.permalink() | 
|  | } | 
|  |  | 
|  |  | 
|  | def get_defects_table(jira_client, list_of_results, sort_by): | 
|  | LOG.info("Collect report table") | 
|  | table = defaultdict(dict) | 
|  | for run_id, result in list_of_results: | 
|  | for defect in result.defects: | 
|  | if defect not in table: | 
|  | info = get_defect_info(jira_client, defect) | 
|  |  | 
|  | table[defect].update(info) | 
|  | table[defect]['results'] = set([(run_id, result)]) | 
|  | if result.status.name.lower() == 'blocked': | 
|  | table[defect]['blocks'] = 1 | 
|  | table[defect]['fails'] = 0 | 
|  | else: | 
|  | table[defect]['fails'] = 1 | 
|  | table[defect]['blocks'] = 0 | 
|  | else: | 
|  | table[defect]['results'].add((run_id, result)) | 
|  | if result.status.name.lower() == 'blocked': | 
|  | table[defect]['blocks'] += 1 | 
|  | else: | 
|  | table[defect]['fails'] += 1 | 
|  | return OrderedDict( | 
|  | sorted(table.items(), key=lambda i: i[1][sort_by], reverse=True)) | 
|  |  | 
|  |  | 
|  | def get_text_table(table): | 
|  | LOG.info("Generation text table") | 
|  | lines = [] | 
|  | line = ("{fails:^5} | {blocks:^5} | {project:^10} | {priority:^15} | " | 
|  | "{status:^15} | {bug:^100} | {tests} ") | 
|  |  | 
|  | def title_uid(r): | 
|  | run_id, result = r | 
|  | test = fetch_test(result.api, result.raw_data()['test_id'], run_id) | 
|  | return { | 
|  | "title": test.title, | 
|  | "uid": test.id} | 
|  |  | 
|  | def list_of_defect_tests(results): | 
|  | ret = ["[{title} #{uid}]".format(**title_uid(r)) for | 
|  | r in results] | 
|  | return ' '.join(ret) | 
|  |  | 
|  | lines.append(line.format(fails='FAILS', blocks='BLOCKS', project="PROJECT", | 
|  | priority="PRIORITY", status="STATUS", bug="BUG", | 
|  | tests="TESTS")) | 
|  | for k in table: | 
|  | one = table[k] | 
|  | data = { | 
|  | "fails": one['fails'], | 
|  | "blocks": one['blocks'], | 
|  | "project": one['project'], | 
|  | "priority": one['priority'], | 
|  | "status": one['status'], | 
|  | "bug": "{uid} {title}".format(uid=one['id'], title=one['title']), | 
|  | "tests": list_of_defect_tests(one['results']) | 
|  | } | 
|  | lines.append(line.format(**data)) | 
|  | return '\n'.join(lines) | 
|  |  | 
|  |  | 
|  | def get_md_table(table): | 
|  | LOG.info("Generation MD table") | 
|  | lines = [] | 
|  | line = ("||{fails} | {blocks} | {project} | {priority} | " | 
|  | "{status} | {bug} | {tests} ") | 
|  |  | 
|  | def title_uid_link(r): | 
|  | run_id, result = r | 
|  | test = fetch_test(result.api, result.raw_data()['test_id'], run_id) | 
|  |  | 
|  | return { | 
|  | "title": test.title.replace('[', '{').replace(']', '}'), | 
|  | "uid": test.id, | 
|  | "link": "{url}/index.php?/tests/view/{uid}".format( | 
|  | url=test.api._conf()['url'], uid=test.id) | 
|  | } | 
|  |  | 
|  | def list_of_defect_tests(results): | 
|  | ret = ["<[{title} #{uid}]({link})>".format(**title_uid_link(r)) | 
|  | for r in results] | 
|  | return ' '.join(ret) | 
|  |  | 
|  | lines.append(line.format(fails='|:FAILS', blocks=':BLOCKS', | 
|  | project=":PROJECT", priority=":PRIORITY", | 
|  | status=":STATUS", bug=":BUG", tests=":TESTS")) | 
|  | for k in table: | 
|  | one = table[k] | 
|  | data = { | 
|  | "fails": one['fails'], | 
|  | "blocks": one['blocks'], | 
|  | "project": one['project'], | 
|  | "priority": one['priority'], | 
|  | "status": one['status'], | 
|  | "bug": "[{uid} {title}]({url})".format( | 
|  | uid=one['id'], | 
|  | title=one['title'].replace('[', '{').replace(']', '}'), | 
|  | url=one['url']), | 
|  | "tests": list_of_defect_tests(one['results']) | 
|  | } | 
|  | lines.append(line.format(**data)) | 
|  | return '\n'.join(lines) | 
|  |  | 
|  |  | 
|  | def get_html_table(table): | 
|  | LOG.info("Generation HTML table") | 
|  | html = "<table>{lines}</table>" | 
|  | lines = [] | 
|  | line = ("<tr><th>{fails:^5}</th><th>{blocks:^5}</th><th>{project:^10}</th>" | 
|  | "<th>{priority:^15}</th>" | 
|  | "<th>{status:^15}</th><th>{bug:^100}</th><th>{tests}</th></tr>") | 
|  | lines.append(line.format(fails='FAILS', blocks='BLOCKS', project="PROJECT", | 
|  | priority="PRIORITY", status="STATUS", bug="BUG", | 
|  | tests="TESTS")) | 
|  |  | 
|  | def title_uid_link(r): | 
|  | run_id, result = r | 
|  | test = fetch_test(result.api, result.raw_data()['test_id'], run_id) | 
|  |  | 
|  | return { | 
|  | "title": test.title, | 
|  | "uid": test.id, | 
|  | "link": "{url}/index.php?/tests/view/{uid}".format( | 
|  | url=test.api._conf()['url'], uid=test.id) | 
|  | } | 
|  |  | 
|  | def list_of_defect_tests(results): | 
|  | ret = ["<a href='{link}'>{title} #{uid}</a>".format( | 
|  | **title_uid_link(r)) for r in results] | 
|  | return ' '.join(ret) | 
|  |  | 
|  | for k in table: | 
|  | one = table[k] | 
|  | data = { | 
|  | "fails": one['fails'], | 
|  | "blocks": one['blocks'], | 
|  | "project": one['project'], | 
|  | "priority": one['priority'], | 
|  | "status": one['status'], | 
|  | "bug": "<a href='{url}'>{uid} {title}</a>".format( | 
|  | uid=one['id'], title=one['title'], url=one['url']), | 
|  | "tests": list_of_defect_tests(one['results']) | 
|  | } | 
|  | lines.append(line.format(**data)) | 
|  | return html.format(lines=''.join(lines)) | 
|  |  | 
|  |  | 
|  | def out_table(out_type, table): | 
|  | if out_type == 'none': | 
|  | return | 
|  | elif out_type == 'html': | 
|  | print(get_html_table(table)) | 
|  | elif out_type == 'md': | 
|  | print(get_md_table(table)) | 
|  | else: | 
|  | print(get_text_table(table)) | 
|  |  | 
|  |  | 
|  | def push_report(t_client, plan_name, table): | 
|  | LOG.info("Push report table into plan - {}".format(plan_name)) | 
|  | text = "Bugs Statistics (generated on {date})\n" \ | 
|  | "=======================================================\n" \ | 
|  | "{table}".format( | 
|  | date=datetime.datetime.now().strftime("%a %b %d %H:%M:%S %Y"), | 
|  | table=get_md_table(table)) | 
|  | plan = t_client.plan(plan_name) | 
|  | if plan: | 
|  | plan.description = text | 
|  | plan.api._post( | 
|  | 'update_plan/{}'.format(plan.id), | 
|  | { | 
|  | 'name': plan.name, | 
|  | 'description': plan.description, | 
|  | 'milestone_id': plan.milestone.id | 
|  | }) | 
|  |  | 
|  |  | 
|  | def update_report(t_client, plan_name, tests_table): | 
|  | LOG.info("Update report table into plan - {}".format(plan_name) | 
|  | + "\n===\nList tests to udate:") | 
|  | plan = t_client.plan(plan_name) | 
|  | if plan: | 
|  | for r_test, run in tests_table: | 
|  | t_client.add(r_test) | 
|  | print(r_test.test.title) | 
|  | LOG.info("\n===\nUpdate plan finished - {}".format(plan_name)) | 
|  |  | 
|  |  | 
|  | def create_report(**kwargs): | 
|  | j_host = kwargs.get('jira_host') | 
|  | j_user = kwargs.get('jira_user_id') | 
|  | j_user_pwd = kwargs.get('jira_user_password') | 
|  | t_host = kwargs.get('testrail_host') | 
|  | t_user = kwargs.get('testrail_user') | 
|  | t_user_key = kwargs.get('testrail_user_key') | 
|  | t_plan = kwargs.get('testrail_plan') | 
|  | t_project = kwargs.get('testrail_project') | 
|  | t_a_run = kwargs.get('testrail_only_run') | 
|  | o_type = kwargs.get('out_type') | 
|  | push_report_flag = kwargs.get('push_report_flag') | 
|  | sort_by = kwargs.get('sort_by') | 
|  |  | 
|  | t_client = TestRail(email=t_user, key=t_user_key, url=t_host) | 
|  | t_client.set_project_id(t_client.project(t_project).id) | 
|  |  | 
|  | j_client = jira.JIRA(j_host, basic_auth=(j_user, j_user_pwd)) | 
|  |  | 
|  | runs = get_runs(t_client, t_plan, t_a_run) | 
|  | results = get_all_results(t_client, runs) | 
|  | table = get_defects_table(j_client, results, sort_by) | 
|  | out_table(o_type, table) | 
|  | if push_report_flag: | 
|  | push_report(t_client, t_plan, table) | 
|  |  | 
|  |  | 
|  | def mark_fails(**kwargs): | 
|  | testrail_host = kwargs.get('testrail_host') | 
|  | testrail_user = kwargs.get('testrail_user') | 
|  | testrail_user_key = kwargs.get('testrail_user_key') | 
|  | testrail_plan = kwargs.get('testrail_plan') | 
|  | testrail_m_plan = kwargs.get('testrail_marked_plan') | 
|  | testrail_project = kwargs.get('testrail_project') | 
|  | testrail_active_run = kwargs.get('testrail_only_run') | 
|  | if testrail_active_run == '': | 
|  | testrail_active_run = None | 
|  | update_report_flag = kwargs.get('update_report_flag') | 
|  |  | 
|  | testrail_client = TestRail(email=testrail_user, key=testrail_user_key, | 
|  | url=testrail_host) | 
|  | testrail_client.set_project_id(testrail_client.project( | 
|  | testrail_project).id) | 
|  |  | 
|  | # Get list runs with marked results | 
|  | marked_runs = get_runs(testrail_client, testrail_m_plan, | 
|  | testrail_active_run) | 
|  |  | 
|  | # Get list runs to update | 
|  | runs = get_runs(testrail_client, testrail_plan, testrail_active_run) | 
|  |  | 
|  | # Get list (failed, prod_failed, test_failed,skipped..) tests with defects | 
|  | marked_results = get_all_failed_results(testrail_client, marked_runs, | 
|  | '2,3,4,5,6,7,8,9') | 
|  |  | 
|  | # Get list (failed) tests without defects to mark | 
|  | failed_results = get_all_failed_results(testrail_client, | 
|  | runs, '5')  # 5-failed | 
|  |  | 
|  | # Generate list tests to update based on compare (defected | 
|  | # results for tests with failed and not defected) | 
|  | tests_to_update = mark_failed_results(testrail_client, marked_results, | 
|  | failed_results, testrail_host) | 
|  |  | 
|  | if update_report_flag: | 
|  | update_report(testrail_client, testrail_plan, tests_to_update) | 
|  |  | 
|  |  | 
|  | COMMAND_MAP = { | 
|  | 'create-report': create_report, | 
|  | 'mark-fails': mark_fails | 
|  | } | 
|  |  | 
|  |  | 
|  | def main(): | 
|  | args = run_cli() | 
|  | COMMAND_MAP[args.command](**vars(args)) | 
|  |  | 
|  |  | 
|  | if __name__ == '__main__': | 
|  | main() |