Add testrail_upload_suite tool
Change-Id: Id3055c83225d0dcd69015658f05bd9ce07e8d88d
Related-prod: PRODX-942
diff --git a/testrail_upload_suites/base.py b/testrail_upload_suites/base.py
new file mode 100644
index 0000000..c82c9b4
--- /dev/null
+++ b/testrail_upload_suites/base.py
@@ -0,0 +1,221 @@
+from testrail import *
+import config
+
+
+class Base:
+ def __init__(self):
+ self.client = APIClient(config.URL)
+ self.client.user = config.USER
+ self.client.password = config.PASSWORD
+ self.project = self._get_project(config.PROJECT)
+
+ def _get_project(self, project_name):
+ projects_uri = 'get_projects'
+ projects = self.client.send_get(uri=projects_uri)
+ for project in projects:
+ if project['name'] == project_name:
+ return project
+ return None
+
+ def send_post_add_result(self, some_id, bug, status_id, add_result):
+ add_result['status_id'] = status_id
+ add_result['custom_launchpad_bug'] = bug
+ send_add_result = 'add_result/' + str(some_id)
+ return self.client.send_post(send_add_result, add_result)
+
+ def get_plans(self, project_id): # !
+ return self.client.send_get('get_plans/{0}'.format(project_id))
+
+ def get_plan(self, plan_id): # !
+ return self.client.send_get('get_plan/{0}'.format(plan_id))
+
+ def is_test_plan_exist(self, test_plan_name):
+ runs = self.get_plans(self.project['id'])
+ if True in map(lambda item: item['name'] == test_plan_name, runs):
+ return True
+ return False
+
+ def get_tests(self, plan_id): # !
+ return self.client.send_get('get_tests/{0}'.format(plan_id))
+
+ def get_test_runs(self, plan_id, pattern=None):
+ plans_runs = self.get_plan(plan_id) # !get_plans
+ runs = []
+ for run in plans_runs['entries']:
+ if pattern:
+ if pattern in run['name']:
+ runs.append(run)
+ else:
+ runs.append(run)
+ return runs
+
+ def get_tempest_runs(self, plan_id):
+ runs = self.get_plan(plan_id) # !get_plans
+ tempest_runs = []
+ for run in runs['entries']:
+ if 'Tempest' in run['name']:
+ tempest_runs.append(run)
+ return tempest_runs
+
+ def get_id_of_failed_tests(self, run_id): # !
+ all_tests = self.get_tests(run_id)
+ test_ids = []
+ for test in all_tests:
+ if test['status_id'] == 5:
+ test_ids.append(test['id'])
+ return test_ids
+
+ def get_test_result(self, test_id):
+ return self.client.send_get('get_results/{0}'.format(test_id))
+
+ def get_test_results_for_run(self, run_id):
+ return self.client.send_get('get_results_for_run/{0}'.format(run_id))
+
+ def get_results_for_case(self, run_id, case_id):
+ return self.client.send_get('get_results_for_case/{0}/{1}'.
+ format(run_id, case_id))
+
+ def get_test(self, test_id):
+ return self.client.send_get('get_test/{0}'.format(test_id))
+
+ def get_runs(self, run_id):
+ return self.client.send_get('get_runs/{0}'.format(run_id))
+
+ def get_run(self, run_id):
+ return self.client.send_get('get_run/{0}'.format(run_id))
+
+ def get_milestones(self):
+ milestones_uri = 'get_milestones/{project_id}'.format(
+ project_id=self.project['id'])
+ return self.client.send_get(uri=milestones_uri)
+
+ def get_milestone(self, milestone_id):
+ milestone_uri = 'get_milestone/{milestone_id}'.format(
+ milestone_id=milestone_id)
+ return self.client.send_get(uri=milestone_uri)
+
+ def get_milestone_by_name(self, name):
+ for milestone in self.get_milestones():
+ if milestone['name'] == name:
+ return self.get_milestone(milestone_id=milestone['id'])
+
+ def add_plan(self, name, description, milestone_id, entries):
+ add_plan_uri = 'add_plan/{project_id}'.format(
+ project_id=self.project['id'])
+ new_plan = {
+ 'name': name,
+ 'description': description,
+ 'milestone_id': milestone_id,
+ 'entries': entries # entries=[]
+ }
+ return self.client.send_post(add_plan_uri, new_plan)
+
+ def add_plan_entry(self, project_id, new_run):
+ add_plan_uri = 'add_plan_entry/{project_id}'.format(
+ project_id=project_id)
+ return self.client.send_post(add_plan_uri, new_run)
+
+ def get_suites(self):
+ suites_uri = 'get_suites/{project_id}'.format(
+ project_id=self.project['id'])
+ return self.client.send_get(uri=suites_uri)
+
+ def get_suite(self, suite_id):
+ suite_uri = 'get_suite/{suite_id}'.format(suite_id=suite_id)
+ return self.client.send_get(uri=suite_uri)
+
+ def get_suite_by_name(self, name):
+ for suite in self.get_suites():
+ if suite['name'] == name:
+ return self.get_suite(suite_id=suite['id'])
+
+ def get_plan_by_name(self, name):
+ for plan in self.get_plans(13):
+ if plan['name'] == name:
+ return self.get_plan(plan['id'])
+
+ def add_result(self, test_id, result_to_add):
+ return self.client.send_post('add_result/{0}'.format(test_id['id']),
+ result_to_add)
+
+ def add_suite(self, name, description=None):
+ return self.client.send_post('add_suite/' + str(self.project['id']),
+ dict(name=name, description=description))
+
+ def get_sections(self, suite_id):
+ sections_uri = 'get_sections/{project_id}&suite_id={suite_id}'.format(
+ project_id=self.project['id'],
+ suite_id=suite_id
+ )
+ return self.client.send_get(sections_uri)
+
+ def get_section(self, section_id):
+ section_uri = 'get_section/{section_id}'.format(section_id=section_id)
+ return self.client.send_get(section_uri)
+
+ def get_section_by_name(self, suite_id, section_name):
+ for section in self.get_sections(suite_id=suite_id):
+ if section['name'] == section_name:
+ return self.get_section(section_id=section['id'])
+
+ def add_section(self, suite_id, name, parent_id=None):
+ return self.client.send_post('add_section/' + str(self.project['id']),
+ dict(suite_id=suite_id, name=name,
+ parent_id=parent_id))
+
+ def delete_section(self, section_id):
+ # Not working bug in testrail
+ section = self.get_section(section_id)
+ print('SECTION', section)
+ try:
+ deleted = self.client.send_post('delete_section/{}'.format(section_id), section)
+ print('DELETED', deleted)
+ except Exception:
+ pass
+ return
+
+ def add_case(self, section_id, case):
+ add_case_uri = 'add_case/{section_id}'.format(section_id=section_id)
+ return self.client.send_post(add_case_uri, case)
+
+ @staticmethod
+ def prepare_common_results(tests, status_id):
+ results = {"results": []}
+
+ for test in tests:
+ results["results"].append({
+ "test_id": test['id'],
+ "status_id": status_id,
+ "comment": 'Deploy failed',
+ })
+ return results
+
+ @staticmethod
+ def get_result_by_name():
+ result = config.RESULT
+ if result == 'Blocked':
+ return 2
+ elif result == 'Passed':
+ return 1
+ elif result == 'Failed':
+ return 5
+ elif result == 'ProdFailed':
+ return 8
+ elif result == 'Skipped':
+ return 6
+
+ @staticmethod
+ def get_id_of_tempest_runs(tempest_runs):
+ tempest_runs_ids = {} # []
+ for i in tempest_runs:
+ for item in i['runs']:
+ tempest_runs_ids.update({item['id']: item['name']})
+ return tempest_runs_ids
+
+ @staticmethod
+ def get_last_tempest_run(get_plans):
+ for plans in get_plans:
+ # print dict
+ if (plans.get(u'passed_count') > 1000 or plans.get(
+ u'blocked_count') > 1000)and '9.1' in plans.get(u'name'):
+ return plans.get(u'id')