import asyncio import itertools from pico import start_server, send_json, recv_json import websockets class Script: def __init__(self): self.events = [] def __add__(self, event): self.events.append(('send', event)) return self def __sub__(self, event): self.events.append(('recv', event)) return self async def _make_client(path, timeout, script): state = {} error = False await asyncio.sleep(timeout) async with websockets.connect(path) as websocket: for kind, message in script.events: if kind == 'recv': A, B = message, await recv_json(websocket) B.pop('ts') state.update(message) if A != B: error = True print('-', A) print('+', B) print() elif kind == 'send': await send_json(websocket, **message) while True: if state['users'][0] == state['username']: break message = await recv_json(websocket) message.pop('ts') error = True print('+', message) print() state.update(message) if error: print('error') exit() async def _test(*clients): try: gather = asyncio.gather( start_server('localhost', 8642, 'TestServer'), *clients, ) server, *_ = await asyncio.wait_for(gather, timeout=2) server.close() except asyncio.TimeoutError: return def test_happy_path(): client = _make_client('ws://localhost:8642/', 0.1, Script() + {'action': 'login', 'username': 'TestUser'} - {'kind': 'update', 'username': 'TestUser'} - {'kind': 'update', 'users': ['TestUser'], 'info': 'TestUser joined'} + {'action': 'post', 'text': 'Hello World!'} - {'kind': 'post', 'source': 'TestUser', 'text': 'Hello World!'} ) return _test(client) def test_post_before_login(): client = _make_client('ws://localhost:8642/', 0.1, Script() + {} - {'kind': 'error', 'info': 'Message without action is invalid'} + {'action': 'post', 'text': ''} - {'kind': 'error', 'info': 'Not logged in'} + {'action': 'login', 'username': ''} - {'kind': 'error', 'info': 'Invalid username'} + {'action': 'login', 'username': 'Joe'} - {'kind': 'update', 'username': 'Joe'} - {'kind': 'update', 'users': ['Joe'], 'info': 'Joe joined'} ) return _test(client) def test_interact(): client1 = _make_client('ws://localhost:8642/', 0.10, Script() + {'action': 'login', 'username': 'Alice'} - {'kind': 'update', 'username': 'Alice'} - {'kind': 'update', 'users': ['Alice'], 'info': 'Alice joined'} - {'kind': 'update', 'users': ['Alice', 'Bob'], 'info': 'Bob joined'} + {'action': 'post', 'text': 'Hey Bob!'} - {'kind': 'post', 'source': 'Alice', 'text': 'Hey Bob!'} - {'kind': 'post', 'source': 'Bob', 'text': 'Howdy!'} ) client2 = _make_client('ws://localhost:8642/', 0.11, Script() + {'action': 'login', 'username': 'Bob'} - {'kind': 'update', 'username': 'Bob'} - {'kind': 'update', 'users': ['Alice', 'Bob'], 'info': 'Bob joined'} - {'kind': 'post', 'source': 'Alice', 'text': 'Hey Bob!'} + {'action': 'post', 'text': 'Howdy!'} - {'kind': 'post', 'source': 'Bob', 'text': 'Howdy!'} + {'action': 'post', 'text': ''} - {'kind': 'update', 'users': ['Bob'], 'info': 'Alice left'} ) return _test(client1, client2) def test_party(): client1 = _make_client('ws://localhost:8642/', 0.10, Script() + {'action': 'login', 'username': 'Norman'} - {'kind': 'update', 'username': 'Norman'} - {'kind': 'update', 'users': ['Norman'], 'info': 'Norman joined'} - {'kind': 'update', 'users': ['Norman', 'Ray'], 'info': 'Ray joined'} - {'kind': 'update', 'users': ['Norman', 'Ray', 'Emma'], 'info': 'Emma joined'} + {'action': 'post', 'text': 'なに?'} - {'kind': 'post', 'source': 'Norman', 'text': 'なに?'} ) client2 = _make_client('ws://localhost:8642/', 0.11, Script() + {'action': 'login', 'username': 'Ray'} - {'kind': 'update', 'username': 'Ray'} - {'kind': 'update', 'users': ['Norman', 'Ray'], 'info': 'Ray joined'} - {'kind': 'update', 'users': ['Norman', 'Ray', 'Emma'], 'info': 'Emma joined'} - {'kind': 'post', 'source': 'Norman', 'text': 'なに?'} - {'kind': 'update', 'users': ['Ray', 'Emma'], 'info': 'Norman left'} ) client3 = _make_client('ws://localhost:8642/', 0.12, Script() + {'action': 'login', 'username': 'Emma'} - {'kind': 'update', 'username': 'Emma'} - {'kind': 'update', 'users': ['Norman', 'Ray', 'Emma'], 'info': 'Emma joined'} - {'kind': 'post', 'source': 'Norman', 'text': 'なに?'} - {'kind': 'update', 'users': ['Ray', 'Emma'], 'info': 'Norman left'} - {'kind': 'update', 'users': ['Emma'], 'info': 'Ray left'} ) return _test(client1, client2, client3) def test_rooms(): client1 = _make_client('ws://localhost:8642/A', 0.10, Script() + {'action': 'login', 'username': 'Dandy'} - {'kind': 'update', 'username': 'Dandy'} - {'kind': 'update', 'users': ['Dandy'], 'info': 'Dandy joined'} + {'action': 'post', 'text': 'Hi'} - {'kind': 'post', 'source': 'Dandy', 'text': 'Hi'} ) client2 = _make_client('ws://localhost:8642/B', 0.10, Script() + {'action': 'login', 'username': 'Dandy'} - {'kind': 'update', 'username': 'Dandy'} - {'kind': 'update', 'users': ['Dandy'], 'info': 'Dandy joined'} + {'action': 'post', 'text': 'Howdy'} - {'kind': 'post', 'source': 'Dandy', 'text': 'Howdy'} ) return _test(client1, client2) if __name__ == '__main__': loop = asyncio.get_event_loop() for fn_name, fn in list(locals().items()): if fn_name.startswith('test_'): loop.run_until_complete(fn())