import asyncio
from typing import (
+ Any,
Awaitable,
+ Dict,
List,
Optional,
TypeVar,
)
import qemu.qmp
-from qemu.qmp import QMPMessage, QMPReturnValue, SocketAddrT
from .error import AQMPError
-from .protocol import Runstate
+from .protocol import Runstate, SocketAddrT
from .qmp_client import QMPClient
+#: QMPMessage is an entire QMP message of any kind.
+QMPMessage = Dict[str, Any]
+
+#: QMPReturnValue is the 'return' value of a command.
+QMPReturnValue = object
+
+#: QMPObject is any object in a QMP message.
+QMPObject = Dict[str, object]
+
+# QMPMessage can be outgoing commands or incoming events/returns.
+# QMPReturnValue is usually a dict/json object, but due to QAPI's
+# 'returns-whitelist', it can actually be anything.
+#
+# {'return': {}} is a QMPMessage,
+# {} is the QMPReturnValue.
+
+
# pylint: disable=missing-docstring
_U = TypeVar('_U')
_TaskFN = Callable[[], Awaitable[None]] # aka ``async def func() -> None``
+InternetAddrT = Tuple[str, int]
+UnixAddrT = str
+SocketAddrT = Union[UnixAddrT, InternetAddrT]
+
class Runstate(Enum):
"""Protocol session runstate."""
@upper_half
@require(Runstate.IDLE)
- async def accept(self, address: Union[str, Tuple[str, int]],
+ async def accept(self, address: SocketAddrT,
ssl: Optional[SSLContext] = None) -> None:
"""
Accept a connection and begin processing message queues.
@upper_half
@require(Runstate.IDLE)
- async def connect(self, address: Union[str, Tuple[str, int]],
+ async def connect(self, address: SocketAddrT,
ssl: Optional[SSLContext] = None) -> None:
"""
Connect to the server and begin processing message queues.
@upper_half
async def _new_session(self,
- address: Union[str, Tuple[str, int]],
+ address: SocketAddrT,
ssl: Optional[SSLContext] = None,
accept: bool = False) -> None:
"""
@upper_half
async def _establish_connection(
self,
- address: Union[str, Tuple[str, int]],
+ address: SocketAddrT,
ssl: Optional[SSLContext] = None,
accept: bool = False
) -> None:
await self._do_connect(address, ssl)
@upper_half
- async def _do_accept(self, address: Union[str, Tuple[str, int]],
+ async def _do_accept(self, address: SocketAddrT,
ssl: Optional[SSLContext] = None) -> None:
"""
Acting as the transport server, accept a single connection.
self.logger.debug("Connection accepted.")
@upper_half
- async def _do_connect(self, address: Union[str, Tuple[str, int]],
+ async def _do_connect(self, address: SocketAddrT,
ssl: Optional[SSLContext] = None) -> None:
"""
Acting as the transport client, initiate a connection to a server.