From e251fcd8f17e59703d776e9ef126d68246602c38 Mon Sep 17 00:00:00 2001 From: Danil Kolesnikov Date: Wed, 12 Feb 2025 23:37:26 +0300 Subject: [PATCH] test utils --- dumper.py | 21 +++++++++++++++++++++ generator.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 dumper.py create mode 100644 generator.py diff --git a/dumper.py b/dumper.py new file mode 100644 index 0000000..d8f4787 --- /dev/null +++ b/dumper.py @@ -0,0 +1,21 @@ +import socket + +# Create a UDP socket +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + +# Bind the socket to the address and port +server_address = ('', 12345) # Bind to all interfaces on port 12345 +sock.bind(server_address) + +print("UDP server is up and listening") + +while True: + print("\nWaiting for a message") + data, address = sock.recvfrom(4096) + + print(f"Received {len(data)} bytes from {address}") + print(f"Message: {data.decode()}") + + # Send the received data back to the client + sent = sock.sendto(data, address) + print(f"Sent {sent} bytes back to {address}") diff --git a/generator.py b/generator.py new file mode 100644 index 0000000..2f777bc --- /dev/null +++ b/generator.py @@ -0,0 +1,28 @@ +import asyncio +import socket +import time + +async def send_random_packets(): + # Create a UDP socket + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + while True: + # Send a random packet to UDP port 5005 + data = b'Random Data'+str(time.time()).encode() + sock.sendto(data, ('localhost', 5005)) + + print("Sent packet to UDP port 5005") + + # Wait for a response packet from UDP port 5005 + sock.recv(4096) + print("received back", data) + + + # Wait for 1 second before sending the next packet + await asyncio.sleep(1) + + # Close the socket + sock.close() + +# Run the send_random_packets coroutine in the event loop +asyncio.run(send_random_packets())