Last active
January 10, 2023 16:56
-
-
Save snacsnoc/3911ed39cee3bba81d448cbc19266837 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| from typing import Tuple | |
| class DeviceManager: | |
| def __init__(self): | |
| self.devices = {} | |
| def register_device(self, device_name, block_size): | |
| """Register a device with the given name and block size""" | |
| self.devices[device_name] = block_size | |
| def read_from_device(self, device_name, block_number): | |
| """Read a block from a registered device""" | |
| if device_name in self.devices: | |
| block_size = self.devices[device_name] | |
| with open(device_name, "rb") as f: | |
| f.seek(block_number * block_size) | |
| return f.read(block_size) | |
| else: | |
| raise ValueError(f"{device_name} is not a registered device") | |
| class FileSystem: | |
| def __init__(self): | |
| self.files = {} | |
| self.free_space = 1024 # Total size of the file system in bytes | |
| def create_file(self, name, size): | |
| """Creates a new file with the specified name and size""" | |
| if self.free_space >= size: | |
| file = {"name": name, "size": size, "data": ""} | |
| self.files[name] = file | |
| self.free_space -= size | |
| return file | |
| else: | |
| return None | |
| def delete_file(self, name): | |
| """Deletes the specified file""" | |
| if name in self.files: | |
| file = self.files[name] | |
| del self.files[name] | |
| self.free_space += file["size"] | |
| return True | |
| else: | |
| return False | |
| def read_file(self, name): | |
| """Reads the specified file""" | |
| if name in self.files: | |
| return self.files[name]["data"] | |
| else: | |
| return None | |
| def write_file(self, name, data): | |
| """Writes data to the specified file""" | |
| if name in self.files: | |
| self.files[name]["data"] = data | |
| return True | |
| else: | |
| return False | |
| class FileSystemDriver: | |
| def __init__(self): | |
| self.file_system = FileSystem() | |
| def create_file(self, name, size): | |
| """Creates a new file with the specified name and size""" | |
| return self.file_system.create_file(name, size) | |
| def delete_file(self, name): | |
| """Deletes the specified file""" | |
| return self.file_system.delete_file(name) | |
| def read_file(self, name): | |
| """Reads the specified file""" | |
| return self.file_system.read_file(name) | |
| def write_file(self, name, data): | |
| """Writes data to the specified file""" | |
| return self.file_system.write_file(name, data) | |
| class Microkernel: | |
| def __init__(self): | |
| self.memory = {} | |
| self.process_list = [] | |
| self.message_queues = {} | |
| self.file_system_driver = FileSystemDriver() | |
| self.mount_points = {} | |
| self.cache = {} | |
| self.buffer = {} | |
| self.semaphores = {} | |
| self.process_queues = {} | |
| def add_process(self, process_id): | |
| """Add a new process""" | |
| self.process_queues[process_id] = [] | |
| def send_message(self, process_id, message): | |
| """Send a message to a process message queue""" | |
| if process_id in self.process_queues: | |
| self.process_queues[process_id].append(message) | |
| return True | |
| else: | |
| return False | |
| def receive_message(self, process_id): | |
| """Receive a message from a process message queue""" | |
| if ( | |
| process_id in self.process_queues | |
| and len(self.process_queues[process_id]) > 0 | |
| ): | |
| return self.process_queues[process_id].pop(0) | |
| else: | |
| return None | |
| def create_pipe(self, name: str) -> Tuple[int, int]: | |
| """Creates a pipe with the specified name""" | |
| read_end, write_end = os.pipe() | |
| self.semaphores[name] = os.semget(os.IPC_PRIVATE, 1, os.IPC_CREAT | 0o666) | |
| os.semctl(self.semaphores[name], 0, os.SETVAL, 0) | |
| return read_end, write_end | |
| def write_pipe(self, name: str, data: bytes): | |
| """Writes data to the specified pipe""" | |
| semaphore = self.semaphores[name] | |
| os.semop(semaphore, [(-1, 0, 1), (0, 0, 1)]) | |
| os.write(self.pipes[name], data) | |
| os.semop(semaphore, [(1, 0, 1)]) | |
| def read_pipe(self, name: str) -> bytes: | |
| """Reads data from the specified pipe""" | |
| semaphore = self.semaphores[name] | |
| os.semop(semaphore, [(-1, 0, 0)]) | |
| data = os.read(self.pipes[name], 1024) | |
| os.semop(semaphore, [(1, 0, 1)]) | |
| def read_block(self, device, block_num): | |
| """Reads a block of data from the specified device""" | |
| # Check if the block is in the cache | |
| if (device, block_num) in self.cache: | |
| return self.cache[(device, block_num)] | |
| # Check if the block is in the buffer | |
| if (device, block_num) in self.buffer: | |
| data = self.buffer.pop((device, block_num)) | |
| self.cache[(device, block_num)] = data | |
| return data | |
| # Block is not in cache or buffer, read from device | |
| data = self.read_from_device(device, block_num) | |
| self.cache[(device, block_num)] = data | |
| return data | |
| def write_block(self, device, block_num, data): | |
| """Writes a block of data to the specified device""" | |
| # If block is in cache, update it | |
| if (device, block_num) in self.cache: | |
| self.cache[(device, block_num)] = data | |
| # If block is in buffer, update it | |
| elif (device, block_num) in self.buffer: | |
| self.buffer[(device, block_num)] = data | |
| # Block is not in cache or buffer, add it to buffer | |
| else: | |
| self.buffer[(device, block_num)] = data | |
| def flush_buffer(self): | |
| """Writes all dirty blocks in the buffer to the device""" | |
| for (device, block_num), data in self.buffer.items(): | |
| self.write_to_device(device, block_num, data) | |
| self.buffer.clear() | |
| def evict_cache(self): | |
| """Removes the least recently used block from the cache""" | |
| lru_block = min(self.cache, key=lambda x: self.cache[x][1]) | |
| self.cache.pop(lru_block) | |
| def mount_file_system(self, device, mount_point): | |
| """Mounts a file system on the specified device at the specified mount point""" | |
| if mount_point in self.mount_points: | |
| return False | |
| else: | |
| self.mount_points[mount_point] = self.file_system_driver | |
| return True | |
| def unmount_file_system(self, mount_point): | |
| """Unmounts a file system at the specified mount point""" | |
| if mount_point in self.mount_points: | |
| del self.mount_points[mount_point] | |
| return True | |
| else: | |
| return False | |
| def create_file(self, path, size): | |
| """Creates a new file at the specified path with the specified size""" | |
| if path.startswith("/mnt"): | |
| mount_point = "/mnt" | |
| else: | |
| return None | |
| if mount_point in self.mount_points: | |
| return self.mount_points[mount_point].create_file( | |
| path[len(mount_point) :], size | |
| ) | |
| else: | |
| return None | |
| def delete_file(self, path): | |
| """Deletes the file at the specified path""" | |
| if path.startswith("/mnt"): | |
| mount_point = "/mnt" | |
| else: | |
| return None | |
| if mount_point in self.mount_points: | |
| return self.mount_points[mount_point].delete_file(path[len(mount_point) :]) | |
| else: | |
| return None | |
| def read_file(self, path): | |
| """Reads the file at the specified path""" | |
| if path.startswith("/mnt"): | |
| mount_point = "/mnt" | |
| else: | |
| return None | |
| if mount_point in self.mount_points: | |
| return self.mount_points[mount_point].read_file(path[len(mount_point) :]) | |
| else: | |
| return None | |
| def write_file(self, path, data): | |
| """Writes data to the file at the specified path""" | |
| if path.startswith("/mnt"): | |
| mount_point = "/mnt" | |
| else: | |
| return None | |
| if mount_point in self.mount_points: | |
| return self.mount_points[mount_point].write_file( | |
| path[len(mount_point) :], data | |
| ) | |
| else: | |
| return None | |
| def add_process(self, process_id): | |
| """Add a new process""" | |
| self.process_queues[process_id] = [] | |
| def create_message_queue(self, queue_name): | |
| """Create a new message queue""" | |
| self.message_queues[queue_name] = [] | |
| def allocate_memory(self, size, process_id): | |
| """Allocates a block of memory of the specified size for a process""" | |
| """ | |
| total_allocated_memory = sum( | |
| block["size"] | |
| for block in self.memory.values() | |
| if block["allocated"] and block["process_id"] == process_id | |
| ) | |
| if total_allocated_memory + size > self.total_memory: | |
| raise MemoryError("Not enough memory available") | |
| """ | |
| """Allocates a block of memory of the specified size for a process""" | |
| memory_block = {} | |
| memory_block["id"] = len(self.memory) + 1 | |
| memory_block["size"] = size | |
| memory_block["allocated"] = True | |
| memory_block["process_id"] = process_id | |
| self.memory[memory_block["id"]] = memory_block | |
| return memory_block | |
| def free_memory(self, id, process_id): | |
| """Frees a block of memory with the specified id if the process_id matches""" | |
| if id in self.memory: | |
| memory_block = self.memory[id] | |
| if memory_block["process_id"] == process_id: | |
| del self.memory[id] | |
| return True | |
| else: | |
| return False | |
| else: | |
| return False | |
| def add_process(self, process_id): | |
| """Adds a process to the process list""" | |
| self.process_list.append(process_id) | |
| def check_process(self, process_id): | |
| """Check if the process is in the process list""" | |
| return process_id in self.process_list | |
| def send_message_queue(self, queue_name, message): | |
| """Send a message to a message queue""" | |
| if queue_name in self.message_queues: | |
| self.message_queues[queue_name].append(message) | |
| return True | |
| else: | |
| return False | |
| def receive_message_queue(self, queue_name): | |
| """Receive a message from a message queue""" | |
| if ( | |
| queue_name in self.message_queues | |
| and len(self.message_queues[queue_name]) > 0 | |
| ): | |
| return self.message_queues[queue_name].pop(0) | |
| else: | |
| return None | |
| microkernel = Microkernel() | |
| microkernel.add_process("Process 1") | |
| memory_block = microkernel.allocate_memory(1024, "Process 1") | |
| # Free the memory block | |
| print(microkernel.free_memory(memory_block["id"], "Process 1")) | |
| # Try to free the memory block with an incorrect process ID | |
| print(microkernel.free_memory(memory_block["id"], "Process 2")) # returns false | |
| "create message queue:", microkernel.create_message_queue("Queue 1") | |
| microkernel.send_message_queue("Queue 1", "queue hello!") | |
| print(microkernel.receive_message_queue("Queue 1")) # "Hello World" | |
| # File system calls | |
| driver = FileSystemDriver() | |
| file = driver.create_file("file.txt", 100) | |
| driver.write_file("file.txt", "hello world from a file!") | |
| print(driver.read_file("file.txt")) | |
| driver.delete_file("file.txt") | |
| # create a new instance of the microkernel | |
| mk = Microkernel() | |
| # create two new processes with IDs 1 and 2 | |
| mk.add_process(1) | |
| mk.add_process(2) | |
| # allocate memory for process ID 1 | |
| mk.allocate_memory(1, 1024) | |
| # send message "Hello from process 1" from process 1 to process 2 | |
| mk.send_message(1, "Hello from process 1") | |
| # receive message from process 2 | |
| message = mk.receive_message(2) | |
| print(message) | |
| # output: "Hello from process 1" | |
| # send message "Hello from process 2" from process 2 to process 1 | |
| mk.send_message(2, "Hello from process 2") | |
| # receive message from process 1 | |
| message = mk.receive_message(1) | |
| print(message) | |
| # output: "Hello from process 2" | |
| # attempt to receive message from an invalid process ID | |
| message = mk.receive_message(3) | |
| print(message) | |
| # output: None | |
| # attempting to receive message from process 1 after it has been received | |
| message = mk.receive_message(1) | |
| print(message) | |
| # free memory allocated for process ID 1 | |
| mk.free_memory(1, 1) | |
| # creating a pipe named "pipe1" | |
| # pipe1_read, pipe1_write = mk.create_pipe("pipe1") | |
| # sending data through the pipe | |
| # mk.write_pipe("pipe1", b"Hello from process 1!") | |
| # Reading from the pipe | |
| # data = mk.read_pipe("pipe1") | |
| # print(data.decode()) | |
| # Mounting a file system on the device '/dev/sda1' at the mount point '/mnt' | |
| # mk.mount_file_system("/dev/sda1", "/mnt") | |
| # create a file named test.txt | |
| # mk.create_file("/mnt/test.txt", 10) | |
| # writing to the file | |
| # mk.write_file("/mnt/test.txt", b"Hello, world!") | |
| # reading the file | |
| # data = mk.read_file("/mnt/test.txt") | |
| # print(data.decode()) | |
| # deleting the file | |
| # mk.delete_file("/mnt/test.txt") | |
| # unmount the file system | |
| # mk.unmount_file_system("/mnt") | |
| # read a block from device '/dev/sda1' | |
| # data = mk.read_block("/dev/sda1", 0) | |
| # print(data) | |
| # writing to a block on the device '/dev/sda1' | |
| # mk.write_block("/dev/sda1", 0, b"This is a block of data") | |
| # flush the buffer | |
| # mk.flush_buffer() | |
| # evict a block from the cache | |
| # mk.evict_cache() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment