Skip to content

Instantly share code, notes, and snippets.

@SachinDas246
Created December 18, 2024 01:47
Show Gist options
  • Select an option

  • Save SachinDas246/c6fa6654206a8dcf1422006c9ceb5e8e to your computer and use it in GitHub Desktop.

Select an option

Save SachinDas246/c6fa6654206a8dcf1422006c9ceb5e8e to your computer and use it in GitHub Desktop.
udp receiver
import sys
import socket
import threading
from PySide6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QTextEdit, QWidget)
from PySide6.QtCore import Signal, QObject, Qt
class ReceiverSignals(QObject):
packet_received = Signal(str)
error_occurred = Signal(str)
class UDPReceiverThread(threading.Thread):
def __init__(self, ip, port, signals):
threading.Thread.__init__(self)
self.ip = ip
self.port = port
self.signals = signals
self.is_running = True
self.udp_socket = None
self.timeout = 1
def run(self):
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.bind((self.ip, self.port))
self.udp_socket.settimeout(self.timeout)
while self.is_running:
try:
data, addr = self.udp_socket.recvfrom(1024)
packet_info = f"{data.decode()} from {addr} on {self.ip}:{self.port}"
self.signals.packet_received.emit(packet_info)
except socket.timeout:
continue
self.udp_socket.close()
self.udp_socket = None
def stop_listening(self):
self.is_running = False
class UDPReceiverGUI(QMainWindow):
"""Main GUI Window for UDP Receiver"""
def __init__(self):
super().__init__()
self.setWindowTitle("UDP Packet Receiver")
self.setGeometry(100, 100, 600, 500)
# Central widget and main layout
central_widget = QWidget()
main_layout = QVBoxLayout()
central_widget.setLayout(main_layout)
self.setCentralWidget(central_widget)
# IP Address Input
ip_layout = QHBoxLayout()
ip_layout.addWidget(QLabel("Receiver IP:"))
self.ip_input = QLineEdit()
self.ip_input.setText("192.168.1.6")
ip_layout.addWidget(self.ip_input)
main_layout.addLayout(ip_layout)
# Port Input
port_layout = QHBoxLayout()
port_layout.addWidget(QLabel("Ports (comma seprated):"))
self.port_input = QLineEdit()
self.port_input.setText("6000,8000")
port_layout.addWidget(self.port_input)
main_layout.addLayout(port_layout)
# Control Buttons
button_layout = QHBoxLayout()
self.start_button = QPushButton("Start Receiving")
self.start_button.clicked.connect(self.start_receiving)
self.stop_button = QPushButton("Stop Receiving")
self.stop_button.clicked.connect(self.stop_receiving)
self.stop_button.setEnabled(False)
button_layout.addWidget(self.start_button)
button_layout.addWidget(self.stop_button)
main_layout.addLayout(button_layout)
# Packet Display Area
self.packet_display = QTextEdit()
self.packet_display.setReadOnly(True)
main_layout.addWidget(self.packet_display)
# Thread-related setup
self.receiver_threads = []
self.signals = ReceiverSignals()
self.signals.packet_received.connect(self.update_packet_display)
self.signals.error_occurred.connect(self.update_packet_display)
def start_receiving(self):
"""Start receiving UDP packets"""
try:
# Validate inputs
ip = self.ip_input.text()
# Clear previous display
self.packet_display.clear()
# Create and start receiver thread
self.ports = [int(x) for x in self.port_input.text().split(',')]
for port in self.ports:
th = UDPReceiverThread(ip, port, self.signals)
th.start()
self.receiver_threads.append(th)
self.packet_display.append("Listening on " + self.port_input.text() )
# Update button states
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
except Exception as e:
self.update_packet_display(f"Error starting receiver: {str(e)}")
def stop_receiving(self):
"""Stop receiving UDP packets"""
for th in self.receiver_threads:
th.stop_listening()
self.receiver_threads = []
# Reset button states
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
def update_packet_display(self, message):
"""Update packet display text area"""
self.packet_display.append(message)
def main():
"""Main application entry point"""
app = QApplication(sys.argv)
receiver_gui = UDPReceiverGUI()
receiver_gui.show()
app.aboutToQuit.connect(receiver_gui.stop_receiving)
sys.exit(app.exec())
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment