Skip to content

Instantly share code, notes, and snippets.

@snacsnoc
Last active January 10, 2023 16:56
Show Gist options
  • Select an option

  • Save snacsnoc/3911ed39cee3bba81d448cbc19266837 to your computer and use it in GitHub Desktop.

Select an option

Save snacsnoc/3911ed39cee3bba81d448cbc19266837 to your computer and use it in GitHub Desktop.
import os
from typing import Tuple
class DeviceManager:
def __init__(self):
self.devices = {}
def register_device(self, device_name, block_size):
"""Register a device with the given name and block size"""
self.devices[device_name] = block_size
def read_from_device(self, device_name, block_number):
"""Read a block from a registered device"""
if device_name in self.devices:
block_size = self.devices[device_name]
with open(device_name, "rb") as f:
f.seek(block_number * block_size)
return f.read(block_size)
else:
raise ValueError(f"{device_name} is not a registered device")
class FileSystem:
def __init__(self):
self.files = {}
self.free_space = 1024 # Total size of the file system in bytes
def create_file(self, name, size):
"""Creates a new file with the specified name and size"""
if self.free_space >= size:
file = {"name": name, "size": size, "data": ""}
self.files[name] = file
self.free_space -= size
return file
else:
return None
def delete_file(self, name):
"""Deletes the specified file"""
if name in self.files:
file = self.files[name]
del self.files[name]
self.free_space += file["size"]
return True
else:
return False
def read_file(self, name):
"""Reads the specified file"""
if name in self.files:
return self.files[name]["data"]
else:
return None
def write_file(self, name, data):
"""Writes data to the specified file"""
if name in self.files:
self.files[name]["data"] = data
return True
else:
return False
class FileSystemDriver:
def __init__(self):
self.file_system = FileSystem()
def create_file(self, name, size):
"""Creates a new file with the specified name and size"""
return self.file_system.create_file(name, size)
def delete_file(self, name):
"""Deletes the specified file"""
return self.file_system.delete_file(name)
def read_file(self, name):
"""Reads the specified file"""
return self.file_system.read_file(name)
def write_file(self, name, data):
"""Writes data to the specified file"""
return self.file_system.write_file(name, data)
class Microkernel:
def __init__(self):
self.memory = {}
self.process_list = []
self.message_queues = {}
self.file_system_driver = FileSystemDriver()
self.mount_points = {}
self.cache = {}
self.buffer = {}
self.semaphores = {}
self.process_queues = {}
def add_process(self, process_id):
"""Add a new process"""
self.process_queues[process_id] = []
def send_message(self, process_id, message):
"""Send a message to a process message queue"""
if process_id in self.process_queues:
self.process_queues[process_id].append(message)
return True
else:
return False
def receive_message(self, process_id):
"""Receive a message from a process message queue"""
if (
process_id in self.process_queues
and len(self.process_queues[process_id]) > 0
):
return self.process_queues[process_id].pop(0)
else:
return None
def create_pipe(self, name: str) -> Tuple[int, int]:
"""Creates a pipe with the specified name"""
read_end, write_end = os.pipe()
self.semaphores[name] = os.semget(os.IPC_PRIVATE, 1, os.IPC_CREAT | 0o666)
os.semctl(self.semaphores[name], 0, os.SETVAL, 0)
return read_end, write_end
def write_pipe(self, name: str, data: bytes):
"""Writes data to the specified pipe"""
semaphore = self.semaphores[name]
os.semop(semaphore, [(-1, 0, 1), (0, 0, 1)])
os.write(self.pipes[name], data)
os.semop(semaphore, [(1, 0, 1)])
def read_pipe(self, name: str) -> bytes:
"""Reads data from the specified pipe"""
semaphore = self.semaphores[name]
os.semop(semaphore, [(-1, 0, 0)])
data = os.read(self.pipes[name], 1024)
os.semop(semaphore, [(1, 0, 1)])
def read_block(self, device, block_num):
"""Reads a block of data from the specified device"""
# Check if the block is in the cache
if (device, block_num) in self.cache:
return self.cache[(device, block_num)]
# Check if the block is in the buffer
if (device, block_num) in self.buffer:
data = self.buffer.pop((device, block_num))
self.cache[(device, block_num)] = data
return data
# Block is not in cache or buffer, read from device
data = self.read_from_device(device, block_num)
self.cache[(device, block_num)] = data
return data
def write_block(self, device, block_num, data):
"""Writes a block of data to the specified device"""
# If block is in cache, update it
if (device, block_num) in self.cache:
self.cache[(device, block_num)] = data
# If block is in buffer, update it
elif (device, block_num) in self.buffer:
self.buffer[(device, block_num)] = data
# Block is not in cache or buffer, add it to buffer
else:
self.buffer[(device, block_num)] = data
def flush_buffer(self):
"""Writes all dirty blocks in the buffer to the device"""
for (device, block_num), data in self.buffer.items():
self.write_to_device(device, block_num, data)
self.buffer.clear()
def evict_cache(self):
"""Removes the least recently used block from the cache"""
lru_block = min(self.cache, key=lambda x: self.cache[x][1])
self.cache.pop(lru_block)
def mount_file_system(self, device, mount_point):
"""Mounts a file system on the specified device at the specified mount point"""
if mount_point in self.mount_points:
return False
else:
self.mount_points[mount_point] = self.file_system_driver
return True
def unmount_file_system(self, mount_point):
"""Unmounts a file system at the specified mount point"""
if mount_point in self.mount_points:
del self.mount_points[mount_point]
return True
else:
return False
def create_file(self, path, size):
"""Creates a new file at the specified path with the specified size"""
if path.startswith("/mnt"):
mount_point = "/mnt"
else:
return None
if mount_point in self.mount_points:
return self.mount_points[mount_point].create_file(
path[len(mount_point) :], size
)
else:
return None
def delete_file(self, path):
"""Deletes the file at the specified path"""
if path.startswith("/mnt"):
mount_point = "/mnt"
else:
return None
if mount_point in self.mount_points:
return self.mount_points[mount_point].delete_file(path[len(mount_point) :])
else:
return None
def read_file(self, path):
"""Reads the file at the specified path"""
if path.startswith("/mnt"):
mount_point = "/mnt"
else:
return None
if mount_point in self.mount_points:
return self.mount_points[mount_point].read_file(path[len(mount_point) :])
else:
return None
def write_file(self, path, data):
"""Writes data to the file at the specified path"""
if path.startswith("/mnt"):
mount_point = "/mnt"
else:
return None
if mount_point in self.mount_points:
return self.mount_points[mount_point].write_file(
path[len(mount_point) :], data
)
else:
return None
def add_process(self, process_id):
"""Add a new process"""
self.process_queues[process_id] = []
def create_message_queue(self, queue_name):
"""Create a new message queue"""
self.message_queues[queue_name] = []
def allocate_memory(self, size, process_id):
"""Allocates a block of memory of the specified size for a process"""
"""
total_allocated_memory = sum(
block["size"]
for block in self.memory.values()
if block["allocated"] and block["process_id"] == process_id
)
if total_allocated_memory + size > self.total_memory:
raise MemoryError("Not enough memory available")
"""
"""Allocates a block of memory of the specified size for a process"""
memory_block = {}
memory_block["id"] = len(self.memory) + 1
memory_block["size"] = size
memory_block["allocated"] = True
memory_block["process_id"] = process_id
self.memory[memory_block["id"]] = memory_block
return memory_block
def free_memory(self, id, process_id):
"""Frees a block of memory with the specified id if the process_id matches"""
if id in self.memory:
memory_block = self.memory[id]
if memory_block["process_id"] == process_id:
del self.memory[id]
return True
else:
return False
else:
return False
def add_process(self, process_id):
"""Adds a process to the process list"""
self.process_list.append(process_id)
def check_process(self, process_id):
"""Check if the process is in the process list"""
return process_id in self.process_list
def send_message_queue(self, queue_name, message):
"""Send a message to a message queue"""
if queue_name in self.message_queues:
self.message_queues[queue_name].append(message)
return True
else:
return False
def receive_message_queue(self, queue_name):
"""Receive a message from a message queue"""
if (
queue_name in self.message_queues
and len(self.message_queues[queue_name]) > 0
):
return self.message_queues[queue_name].pop(0)
else:
return None
microkernel = Microkernel()
microkernel.add_process("Process 1")
memory_block = microkernel.allocate_memory(1024, "Process 1")
# Free the memory block
print(microkernel.free_memory(memory_block["id"], "Process 1"))
# Try to free the memory block with an incorrect process ID
print(microkernel.free_memory(memory_block["id"], "Process 2")) # returns false
"create message queue:", microkernel.create_message_queue("Queue 1")
microkernel.send_message_queue("Queue 1", "queue hello!")
print(microkernel.receive_message_queue("Queue 1")) # "Hello World"
# File system calls
driver = FileSystemDriver()
file = driver.create_file("file.txt", 100)
driver.write_file("file.txt", "hello world from a file!")
print(driver.read_file("file.txt"))
driver.delete_file("file.txt")
# create a new instance of the microkernel
mk = Microkernel()
# create two new processes with IDs 1 and 2
mk.add_process(1)
mk.add_process(2)
# allocate memory for process ID 1
mk.allocate_memory(1, 1024)
# send message "Hello from process 1" from process 1 to process 2
mk.send_message(1, "Hello from process 1")
# receive message from process 2
message = mk.receive_message(2)
print(message)
# output: "Hello from process 1"
# send message "Hello from process 2" from process 2 to process 1
mk.send_message(2, "Hello from process 2")
# receive message from process 1
message = mk.receive_message(1)
print(message)
# output: "Hello from process 2"
# attempt to receive message from an invalid process ID
message = mk.receive_message(3)
print(message)
# output: None
# attempting to receive message from process 1 after it has been received
message = mk.receive_message(1)
print(message)
# free memory allocated for process ID 1
mk.free_memory(1, 1)
# creating a pipe named "pipe1"
# pipe1_read, pipe1_write = mk.create_pipe("pipe1")
# sending data through the pipe
# mk.write_pipe("pipe1", b"Hello from process 1!")
# Reading from the pipe
# data = mk.read_pipe("pipe1")
# print(data.decode())
# Mounting a file system on the device '/dev/sda1' at the mount point '/mnt'
# mk.mount_file_system("/dev/sda1", "/mnt")
# create a file named test.txt
# mk.create_file("/mnt/test.txt", 10)
# writing to the file
# mk.write_file("/mnt/test.txt", b"Hello, world!")
# reading the file
# data = mk.read_file("/mnt/test.txt")
# print(data.decode())
# deleting the file
# mk.delete_file("/mnt/test.txt")
# unmount the file system
# mk.unmount_file_system("/mnt")
# read a block from device '/dev/sda1'
# data = mk.read_block("/dev/sda1", 0)
# print(data)
# writing to a block on the device '/dev/sda1'
# mk.write_block("/dev/sda1", 0, b"This is a block of data")
# flush the buffer
# mk.flush_buffer()
# evict a block from the cache
# mk.evict_cache()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment