Skip to content

Instantly share code, notes, and snippets.

@ugovaretto
Created January 29, 2026 07:54
Show Gist options
  • Select an option

  • Save ugovaretto/009c15d6efb9396e4acc7ee13204b69a to your computer and use it in GitHub Desktop.

Select an option

Save ugovaretto/009c15d6efb9396e4acc7ee13204b69a to your computer and use it in GitHub Desktop.
concurrent python async vs sync vs multithreading
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
async def do_async_work(task_id: int, duration: float = 0.1) -> str:
await asyncio.sleep(duration)
return f"Task {task_id} completed"
async def run_asyncio(tasks: int=5):
task_list = [do_async_work(i, 0.1) for i in range(tasks)]
results = await asyncio.gather(*task_list)
return list (results)
def async_main():
start_time = time.perf_counter()
results = asyncio.run(run_asyncio(tasks=5))
elapsed_time = time.perf_counter() - start_time
print("Asyncio Results:")
for result in results:
print(f" {result}")
print(f"\nTotal time: {elapsed_time: .2f} seconds")
print("Note: Tasks ran concurrently using asyncio")
################################################################################
def do_work(task_id: int, duration: float = 0.1) -> str:
time.sleep(duration)
return f"Task {task_id} completed"
def run_threading(max_workers: int = 5) -> list[str]:
results: list[str] = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(do_work, i, 0.1) for i in range(max_workers)]
for future in as_completed(futures):
result = future.result()
results.append(result)
return results
def multi_threading_main():
start_time = time.perf_counter()
results = run_threading(5)
elapsed_time = time.perf_counter() - start_time
print("Multithreading Results:")
for result in results:
print(f" {result}")
print(f"\nTotal time: {elapsed_time: .2f} seconds")
print("Note: Tasks ran concurrently using threads")
################################################################################
def run_sync(tasks: int = 5) -> list[str]:
results: list[str] = []
for i in range(tasks):
result = do_work(i, duration=0.1)
results.append(result)
return results
def sync_main():
start_time = time.perf_counter()
results = run_sync(5)
elapsed_time = time.perf_counter() - start_time
print("Sync Results:")
for result in results:
print(f" {result}")
print(f"\nTotal time: {elapsed_time: .2f} seconds")
print("Note: Tasks ran sequentially")
def print_help():
print(f"{sys.argv[0]} [sync|async|multi-threading]")
if __name__ == "__main__":
if len(sys.argv) == 2:
match sys.argv[1]:
case "async":
async_main()
case "multi-threading":
multi_threading_main()
case "sync":
sync_main()
case _ :
print_help()
else:
sync_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment