test utils

This commit is contained in:
Danil Kolesnikov
2025-02-12 23:37:26 +03:00
parent 2dc604628b
commit e251fcd8f1
2 changed files with 49 additions and 0 deletions

21
dumper.py Normal file
View File

@@ -0,0 +1,21 @@
import socket
# Create a UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Bind the socket to the address and port
server_address = ('', 12345) # Bind to all interfaces on port 12345
sock.bind(server_address)
print("UDP server is up and listening")
while True:
print("\nWaiting for a message")
data, address = sock.recvfrom(4096)
print(f"Received {len(data)} bytes from {address}")
print(f"Message: {data.decode()}")
# Send the received data back to the client
sent = sock.sendto(data, address)
print(f"Sent {sent} bytes back to {address}")

28
generator.py Normal file
View File

@@ -0,0 +1,28 @@
import asyncio
import socket
import time
async def send_random_packets():
# Create a UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
# Send a random packet to UDP port 5005
data = b'Random Data'+str(time.time()).encode()
sock.sendto(data, ('localhost', 5005))
print("Sent packet to UDP port 5005")
# Wait for a response packet from UDP port 5005
sock.recv(4096)
print("received back", data)
# Wait for 1 second before sending the next packet
await asyncio.sleep(1)
# Close the socket
sock.close()
# Run the send_random_packets coroutine in the event loop
asyncio.run(send_random_packets())