Skip to content

Instantly share code, notes, and snippets.

@filipeandre
Last active December 15, 2025 18:00
Show Gist options
  • Select an option

  • Save filipeandre/10225bf02583a97a0404266add3b7eb6 to your computer and use it in GitHub Desktop.

Select an option

Save filipeandre/10225bf02583a97a0404266add3b7eb6 to your computer and use it in GitHub Desktop.
Force aws limit increase lambda
#!/usr/bin/env python3
import asyncio
import io
import json
import os
import time
import zipfile
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from aiobotocore.session import get_session
# -----------------------------
# Settings
# -----------------------------
REGION = os.getenv("AWS_REGION") or "us-east-1"
RUN_ID = os.getenv("RUN_ID") or time.strftime("%Y%m%d-%H%M%S")
LAMBDA_NAME = os.getenv("LAMBDA_NAME") or f"lambda-hit-limit-{RUN_ID}"
ROLE_NAME = os.getenv("ROLE_NAME") or "lambda-hit-limit-role"
LAMBDA_RUNTIME = os.getenv("LAMBDA_RUNTIME") or "python3.12"
ARCHITECTURE = ["arm64"]
HANDLER = "lambda_function.lambda_handler"
SLEEP_SECONDS = int(os.getenv("SLEEP_SECONDS") or "5")
# How long to run
DURATION_SECONDS = int(os.getenv("DURATION_SECONDS") or "180")
REPORT_EVERY_SECONDS = int(os.getenv("REPORT_EVERY_SECONDS") or "5")
# Invocation type: "RequestResponse" makes concurrency "real" (calls block ~sleep time)
INVOCATION_TYPE = os.getenv("INVOCATION_TYPE") or "RequestResponse"
# Overbook factor: how many parallel invoke attempts vs account unreserved limit.
# Example: limit=400, OVERBOOK=2 => attempt 800 in parallel; some run, some throttle (acceptable).
OVERBOOK = float(os.getenv("OVERBOOK") or "2.0")
# Client connection pool & cap
MAX_POOL_CONNECTIONS = int(os.getenv("MAX_POOL_CONNECTIONS") or "2000")
MAX_CLIENT_INFLIGHT = int(os.getenv("MAX_CLIENT_INFLIGHT") or "1200") # cap for open HTTP requests
sync_config = Config(
region_name=REGION,
max_pool_connections=MAX_POOL_CONNECTIONS,
retries={"max_attempts": 0},
)
iam = boto3.client("iam", region_name=REGION)
lambda_sync = boto3.client("lambda", config=sync_config)
# -----------------------------
# IAM / Lambda creation
# -----------------------------
def ensure_role() -> str:
assume_policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}],
}
try:
return iam.get_role(RoleName=ROLE_NAME)["Role"]["Arn"]
except iam.exceptions.NoSuchEntityException:
pass
role = iam.create_role(
RoleName=ROLE_NAME,
AssumeRolePolicyDocument=json.dumps(assume_policy),
)
iam.attach_role_policy(
RoleName=ROLE_NAME,
PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
)
# IAM propagation
time.sleep(10)
return role["Role"]["Arn"]
def build_lambda_zip() -> bytes:
code = f"""\
import time
def lambda_handler(event, context):
time.sleep({SLEEP_SECONDS})
return {{"ok": True, "sleep": {SLEEP_SECONDS}}}
"""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as z:
z.writestr("lambda_function.py", code)
return buf.getvalue()
def wait_for_active(function_name: str, timeout: int = 180) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
cfg = lambda_sync.get_function_configuration(FunctionName=function_name)
st = cfg.get("State")
if st == "Active":
return
if st == "Failed":
raise RuntimeError(f"Lambda Failed: {cfg.get('StateReason')}")
time.sleep(2)
raise TimeoutError(f"Timed out waiting for {function_name} to become Active")
def ensure_lambda(role_arn: str) -> None:
zip_bytes = build_lambda_zip()
try:
lambda_sync.get_function(FunctionName=LAMBDA_NAME)
lambda_sync.update_function_code(
FunctionName=LAMBDA_NAME,
ZipFile=zip_bytes,
Publish=True,
)
lambda_sync.update_function_configuration(
FunctionName=LAMBDA_NAME,
Runtime=LAMBDA_RUNTIME,
Architectures=ARCHITECTURE,
Role=role_arn,
Handler=HANDLER,
Timeout=30,
MemorySize=128,
)
except lambda_sync.exceptions.ResourceNotFoundException:
lambda_sync.create_function(
FunctionName=LAMBDA_NAME,
Runtime=LAMBDA_RUNTIME,
Architectures=ARCHITECTURE,
Role=role_arn,
Handler=HANDLER,
Code={"ZipFile": zip_bytes},
Timeout=30,
MemorySize=128,
Publish=True,
)
wait_for_active(LAMBDA_NAME)
# IMPORTANT: do NOT reserve concurrency. Also, remove it if some previous run left it behind.
try:
lambda_sync.delete_function_concurrency(FunctionName=LAMBDA_NAME)
except ClientError:
pass
# -----------------------------
# Load test logic
# -----------------------------
class Counters:
__slots__ = ("accepted", "throttled", "errors", "completed", "lock")
def __init__(self):
self.accepted = 0
self.throttled = 0
self.errors = 0
self.completed = 0
self.lock = asyncio.Lock()
async def bump(self, field: str, n: int = 1):
async with self.lock:
setattr(self, field, getattr(self, field) + n)
async def snapshot_and_reset(self):
async with self.lock:
a, t, e, c = self.accepted, self.throttled, self.errors, self.completed
self.accepted = self.throttled = self.errors = self.completed = 0
return a, t, e, c
async def invoke_forever(lambda_client, sem: asyncio.Semaphore, counters: Counters, stop_at: float):
while time.time() < stop_at:
async with sem:
try:
resp = await lambda_client.invoke(
FunctionName=LAMBDA_NAME,
InvocationType=INVOCATION_TYPE,
)
# IMPORTANT: drain/close payload so the HTTP connection returns to the pool.
# Without this, RequestResponse often plateaus far below the account concurrency limit.
payload = resp.get("Payload")
if payload is not None:
await payload.read()
payload.close()
sc = int(resp.get("StatusCode", 0))
if sc in (200, 202, 204):
await counters.bump("accepted")
else:
await counters.bump("errors")
except lambda_client.exceptions.TooManyRequestsException:
# Throttling is acceptable: it means we hit a limit (concurrency and/or invoke rate).
await counters.bump("throttled")
except Exception:
await counters.bump("errors")
finally:
await counters.bump("completed")
async def run_load(parallel_attempts: int):
stop_at = time.time() + DURATION_SECONDS
counters = Counters()
# cap in-flight HTTP requests to protect the client
in_flight = min(parallel_attempts, MAX_CLIENT_INFLIGHT)
sem = asyncio.Semaphore(in_flight)
session = get_session()
async with session.create_client(
"lambda",
region_name=REGION,
config=Config(
max_pool_connections=MAX_POOL_CONNECTIONS,
retries={"max_attempts": 0},
),
) as lambda_async:
# Keep task count reasonable: tasks are just "producers"; semaphore controls in-flight.
# Still, if parallel_attempts is huge, task scheduling overhead can dominate.
tasks = [
asyncio.create_task(invoke_forever(lambda_async, sem, counters, stop_at))
for _ in range(parallel_attempts)
]
last = time.time()
while time.time() < stop_at:
await asyncio.sleep(REPORT_EVERY_SECONDS)
now = time.time()
accepted, throttled, errors, completed = await counters.snapshot_and_reset()
window = max(1e-6, now - last)
accepted_per_sec = accepted / window
# Est concurrency ~= throughput * avg runtime (sleep)
est_conc = accepted_per_sec * SLEEP_SECONDS
print(
f"[{int(now)}] "
f"accepted={accepted} throttled={throttled} errors={errors} completed={completed} "
f"accepted/s={accepted_per_sec:.1f} est_conc≈{int(est_conc)} "
f"in_flight_cap={in_flight} parallel_tasks={parallel_attempts}"
)
last = now
await asyncio.gather(*tasks, return_exceptions=True)
def main():
acct = lambda_sync.get_account_settings()["AccountLimit"]
total = int(acct["ConcurrentExecutions"])
unreserved = int(acct["UnreservedConcurrentExecutions"])
# We aim to hit the limit: drive the unreserved pool.
target_limit = unreserved
parallel_attempts = max(10, int(target_limit * OVERBOOK))
print(f"Region: {REGION}")
print(f"Account concurrency total: {total}")
print(f"Unreserved concurrency: {unreserved}")
print(f"Target to hit: ~{target_limit} concurrent executions")
print(f"InvocationType: {INVOCATION_TYPE} (blocks ~{SLEEP_SECONDS}s per accepted invoke)")
print(f"OVERBOOK: {OVERBOOK} => parallel invoke tasks: {parallel_attempts}")
print(f"Client in-flight HTTP cap: {min(parallel_attempts, MAX_CLIENT_INFLIGHT)}")
print(f"MAX_POOL_CONNECTIONS: {MAX_POOL_CONNECTIONS}")
print()
role_arn = ensure_role()
ensure_lambda(role_arn)
print(f"Function: {LAMBDA_NAME}")
print(f"Duration: {DURATION_SECONDS}s")
print()
asyncio.run(run_load(parallel_attempts))
if __name__ == "__main__":
main()
@filipeandre
Copy link
Author

filipeandre commented Dec 15, 2025

pip install aiobotocore boto3 && \
REPO_FOLDER=concurrency && \
REPO_URL=https://gist.github.com/filipeandre/10225bf02583a97a0404266add3b7eb6.git && \
[ -d .git ] && git pull --rebase || { [ -d $REPO_FOLDER/.git ] && git -C $REPO_FOLDER pull --rebase || git clone $REPO_URL $REPO_FOLDER; cd $REPO_FOLDER 2>/dev/null || true; }  && \
python concurrency.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment