Created
December 18, 2024 01:47
-
-
Save SachinDas246/c6fa6654206a8dcf1422006c9ceb5e8e to your computer and use it in GitHub Desktop.
udp receiver
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sys | |
| import socket | |
| import threading | |
| from PySide6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QTextEdit, QWidget) | |
| from PySide6.QtCore import Signal, QObject, Qt | |
| class ReceiverSignals(QObject): | |
| packet_received = Signal(str) | |
| error_occurred = Signal(str) | |
| class UDPReceiverThread(threading.Thread): | |
| def __init__(self, ip, port, signals): | |
| threading.Thread.__init__(self) | |
| self.ip = ip | |
| self.port = port | |
| self.signals = signals | |
| self.is_running = True | |
| self.udp_socket = None | |
| self.timeout = 1 | |
| def run(self): | |
| self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| self.udp_socket.bind((self.ip, self.port)) | |
| self.udp_socket.settimeout(self.timeout) | |
| while self.is_running: | |
| try: | |
| data, addr = self.udp_socket.recvfrom(1024) | |
| packet_info = f"{data.decode()} from {addr} on {self.ip}:{self.port}" | |
| self.signals.packet_received.emit(packet_info) | |
| except socket.timeout: | |
| continue | |
| self.udp_socket.close() | |
| self.udp_socket = None | |
| def stop_listening(self): | |
| self.is_running = False | |
| class UDPReceiverGUI(QMainWindow): | |
| """Main GUI Window for UDP Receiver""" | |
| def __init__(self): | |
| super().__init__() | |
| self.setWindowTitle("UDP Packet Receiver") | |
| self.setGeometry(100, 100, 600, 500) | |
| # Central widget and main layout | |
| central_widget = QWidget() | |
| main_layout = QVBoxLayout() | |
| central_widget.setLayout(main_layout) | |
| self.setCentralWidget(central_widget) | |
| # IP Address Input | |
| ip_layout = QHBoxLayout() | |
| ip_layout.addWidget(QLabel("Receiver IP:")) | |
| self.ip_input = QLineEdit() | |
| self.ip_input.setText("192.168.1.6") | |
| ip_layout.addWidget(self.ip_input) | |
| main_layout.addLayout(ip_layout) | |
| # Port Input | |
| port_layout = QHBoxLayout() | |
| port_layout.addWidget(QLabel("Ports (comma seprated):")) | |
| self.port_input = QLineEdit() | |
| self.port_input.setText("6000,8000") | |
| port_layout.addWidget(self.port_input) | |
| main_layout.addLayout(port_layout) | |
| # Control Buttons | |
| button_layout = QHBoxLayout() | |
| self.start_button = QPushButton("Start Receiving") | |
| self.start_button.clicked.connect(self.start_receiving) | |
| self.stop_button = QPushButton("Stop Receiving") | |
| self.stop_button.clicked.connect(self.stop_receiving) | |
| self.stop_button.setEnabled(False) | |
| button_layout.addWidget(self.start_button) | |
| button_layout.addWidget(self.stop_button) | |
| main_layout.addLayout(button_layout) | |
| # Packet Display Area | |
| self.packet_display = QTextEdit() | |
| self.packet_display.setReadOnly(True) | |
| main_layout.addWidget(self.packet_display) | |
| # Thread-related setup | |
| self.receiver_threads = [] | |
| self.signals = ReceiverSignals() | |
| self.signals.packet_received.connect(self.update_packet_display) | |
| self.signals.error_occurred.connect(self.update_packet_display) | |
| def start_receiving(self): | |
| """Start receiving UDP packets""" | |
| try: | |
| # Validate inputs | |
| ip = self.ip_input.text() | |
| # Clear previous display | |
| self.packet_display.clear() | |
| # Create and start receiver thread | |
| self.ports = [int(x) for x in self.port_input.text().split(',')] | |
| for port in self.ports: | |
| th = UDPReceiverThread(ip, port, self.signals) | |
| th.start() | |
| self.receiver_threads.append(th) | |
| self.packet_display.append("Listening on " + self.port_input.text() ) | |
| # Update button states | |
| self.start_button.setEnabled(False) | |
| self.stop_button.setEnabled(True) | |
| except Exception as e: | |
| self.update_packet_display(f"Error starting receiver: {str(e)}") | |
| def stop_receiving(self): | |
| """Stop receiving UDP packets""" | |
| for th in self.receiver_threads: | |
| th.stop_listening() | |
| self.receiver_threads = [] | |
| # Reset button states | |
| self.start_button.setEnabled(True) | |
| self.stop_button.setEnabled(False) | |
| def update_packet_display(self, message): | |
| """Update packet display text area""" | |
| self.packet_display.append(message) | |
| def main(): | |
| """Main application entry point""" | |
| app = QApplication(sys.argv) | |
| receiver_gui = UDPReceiverGUI() | |
| receiver_gui.show() | |
| app.aboutToQuit.connect(receiver_gui.stop_receiving) | |
| sys.exit(app.exec()) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment