Refactor the code of osccore-qa-testing-tools to comply with PEP8.
Related-prod: PRODX-42195
Change-Id: Id05e7584e0d024127ce1bd5042cfe681a1b52e2d
diff --git a/parcing_testrail_results/config.py b/parcing_testrail_results/config.py
index ede2f55..06ae694 100644
--- a/parcing_testrail_results/config.py
+++ b/parcing_testrail_results/config.py
@@ -1,11 +1,11 @@
import os
-TESTRAIL_USER = os.environ.get('TESTRAIL_USER')
-TESTRAIL_PASSWORD = os.environ.get('TESTRAIL_PASSWORD')
+TESTRAIL_USER = os.environ.get("TESTRAIL_USER")
+TESTRAIL_PASSWORD = os.environ.get("TESTRAIL_PASSWORD")
-TESTRAIL_URL = 'https://mirantis.testrail.com'
-TESTRAIL_TOKEN = '0YGnO1TC5NCCQFwgxmsW'
+TESTRAIL_URL = "https://mirantis.testrail.com"
+TESTRAIL_TOKEN = "0YGnO1TC5NCCQFwgxmsW"
TESTRAIL_COOKIES = "9adbe251-4ef1-474c-8ca6-9aaa1fbc5e76"
-LOGGIGNG_FOLDER = '/tmp/'
-LOGGIGNG_UTILS = 'testrail.log'
+LOGGIGNG_FOLDER = "/tmp/"
+LOGGIGNG_UTILS = "testrail.log"
diff --git a/parcing_testrail_results/html_testrail.py b/parcing_testrail_results/html_testrail.py
index 182299d..39eac00 100644
--- a/parcing_testrail_results/html_testrail.py
+++ b/parcing_testrail_results/html_testrail.py
@@ -1,25 +1,30 @@
-import click
-import config
import logging
import re
-import requests
-from bs4 import BeautifulSoup
from difflib import SequenceMatcher
-from testrail import *
+import click
+import config
+import requests
+from bs4 import BeautifulSoup
+from testrail import APIClient
client = APIClient(config.TESTRAIL_URL)
client.user = config.TESTRAIL_USER
client.password = config.TESTRAIL_PASSWORD
-logging.basicConfig(format='[%(asctime)s][%(name)s][%(levelname)s] %(message)s',
- datefmt='%d-%m-%Y %H:%M:%S',
- handlers=[
- logging.FileHandler('{}{}'.format(config.LOGGIGNG_FOLDER, config.LOGGIGNG_UTILS)),
- logging.StreamHandler()],
- level=logging.INFO)
-logger = logging.getLogger('testrail')
+logging.basicConfig(
+ format="[%(asctime)s][%(name)s][%(levelname)s] %(message)s",
+ datefmt="%d-%m-%Y %H:%M:%S",
+ handlers=[
+ logging.FileHandler(
+ "{}{}".format(config.LOGGIGNG_FOLDER, config.LOGGIGNG_UTILS)
+ ),
+ logging.StreamHandler(),
+ ],
+ level=logging.INFO,
+)
+logger = logging.getLogger("testrail")
class GetTestHistory:
@@ -29,48 +34,65 @@
def get_html(self):
token = config.TESTRAIL_TOKEN
- post_url = "https://mirantis.testrail.com/index.php?/tests/ajax_render_history"
+ post_url = (
+ "https://mirantis.testrail.com/index.php?/tests"
+ "/ajax_render_history"
+ )
headers = {
"Accept": "text/plain, */*; q=0.01",
"Accept-Encoding": "gzip, deflate, br",
- "Accept-Language": "en-US,en;q=0.9,es-AR;q=0.8,es;q=0.7,fr-DZ;q=0.6,fr;q=0.5,de-BE;q=0.4,de;q=0.3,ru-UA;q=0.2,ru;q=0.1,uk;q=0.1",
+ "Accept-Language": "en-US,en;q=0.9,es-AR;q=0.8,es;q=0.7,fr-DZ;"
+ "q=0.6,fr;q=0.5,de-BE;q=0.4,de;q=0.3,ru-UA;"
+ "q=0.2,ru;q=0.1,uk;q=0.1",
"Connection": "keep-alive",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"Host": "mirantis.testrail.com",
"Origin": "https://mirantis.testrail.com",
- "Proxy-Authorization": "Basic VVZQTnYxLXAybjJlbXhldzB6Z2RkcndwM25vZ2JiaHJ0Zm9ib3pjJmpvaG5kb2VAdXZwbi5tZTpvN3I3cDA4Mml6cHNoZHp6eDBjeHNsZGVudmUzYmNyZg ==",
- "Referer": "https://mirantis.testrail.com/index.php?/tests/view/{}".format(self.test_id),
- "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/73.0.3683.86 Chrome/73.0.3683.86 Safari/537.36",
- "X-Requested-With": "XMLHttpRequest"
+ "Proxy-Authorization": "Basic VVZQTnYxLXAybjJlbXhldzB6Z2RkcndwM25v"
+ "Z2JiaHJ0Zm9ib3pjJmpvaG5kb2VAdXZwbi5tZTpvN3I3cDA4Mml6cHNoZHp6eDBj"
+ "eHNsZGVudmUzYmNyZg ==",
+ "Referer": "https://mirantis.testrail.com/index.php?/"
+ "tests/view/{}".format(self.test_id),
+ "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
+ "(KHTML, like Gecko) Ubuntu Chromium/73.0.3683.86 "
+ "Chrome/73.0.3683.86 Safari/537.36",
+ "X-Requested-With": "XMLHttpRequest",
}
cookies = {"tr_session": config.TESTRAIL_COOKIES}
- r = requests.post(post_url, data='test_id={}&limit=50&_token={}'.format(self.test_id, token),
- headers=headers,
- cookies=cookies)
+ r = requests.post(
+ post_url,
+ data=f"test_id={self.test_id}&limit=50&_token={token}",
+ headers=headers,
+ cookies=cookies,
+ )
html_page = r.text
return html_page
def get_old_test_results(self):
- logger.info('Getting old test results from html page')
+ logger.info("Getting old test results from html page")
html_page = self.get_html()
- soup = BeautifulSoup(html_page, 'html.parser')
+ soup = BeautifulSoup(html_page, "html.parser")
page_div = soup.div
tests_history = []
- for tag in page_div.find_all('td', {'class': 'id'}):
+ for tag in page_div.find_all("td", {"class": "id"}):
tag_parent = tag.parent
- test_status = tag_parent.find_all('span', {'class', 'status'})[0].string
- test_id = tag_parent.find_all('a', {'class', 'link-noline'})[0].string[1:]
+ test_status = tag_parent.find_all("span", {"class", "status"})[
+ 0
+ ].string
+ test_id = tag_parent.find_all("a", {"class", "link-noline"})[
+ 0
+ ].string[1:]
- test_data = {'test_status': test_status, 'test_id': test_id}
+ test_data = {"test_status": test_status, "test_id": test_id}
- if test_status == 'TestFailed':
+ if test_status == "TestFailed":
tests_history.append(test_data)
return tests_history
@@ -81,30 +103,30 @@
self.plan_id = plan_id
def get_plan(self):
- logger.info('Getting plan: {}'.format(self.plan_id))
- return client.send_get('get_plan/{}'.format(self.plan_id))
+ logger.info(f"Getting plan: {self.plan_id}")
+ return client.send_get(f"get_plan/{self.plan_id}")
def get_suites(self):
- logger.info('Getting suites')
+ logger.info("Getting suites")
plan = self.get_plan()
all_suites_ids = []
- for suite in plan['entries']:
- siute_id = suite['runs'][0]['id']
+ for suite in plan["entries"]:
+ siute_id = suite["runs"][0]["id"]
all_suites_ids.append(siute_id)
- logger.info('Suite: {}'.format(siute_id))
+ logger.info(f"Suite: {siute_id}")
return all_suites_ids
def get_test(self, test_id):
- logger.info('Getting test: {}'.format(test_id))
- return client.send_get('get_test/{}'.format(test_id))
+ logger.info(f"Getting test: {test_id}")
+ return client.send_get(f"get_test/{test_id}")
def get_tests_results_by_suite(self, suite_id):
- logger.info('Getting tests results by suite (suite_id): {}'.format(suite_id))
- return client.send_get('get_tests/{}'.format(suite_id))
+ logger.info(f"Getting tests results by suite (suite_id): {suite_id}")
+ return client.send_get(f"get_tests/{suite_id}")
def get_all_tests_results(self):
- logger.info('Getting all tests results')
+ logger.info("Getting all tests results")
all_suites = self.get_suites()
all_tests = []
@@ -115,7 +137,7 @@
return all_tests
def get_all_failed_tests(self, test_status=5):
- logger.info('Getting failed tests')
+ logger.info("Getting failed tests")
# test['status_id'] == 5 failed
# test['status_id'] == 9 test failed
# test['status_id'] == 10 infra failed
@@ -124,15 +146,19 @@
failed_tests = []
for tests_in_suite in all_tests_in_all_suites:
for test in tests_in_suite:
- if test['status_id'] == test_status:
- failed_tests.append(test)
+ if test["status_id"] == test_status:
+ failed_tests.append(test)
return failed_tests
def get_test_result(self, test_id):
- logger.info('Getting test result: {}'.format(test_id))
+ logger.info(f"Getting test result: {test_id}")
test = self.get_test(test_id)
- return client.send_get('get_results_for_case/{}/{}'.format(test['run_id'], test['case_id']))
+ return client.send_get(
+ "get_results_for_case/{}/{}".format(
+ test["run_id"], test["case_id"]
+ )
+ )
def update_test_results(self, test_id, defects):
"""
@@ -144,29 +170,35 @@
:param defects: defect to update
:return:
"""
- logger.info('Updating test results test_id: {} with defect: {}'.format(test_id, defects))
+ logger.info(
+ f"Updating test results test_id: {test_id} with defect: {defects}"
+ )
test = self.get_test(test_id)
- return client.send_post('add_result_for_case/{}/{}'.format(test['run_id'], test['case_id']),
- {'status_id': 9, 'comment': 'Updated by R2D2', 'defects': defects})
+ return client.send_post(
+ "add_result_for_case/{}/{}".format(
+ test["run_id"], test["case_id"]
+ ),
+ {"status_id": 9, "comment": "Updated by R2D2", "defects": defects},
+ )
def get_current_test_comment(current_test_results):
- logger.info('Getting current test comment')
+ logger.info("Getting current test comment")
for test_results in current_test_results:
- if 'comment' in test_results:
- if test_results['comment']:
- if len(test_results['comment']) > 50:
- return test_results['comment']
+ if "comment" in test_results:
+ if test_results["comment"]:
+ if len(test_results["comment"]) > 50:
+ return test_results["comment"]
def get_old_tests_comments_ids(old_test_results, failed_tests):
- logger.info('Getting old tests comments ids')
+ logger.info("Getting old tests comments ids")
old_tests_comments_ids = []
for test in old_test_results:
- test_result = failed_tests.get_test_result(test['test_id'])
+ test_result = failed_tests.get_test_result(test["test_id"])
old_tests_comments_ids.append(test_result)
return old_tests_comments_ids
@@ -177,78 +209,114 @@
:param test_comment: string
:return: string
"""
- logger.info('Updating current test comment')
+ logger.info("Updating current test comment")
format_date = r"\d{4}-\d\d-\d\dT\d\d:\d\d:\d\dZ"
- format_uuid_a = r"[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}"
+ format_uuid_a = (
+ r"[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}"
+ r"\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}"
+ )
format_uuid_b = r"[0-9a-fA-F]{32}"
format_space = r" "
format_new_line = r"\n"
- for item in format_date, format_uuid_a, format_uuid_b, format_space, format_new_line:
- test_comment = re.sub(item, '', test_comment)
+ for item in (
+ format_date,
+ format_uuid_a,
+ format_uuid_b,
+ format_space,
+ format_new_line,
+ ):
+ test_comment = re.sub(item, "", test_comment)
return test_comment
def update_old_comments(defects_and_comments_from_old_tests):
- logger.info('Updating old test comment')
+ logger.info("Updating old test comment")
for item in defects_and_comments_from_old_tests:
- item['old_test_comment'] = update_test_comment(item['old_test_comment'])
+ item["old_test_comment"] = update_test_comment(
+ item["old_test_comment"]
+ )
return defects_and_comments_from_old_tests
def get_defects_and_comments_from_old_tests(old_tests_comments_ids):
- logger.info('Getting defects and comments from old tests')
+ logger.info("Getting defects and comments from old tests")
data_from_old_tests = []
- old_test_comment = ''
- old_test_defect = ''
+ old_test_comment = ""
+ old_test_defect = ""
for old_test_list in old_tests_comments_ids:
for old_test in old_test_list:
- if old_test['comment']:
- if len(old_test['comment']) > 50:
- old_test_comment = old_test['comment']
+ if old_test["comment"]:
+ if len(old_test["comment"]) > 50:
+ old_test_comment = old_test["comment"]
- if old_test['defects']:
- old_test_defect = old_test['defects']
+ if old_test["defects"]:
+ old_test_defect = old_test["defects"]
if old_test_comment and old_test_defect:
- data_from_old_tests.append({'old_test_comment': old_test_comment, 'old_test_defect': old_test_defect})
+ data_from_old_tests.append(
+ {
+ "old_test_comment": old_test_comment,
+ "old_test_defect": old_test_defect,
+ }
+ )
return data_from_old_tests
-def compare_comments(current_test_comment, defects_and_comments_from_old_tests, desired_ratio=0.7, test_id=''):
- logger.info('Comparing comments')
+def compare_comments(
+ current_test_comment,
+ defects_and_comments_from_old_tests,
+ desired_ratio=0.7,
+ test_id="",
+):
+ logger.info("Comparing comments")
if not desired_ratio:
desired_ratio = 0.75
- defect_for_update = ''
+ defect_for_update = ""
for item in defects_and_comments_from_old_tests:
- m = SequenceMatcher(None, current_test_comment, item['old_test_comment'])
+ m = SequenceMatcher(
+ None, current_test_comment, item["old_test_comment"]
+ )
my_ratio = m.ratio()
- logger.info('Low ratio: {}, Desired ratio {} Test https://mirantis.testrail.com/index.php?/tests/view/{} '
- 'will NOT be updated with issue {}'.format(my_ratio,
- desired_ratio,
- test_id,
- item['old_test_defect']))
+ logger.info(
+ "Low ratio: {}, Desired ratio {} Test "
+ "https://mirantis.testrail.com/index.php?/tests/view/{} "
+ "will NOT be updated with issue {}".format(
+ my_ratio, desired_ratio, test_id, item["old_test_defect"]
+ )
+ )
if my_ratio > desired_ratio:
- logger.info('!!!!! Desired ratio {}, Test Ratio: {} Jira issue: {}'.format(desired_ratio,
- my_ratio,
- item['old_test_defect']))
+ logger.info(
+ "!!!!! Desired ratio {}, Test Ratio: {} Jira issue: {}".format(
+ desired_ratio, my_ratio, item["old_test_defect"]
+ )
+ )
- defect_for_update = item['old_test_defect']
+ defect_for_update = item["old_test_defect"]
return defect_for_update
@click.command()
-@click.option('--run_id', default=1, type=click.STRING, help='Testrail run_id. For example, '
- 'https://mirantis.testrail.com/index.php?/runs/view/63288 '
- 'So run_id will be 63288')
-@click.option('--ratio', type=click.FLOAT, help='The ratio to comapare current console output and old one.')
+@click.option(
+ "--run_id",
+ default=1,
+ type=click.STRING,
+ help="Testrail run_id. For example, "
+ "https://mirantis.testrail.com/index.php?/runs/view/63288 "
+ "So run_id will be 63288",
+)
+@click.option(
+ "--ratio",
+ type=click.FLOAT,
+ help="The ratio to compare current console output and old one.",
+)
def get_failed_tests_history(run_id, ratio):
failed_tests = GetFailedTests(run_id)
@@ -256,30 +324,43 @@
for test in all_failed_tests:
- test_history = GetTestHistory(test['id'])
+ test_history = GetTestHistory(test["id"])
old_test_results = test_history.get_old_test_results()
- curr_tst_res = failed_tests.get_test_result(test['id'])
+ curr_tst_res = failed_tests.get_test_result(test["id"])
current_test_comment = get_current_test_comment(curr_tst_res)
current_test_comment = update_test_comment(current_test_comment)
- old_tests_comments_ids = get_old_tests_comments_ids(old_test_results, failed_tests)
+ old_tests_comments_ids = get_old_tests_comments_ids(
+ old_test_results, failed_tests
+ )
- defects_and_comments_from_old_tests = get_defects_and_comments_from_old_tests(old_tests_comments_ids)
- defects_and_comments_from_old_tests = update_old_comments(defects_and_comments_from_old_tests)
+ defects_and_comments_from_old_tests = (
+ get_defects_and_comments_from_old_tests(old_tests_comments_ids)
+ )
+ defects_and_comments_from_old_tests = update_old_comments(
+ defects_and_comments_from_old_tests
+ )
if defects_and_comments_from_old_tests:
- defect_for_update = compare_comments(current_test_comment,
- defects_and_comments_from_old_tests,
- desired_ratio=ratio,
- test_id=test['id'])
+ defect_for_update = compare_comments(
+ current_test_comment,
+ defects_and_comments_from_old_tests,
+ desired_ratio=ratio,
+ test_id=test["id"],
+ )
if defect_for_update:
- logger.info('!!!!! Updating test-case: https://mirantis.testrail.com/index.php?/tests/view/{} '
- 'with Jira issue {}'.format(test['id'], defect_for_update))
- failed_tests.update_test_results(test_id=test['id'], defects=defect_for_update)
+ logger.info(
+ "!!!!! Updating test-case: https://mirantis.testrail.com"
+ "/index.php?/tests/view/{} "
+ "with Jira issue {}".format(test["id"], defect_for_update)
+ )
+ failed_tests.update_test_results(
+ test_id=test["id"], defects=defect_for_update
+ )
-if __name__ == '__main__':
+if __name__ == "__main__":
get_failed_tests_history()
diff --git a/parcing_testrail_results/testrail.py b/parcing_testrail_results/testrail.py
index 7ed5900..42dfbce 100644
--- a/parcing_testrail_results/testrail.py
+++ b/parcing_testrail_results/testrail.py
@@ -10,88 +10,93 @@
# Copyright Gurock Software GmbH. See license.md for details.
#
-import urllib.request, urllib.error
-import json, base64
+import base64
+import json
import time
+import urllib.error
+import urllib.request
+
class APIClient:
- def __init__(self, base_url):
- self.user = ''
- self.password = ''
- if not base_url.endswith('/'):
- base_url += '/'
- self.__url = base_url + 'index.php?/api/v2/'
+ def __init__(self, base_url):
+ self.user = ""
+ self.password = ""
+ if not base_url.endswith("/"):
+ base_url += "/"
+ self.__url = base_url + "index.php?/api/v2/"
- #
- # Send Get
- #
- # Issues a GET request (read) against the API and returns the result
- # (as Python dict).
- #
- # Arguments:
- #
- # uri The API method to call including parameters
- # (e.g. get_case/1)
- #
- def send_get(self, uri):
- try:
- return self.__send_request('GET', uri, None)
- except APIError:
- print("Got an API Exception. Waiting 30 sec.")
- time.sleep(30)
- return self.__send_request('GET', uri, None)
+ #
+ # Send Get
+ #
+ # Issues a GET request (read) against the API and returns the result
+ # (as Python dict).
+ #
+ # Arguments:
+ #
+ # uri The API method to call including parameters
+ # (e.g. get_case/1)
+ #
+ def send_get(self, uri):
+ try:
+ return self.__send_request("GET", uri, None)
+ except APIError:
+ print("Got an API Exception. Waiting 30 sec.")
+ time.sleep(30)
+ return self.__send_request("GET", uri, None)
- #
- # Send POST
- #
- # Issues a POST request (write) against the API and returns the result
- # (as Python dict).
- #
- # Arguments:
- #
- # uri The API method to call including parameters
- # (e.g. add_case/1)
- # data The data to submit as part of the request (as
- # Python dict, strings must be UTF-8 encoded)
- #
- def send_post(self, uri, data):
- return self.__send_request('POST', uri, data)
+ #
+ # Send POST
+ #
+ # Issues a POST request (write) against the API and returns the result
+ # (as Python dict).
+ #
+ # Arguments:
+ #
+ # uri The API method to call including parameters
+ # (e.g. add_case/1)
+ # data The data to submit as part of the request (as
+ # Python dict, strings must be UTF-8 encoded)
+ #
+ def send_post(self, uri, data):
+ return self.__send_request("POST", uri, data)
- def __send_request(self, method, uri, data):
- url = self.__url + uri
- request = urllib.request.Request(url)
- if (method == 'POST'):
- request.data = bytes(json.dumps(data), 'utf-8')
- auth = str(
- base64.b64encode(
- bytes('%s:%s' % (self.user, self.password), 'utf-8')
- ),
- 'ascii'
- ).strip()
- request.add_header('Authorization', 'Basic %s' % auth)
- request.add_header('Content-Type', 'application/json')
+ def __send_request(self, method, uri, data):
+ url = self.__url + uri
+ request = urllib.request.Request(url)
+ if method == "POST":
+ request.data = bytes(json.dumps(data), "utf-8")
+ auth = str(
+ base64.b64encode(
+ bytes("%s:%s" % (self.user, self.password), "utf-8")
+ ),
+ "ascii",
+ ).strip()
+ request.add_header("Authorization", "Basic %s" % auth)
+ request.add_header("Content-Type", "application/json")
- e = None
- try:
- response = urllib.request.urlopen(request).read()
- except urllib.error.HTTPError as ex:
- response = ex.read()
- e = ex
+ e = None
+ try:
+ response = urllib.request.urlopen(request).read()
+ except urllib.error.HTTPError as ex:
+ response = ex.read()
+ e = ex
- if response:
- result = json.loads(response.decode())
- else:
- result = {}
+ if response:
+ result = json.loads(response.decode())
+ else:
+ result = {}
- if e != None:
- if result and 'error' in result:
- error = '"' + result['error'] + '"'
- else:
- error = 'No additional error message received'
- raise APIError('TestRail API returned HTTP %s (%s)' %
- (e.code, error))
+ if e is not None:
+ if result and "error" in result:
+ error = '"' + result["error"] + '"'
+ else:
+ error = "No additional error message received"
+ raise APIError(
+ "TestRail API returned HTTP %s (%s)" % (e.code, error)
+ )
- return result
+ return result
+
class APIError(Exception):
- pass
\ No newline at end of file
+ pass