Refactor the code of osccore-qa-testing-tools to comply with PEP8.
Related-prod: PRODX-42195
Change-Id: Id05e7584e0d024127ce1bd5042cfe681a1b52e2d
diff --git a/testrail_upload_suites/base.py b/testrail_upload_suites/base.py
index 76b6936..ae71166 100644
--- a/testrail_upload_suites/base.py
+++ b/testrail_upload_suites/base.py
@@ -1,5 +1,5 @@
-from testrail import *
import config
+from testrail import APIClient
class Base:
@@ -10,40 +10,40 @@
self.project = self._get_project(config.PROJECT)
def _get_project(self, project_name):
- projects_uri = 'get_projects'
- projects = self.client.send_get(uri=projects_uri)['projects']
+ projects_uri = "get_projects"
+ projects = self.client.send_get(uri=projects_uri)["projects"]
for project in projects:
- if project['name'] == project_name:
+ if project["name"] == project_name:
return project
return None
def send_post_add_result(self, some_id, bug, status_id, add_result):
- add_result['status_id'] = status_id
- add_result['custom_launchpad_bug'] = bug
- send_add_result = 'add_result/' + str(some_id)
+ add_result["status_id"] = status_id
+ add_result["custom_launchpad_bug"] = bug
+ send_add_result = "add_result/" + str(some_id)
return self.client.send_post(send_add_result, add_result)
def get_plans(self, project_id): # !
- return self.client.send_get('get_plans/{0}'.format(project_id))
+ return self.client.send_get("get_plans/{0}".format(project_id))
def get_plan(self, plan_id): # !
- return self.client.send_get('get_plan/{0}'.format(plan_id))
+ return self.client.send_get("get_plan/{0}".format(plan_id))
def is_test_plan_exist(self, test_plan_name):
- runs = self.get_plans(self.project['id'])
- if True in map(lambda item: item['name'] == test_plan_name, runs):
+ runs = self.get_plans(self.project["id"])
+ if True in map(lambda item: item["name"] == test_plan_name, runs):
return True
return False
def get_tests(self, plan_id): # !
- return self.client.send_get('get_tests/{0}'.format(plan_id))
+ return self.client.send_get("get_tests/{0}".format(plan_id))
def get_test_runs(self, plan_id, pattern=None):
plans_runs = self.get_plan(plan_id) # !get_plans
runs = []
- for run in plans_runs['entries']:
+ for run in plans_runs["entries"]:
if pattern:
- if pattern in run['name']:
+ if pattern in run["name"]:
runs.append(run)
else:
runs.append(run)
@@ -52,8 +52,8 @@
def get_tempest_runs(self, plan_id):
runs = self.get_plan(plan_id) # !get_plans
tempest_runs = []
- for run in runs['entries']:
- if 'Tempest' in run['name']:
+ for run in runs["entries"]:
+ if "Tempest" in run["name"]:
tempest_runs.append(run)
return tempest_runs
@@ -61,121 +61,132 @@
all_tests = self.get_tests(run_id)
test_ids = []
for test in all_tests:
- if test['status_id'] == 5:
- test_ids.append(test['id'])
+ if test["status_id"] == 5:
+ test_ids.append(test["id"])
return test_ids
def get_test_result(self, test_id):
- return self.client.send_get('get_results/{0}'.format(test_id))
+ return self.client.send_get("get_results/{0}".format(test_id))
def get_test_results_for_run(self, run_id):
- return self.client.send_get('get_results_for_run/{0}'.format(run_id))
+ return self.client.send_get("get_results_for_run/{0}".format(run_id))
def get_results_for_case(self, run_id, case_id):
- return self.client.send_get('get_results_for_case/{0}/{1}'.
- format(run_id, case_id))
+ return self.client.send_get(
+ "get_results_for_case/{0}/{1}".format(run_id, case_id)
+ )
def get_test(self, test_id):
- return self.client.send_get('get_test/{0}'.format(test_id))
+ return self.client.send_get("get_test/{0}".format(test_id))
def get_runs(self, run_id):
- return self.client.send_get('get_runs/{0}'.format(run_id))
+ return self.client.send_get("get_runs/{0}".format(run_id))
def get_run(self, run_id):
- return self.client.send_get('get_run/{0}'.format(run_id))
+ return self.client.send_get("get_run/{0}".format(run_id))
def get_milestones(self):
- milestones_uri = 'get_milestones/{project_id}'.format(
- project_id=self.project['id'])
- return self.client.send_get(uri=milestones_uri)['milestones']
+ milestones_uri = "get_milestones/{project_id}".format(
+ project_id=self.project["id"]
+ )
+ return self.client.send_get(uri=milestones_uri)["milestones"]
def get_milestone(self, milestone_id):
- milestone_uri = 'get_milestone/{milestone_id}'.format(
- milestone_id=milestone_id)
+ milestone_uri = "get_milestone/{milestone_id}".format(
+ milestone_id=milestone_id
+ )
return self.client.send_get(uri=milestone_uri)
def get_milestone_by_name(self, name):
for milestone in self.get_milestones():
- if milestone['name'] == name:
- return self.get_milestone(milestone_id=milestone['id'])
+ if milestone["name"] == name:
+ return self.get_milestone(milestone_id=milestone["id"])
def add_plan(self, name, description, milestone_id, entries):
- add_plan_uri = 'add_plan/{project_id}'.format(
- project_id=self.project['id'])
+ add_plan_uri = "add_plan/{project_id}".format(
+ project_id=self.project["id"]
+ )
new_plan = {
- 'name': name,
- 'description': description,
- 'milestone_id': milestone_id,
- 'entries': entries # entries=[]
+ "name": name,
+ "description": description,
+ "milestone_id": milestone_id,
+ "entries": entries, # entries=[]
}
return self.client.send_post(add_plan_uri, new_plan)
def add_plan_entry(self, project_id, new_run):
- add_plan_uri = 'add_plan_entry/{project_id}'.format(
- project_id=project_id)
+ add_plan_uri = "add_plan_entry/{project_id}".format(
+ project_id=project_id
+ )
return self.client.send_post(add_plan_uri, new_run)
def get_suites(self):
- suites_uri = 'get_suites/{project_id}'.format(
- project_id=self.project['id'])
+ suites_uri = "get_suites/{project_id}".format(
+ project_id=self.project["id"]
+ )
return self.client.send_get(uri=suites_uri)
def get_suite(self, suite_id):
- suite_uri = 'get_suite/{suite_id}'.format(suite_id=suite_id)
+ suite_uri = "get_suite/{suite_id}".format(suite_id=suite_id)
return self.client.send_get(uri=suite_uri)
def get_suite_by_name(self, name):
for suite in self.get_suites():
- if suite['name'] == name:
- return self.get_suite(suite_id=suite['id'])
+ if suite["name"] == name:
+ return self.get_suite(suite_id=suite["id"])
def get_plan_by_name(self, name):
for plan in self.get_plans(13):
- if plan['name'] == name:
- return self.get_plan(plan['id'])
+ if plan["name"] == name:
+ return self.get_plan(plan["id"])
def add_result(self, test_id, result_to_add):
- return self.client.send_post('add_result/{0}'.format(test_id['id']),
- result_to_add)
+ return self.client.send_post(
+ "add_result/{0}".format(test_id["id"]), result_to_add
+ )
def add_suite(self, name, description=None):
- return self.client.send_post('add_suite/' + str(self.project['id']),
- dict(name=name, description=description))
+ return self.client.send_post(
+ "add_suite/" + str(self.project["id"]),
+ dict(name=name, description=description),
+ )
def get_sections(self, suite_id):
- sections_uri = 'get_sections/{project_id}&suite_id={suite_id}'.format(
- project_id=self.project['id'],
- suite_id=suite_id
+ sections_uri = "get_sections/{project_id}&suite_id={suite_id}".format(
+ project_id=self.project["id"], suite_id=suite_id
)
return self.client.send_get(sections_uri)
def get_section(self, section_id):
- section_uri = 'get_section/{section_id}'.format(section_id=section_id)
+ section_uri = "get_section/{section_id}".format(section_id=section_id)
return self.client.send_get(section_uri)
def get_section_by_name(self, suite_id, section_name):
for section in self.get_sections(suite_id=suite_id):
- if section['name'] == section_name:
- return self.get_section(section_id=section['id'])
+ if section["name"] == section_name:
+ return self.get_section(section_id=section["id"])
def add_section(self, suite_id, name, parent_id=None):
- return self.client.send_post('add_section/' + str(self.project['id']),
- dict(suite_id=suite_id, name=name,
- parent_id=parent_id))
+ return self.client.send_post(
+ "add_section/" + str(self.project["id"]),
+ dict(suite_id=suite_id, name=name, parent_id=parent_id),
+ )
def delete_section(self, section_id):
# Not working bug in testrail
section = self.get_section(section_id)
- print('SECTION', section)
+ print("SECTION", section)
try:
- deleted = self.client.send_post('delete_section/{}'.format(section_id), section)
- print('DELETED', deleted)
+ deleted = self.client.send_post(
+ "delete_section/{}".format(section_id), section
+ )
+ print("DELETED", deleted)
except Exception:
pass
return
def add_case(self, section_id, case):
- add_case_uri = 'add_case/{section_id}'.format(section_id=section_id)
+ add_case_uri = "add_case/{section_id}".format(section_id=section_id)
return self.client.send_post(add_case_uri, case)
@staticmethod
@@ -183,39 +194,43 @@
results = {"results": []}
for test in tests:
- results["results"].append({
- "test_id": test['id'],
- "status_id": status_id,
- "comment": 'Deploy failed',
- })
+ results["results"].append(
+ {
+ "test_id": test["id"],
+ "status_id": status_id,
+ "comment": "Deploy failed",
+ }
+ )
return results
@staticmethod
def get_result_by_name():
result = config.RESULT
- if result == 'Blocked':
+ if result == "Blocked":
return 2
- elif result == 'Passed':
+ elif result == "Passed":
return 1
- elif result == 'Failed':
+ elif result == "Failed":
return 5
- elif result == 'ProdFailed':
+ elif result == "ProdFailed":
return 8
- elif result == 'Skipped':
+ elif result == "Skipped":
return 6
@staticmethod
def get_id_of_tempest_runs(tempest_runs):
tempest_runs_ids = {} # []
for i in tempest_runs:
- for item in i['runs']:
- tempest_runs_ids.update({item['id']: item['name']})
+ for item in i["runs"]:
+ tempest_runs_ids.update({item["id"]: item["name"]})
return tempest_runs_ids
@staticmethod
def get_last_tempest_run(get_plans):
for plans in get_plans:
# print dict
- if (plans.get(u'passed_count') > 1000 or plans.get(
- u'blocked_count') > 1000)and '9.1' in plans.get(u'name'):
- return plans.get(u'id')
+ if (
+ plans.get("passed_count") > 1000
+ or plans.get("blocked_count") > 1000
+ ) and "9.1" in plans.get("name"):
+ return plans.get("id")
diff --git a/testrail_upload_suites/config.py b/testrail_upload_suites/config.py
index be5fdb4..9a4aa10 100644
--- a/testrail_upload_suites/config.py
+++ b/testrail_upload_suites/config.py
@@ -1,14 +1,14 @@
import os
-URL = os.environ.get('TESTRAIL_URL')
-USER = os.environ.get('TESTRAIL_USER')
-PROJECT = os.environ.get('TESTRAIL_PROJECT')
-PASSWORD = os.environ.get('TESTRAIL_PASSWORD')
+URL = os.environ.get("TESTRAIL_URL")
+USER = os.environ.get("TESTRAIL_USER")
+PROJECT = os.environ.get("TESTRAIL_PROJECT")
+PASSWORD = os.environ.get("TESTRAIL_PASSWORD")
-MILESTONE = os.environ.get('TESTRAIL_MILESTONE')
-SUITE = os.environ.get('TESTRAIL_SUITE')
-PLAN_NAME = os.environ.get('TESTRAIL_PLAN_NAME')
-RESULT = os.environ.get('TESTRAIL_RESULT')
+MILESTONE = os.environ.get("TESTRAIL_MILESTONE")
+SUITE = os.environ.get("TESTRAIL_SUITE")
+PLAN_NAME = os.environ.get("TESTRAIL_PLAN_NAME")
+RESULT = os.environ.get("TESTRAIL_RESULT")
# Use test IDs for titles of TestRail test cases like
@@ -23,22 +23,22 @@
UPLOAD_THREADS_COUNT = 4
SECTIONS_MAP = {
- "Telemetry": ["telemetry_tempest_plugin."],
- "Glance": ["image."],
- "Keystone": ["identity."],
- "Neutron": ["network."],
- "Nova": ["compute."],
- "Swift": ["object_storage."],
- "Scenario": ["tempest.scenario."],
- "Manila": ["manila_tempest_tests."],
- "Ironic": ["ironic_tempest_plugin."],
- "Heat": ["heat_tempest_plugin."],
- "Designate": ["designate_tempest_plugin."],
- "Barbican": ["barbican_tempest_plugin."],
- "Horizon": ["tempest_horizon."]
+ "Telemetry": ["telemetry_tempest_plugin."],
+ "Glance": ["image."],
+ "Keystone": ["identity."],
+ "Neutron": ["network."],
+ "Nova": ["compute."],
+ "Swift": ["object_storage."],
+ "Scenario": ["tempest.scenario."],
+ "Manila": ["manila_tempest_tests."],
+ "Ironic": ["ironic_tempest_plugin."],
+ "Heat": ["heat_tempest_plugin."],
+ "Designate": ["designate_tempest_plugin."],
+ "Barbican": ["barbican_tempest_plugin."],
+ "Horizon": ["tempest_horizon."],
}
# Logging
-LOGGER = 'upload_suite'
-LOG_FOLDER = '/tmp/'
-LOG_FILENAME = 'upload_suite.log'
+LOGGER = "upload_suite"
+LOG_FOLDER = "/tmp/"
+LOG_FILENAME = "upload_suite.log"
diff --git a/testrail_upload_suites/testrail.py b/testrail_upload_suites/testrail.py
index a2c6523..e56c633 100644
--- a/testrail_upload_suites/testrail.py
+++ b/testrail_upload_suites/testrail.py
@@ -11,18 +11,19 @@
# Copyright Gurock Software GmbH. See license.md for details.
#
-import requests
-import json
import base64
+import json
+
+import requests
class APIClient:
def __init__(self, base_url):
- self.user = ''
- self.password = ''
- if not base_url.endswith('/'):
- base_url += '/'
- self.__url = base_url + 'index.php?/api/v2/'
+ self.user = ""
+ self.password = ""
+ if not base_url.endswith("/"):
+ base_url += "/"
+ self.__url = base_url + "index.php?/api/v2/"
#
# Send Get
@@ -39,7 +40,7 @@
# Used only for 'get_attachment/:attachment_id'
#
def send_get(self, uri, filepath=None):
- return self.__send_request('GET', uri, filepath)
+ return self.__send_request("GET", uri, filepath)
#
# Send POST
@@ -57,48 +58,51 @@
# to the file
#
def send_post(self, uri, data):
- return self.__send_request('POST', uri, data)
+ return self.__send_request("POST", uri, data)
def __send_request(self, method, uri, data):
url = self.__url + uri
auth = str(
base64.b64encode(
- bytes('%s:%s' % (self.user, self.password), 'utf-8')
+ bytes("%s:%s" % (self.user, self.password), "utf-8")
),
- 'ascii'
+ "ascii",
).strip()
- headers = {'Authorization': 'Basic ' + auth}
+ headers = {"Authorization": "Basic " + auth}
- if method == 'POST':
- if uri[:14] == 'add_attachment': # add_attachment API method
- files = {'attachment': (open(data, 'rb'))}
+ if method == "POST":
+ if uri[:14] == "add_attachment": # add_attachment API method
+ files = {"attachment": (open(data, "rb"))}
response = requests.post(url, headers=headers, files=files)
- files['attachment'].close()
+ files["attachment"].close()
else:
- headers['Content-Type'] = 'application/json'
- payload = bytes(json.dumps(data), 'utf-8')
+ headers["Content-Type"] = "application/json"
+ payload = bytes(json.dumps(data), "utf-8")
response = requests.post(url, headers=headers, data=payload)
else:
- headers['Content-Type'] = 'application/json'
+ headers["Content-Type"] = "application/json"
response = requests.get(url, headers=headers)
if response.status_code > 201:
try:
error = response.json()
- except: # response.content not formatted as JSON
+ except Exception: # response.content not formatted as JSON
error = str(response.content)
- raise APIError('TestRail API returned HTTP %s (%s)' % (response.status_code, error))
+ raise APIError(
+ "TestRail API returned HTTP %s (%s)"
+ % (response.status_code, error)
+ )
else:
- if uri[:15] == 'get_attachment/': # Expecting file, not JSON
+ if uri[:15] == "get_attachment/": # Expecting file, not JSON
try:
- open(data, 'wb').write(response.content)
- return (data)
- except:
- return ("Error saving attachment.")
+ open(data, "wb").write(response.content)
+ return data
+ except Exception:
+ return "Error saving attachment."
else:
return response.json()
class APIError(Exception):
- pass
\ No newline at end of file
+ pass
diff --git a/testrail_upload_suites/upload_suite.py b/testrail_upload_suites/upload_suite.py
index 3cabf05..ade9cd5 100644
--- a/testrail_upload_suites/upload_suite.py
+++ b/testrail_upload_suites/upload_suite.py
@@ -14,22 +14,25 @@
# License for the specific language governing permissions and limitations
# under the License.
-import os
import logging
+import os
import sys
+import config
from base import Base
from testrail import APIError
-import config
-
-
logging.basicConfig(
- format='[%(asctime)s][%(name)s][%(levelname)s] %(message)s',
- datefmt='%d-%m-%Y %H:%M:%S',
- handlers=[logging.FileHandler('{}{}'.format(
- config.LOG_FOLDER, config.LOG_FILENAME)), logging.StreamHandler()],
- level=logging.INFO)
+ format="[%(asctime)s][%(name)s][%(levelname)s] %(message)s",
+ datefmt="%d-%m-%Y %H:%M:%S",
+ handlers=[
+ logging.FileHandler(
+ "{}{}".format(config.LOG_FOLDER, config.LOG_FILENAME)
+ ),
+ logging.StreamHandler(),
+ ],
+ level=logging.INFO,
+)
logger = logging.getLogger(config.LOGGER)
@@ -49,8 +52,9 @@
return tags
-def create_tr_test_cases(test_cases, milestone_id, type_id=1, priority_id=4,
- qa_team=4):
+def create_tr_test_cases(
+ test_cases, milestone_id, type_id=1, priority_id=4, qa_team=4
+):
tr_test_cases = []
for test_case_name in test_cases:
@@ -68,8 +72,11 @@
test_case = {
"milestone_id": milestone_id,
"section": section,
- "title": (("%s.%s" % (test_class, test_name)) if config.USE_TEST_IDs
- else test_name),
+ "title": (
+ ("%s.%s" % (test_class, test_name))
+ if config.USE_TEST_IDs
+ else test_name
+ ),
"type_id": type_id,
"priority_id": priority_id,
"custom_qa_team": qa_team,
@@ -78,7 +85,7 @@
"custom_test_group": test_class,
"custom_test_case_description": test_name,
"custom_test_case_steps": [{"Run test": "passed"}],
- "custom_report_label": report_label
+ "custom_report_label": report_label,
}
tr_test_cases.append(test_case)
@@ -109,8 +116,9 @@
logger.info("Reading tests file '%s'..." % tests_file_path)
with open(tests_file_path) as f:
test_cases = [test for test in f.read().split("\n") if test]
- logger.info("Tests file '%s' has been successfully read."
- % tests_file_path)
+ logger.info(
+ "Tests file '%s' has been successfully read." % tests_file_path
+ )
else:
raise Exception("Tests file '%s' doesn't exist!" % tests_file_path)
@@ -125,8 +133,10 @@
suite = call.get_suite_by_name(config.SUITE)
if not suite:
- logger.info("Tests suite '%s' not found. "
- "Creating tests suite..." % config.SUITE)
+ logger.info(
+ "Tests suite '%s' not found. "
+ "Creating tests suite..." % config.SUITE
+ )
suite = call.add_suite(config.SUITE)
logger.info("Tests suite has benn successfully created.")
@@ -134,10 +144,12 @@
logger.info("Creating test cases for TestRail...")
tr_test_cases = create_tr_test_cases(
- test_cases, milestone["id"],
+ test_cases,
+ milestone["id"],
type_id=config.TEST_CASE_TYPE_ID,
priority_id=config.TEST_CASE_PRIORITY_ID,
- qa_team=config.QA_TEAM)
+ qa_team=config.QA_TEAM,
+ )
logger.info("Test cases have been successfully created.")
sections_map = {}