blob: 0cbeb41ef70438972129bdbf5d6032b6b800eb68 [file] [log] [blame]
# Copyright 2016-2017 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ssl
import struct
import urllib.parse as urlparse
import urllib3
import websocket
from tempest.api.compute import base
from tempest.common import compute
from tempest import config
from tempest.lib import decorators
CONF = config.CONF
class ConsoleTestBase(base.BaseV2ComputeTest):
create_default_network = True
def setUp(self):
super(ConsoleTestBase, self).setUp()
self._websocket = None
def tearDown(self):
super(ConsoleTestBase, self).tearDown()
if self._websocket is not None:
self._websocket.close()
# NOTE(zhufl): Because server_check_teardown will raise Exception
# which will prevent other cleanup steps from being executed, so
# server_check_teardown should be called after super's tearDown.
self.server_check_teardown()
@classmethod
def setup_clients(cls):
super(ConsoleTestBase, cls).setup_clients()
cls.client = cls.servers_client
@classmethod
def resource_setup(cls):
super(ConsoleTestBase, cls).resource_setup()
cls.server = cls.create_test_server(wait_until="ACTIVE")
cls.use_get_remote_console = False
if not cls.is_requested_microversion_compatible("2.5"):
cls.use_get_remote_console = True
@property
def cert_params(self):
ssl_opt = {}
if CONF.identity.disable_ssl_certificate_validation:
ssl_opt["cert_reqs"] = ssl.CERT_NONE
else:
ssl_opt["ca_certs"] = CONF.identity.ca_certificates_file
return ssl_opt
def _validate_html(self, url, js_title):
"""Verify we can connect to console and get back the javascript."""
resp = urllib3.PoolManager(**self.cert_params).request("GET", url)
# Make sure that the GET request was accepted by the console proxy
self.assertEqual(
resp.status,
200,
"Got a Bad HTTP Response on the "
"initial call: " + str(resp.status),
)
# Do some basic validation to make sure it is an expected HTML document
resp_data = resp.data.decode()
# This is needed in the case of example: <html lang="en">
self.assertRegex(
resp_data, "<html.*>", "Not a valid html document in the response."
)
self.assertIn(
"</html>", resp_data, "Not a valid html document in the response."
)
# Just try to make sure we got JavaScript back for console, since we
# won't actually use it since not inside of a browser
self.assertIn(
js_title,
resp_data,
"Not a valid console javascript html document.",
)
self.assertIn(
"<script",
resp_data,
"Not a valid console javascript html document.",
)
def _validate_websocket_upgrade(self):
"""Verify that the websocket upgrade was successful.
Parses response and ensures that required response
fields are present and accurate.
(https://tools.ietf.org/html/rfc7231#section-6.2.2)
"""
self.assertTrue(
self._websocket.response.startswith(
b"HTTP/1.1 101 Switching Protocols"
),
"Incorrect HTTP return status code: {}".format(
str(self._websocket.response)
),
)
_required_header = "upgrade: websocket"
_response = str(self._websocket.response).lower()
self.assertIn(
_required_header,
_response,
"Did not get the expected WebSocket HTTP Response.",
)
def _get_console_body(self, type, protocol, get_console):
if self.use_get_remote_console:
return self.client.get_remote_console(
self.server["id"], type=type, protocol=protocol
)["remote_console"]
return getattr(self.client, get_console)(self.server["id"], type=type)[
"console"
]
def _test_console_bad_token(self, type, protocol, get_console):
body = self._get_console_body(type, protocol, get_console)
self.assertEqual(type, body["type"])
# Do the WebSockify HTTP Request to console proxy with a bad token
parts = urlparse.urlparse(body["url"])
qparams = urlparse.parse_qs(parts.query)
if "path" in qparams:
qparams["path"] = urlparse.unquote(qparams["path"][0]).replace(
"token=", "token=bad"
)
elif "token" in qparams:
qparams["token"] = "bad" + qparams["token"][0]
new_query = urlparse.urlencode(qparams)
new_parts = urlparse.ParseResult(
parts.scheme,
parts.netloc,
parts.path,
parts.params,
new_query,
parts.fragment,
)
url = urlparse.urlunparse(new_parts)
self._websocket = compute.create_websocket(url)
# Make sure the console proxy rejected the connection and closed it
data = self._websocket.receive_frame()
self.assertTrue(
data is None or not data,
"The console proxy actually sent us some data, but we "
"expected it to close the connection.",
)
class NoVNCConsoleTestJSON(ConsoleTestBase):
"""Test novnc console"""
@classmethod
def skip_checks(cls):
super(NoVNCConsoleTestJSON, cls).skip_checks()
if not CONF.compute_feature_enabled.vnc_console:
raise cls.skipException("VNC Console feature is disabled.")
def _validate_rfb_negotiation(self):
"""Verify we can connect to novnc and do the websocket connection."""
# Turn the Socket into a WebSocket to do the communication
data = self._websocket.receive_frame()
self.assertFalse(
data is None or not data,
"Token must be invalid because the connection closed.",
)
# Parse the RFB version from the data to make sure it is valid
# and belong to the known supported RFB versions.
version = float(
"%d.%d" % (int(data[4:7], base=10), int(data[8:11], base=10))
)
# Add the max RFB versions supported
supported_versions = [3.3, 3.8]
self.assertIn(
version, supported_versions, "Bad RFB Version: " + str(version)
)
# Send our RFB version to the server
self._websocket.send_frame(data)
# Get the sever authentication type and make sure None is supported
data = self._websocket.receive_frame()
self.assertIsNotNone(data, "Expected authentication type None.")
data_length = len(data)
if version == 3.3:
# For RFB 3.3: in the security handshake, rather than a two-way
# negotiation, the server decides the security type and sends a
# single word(4 bytes).
self.assertEqual(
data_length, 4, "Expected authentication type None."
)
self.assertIn(
1,
[int(data[i]) for i in (0, 3)],
"Expected authentication type None.",
)
else:
self.assertGreaterEqual(
len(data), 2, "Expected authentication type None."
)
self.assertIn(
1,
[int(data[i + 1]) for i in range(int(data[0]))],
"Expected authentication type None.",
)
# Send to the server that we only support authentication
# type None
self._websocket.send_frame(bytes((1,)))
# The server should send 4 bytes of 0's if security
# handshake succeeded
data = self._websocket.receive_frame()
self.assertEqual(
len(data), 4, "Server did not think security was successful."
)
self.assertEqual(
[int(i) for i in data],
[0, 0, 0, 0],
"Server did not think security was successful.",
)
# Say to leave the desktop as shared as part of client initialization
self._websocket.send_frame(bytes((1,)))
# Get the server initialization packet back and make sure it is the
# right structure where bytes 20-24 is the name length and
# 24-N is the name
data = self._websocket.receive_frame()
data_length = len(data) if data is not None else 0
self.assertFalse(
data_length <= 24 or
data_length != (struct.unpack(">L", data[20:24])[0] + 24),
"Server initialization was not the right format.",
)
# Since the rest of the data on the screen is arbitrary, we will
# close the socket and end our validation of the data at this point
# Assert that the latest check was false, meaning that the server
# initialization was the right format
self.assertFalse(
data_length <= 24 or
data_length != (struct.unpack(">L", data[20:24])[0] + 24)
)
@decorators.idempotent_id("c640fdff-8ab4-45a4-a5d8-7e6146cbd0dc")
def test_novnc(self):
"""Test accessing novnc console of server"""
body = self._get_console_body("novnc", "vnc", "get_vnc_console")
self.assertEqual("novnc", body["type"])
# Do the initial HTTP Request to novncproxy to get the JavaScript
self._validate_html(body["url"], "noVNC")
# Do the WebSockify HTTP Request to novncproxy to do the RFB connection
self._websocket = compute.create_websocket(body["url"])
# Validate that we successfully connected and upgraded to Web Sockets
self._validate_websocket_upgrade()
# Validate the RFB Negotiation to determine if a valid VNC session
self._validate_rfb_negotiation()
@decorators.idempotent_id("f9c79937-addc-4aaa-9e0e-841eef02aeb7")
def test_novnc_bad_token(self):
"""Test accessing novnc console with bad token
Do the WebSockify HTTP Request to novnc proxy with a bad token,
the novnc proxy should reject the connection and closed it.
"""
self._test_console_bad_token("novnc", "vnc", "get_vnc_console")
class SpiceConsoleTestJSON(ConsoleTestBase):
"""Test spice console"""
@classmethod
def skip_checks(cls):
super(SpiceConsoleTestJSON, cls).skip_checks()
if not CONF.compute_feature_enabled.spice_console:
raise cls.skipException("SPICE Console feature is disabled.")
def _validate_websocket_connection(self, body):
# Protocol Magic number UINT8[4] { 0x52, 0x45, 0x44, 0x51} // "REDQ"
spice_magic = b"REDQ"
scheme = {"https": "wss", "http": "ws"}
q = urlparse.urlparse(body["url"])
ws = websocket.WebSocket(sslopt=self.cert_params)
ws.connect(
f"{scheme[q.scheme]}://{q.netloc}/websockify", cookie=q.query,
subprotocols=["binary"]
)
ws.send_binary(b"\r\n\r\n")
opcode, data = ws.recv_data()
self.assertEqual(opcode, websocket.ABNF.OPCODE_BINARY)
self.assertTrue(data.startswith(spice_magic))
@decorators.idempotent_id("0914a681-72dd-4fad-8457-b45195373d3d")
def test_spice(self):
"""Test accessing spice console of server"""
body = self._get_console_body(
"spice-html5", "spice", "get_spice_console"
)
self.assertEqual("spice-html5", body["type"])
# Do the initial HTTP Request to spiceproxy to get the JavaScript
self._validate_html(body["url"], "Spice Javascript client")
# Validate that we successfully connected to Web Sockets
self._validate_websocket_connection(body)
@decorators.idempotent_id("6f4b0690-d078-4a28-a2ce-33dafdfca7ac")
def test_spice_bad_token(self):
"""Test accessing spice console with bad token
Do the WebSockify HTTP Request to spice proxy with a bad token,
the spice proxy should reject the connection and closed it.
"""
self._test_console_bad_token(
"spice-html5", "spice", "get_spice_console"
)