]> xenbits.xensource.com Git - qemu-xen.git/commitdiff
python/aqmp: copy type definitions from qmp
authorJohn Snow <jsnow@redhat.com>
Mon, 10 Jan 2022 23:28:47 +0000 (18:28 -0500)
committerJohn Snow <jsnow@redhat.com>
Fri, 21 Jan 2022 21:01:31 +0000 (16:01 -0500)
Copy the remaining type definitions from QMP into the qemu.aqmp.legacy
module. Now, users that require the legacy interface don't need to
import anything else but qemu.aqmp.legacy wrapper.

Signed-off-by: John Snow <jsnow@redhat.com>
Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
Reviewed-by: Beraldo Leal <bleal@redhat.com>
python/qemu/aqmp/legacy.py
python/qemu/aqmp/protocol.py

index 2ccb136b02c1fcf586507400c056be659a84e331..9431fe933019b1e1c221ea3ab7bb426967d0f0cc 100644 (file)
@@ -6,7 +6,9 @@ This class pretends to be qemu.qmp.QEMUMonitorProtocol.
 
 import asyncio
 from typing import (
+    Any,
     Awaitable,
+    Dict,
     List,
     Optional,
     TypeVar,
@@ -14,13 +16,29 @@ from typing import (
 )
 
 import qemu.qmp
-from qemu.qmp import QMPMessage, QMPReturnValue, SocketAddrT
 
 from .error import AQMPError
-from .protocol import Runstate
+from .protocol import Runstate, SocketAddrT
 from .qmp_client import QMPClient
 
 
+#: QMPMessage is an entire QMP message of any kind.
+QMPMessage = Dict[str, Any]
+
+#: QMPReturnValue is the 'return' value of a command.
+QMPReturnValue = object
+
+#: QMPObject is any object in a QMP message.
+QMPObject = Dict[str, object]
+
+# QMPMessage can be outgoing commands or incoming events/returns.
+# QMPReturnValue is usually a dict/json object, but due to QAPI's
+# 'returns-whitelist', it can actually be anything.
+#
+# {'return': {}} is a QMPMessage,
+# {} is the QMPReturnValue.
+
+
 # pylint: disable=missing-docstring
 
 
index c4fbe35a0e41c589059ec4fa37a816f4b76a3bd8..5b4f2f0d0a81a0d2902358e9b799d28108d188f3 100644 (file)
@@ -46,6 +46,10 @@ T = TypeVar('T')
 _U = TypeVar('_U')
 _TaskFN = Callable[[], Awaitable[None]]  # aka ``async def func() -> None``
 
+InternetAddrT = Tuple[str, int]
+UnixAddrT = str
+SocketAddrT = Union[UnixAddrT, InternetAddrT]
+
 
 class Runstate(Enum):
     """Protocol session runstate."""
@@ -257,7 +261,7 @@ class AsyncProtocol(Generic[T]):
 
     @upper_half
     @require(Runstate.IDLE)
-    async def accept(self, address: Union[str, Tuple[str, int]],
+    async def accept(self, address: SocketAddrT,
                      ssl: Optional[SSLContext] = None) -> None:
         """
         Accept a connection and begin processing message queues.
@@ -275,7 +279,7 @@ class AsyncProtocol(Generic[T]):
 
     @upper_half
     @require(Runstate.IDLE)
-    async def connect(self, address: Union[str, Tuple[str, int]],
+    async def connect(self, address: SocketAddrT,
                       ssl: Optional[SSLContext] = None) -> None:
         """
         Connect to the server and begin processing message queues.
@@ -337,7 +341,7 @@ class AsyncProtocol(Generic[T]):
 
     @upper_half
     async def _new_session(self,
-                           address: Union[str, Tuple[str, int]],
+                           address: SocketAddrT,
                            ssl: Optional[SSLContext] = None,
                            accept: bool = False) -> None:
         """
@@ -397,7 +401,7 @@ class AsyncProtocol(Generic[T]):
     @upper_half
     async def _establish_connection(
             self,
-            address: Union[str, Tuple[str, int]],
+            address: SocketAddrT,
             ssl: Optional[SSLContext] = None,
             accept: bool = False
     ) -> None:
@@ -424,7 +428,7 @@ class AsyncProtocol(Generic[T]):
             await self._do_connect(address, ssl)
 
     @upper_half
-    async def _do_accept(self, address: Union[str, Tuple[str, int]],
+    async def _do_accept(self, address: SocketAddrT,
                          ssl: Optional[SSLContext] = None) -> None:
         """
         Acting as the transport server, accept a single connection.
@@ -482,7 +486,7 @@ class AsyncProtocol(Generic[T]):
         self.logger.debug("Connection accepted.")
 
     @upper_half
-    async def _do_connect(self, address: Union[str, Tuple[str, int]],
+    async def _do_connect(self, address: SocketAddrT,
                           ssl: Optional[SSLContext] = None) -> None:
         """
         Acting as the transport client, initiate a connection to a server.