]> xenbits.xensource.com Git - osstest/openstack-nova.git/commitdiff
libvirt: Call host connection callbacks asynchronously
authorMatthew Booth <mbooth@redhat.com>
Thu, 3 Nov 2016 13:08:13 +0000 (13:08 +0000)
committerMatthew Booth <mbooth@redhat.com>
Wed, 23 Nov 2016 17:53:44 +0000 (17:53 +0000)
Host.get_connection() does 2 types of job when initialising a new
connection:

1. Register event handling callbacks on the connection itself.
2. Call the _set_host_enabled in the driver to enable/disable the
   compute service.

The first is essential to run before the connection is used for
anything. The second needs only to run eventually once the connection
is established.

This patch creates a new helper thread which runs connection up/down
callbacks asynchronously to the caller. This means that the caller of
get_connection() returns as soon as the connection is usable, and the
driver callback happens concurrently. This is a minor improvement
currently, but will be more useful when we add additional work to the
connection up/down event. It also means that the callback runs without
holding _wrapped_conn_lock, which it doesn't need. This means the
callback itself can call get_connection() without deadlocking, and use
the connection which was just initialised.

In updating the callback logic, we also fix a minor bug: previously we
would not generate down/up events if we detected a failed connection
and succeeded creating a new one on the first attempt. This is
currently only of limited importance, as we are only marking the
service down or up for the scheduler, but could be more significant
when we add additional work to the callback.

Change-Id: Idf0f20d711f015e9f1331d5f65397aca2d67951a

nova/tests/unit/virt/libvirt/test_driver.py
nova/tests/unit/virt/libvirt/test_host.py
nova/tests/unit/virt/test_virt_drivers.py
nova/virt/libvirt/host.py

index ea21da60ff7ab6db75ee036647cc83d19b7f8262..1396c26206d17347dfcbe43ebc7c61bf6d397d30 100644 (file)
@@ -10982,6 +10982,7 @@ class LibvirtConnTestCase(test.NoDBTestCase):
 
             drvr.init_host("wibble")
             drvr.get_num_instances()
+            drvr._host._dispatch_conn_event()
             self.assertFalse(service_mock.disabled)
             self.assertIsNone(service_mock.disabled_reason)
 
index a1627ac1c0be32af01cef061c19216b8f981bb96..456560dc81af4c7c4b412447bb446c36cae1b9f4 100644 (file)
@@ -33,6 +33,11 @@ from nova.virt.libvirt import guest as libvirt_guest
 from nova.virt.libvirt import host
 
 
+class StringMatcher(object):
+    def __eq__(self, other):
+        return isinstance(other, six.string_types)
+
+
 class FakeVirtDomain(object):
 
     def __init__(self, id=-1, name=None):
@@ -301,6 +306,69 @@ class HostTestCase(test.NoDBTestCase):
         self.assertEqual(self.connect_calls, 1)
         self.assertEqual(self.register_calls, 1)
 
+    @mock.patch.object(host.Host, "_connect")
+    def test_conn_event(self, mock_conn):
+        handler = mock.MagicMock()
+        h = host.Host("qemu:///system", conn_event_handler=handler)
+
+        h.get_connection()
+        h._dispatch_conn_event()
+
+        handler.assert_called_once_with(True, None)
+
+    @mock.patch.object(host.Host, "_connect")
+    def test_conn_event_fail(self, mock_conn):
+        handler = mock.MagicMock()
+        h = host.Host("qemu:///system", conn_event_handler=handler)
+        mock_conn.side_effect = fakelibvirt.libvirtError('test')
+
+        self.assertRaises(exception.HypervisorUnavailable, h.get_connection)
+        h._dispatch_conn_event()
+
+        handler.assert_called_once_with(False, StringMatcher())
+
+        # Attempt to get a second connection, and assert that we don't add
+        # queue a second callback. Note that we can't call
+        # _dispatch_conn_event() and assert no additional call to the handler
+        # here as above. This is because we haven't added an event, so it would
+        # block. We mock the helper method which queues an event for callback
+        # instead.
+        with mock.patch.object(h, '_queue_conn_event_handler') as mock_queue:
+            self.assertRaises(exception.HypervisorUnavailable,
+                              h.get_connection)
+            mock_queue.assert_not_called()
+
+    @mock.patch.object(host.Host, "_test_connection")
+    @mock.patch.object(host.Host, "_connect")
+    def test_conn_event_up_down(self, mock_conn, mock_test_conn):
+        handler = mock.MagicMock()
+        h = host.Host("qemu:///system", conn_event_handler=handler)
+        mock_conn.side_effect = (mock.MagicMock(),
+                                 fakelibvirt.libvirtError('test'))
+        mock_test_conn.return_value = False
+
+        h.get_connection()
+        self.assertRaises(exception.HypervisorUnavailable, h.get_connection)
+        h._dispatch_conn_event()
+        h._dispatch_conn_event()
+
+        handler.assert_has_calls([
+            mock.call(True, None),
+            mock.call(False, StringMatcher())
+        ])
+
+    @mock.patch.object(host.Host, "_connect")
+    def test_conn_event_thread(self, mock_conn):
+        event = eventlet.event.Event()
+        h = host.Host("qemu:///system", conn_event_handler=event.send)
+        h.initialize()
+
+        h.get_connection()
+        event.wait()
+        # This test will timeout if it fails. Success is implicit in a
+        # timely return from wait(), indicating that the connection event
+        # handler was called.
+
     @mock.patch.object(fakelibvirt.virConnect, "getLibVersion")
     @mock.patch.object(fakelibvirt.virConnect, "getVersion")
     @mock.patch.object(fakelibvirt.virConnect, "getType")
index 3c2f36e2bd959ed1a37540f374543c239be01402..0be898b5dc0821d388d151770da81b69dbf50c53 100644 (file)
@@ -104,6 +104,10 @@ class _FakeDriverBackendTestCase(object):
             'nova.virt.libvirt.driver.connector',
             fake_os_brick_connector))
 
+        self.useFixture(fixtures.MonkeyPatch(
+            'nova.virt.libvirt.host.Host._conn_event_thread',
+            lambda *args: None))
+
         self.flags(rescue_image_id="2",
                    rescue_kernel_id="3",
                    rescue_ramdisk_id=None,
index 1e737902c1351a2e7b80abdb6a5c5c841220a57f..5ccdc20855def808d4fe173e5c5c4eeb67093648 100644 (file)
@@ -86,7 +86,9 @@ class Host(object):
 
         self._uri = uri
         self._read_only = read_only
+        self._initial_connection = True
         self._conn_event_handler = conn_event_handler
+        self._conn_event_handler_queue = six.moves.queue.Queue()
         self._lifecycle_event_handler = lifecycle_event_handler
         self._skip_list_all_domains = False
         self._caps = None
@@ -130,6 +132,26 @@ class Host(object):
         while True:
             self._dispatch_events()
 
+    def _conn_event_thread(self):
+        """Dispatches async connection events"""
+        # NOTE(mdbooth): This thread doesn't need to jump through the same
+        # hoops as _dispatch_thread because it doesn't interact directly
+        # with the libvirt native thread.
+        while True:
+            self._dispatch_conn_event()
+
+    def _dispatch_conn_event(self):
+        # NOTE(mdbooth): Splitting out this loop looks redundant, but it
+        # means we can easily dispatch events synchronously from tests and
+        # it isn't completely awful.
+        handler = self._conn_event_handler_queue.get()
+        try:
+            handler()
+        except Exception:
+            LOG.exception(_LE('Exception handling connection event'))
+        finally:
+            self._conn_event_handler_queue.task_done()
+
     @staticmethod
     def _event_lifecycle_callback(conn, dom, event, detail, opaque):
         """Receives lifecycle events from libvirt.
@@ -261,8 +283,7 @@ class Host(object):
                 reason = str(last_close_event['reason'])
                 msg = _("Connection to libvirt lost: %s") % reason
                 self._wrapped_conn = None
-                if self._conn_event_handler is not None:
-                    self._conn_event_handler(False, msg)
+                self._queue_conn_event_handler(False, msg)
 
     def _event_emit_delayed(self, event):
         """Emit events - possibly delayed."""
@@ -344,21 +365,9 @@ class Host(object):
     def _get_new_connection(self):
         # call with _wrapped_conn_lock held
         LOG.debug('Connecting to libvirt: %s', self._uri)
-        wrapped_conn = None
-
-        try:
-            wrapped_conn = self._connect(self._uri, self._read_only)
-        finally:
-            # Enabling the compute service, in case it was disabled
-            # since the connection was successful.
-            disable_reason = None
-            if not wrapped_conn:
-                disable_reason = 'Failed to connect to libvirt'
-
-            if self._conn_event_handler is not None:
-                self._conn_event_handler(bool(wrapped_conn), disable_reason)
 
-        self._wrapped_conn = wrapped_conn
+        # This will raise an exception on failure
+        wrapped_conn = self._connect(self._uri, self._read_only)
 
         try:
             LOG.debug("Registering for lifecycle events %s", self)
@@ -390,14 +399,47 @@ class Host(object):
 
         return wrapped_conn
 
+    def _queue_conn_event_handler(self, *args, **kwargs):
+        if self._conn_event_handler is None:
+            return
+
+        def handler():
+            return self._conn_event_handler(*args, **kwargs)
+
+        self._conn_event_handler_queue.put(handler)
+
     def _get_connection(self):
         # multiple concurrent connections are protected by _wrapped_conn_lock
         with self._wrapped_conn_lock:
-            wrapped_conn = self._wrapped_conn
-            if not wrapped_conn or not self._test_connection(wrapped_conn):
-                wrapped_conn = self._get_new_connection()
+            # Drop the existing connection if it is not usable
+            if (self._wrapped_conn is not None and
+                    not self._test_connection(self._wrapped_conn)):
+                self._wrapped_conn = None
+                # Connection was previously up, and went down
+                self._queue_conn_event_handler(
+                    False, _('Connection to libvirt lost'))
 
-        return wrapped_conn
+            if self._wrapped_conn is None:
+                try:
+                    # This will raise if it fails to get a connection
+                    self._wrapped_conn = self._get_new_connection()
+                except Exception as ex:
+                    with excutils.save_and_reraise_exception():
+                        # If we previously had a connection and it went down,
+                        # we generated a down event for that above.
+                        # We also want to generate a down event for an initial
+                        # failure, which won't be handled above.
+                        if self._initial_connection:
+                            self._queue_conn_event_handler(
+                                False,
+                                _('Failed to connect to libvirt: %(msg)s') %
+                                {'msg': ex})
+                finally:
+                    self._initial_connection = False
+
+                self._queue_conn_event_handler(True, None)
+
+        return self._wrapped_conn
 
     def get_connection(self):
         """Returns a connection to the hypervisor
@@ -433,6 +475,10 @@ class Host(object):
         libvirt.registerErrorHandler(self._libvirt_error_handler, None)
         libvirt.virEventRegisterDefaultImpl()
         self._init_events()
+
+        LOG.debug("Starting connection event dispatch thread")
+        utils.spawn(self._conn_event_thread)
+
         self._initialized = True
 
     def _version_check(self, lv_ver=None, hv_ver=None, hv_type=None,