import asyncio import itertools from pico import start_server, send_json, recv_json import websockets class Script: def __init__(self): self.events = [] def __add__(self, event): self.events.append(('send', event)) return self def __sub__(self, event): self.events.append(('recv', event)) return self async def _make_client(path, timeout, script): state = {} error = False await asyncio.sleep(timeout) async with websockets.connect(path) as websocket: for kind, message in script.events: if kind == 'recv': A, B = message, await recv_json(websocket) B.pop('ts') if B['kind'] == 'state': state.update(B) if A != B: error = True print('-', A) print('+', B) print() elif kind == 'send': await send_json(websocket, **message) while 'online' in state: if state['online'][0] == state['username']: break message = await recv_json(websocket) message.pop('ts') error = True print('+', message) print() state.update(message) if error: print('error') exit() async def _test(*clients): try: gather = asyncio.gather( start_server('localhost', 8642, 'TestServer'), *clients, ) server, *results = await asyncio.wait_for(gather, timeout=2) server.close() except asyncio.TimeoutError: return return results def test_happy_path(): client = _make_client('ws://localhost:8642/x', 0.1, Script() + {'kind': 'login', 'value': 'TestUser'} - {'kind': 'state', 'username': 'TestUser'} - {'kind': 'state', 'online': ['TestUser']} + {'kind': 'post', 'value': 'Hello World!'} - {'kind': 'post', 'value': 'Hello World!', 'source': 'TestUser'} ) return _test(client) def test_name_taken(): client1 = _make_client('ws://localhost:8642/x', 0.10, Script() + {'kind': 'login', 'value': 'A'} - {'kind': 'state', 'username': 'A'} - {'kind': 'state', 'online': ['A']} - {'kind': 'state', 'online': ['A', 'B']} ) client2 = _make_client('ws://localhost:8642/x', 0.11, Script() + {'kind': 'login', 'value': 'A'} - {'kind': 'error', 'value': 'Username taken'} ) client3 = _make_client('ws://localhost:8642/x', 0.12, Script() + {'kind': 'login', 'value': 'B'} - {'kind': 'state', 'username': 'B'} - {'kind': 'state', 'online': ['A', 'B']} - {'kind': 'state', 'online': ['B']} ) return _test(client1, client2, client3) def test_interaction(): client1 = _make_client('ws://localhost:8642/x', 0.10, Script() + {'kind': 'login', 'value': 'Alice'} - {'kind': 'state', 'username': 'Alice'} - {'kind': 'state', 'online': ['Alice']} - {'kind': 'state', 'online': ['Alice', 'Bob']} + {'kind': 'post', 'value': 'Hey Bob!'} - {'kind': 'post', 'value': 'Hey Bob!', 'source': 'Alice'} - {'kind': 'post', 'value': 'Howdy!', 'source': 'Bob'} ) client2 = _make_client('ws://localhost:8642/x', 0.11, Script() + {'kind': 'login', 'value': 'Bob'} - {'kind': 'state', 'username': 'Bob'} - {'kind': 'state', 'online': ['Alice', 'Bob']} - {'kind': 'post', 'value': 'Hey Bob!', 'source': 'Alice'} + {'kind': 'post', 'value': 'Howdy!'} - {'kind': 'post', 'value': 'Howdy!', 'source': 'Bob'} - {'kind': 'state', 'online': ['Bob']} ) return _test(client1, client2) def test_rooms(): client1 = _make_client('ws://localhost:8642/x', 0.10, Script() + {'kind': 'login', 'value': 'Dandy'} - {'kind': 'state', 'username': 'Dandy'} - {'kind': 'state', 'online': ['Dandy']} + {'kind': 'post', 'value': 'Hi', 'source': 'Dandy'} - {'kind': 'post', 'value': 'Hi', 'source': 'Dandy'} ) client2 = _make_client('ws://localhost:8642/y', 0.10, Script() + {'kind': 'login', 'value': 'Dandy'} - {'kind': 'state', 'username': 'Dandy'} - {'kind': 'state', 'online': ['Dandy']} + {'kind': 'post', 'value': 'Hi', 'source': 'Dandy'} - {'kind': 'post', 'value': 'Hi', 'source': 'Dandy'} ) return _test(client1, client2) def test_private_message(): client1 = _make_client('ws://localhost:8642/x', 0.10, Script() + {'kind': 'login', 'value': 'Norman'} - {'kind': 'state', 'username': 'Norman'} - {'kind': 'state', 'online': ['Norman']} - {'kind': 'state', 'online': ['Norman', 'Ray']} - {'kind': 'state', 'online': ['Norman', 'Ray', 'Emma']} + {'kind': 'post', 'value': '1', 'target': 'Ray'} - {'kind': 'post', 'value': '1', 'source': 'Norman'} - {'kind': 'post', 'value': '3', 'source': 'Emma'} ) client2 = _make_client('ws://localhost:8642/x', 0.11, Script() + {'kind': 'login', 'value': 'Ray'} - {'kind': 'state', 'username': 'Ray'} - {'kind': 'state', 'online': ['Norman', 'Ray']} - {'kind': 'state', 'online': ['Norman', 'Ray', 'Emma']} - {'kind': 'post', 'value': '1', 'source': 'Norman'} + {'kind': 'post', 'value': '2', 'target': 'Emma'} - {'kind': 'post', 'value': '2', 'source': 'Ray'} - {'kind': 'state', 'online': ['Ray', 'Emma']} ) client3 = _make_client('ws://localhost:8642/x', 0.12, Script() + {'kind': 'login', 'value': 'Emma'} - {'kind': 'state', 'username': 'Emma'} - {'kind': 'state', 'online': ['Norman', 'Ray', 'Emma']} - {'kind': 'post', 'value': '2', 'source': 'Ray'} + {'kind': 'post', 'value': '3', 'target': 'Norman'} - {'kind': 'post', 'value': '3', 'source': 'Emma'} - {'kind': 'state', 'online': ['Ray', 'Emma']} - {'kind': 'state', 'online': ['Emma']} ) return _test(client1, client2, client3) def test_query_is_irrelevant(): client1 = _make_client('ws://localhost:8642/x?a=0', 0.10, Script() + {'kind': 'login', 'value': 'Alice'} - {'kind': 'state', 'username': 'Alice'} - {'kind': 'state', 'online': ['Alice']} - {'kind': 'state', 'online': ['Alice', 'Bob']} + {'kind': 'post', 'value': 'Hey Bob!'} - {'kind': 'post', 'value': 'Hey Bob!', 'source': 'Alice'} - {'kind': 'post', 'value': 'Howdy!', 'source': 'Bob'} ) client2 = _make_client('ws://localhost:8642/x?v=0', 0.11, Script() + {'kind': 'login', 'value': 'Bob'} - {'kind': 'state', 'username': 'Bob'} - {'kind': 'state', 'online': ['Alice', 'Bob']} - {'kind': 'post', 'value': 'Hey Bob!', 'source': 'Alice'} + {'kind': 'post', 'value': 'Howdy!'} - {'kind': 'post', 'value': 'Howdy!', 'source': 'Bob'} - {'kind': 'state', 'online': ['Bob']} ) return _test(client1, client2) if __name__ == '__main__': loop = asyncio.get_event_loop() for fn_name, fn in list(locals().items()): if fn_name.startswith('test_'): loop.run_until_complete(fn())