import asyncio import socket def shift_seq(seq, byte_shift): result = bytearray(seq) for i in range(len(seq)): result[i] = (result[i] + byte_shift) & 0xff return result def encrypt(data): return shift_seq(data, 15) def decrypt(data): return shift_seq(data, -15) class Duplex: def __init__(self, s1, s1_remote, s1_transform, s2, s2_remote, s2_transform, loop, mtu): self.mtu = mtu self.loop = loop # loop.create_task(self.stream(s1,s2, s2_remote, s2_transform)) loop.create_task(self.stream(s2, s1, s1_remote, s1_transform)) async def stream(self, rx, tx, tx_addr, transform): print("Starting stream", rx.getsockname(), '>>', tx_addr) while True: data = await self.loop.sock_recv(rx, self.mtu) # print(rx.getsockname(),'>>', tx_addr, len(data), # data[:20].hex(), transform(data)[:20].hex()) await self.loop.sock_sendto(tx, transform(data), tx_addr) async def main(): rx = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) rx.setblocking(False) rx.bind((RX_IP, RX_PORT)) connections = {} print("App started") loop = asyncio.get_running_loop() while True: data, addr = await loop.sock_recvfrom(rx, RX_MTU) # print("Received "+str(len(data))+" bytes", addr, data[:20]) if addr in connections.keys(): tx = connections[addr] else: connections[addr] = socket.socket( socket.AF_INET, socket.SOCK_DGRAM) tx = connections[addr] tx.setblocking(False) tx.bind(tx.getsockname()) Duplex(rx, addr, RX_ALGO, connections[addr], (TX_IP, TX_PORT), TX_ALGO, loop, RX_MTU) try: tx.sendto(TX_ALGO(data), (TX_IP, TX_PORT)) except Exception: print("reconnect", addr) del connections[addr] # print("Sent", TX_ALGO(data)[:20], "to", (TX_IP, TX_PORT)) if __name__ == '__main__': RX_IP = "127.0.0.1" RX_PORT = 5005 RX_MTU = 1500 RX_ALGO = encrypt TX_IP = "194.135.105.21" TX_PORT = 5005 TX_ALGO = decrypt asyncio.run(main())