diff --git a/leaf.py b/leaf.py new file mode 100644 index 0000000..9180be7 --- /dev/null +++ b/leaf.py @@ -0,0 +1,87 @@ +import time +import asyncio +from asyncio.protocols import DatagramProtocol + +import websockets + +TX_HOST = '127.0.0.1' +TX_PORT = '80' +TX_ROUTE = '/' +RX_HOST = '0.0.0.0' +RX_PORT = 5005 + +class MyProto(DatagramProtocol): + def __init__(self): + super().__init__() + + def connection_made(self, transport): + self.transport = transport + self.tunnels = {} + self.last_packet_time = {} + + async def ws_to_udp_loop(self, first_packet, addr): + try: + ws = await websockets.connect("ws://"+TX_HOST+':'+TX_PORT+TX_ROUTE) + self.tunnels[addr] = ws + print("WebSocket for", addr, "created") + except Exception as e: + print("Unable to connect, check if remote server is available. Exception text:", e) + return + + try: + while True: + await ws.send(first_packet) + async for message in ws: + self.transport.sendto(message, addr) + self.last_packet_time[addr] = time.time() + except Exception as e: + print("Received exception on WebSocket for", addr,"-", e) + finally: + print("WebSocket for", addr, "closed") + self.tunnels.pop(addr, None) + self.last_packet_time.pop(addr, None) + await ws.close() + + + def datagram_received(self, data, addr): + loop = asyncio.get_running_loop() + ws = self.tunnels.get(addr, None) + + if ws is None: + loop.create_task(self.ws_to_udp_loop(data, addr)) + else: + loop.create_task(ws.send(data)) + self.last_packet_time[addr] = time.time() + + async def cleanup(self): + connections_for_removal = [] + for addr in self.last_packet_time.keys(): + time_passed = time.time() - self.last_packet_time[addr] + if time_passed > 3600: + print(f"Closing connection for {addr} (inactive for: {int(time_passed)} seconds)") + connections_for_removal.append(addr) + + for addr in connections_for_removal: + await self.tunnels[addr].close() + + + +async def main(): + addr = RX_HOST + proto = MyProto() + transport, protocol = await asyncio.get_event_loop().create_datagram_endpoint( + lambda: proto, + local_addr=(addr, RX_PORT) + ) + + print(f'Serving on {addr}:{RX_PORT}') + + try: + while True: + await asyncio.sleep(3600) + await proto.cleanup() + + finally: + transport.close() + +asyncio.run(main())