from nova.virt.libvirt import host
+class StringMatcher(object):
+ def __eq__(self, other):
+ return isinstance(other, six.string_types)
+
+
class FakeVirtDomain(object):
def __init__(self, id=-1, name=None):
self.assertEqual(self.connect_calls, 1)
self.assertEqual(self.register_calls, 1)
+ @mock.patch.object(host.Host, "_connect")
+ def test_conn_event(self, mock_conn):
+ handler = mock.MagicMock()
+ h = host.Host("qemu:///system", conn_event_handler=handler)
+
+ h.get_connection()
+ h._dispatch_conn_event()
+
+ handler.assert_called_once_with(True, None)
+
+ @mock.patch.object(host.Host, "_connect")
+ def test_conn_event_fail(self, mock_conn):
+ handler = mock.MagicMock()
+ h = host.Host("qemu:///system", conn_event_handler=handler)
+ mock_conn.side_effect = fakelibvirt.libvirtError('test')
+
+ self.assertRaises(exception.HypervisorUnavailable, h.get_connection)
+ h._dispatch_conn_event()
+
+ handler.assert_called_once_with(False, StringMatcher())
+
+ # Attempt to get a second connection, and assert that we don't add
+ # queue a second callback. Note that we can't call
+ # _dispatch_conn_event() and assert no additional call to the handler
+ # here as above. This is because we haven't added an event, so it would
+ # block. We mock the helper method which queues an event for callback
+ # instead.
+ with mock.patch.object(h, '_queue_conn_event_handler') as mock_queue:
+ self.assertRaises(exception.HypervisorUnavailable,
+ h.get_connection)
+ mock_queue.assert_not_called()
+
+ @mock.patch.object(host.Host, "_test_connection")
+ @mock.patch.object(host.Host, "_connect")
+ def test_conn_event_up_down(self, mock_conn, mock_test_conn):
+ handler = mock.MagicMock()
+ h = host.Host("qemu:///system", conn_event_handler=handler)
+ mock_conn.side_effect = (mock.MagicMock(),
+ fakelibvirt.libvirtError('test'))
+ mock_test_conn.return_value = False
+
+ h.get_connection()
+ self.assertRaises(exception.HypervisorUnavailable, h.get_connection)
+ h._dispatch_conn_event()
+ h._dispatch_conn_event()
+
+ handler.assert_has_calls([
+ mock.call(True, None),
+ mock.call(False, StringMatcher())
+ ])
+
+ @mock.patch.object(host.Host, "_connect")
+ def test_conn_event_thread(self, mock_conn):
+ event = eventlet.event.Event()
+ h = host.Host("qemu:///system", conn_event_handler=event.send)
+ h.initialize()
+
+ h.get_connection()
+ event.wait()
+ # This test will timeout if it fails. Success is implicit in a
+ # timely return from wait(), indicating that the connection event
+ # handler was called.
+
@mock.patch.object(fakelibvirt.virConnect, "getLibVersion")
@mock.patch.object(fakelibvirt.virConnect, "getVersion")
@mock.patch.object(fakelibvirt.virConnect, "getType")
self._uri = uri
self._read_only = read_only
+ self._initial_connection = True
self._conn_event_handler = conn_event_handler
+ self._conn_event_handler_queue = six.moves.queue.Queue()
self._lifecycle_event_handler = lifecycle_event_handler
self._skip_list_all_domains = False
self._caps = None
while True:
self._dispatch_events()
+ def _conn_event_thread(self):
+ """Dispatches async connection events"""
+ # NOTE(mdbooth): This thread doesn't need to jump through the same
+ # hoops as _dispatch_thread because it doesn't interact directly
+ # with the libvirt native thread.
+ while True:
+ self._dispatch_conn_event()
+
+ def _dispatch_conn_event(self):
+ # NOTE(mdbooth): Splitting out this loop looks redundant, but it
+ # means we can easily dispatch events synchronously from tests and
+ # it isn't completely awful.
+ handler = self._conn_event_handler_queue.get()
+ try:
+ handler()
+ except Exception:
+ LOG.exception(_LE('Exception handling connection event'))
+ finally:
+ self._conn_event_handler_queue.task_done()
+
@staticmethod
def _event_lifecycle_callback(conn, dom, event, detail, opaque):
"""Receives lifecycle events from libvirt.
reason = str(last_close_event['reason'])
msg = _("Connection to libvirt lost: %s") % reason
self._wrapped_conn = None
- if self._conn_event_handler is not None:
- self._conn_event_handler(False, msg)
+ self._queue_conn_event_handler(False, msg)
def _event_emit_delayed(self, event):
"""Emit events - possibly delayed."""
def _get_new_connection(self):
# call with _wrapped_conn_lock held
LOG.debug('Connecting to libvirt: %s', self._uri)
- wrapped_conn = None
-
- try:
- wrapped_conn = self._connect(self._uri, self._read_only)
- finally:
- # Enabling the compute service, in case it was disabled
- # since the connection was successful.
- disable_reason = None
- if not wrapped_conn:
- disable_reason = 'Failed to connect to libvirt'
-
- if self._conn_event_handler is not None:
- self._conn_event_handler(bool(wrapped_conn), disable_reason)
- self._wrapped_conn = wrapped_conn
+ # This will raise an exception on failure
+ wrapped_conn = self._connect(self._uri, self._read_only)
try:
LOG.debug("Registering for lifecycle events %s", self)
return wrapped_conn
+ def _queue_conn_event_handler(self, *args, **kwargs):
+ if self._conn_event_handler is None:
+ return
+
+ def handler():
+ return self._conn_event_handler(*args, **kwargs)
+
+ self._conn_event_handler_queue.put(handler)
+
def _get_connection(self):
# multiple concurrent connections are protected by _wrapped_conn_lock
with self._wrapped_conn_lock:
- wrapped_conn = self._wrapped_conn
- if not wrapped_conn or not self._test_connection(wrapped_conn):
- wrapped_conn = self._get_new_connection()
+ # Drop the existing connection if it is not usable
+ if (self._wrapped_conn is not None and
+ not self._test_connection(self._wrapped_conn)):
+ self._wrapped_conn = None
+ # Connection was previously up, and went down
+ self._queue_conn_event_handler(
+ False, _('Connection to libvirt lost'))
- return wrapped_conn
+ if self._wrapped_conn is None:
+ try:
+ # This will raise if it fails to get a connection
+ self._wrapped_conn = self._get_new_connection()
+ except Exception as ex:
+ with excutils.save_and_reraise_exception():
+ # If we previously had a connection and it went down,
+ # we generated a down event for that above.
+ # We also want to generate a down event for an initial
+ # failure, which won't be handled above.
+ if self._initial_connection:
+ self._queue_conn_event_handler(
+ False,
+ _('Failed to connect to libvirt: %(msg)s') %
+ {'msg': ex})
+ finally:
+ self._initial_connection = False
+
+ self._queue_conn_event_handler(True, None)
+
+ return self._wrapped_conn
def get_connection(self):
"""Returns a connection to the hypervisor
libvirt.registerErrorHandler(self._libvirt_error_handler, None)
libvirt.virEventRegisterDefaultImpl()
self._init_events()
+
+ LOG.debug("Starting connection event dispatch thread")
+ utils.spawn(self._conn_event_thread)
+
self._initialized = True
def _version_check(self, lv_ver=None, hv_ver=None, hv_type=None,