Last active
December 15, 2025 18:00
-
-
Save filipeandre/10225bf02583a97a0404266add3b7eb6 to your computer and use it in GitHub Desktop.
Force aws limit increase lambda
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import asyncio | |
| import io | |
| import json | |
| import os | |
| import time | |
| import zipfile | |
| import boto3 | |
| from botocore.config import Config | |
| from botocore.exceptions import ClientError | |
| from aiobotocore.session import get_session | |
| # ----------------------------- | |
| # Settings | |
| # ----------------------------- | |
| REGION = os.getenv("AWS_REGION") or "us-east-1" | |
| RUN_ID = os.getenv("RUN_ID") or time.strftime("%Y%m%d-%H%M%S") | |
| LAMBDA_NAME = os.getenv("LAMBDA_NAME") or f"lambda-hit-limit-{RUN_ID}" | |
| ROLE_NAME = os.getenv("ROLE_NAME") or "lambda-hit-limit-role" | |
| LAMBDA_RUNTIME = os.getenv("LAMBDA_RUNTIME") or "python3.12" | |
| ARCHITECTURE = ["arm64"] | |
| HANDLER = "lambda_function.lambda_handler" | |
| SLEEP_SECONDS = int(os.getenv("SLEEP_SECONDS") or "5") | |
| # How long to run | |
| DURATION_SECONDS = int(os.getenv("DURATION_SECONDS") or "180") | |
| REPORT_EVERY_SECONDS = int(os.getenv("REPORT_EVERY_SECONDS") or "5") | |
| # Invocation type: "RequestResponse" makes concurrency "real" (calls block ~sleep time) | |
| INVOCATION_TYPE = os.getenv("INVOCATION_TYPE") or "RequestResponse" | |
| # Overbook factor: how many parallel invoke attempts vs account unreserved limit. | |
| # Example: limit=400, OVERBOOK=2 => attempt 800 in parallel; some run, some throttle (acceptable). | |
| OVERBOOK = float(os.getenv("OVERBOOK") or "2.0") | |
| # Client connection pool & cap | |
| MAX_POOL_CONNECTIONS = int(os.getenv("MAX_POOL_CONNECTIONS") or "2000") | |
| MAX_CLIENT_INFLIGHT = int(os.getenv("MAX_CLIENT_INFLIGHT") or "1200") # cap for open HTTP requests | |
| sync_config = Config( | |
| region_name=REGION, | |
| max_pool_connections=MAX_POOL_CONNECTIONS, | |
| retries={"max_attempts": 0}, | |
| ) | |
| iam = boto3.client("iam", region_name=REGION) | |
| lambda_sync = boto3.client("lambda", config=sync_config) | |
| # ----------------------------- | |
| # IAM / Lambda creation | |
| # ----------------------------- | |
| def ensure_role() -> str: | |
| assume_policy = { | |
| "Version": "2012-10-17", | |
| "Statement": [{ | |
| "Effect": "Allow", | |
| "Principal": {"Service": "lambda.amazonaws.com"}, | |
| "Action": "sts:AssumeRole", | |
| }], | |
| } | |
| try: | |
| return iam.get_role(RoleName=ROLE_NAME)["Role"]["Arn"] | |
| except iam.exceptions.NoSuchEntityException: | |
| pass | |
| role = iam.create_role( | |
| RoleName=ROLE_NAME, | |
| AssumeRolePolicyDocument=json.dumps(assume_policy), | |
| ) | |
| iam.attach_role_policy( | |
| RoleName=ROLE_NAME, | |
| PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", | |
| ) | |
| # IAM propagation | |
| time.sleep(10) | |
| return role["Role"]["Arn"] | |
| def build_lambda_zip() -> bytes: | |
| code = f"""\ | |
| import time | |
| def lambda_handler(event, context): | |
| time.sleep({SLEEP_SECONDS}) | |
| return {{"ok": True, "sleep": {SLEEP_SECONDS}}} | |
| """ | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as z: | |
| z.writestr("lambda_function.py", code) | |
| return buf.getvalue() | |
| def wait_for_active(function_name: str, timeout: int = 180) -> None: | |
| deadline = time.time() + timeout | |
| while time.time() < deadline: | |
| cfg = lambda_sync.get_function_configuration(FunctionName=function_name) | |
| st = cfg.get("State") | |
| if st == "Active": | |
| return | |
| if st == "Failed": | |
| raise RuntimeError(f"Lambda Failed: {cfg.get('StateReason')}") | |
| time.sleep(2) | |
| raise TimeoutError(f"Timed out waiting for {function_name} to become Active") | |
| def ensure_lambda(role_arn: str) -> None: | |
| zip_bytes = build_lambda_zip() | |
| try: | |
| lambda_sync.get_function(FunctionName=LAMBDA_NAME) | |
| lambda_sync.update_function_code( | |
| FunctionName=LAMBDA_NAME, | |
| ZipFile=zip_bytes, | |
| Publish=True, | |
| ) | |
| lambda_sync.update_function_configuration( | |
| FunctionName=LAMBDA_NAME, | |
| Runtime=LAMBDA_RUNTIME, | |
| Architectures=ARCHITECTURE, | |
| Role=role_arn, | |
| Handler=HANDLER, | |
| Timeout=30, | |
| MemorySize=128, | |
| ) | |
| except lambda_sync.exceptions.ResourceNotFoundException: | |
| lambda_sync.create_function( | |
| FunctionName=LAMBDA_NAME, | |
| Runtime=LAMBDA_RUNTIME, | |
| Architectures=ARCHITECTURE, | |
| Role=role_arn, | |
| Handler=HANDLER, | |
| Code={"ZipFile": zip_bytes}, | |
| Timeout=30, | |
| MemorySize=128, | |
| Publish=True, | |
| ) | |
| wait_for_active(LAMBDA_NAME) | |
| # IMPORTANT: do NOT reserve concurrency. Also, remove it if some previous run left it behind. | |
| try: | |
| lambda_sync.delete_function_concurrency(FunctionName=LAMBDA_NAME) | |
| except ClientError: | |
| pass | |
| # ----------------------------- | |
| # Load test logic | |
| # ----------------------------- | |
| class Counters: | |
| __slots__ = ("accepted", "throttled", "errors", "completed", "lock") | |
| def __init__(self): | |
| self.accepted = 0 | |
| self.throttled = 0 | |
| self.errors = 0 | |
| self.completed = 0 | |
| self.lock = asyncio.Lock() | |
| async def bump(self, field: str, n: int = 1): | |
| async with self.lock: | |
| setattr(self, field, getattr(self, field) + n) | |
| async def snapshot_and_reset(self): | |
| async with self.lock: | |
| a, t, e, c = self.accepted, self.throttled, self.errors, self.completed | |
| self.accepted = self.throttled = self.errors = self.completed = 0 | |
| return a, t, e, c | |
| async def invoke_forever(lambda_client, sem: asyncio.Semaphore, counters: Counters, stop_at: float): | |
| while time.time() < stop_at: | |
| async with sem: | |
| try: | |
| resp = await lambda_client.invoke( | |
| FunctionName=LAMBDA_NAME, | |
| InvocationType=INVOCATION_TYPE, | |
| ) | |
| # IMPORTANT: drain/close payload so the HTTP connection returns to the pool. | |
| # Without this, RequestResponse often plateaus far below the account concurrency limit. | |
| payload = resp.get("Payload") | |
| if payload is not None: | |
| await payload.read() | |
| payload.close() | |
| sc = int(resp.get("StatusCode", 0)) | |
| if sc in (200, 202, 204): | |
| await counters.bump("accepted") | |
| else: | |
| await counters.bump("errors") | |
| except lambda_client.exceptions.TooManyRequestsException: | |
| # Throttling is acceptable: it means we hit a limit (concurrency and/or invoke rate). | |
| await counters.bump("throttled") | |
| except Exception: | |
| await counters.bump("errors") | |
| finally: | |
| await counters.bump("completed") | |
| async def run_load(parallel_attempts: int): | |
| stop_at = time.time() + DURATION_SECONDS | |
| counters = Counters() | |
| # cap in-flight HTTP requests to protect the client | |
| in_flight = min(parallel_attempts, MAX_CLIENT_INFLIGHT) | |
| sem = asyncio.Semaphore(in_flight) | |
| session = get_session() | |
| async with session.create_client( | |
| "lambda", | |
| region_name=REGION, | |
| config=Config( | |
| max_pool_connections=MAX_POOL_CONNECTIONS, | |
| retries={"max_attempts": 0}, | |
| ), | |
| ) as lambda_async: | |
| # Keep task count reasonable: tasks are just "producers"; semaphore controls in-flight. | |
| # Still, if parallel_attempts is huge, task scheduling overhead can dominate. | |
| tasks = [ | |
| asyncio.create_task(invoke_forever(lambda_async, sem, counters, stop_at)) | |
| for _ in range(parallel_attempts) | |
| ] | |
| last = time.time() | |
| while time.time() < stop_at: | |
| await asyncio.sleep(REPORT_EVERY_SECONDS) | |
| now = time.time() | |
| accepted, throttled, errors, completed = await counters.snapshot_and_reset() | |
| window = max(1e-6, now - last) | |
| accepted_per_sec = accepted / window | |
| # Est concurrency ~= throughput * avg runtime (sleep) | |
| est_conc = accepted_per_sec * SLEEP_SECONDS | |
| print( | |
| f"[{int(now)}] " | |
| f"accepted={accepted} throttled={throttled} errors={errors} completed={completed} " | |
| f"accepted/s={accepted_per_sec:.1f} est_conc≈{int(est_conc)} " | |
| f"in_flight_cap={in_flight} parallel_tasks={parallel_attempts}" | |
| ) | |
| last = now | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| def main(): | |
| acct = lambda_sync.get_account_settings()["AccountLimit"] | |
| total = int(acct["ConcurrentExecutions"]) | |
| unreserved = int(acct["UnreservedConcurrentExecutions"]) | |
| # We aim to hit the limit: drive the unreserved pool. | |
| target_limit = unreserved | |
| parallel_attempts = max(10, int(target_limit * OVERBOOK)) | |
| print(f"Region: {REGION}") | |
| print(f"Account concurrency total: {total}") | |
| print(f"Unreserved concurrency: {unreserved}") | |
| print(f"Target to hit: ~{target_limit} concurrent executions") | |
| print(f"InvocationType: {INVOCATION_TYPE} (blocks ~{SLEEP_SECONDS}s per accepted invoke)") | |
| print(f"OVERBOOK: {OVERBOOK} => parallel invoke tasks: {parallel_attempts}") | |
| print(f"Client in-flight HTTP cap: {min(parallel_attempts, MAX_CLIENT_INFLIGHT)}") | |
| print(f"MAX_POOL_CONNECTIONS: {MAX_POOL_CONNECTIONS}") | |
| print() | |
| role_arn = ensure_role() | |
| ensure_lambda(role_arn) | |
| print(f"Function: {LAMBDA_NAME}") | |
| print(f"Duration: {DURATION_SECONDS}s") | |
| print() | |
| asyncio.run(run_load(parallel_attempts)) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.