Skip to content

Instantly share code, notes, and snippets.

@bengerman13
Last active June 11, 2019 18:52
Show Gist options
  • Select an option

  • Save bengerman13/4b31709aa1915da73900fd9bcfeaae92 to your computer and use it in GitHub Desktop.

Select an option

Save bengerman13/4b31709aa1915da73900fd9bcfeaae92 to your computer and use it in GitHub Desktop.
"""
A handful of convenience methods for parsing logs. Requires python 3.7+
start with:
aws s3 cp --recursive s3://my-log-bucket/path/to/some/logs .
cat *.log > my_big_logfile.txt
ipython
In [1]: import log_parser
In [2]: logs = log_parser.parse_alb_logs('my_big_logfile.txt')
In [3]: import pandas as pd
In [4]: df = pd.DataFrame(logs)
In [5]: df[df['target_status_code'].isna()]]
Out [5]: < all the requests during the given time where the target did not respond >
"""
import datetime
import re
from typing import Dict, List, Union
from pathlib import Path
# source: https://docs.aws.amazon.com/athena/latest/ug/application-load-balancer-logs.html
ALB_LOG_LINE_REGEX = re.compile(
r"""
(?P<type>[^ ]*)
\ (?P<time>[^ ]*) # leading backslash escapes the leading space
\ (?P<elb>[^ ]*)
\ (?P<client_ip>[^ ]*):(?P<client_port>[0-9]*)
\ (?P<target_ip>[^ ]*)[:-](?P<target_port>[0-9]*)
\ (?P<request_processing_time>[-.0-9]*)
\ (?P<target_processing_time>|[-.0-9]*)
\ (?P<response_processing_time>-|[-.0-9]*)
\ (?P<elb_status_code>[-0-9]*)
\ (?P<target_status_code>[-0-9]*)
\ (?P<received_bytes>[-0-9]*)
\ (?P<sent_bytes>[-0-9]*)
\ "(?P<request_verb>[^ ]*)
\ (?P<request_url>[^ ]*)
\ (?P<request_proto>- |[^ ]*)"
\ "(?P<user_agent>[^"]*)"
\ (?P<ssl_cipher>[A-Z0-9-]+)
\ (?P<ssl_protocol>[A-Za-z0-9.-]*)
\ (?P<target_group_arn>[^ ]*)
\ "(?P<trace_id>[^"]*)"
\ "(?P<domain_name>[^"]*)"
\ "(?P<chosen_cert_arn>[^"]*)"
\ (?P<matched_rule_priority>[-.0-9]*)
\ (?P<request_creation_time>[^ ]*)
\ "(?P<actions_executed>[^"]*)"
\ "(?P<redirect_url>[^"]*)
"(?P<lambda_error_reason>$|\ "[^ ]*") # probably never used
(?P<new_field>.*) # probably never used
""",
re.VERBOSE)
ELB_LOG_LINE_REGEX = re.compile(
r"""
(?P<time>[^ ]*)
\ (?P<elb>[^ ]*)
\ (?P<client_ip>[^ ]*):(?P<client_port>[0-9]*)
\ (?P<target_ip>[^ ]*)[:-](?P<target_port>[0-9]*)
\ (?P<request_processing_time>[-.0-9]*)
\ (?P<target_processing_time>[-.0-9]*)
\ (?P<response_processing_time>[-.0-9]*)
\ (?P<elb_status_code>|[-0-9]*)
\ (?P<target_status_code>-|[-0-9]*)
\ (?P<received_bytes>[-0-9]*)
\ (?P<sent_bytes>[-0-9]*)
\ "(?P<request_verb>[^ ]*)
\ (?P<request_url>[^ ]*)
\ (?P<request_proto>-\ |[^ ]*)"
\ (?P<user_agent>"[^"]*")
\ (?P<ssl_cipher>[A-Z0-9-]+)
\ (?P<ssl_protocol>[A-Za-z0-9.-]*)
""",
re.VERBOSE)
# map of key name to desired type constructor
ALB_LOGS_FIELD_TYPES = {
"type": str,
"time": lambda x: datetime.datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f%z"),
"elb": str,
"client_ip": str,
"client_port": int,
"target_ip": str,
"target_port": int,
"request_processing_time": float,
"target_processing_time": float,
"response_processing_time": float,
"elb_status_code": int,
"target_status_code": int,
"received_bytes": int,
"sent_bytes": int,
"request_verb": str,
"request_url": str,
"request_proto": str,
"user_agent": str,
"ssl_cipher": str,
"ssl_protocol": str,
"target_group_arn": str,
"trace_id": str,
"domain_name": str,
"chosen_cert_arn": str,
"matched_rule_priority": str,
"request_creation_time": lambda x: datetime.datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f%z"),
"actions_executed": str,
"redirect_url": str,
"lambda_error_reason": str,
"new_field": str,
}
def parse_alb_logs(logfile: Union[str, Path]) -> List[Dict[str, str]]:
"""Open a logfile, parse every line"""
matches = []
with open(logfile, "r") as f:
for line in f:
match = ALB_LOG_LINE_REGEX.match(line) or ELB_LOG_LINE_REGEX.match(line)
if match is not None:
matches.append(match)
else:
print("failed to match: \n", line)
matches = [match_to_nice_dict(match) for match in matches]
return matches
def match_to_nice_dict(match: re.Match) -> Dict:
"""Converts an ELB log match into a dict with appropriate datatypes"""
d = match.groupdict()
for field, converter in ALB_LOGS_FIELD_TYPES.items():
try:
if field in d:
d[field] = converter(d[field])
except ValueError:
# '-' is used to represent None-ish values
if d[field] == "-" or d[field] == '':
d[field] = None
else:
# bail on any other errors
raise
return d
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment