Add basic Keystone CLI tests
New cli.output_parser helpers for parsing 'ascii-tables'
often used in CLIs output.
New basic tests for keystone cli, verifies only read-only
actions (return codes and basic structure of output).
Change-Id: I4fea08b14e32c62c47e347b401e3f5703836c184
diff --git a/cli/__init__.py b/cli/__init__.py
index 37aec93..6ffe229 100644
--- a/cli/__init__.py
+++ b/cli/__init__.py
@@ -21,6 +21,7 @@
from oslo.config import cfg
+import cli.output_parser
import tempest.test
@@ -51,6 +52,7 @@
super(ClientTestBase, cls).setUpClass()
def __init__(self, *args, **kwargs):
+ self.parser = cli.output_parser
super(ClientTestBase, self).__init__(*args, **kwargs)
def nova(self, action, flags='', params='', admin=True, fail_ok=False):
@@ -63,6 +65,11 @@
return self.cmd(
'nova-manage', action, flags, params, fail_ok)
+ def keystone(self, action, flags='', params='', admin=True, fail_ok=False):
+ """Executes keystone command for the given action."""
+ return self.cmd_with_auth(
+ 'keystone', action, flags, params, admin, fail_ok)
+
def cmd_with_auth(self, cmd, action, flags='', params='',
admin=True, fail_ok=False):
"""Executes given command with auth attributes appended."""
@@ -86,3 +93,9 @@
LOG.error("command output:\n%s" % e.output)
raise
return result
+
+ def assertTableStruct(self, items, field_names):
+ """Verify that all items has keys listed in field_names."""
+ for item in items:
+ for field in field_names:
+ self.assertIn(field, item)
diff --git a/cli/output_parser.py b/cli/output_parser.py
new file mode 100644
index 0000000..840839b
--- /dev/null
+++ b/cli/output_parser.py
@@ -0,0 +1,168 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Collection of utilities for parsing CLI clients output."""
+
+
+import logging
+import re
+
+
+LOG = logging.getLogger(__name__)
+
+
+delimiter_line = re.compile('^\+\-[\+\-]+\-\+$')
+
+
+def details_multiple(output_lines, with_label=False):
+ """Return list of dicts with item details from cli output tables.
+
+ If with_label is True, key '__label' is added to each items dict.
+ For more about 'label' see OutputParser.tables().
+ """
+ items = []
+ tables_ = tables(output_lines)
+ for table_ in tables_:
+ if 'Property' not in table_['headers'] \
+ or 'Value' not in table_['headers']:
+ raise Exception('Invalid structure of table with details')
+ item = {}
+ for value in table_['values']:
+ item[value[0]] = value[1]
+ if with_label:
+ item['__label'] = table_['label']
+ items.append(item)
+ return items
+
+
+def details(output_lines, with_label=False):
+ """Return dict with details of first item (table) found in output."""
+ items = details_multiple(output_lines, with_label)
+ return items[0]
+
+
+def listing(output_lines):
+ """Return list of dicts with basic item info parsed from cli output.
+ """
+
+ items = []
+ table_ = table(output_lines)
+ for row in table_['values']:
+ item = {}
+ for col_idx, col_key in enumerate(table_['headers']):
+ item[col_key] = row[col_idx]
+ items.append(item)
+ return items
+
+
+def tables(output_lines):
+ """Find all ascii-tables in output and parse them.
+
+ Return list of tables parsed from cli output as dicts.
+ (see OutputParser.table())
+
+ And, if found, label key (separated line preceding the table)
+ is added to each tables dict.
+ """
+ tables_ = []
+
+ table_ = []
+ label = None
+
+ start = False
+ header = False
+
+ if not isinstance(output_lines, list):
+ output_lines = output_lines.split('\n')
+
+ for line in output_lines:
+ if delimiter_line.match(line):
+ if not start:
+ start = True
+ elif not header:
+ # we are after head area
+ header = True
+ else:
+ # table ends here
+ start = header = None
+ table_.append(line)
+
+ parsed = table(table_)
+ parsed['label'] = label
+ tables_.append(parsed)
+
+ table_ = []
+ label = None
+ continue
+ if start:
+ table_.append(line)
+ else:
+ if label is None:
+ label = line
+ else:
+ LOG.warn('Invalid line between tables: %s' % line)
+ if len(table_) > 0:
+ LOG.warn('Missing end of table')
+
+ return tables_
+
+
+def table(output_lines):
+ """Parse single table from cli output.
+
+ Return dict with list of column names in 'headers' key and
+ rows in 'values' key.
+ """
+ table_ = {'headers': [], 'values': []}
+ columns = None
+
+ if not isinstance(output_lines, list):
+ output_lines = output_lines.split('\n')
+
+ for line in output_lines:
+ if delimiter_line.match(line):
+ columns = _table_columns(line)
+ continue
+ if '|' not in line:
+ LOG.warn('skipping invalid table line: %s' % line)
+ continue
+ row = []
+ for col in columns:
+ row.append(line[col[0]:col[1]].strip())
+ if table_['headers']:
+ table_['values'].append(row)
+ else:
+ table_['headers'] = row
+
+ return table_
+
+
+def _table_columns(first_table_row):
+ """Find column ranges in output line.
+
+ Return list of touples (start,end) for each column
+ detected by plus (+) characters in delimiter line.
+ """
+ positions = []
+ start = 1 # there is '+' at 0
+ while start < len(first_table_row):
+ end = first_table_row.find('+', start)
+ if end == -1:
+ break
+ positions.append((start, end))
+ start = end + 1
+ return positions
diff --git a/cli/simple_read_only/test_keystone.py b/cli/simple_read_only/test_keystone.py
new file mode 100644
index 0000000..4b14c3c
--- /dev/null
+++ b/cli/simple_read_only/test_keystone.py
@@ -0,0 +1,109 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import re
+import subprocess
+
+import cli
+
+
+LOG = logging.getLogger(__name__)
+
+
+class SimpleReadOnlyKeystoneClientTest(cli.ClientTestBase):
+ """Basic, read-only tests for Keystone CLI client.
+
+ Checks return values and output of read-only commands.
+ These tests do not presume any content, nor do they create
+ their own. They only verify the structure of output if present.
+ """
+
+ def test_admin_fake_action(self):
+ self.assertRaises(subprocess.CalledProcessError,
+ self.keystone,
+ 'this-does-not-exist')
+
+ def test_admin_catalog_list(self):
+ out = self.keystone('catalog')
+ catalog = self.parser.details_multiple(out, with_label=True)
+ for svc in catalog:
+ self.assertTrue(svc['__label'].startswith('Service:'))
+
+ def test_admin_endpoint_list(self):
+ out = self.keystone('endpoint-list')
+ endpoints = self.parser.listing(out)
+ self.assertTableStruct(endpoints, [
+ 'id', 'region', 'publicurl', 'internalurl',
+ 'adminurl', 'service_id'])
+
+ def test_admin_endpoint_service_match(self):
+ endpoints = self.parser.listing(self.keystone('endpoint-list'))
+ services = self.parser.listing(self.keystone('service-list'))
+ svc_by_id = {}
+ for svc in services:
+ svc_by_id[svc['id']] = svc
+ for endpoint in endpoints:
+ self.assertIn(endpoint['service_id'], svc_by_id)
+
+ def test_admin_role_list(self):
+ roles = self.parser.listing(self.keystone('role-list'))
+ self.assertTableStruct(roles, ['id', 'name'])
+
+ def test_admin_service_list(self):
+ services = self.parser.listing(self.keystone('service-list'))
+ self.assertTableStruct(services, ['id', 'name', 'type', 'description'])
+
+ def test_admin_tenant_list(self):
+ tenants = self.parser.listing(self.keystone('tenant-list'))
+ self.assertTableStruct(tenants, ['id', 'name', 'enabled'])
+
+ def test_admin_user_list(self):
+ users = self.parser.listing(self.keystone('user-list'))
+ self.assertTableStruct(users, [
+ 'id', 'name', 'enabled', 'email'])
+
+ def test_admin_user_role_list(self):
+ user_roles = self.parser.listing(self.keystone('user-role-list'))
+ self.assertTableStruct(user_roles, [
+ 'id', 'name', 'user_id', 'tenant_id'])
+
+ def test_admin_discover(self):
+ discovered = self.keystone('discover')
+ self.assertIn('Keystone found at http', discovered)
+ self.assertIn('supports version', discovered)
+
+ def test_admin_help(self):
+ help_text = self.keystone('help')
+ lines = help_text.split('\n')
+ self.assertTrue(lines[0].startswith('usage: keystone'))
+
+ commands = []
+ cmds_start = lines.index('Positional arguments:')
+ cmds_end = lines.index('Optional arguments:')
+ command_pattern = re.compile('^ {4}([a-z0-9\-\_]+)')
+ for line in lines[cmds_start:cmds_end]:
+ match = command_pattern.match(line)
+ if match:
+ commands.append(match.group(1))
+ commands = set(commands)
+ wanted_commands = set(('catalog', 'endpoint-list', 'help',
+ 'token-get', 'discover', 'bootstrap'))
+ self.assertFalse(wanted_commands - commands)
+
+ def test_admin_bashcompletion(self):
+ self.keystone('bash-completion')