]> xenbits.xensource.com Git - qemu-xen.git/commitdiff
python/aqmp: add LineProtocol tests
authorJohn Snow <jsnow@redhat.com>
Wed, 15 Sep 2021 16:29:54 +0000 (12:29 -0400)
committerJohn Snow <jsnow@redhat.com>
Mon, 27 Sep 2021 16:10:29 +0000 (12:10 -0400)
Tests a real connect, a real accept, and really sending and receiving a
message over a UNIX socket.

Brings coverage of protocol.py up to ~93%.

Signed-off-by: John Snow <jsnow@redhat.com>
Message-id: 20210915162955.333025-27-jsnow@redhat.com
Signed-off-by: John Snow <jsnow@redhat.com>
python/tests/protocol.py

index f0682d29ce59cb28c6488fad8bb3a002f8304922..5cd7938be35ec61a1d412728f64e447eb142499b 100644 (file)
@@ -78,6 +78,25 @@ class NullProtocol(AsyncProtocol[None]):
         self._schedule_disconnect()
 
 
+class LineProtocol(AsyncProtocol[str]):
+    def __init__(self, name=None):
+        super().__init__(name)
+        self.rx_history = []
+
+    async def _do_recv(self) -> str:
+        raw = await self._readline()
+        msg = raw.decode()
+        self.rx_history.append(msg)
+        return msg
+
+    def _do_send(self, msg: str) -> None:
+        assert self._writer is not None
+        self._writer.write(msg.encode() + b'\n')
+
+    async def send_msg(self, msg: str) -> None:
+        await self._outgoing.put(msg)
+
+
 def run_as_task(coro, allow_cancellation=False):
     """
     Run a given coroutine as a task.
@@ -533,3 +552,32 @@ class FakeSession(TestBase):
              " Call disconnect() to return to IDLE state."),
             accept=False,
         )
+
+
+class SimpleSession(TestBase):
+
+    def setUp(self):
+        super().setUp()
+        self.server = LineProtocol(type(self).__name__ + '-server')
+
+    async def _asyncSetUp(self):
+        await super()._asyncSetUp()
+        await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
+
+    async def _asyncTearDown(self):
+        await self.proto.disconnect()
+        try:
+            await self.server.disconnect()
+        except EOFError:
+            pass
+        await super()._asyncTearDown()
+
+    @TestBase.async_test
+    async def testSmoke(self):
+        with TemporaryDirectory(suffix='.aqmp') as tmpdir:
+            sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
+            server_task = create_task(self.server.accept(sock))
+
+            # give the server a chance to start listening [...]
+            await asyncio.sleep(0)
+            await self.proto.connect(sock)