Skip to content

Instantly share code, notes, and snippets.

@isaiah1112
Created December 22, 2025 21:53
Show Gist options
  • Select an option

  • Save isaiah1112/e9a04070e1d16d6957441aeb20ae5811 to your computer and use it in GitHub Desktop.

Select an option

Save isaiah1112/e9a04070e1d16d6957441aeb20ae5811 to your computer and use it in GitHub Desktop.
Python Examples
from typing import Optional
def backoff(start: int, stop: int, factor: Optional[int] = 5) -> int:
""" Generator for a backoff policy that increments a given start value by factor until
stop is reached.
:param start: Integer to start at
:type start: int
:param stop: Factor to increase start by
:type stop: int
:param factor: Integer to stop at or after
:type factor: int
:returns: Integer
:rtype: int
:raises: ValueError
"""
assert factor != 0
n = start
if factor < 0:
if stop > start:
raise ValueError(str(stop) + ' >= ' + str(start))
while n > stop:
yield n
n += factor
yield n
else:
if stop < start:
raise ValueError(str(stop) + ' <= ' + str(start))
while n < stop:
yield n
n += factor
yield n
""" A quick implementation to test and see if a number is a Lychrel Number.
More info on Lychrel Numbers can be found at: https://en.wikipedia.org/wiki/Lychrel_number
"""
from __future__ import print_function
import sys
print('Attempt to determine if a number is a Lychrel Number')
start = raw_input('Please enter a positive integer: ')
try:
start = int(start)
except ValueError:
print('You did not enter an integer... silly human!')
sys.exit(1)
if start <= 10:
print('I lied, you must pick a positive integer greater than 10...')
print('All single digit numbers are palindromes!')
sys.exit(1)
counter = 0
num = int(start)
while True:
rev = int(str(num)[::-1])
if str(num) == str(rev):
print(str(start) + ' is not a Lychrel number. Sorry!')
print('It took ' + str(counter) + ' iterations to find a palindrome!')
break
else:
total = num + rev
print(str(num) + ' + ' + str(rev) + ' = ' + str(total))
num = total
counter += 1
sys.exit(0)
""" A Python module for creating thread pools.
"""
# Imports
from __future__ import print_function
from builtins import range
from queue import Queue
from types import FunctionType
import sys
import threading
import time
# Private variables
__author__ = 'Jesse Almanrode'
class ThreadPool(object):
""" Create a pool of threads that can process an iterable
:param size: Number of threads to spawn
:return: <ThreadPool> Object
"""
def __init__(self, size=10, blocking=True):
self.size = int(size)
self.results = None
self.blocking = blocking
self._inqueue = None
self._outqueue = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.blocking is False:
return self.__join()
def map_extended(self, target, iterable, args=(), kwargs={}):
""" Run function on every item in iterable. If extra args or kwargs are passed they are passed to function.
:param target: Function to run
:param iterable: List or tuple
:param args: Extra args to pass
:param kwargs: Extra kwargs to pass
:return: List of results from target(for i in iterable)
"""
if isinstance(target, FunctionType) is False:
raise TypeError('target must be of type: ' + str(FunctionType))
if isinstance(iterable, (list, tuple)):
self._inqueue = Queue(maxsize=len(iterable))
for item in iterable:
self._inqueue.put(item)
elif isinstance(iterable, Queue):
self._inqueue = iterable
else:
raise TypeError('iterable must be of type: ' + str(list) + ', ' + str(tuple) + ', or ' + str(Queue))
if isinstance(args, (tuple, list)) is False:
raise TypeError('args must be of type: ' + str(list) + ' or ' + str(tuple))
if isinstance(kwargs, dict) is False:
raise TypeError('kwargs must be of instance: ' + str(dict))
self._outqueue = Queue(maxsize=len(iterable))
self.results = list()
if self.size == 0:
self._worker(target, self._inqueue, self._outqueue, *args, **kwargs)
else:
if self.size > self._inqueue.qsize():
self.size = self._inqueue.qsize()
_args = [target, self._inqueue, self._outqueue]
_args.extend(args)
for t in range(self.size):
t = threading.Thread(target=self._worker, args=_args, kwargs=kwargs)
t.daemon = True
t.start()
if self.blocking:
return self.__join()
def join(self):
""" Wait for non-blocking threads to complete.
:return: List of results from target(for i in iterable)
"""
while self._inqueue.empty() is False:
time.sleep(1)
self._inqueue.join()
while self._outqueue.empty() is False:
self.results.append(self._outqueue.get())
return self.results
def map(self, target, iterable):
""" Run function on every item in iterable in a pool of threads
:param target: Function to run
:param iterable: List or tuple
:return: List of results from target(for i in iterable)
"""
return self.__map_extended(target, iterable)
@staticmethod
def _worker(target, inqueue, outqueue, *args, **kwargs):
""" Private worker to remove requirement for working with Queues in defined functions
"""
while inqueue.empty() is False:
task = inqueue.get()
outqueue.put(target(task, *args, **kwargs))
inqueue.task_done()
return outqueue
__map_extended = map_extended
__join = join
# coding=utf-8
import time
import sys
class Stopwatch:
""" Class that can be used to time operations in a Python script. The easiest way to implement it is to use the
`with` statement which will automatically start the timer and stop it when the statement exits.
"""
def __init__(self) -> None:
self.start_time = None
self.stop_time = None
self.delta = None
def __enter__(self) -> 'Stopwatch':
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.stop()
def __repr__(self) -> None:
s = {'start_time': self.start_time, 'stop_time': self.stop_time, 'delta': self.delta}
return str(s)
def start(self) -> None:
self.start_time = time.time()
def stop(self) -> None:
self.stop_time = time.time()
self.delta = format(float(self.stop_time - self.start_time), '.4f')
#!/usr/bin/env python
# coding=utf-8
""" I know this is kind of a wacky way to show off ThreadPools in Python but
it was a good way to show of how the map_extended function of my ThreadPool class works!
"""
# This should work in python2 and python3
from __future__ import print_function
from builtins import range # why oh why did python2 call it xrange?
from math import sqrt
from random import shuffle
from pooler import ThreadPool
import threading
import time
import sys
def square(num, root=False):
""" This function will accept an integer and square it or find the square root of it
:param num: Integer
:param root: Find the square root of Integer
:return: Integer (either squared or not)
"""
if root:
return sqrt(num)
else:
return num ** 2
def main():
""" Our main program area and parent thread
"""
numbers = list(range(1, 10))
shuffle(numbers) # Mixing them up so they are processed differently
print('Order of integers: ')
print(numbers)
# Now create a "pool" of threads that will process your data
with ThreadPool(5) as threads: # Create 5 threads
threads.map(square, numbers) # Process each item in the iterable by passing it to a function!
# Threads will automatically join when the *with* statement exits
numbers_squared = threads.results # Grab the results from the threads
print('Results from threading:')
print(numbers_squared)
# Now, the cool part of my ThreadPool Class
# Let's use the same pool from before to do something else!
numbers_root = threads.map_extended(square, numbers_squared, kwargs={'root':True})
# OMG! I passed extra kwargs to the function as well as the next item in the iterable!
# Notice we are still doing thread blocking here!
print('We are back to where we started:')
print(numbers_root)
sys.exit(0)
if __name__ == '__main__':
main()
from functools import partial
import multiprocessing.pool
class ThreadPool(multiprocessing.pool.ThreadPool):
""" Extends multiprocessing.pool.ThreadPool so that it can have an map_extended function
"""
def map_extended(self, func, iterable, **kwargs):
""" Map with kwarg support. Does not support extra args
:param func:
:param iterable:
:param kwargs:
:return:
"""
return self.map(partial(func, **kwargs), iterable)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment