(original) (raw)
import asyncio import sys HOST = '127.0.0.1' PORT = 8888 # Packet big enough to get at least once blocking send or receive #PACKET_SIZE = 1024 * 1024 * 16 # 16 MiB PACKET_SIZE = 1024 * 16 # 16 KiB STRESS_DELAY = 0.10 # second = 10 ms NPACKET = 2000 STRESS = True TIMEOUT = 10.0 def create_packet(i): data = b"PACKET%i" % i data *= PACKET_SIZE // len(data) + 1 data = data[:PACKET_SIZE] return data async def stress_transport(transport, stop): delay = STRESS_DELAY while not stop.is_set(): transport.pause_reading() await asyncio.sleep(delay) transport.resume_reading() await asyncio.sleep(delay) print("stopped") async def client(): reader, writer = await asyncio.open_connection(HOST, PORT) transport = writer.transport if STRESS: stop = asyncio.Event() task = asyncio.create_task(stress_transport(transport, stop)) ok = True for i in range(NPACKET): print("Send packet #%i" % i) packet = create_packet(i) writer.write(packet) await writer.drain() try: packet2 = await asyncio.wait_for(reader.readexactly(len(packet)), TIMEOUT) except asyncio.TimeoutError: ok = False print("TIMEOUT ERROR!", TIMEOUT) break if packet != packet2: ok = False print("RECEIVED A CORRUPTED PACKET %i!!!" % i) print(len(packet), len(packet2)) break print("Got packet #%i unchanged" % i) print('Close the client socket') writer.close() if STRESS: stop.set() await task return ok async def handle_echo(reader, writer): for i in range(NPACKET): try: packet = await asyncio.wait_for(reader.readexactly(PACKET_SIZE), TIMEOUT) except asyncio.TimeoutError: print("Server: timeout!", TIMEOUT) break except Exception as exc: print("Server: receive error!", exc) break addr = writer.get_extra_info('peername') print("received %s - send %s" % (len(packet), len(packet))) writer.write(packet) await writer.drain() print("Close the server socket") writer.close() async def amain(): loop = asyncio.get_event_loop() print("Test event loop:", loop) server = await asyncio.start_server(handle_echo, HOST, PORT) ok = await client() server.close() await server.wait_closed() if ok: print("Ok! no bug") else: print("ERRRRR! BUG!") def main(): if sys.platform == 'win32' and 1: policy = asyncio.get_event_loop_policy() policy._loop_factory = asyncio.ProactorEventLoop asyncio.run(amain()) main()