Skip to content

Instantly share code, notes, and snippets.

@waltervargas
Created February 11, 2026 17:57
Show Gist options
  • Select an option

  • Save waltervargas/97c9674d4b2d5ac55339b1c13e8dd6cd to your computer and use it in GitHub Desktop.

Select an option

Save waltervargas/97c9674d4b2d5ac55339b1c13e8dd6cd to your computer and use it in GitHub Desktop.
ADR-001: Tax Insights Data Pipeline Architecture

ADR-001: Tax Insights Data Pipeline Architecture

Status

Proposed

Date

2025-02-11

Context

We need to process 4 CSV files of varying sizes (MB to TB) stored in S3 and produce JSON messages containing tax payment insights. The files have unpredictable sizes, requiring a scalable architecture that handles both small and large datasets efficiently.

Requirements

  • Process CSV files ranging from MB to TB
  • Extract tax payment information and compute insights
  • Produce JSON messages for downstream consumption
  • Handle schema variations across files
  • Ensure data privacy and compliance (tax data is sensitive)
  • Minimize cost while maintaining performance

Decision

We will implement a size-adaptive streaming pipeline using AWS-native services with Apache Spark for large files.

Architecture Overview

flowchart TB
    subgraph Input["S3 Input Bucket"]
        CSV1[("transactions.csv")]
        CSV2[("withholdings.csv")]
        CSV3[("adjustments.csv")]
        CSV4[("payments.csv")]
    end

    subgraph Orchestration["Step Functions Orchestrator"]
        direction TB
        S1[Discover Files]
        S2[Classify by Size]
        S3{Size Router}
        S4[Aggregate Results]
        S5[Publish Insights]

        S1 --> S2 --> S3
        S3 -->|"< 1GB"| Lambda
        S3 -->|"1-100GB"| Glue
        S3 -->|"> 100GB"| EMR
        Lambda --> S4
        Glue --> S4
        EMR --> S4
        S4 --> S5
    end

    subgraph Processing["Processing Tier"]
        Lambda["Lambda + Pandas\n(small files)"]
        Glue["AWS Glue\n(medium files)"]
        EMR["EMR Spark\n(large files)"]
    end

    subgraph Output["Output"]
        SQS["SQS Queue"]
        JSON[("JSON Messages")]
        DLQ["Dead Letter Queue"]
    end

    Input --> S1
    S5 --> SQS --> JSON
    SQS -.->|failures| DLQ
Loading

Data Flow Sequence

sequenceDiagram
    autonumber
    participant S3 as S3 Bucket
    participant SF as Step Functions
    participant Proc as Processor<br/>(Lambda/Glue/EMR)
    participant Calc as Tax Calculator
    participant SQS as SQS Queue
    participant Consumer as Downstream<br/>Consumer

    S3->>SF: S3 Event (new file)
    SF->>S3: HEAD request (get file size)
    SF->>SF: Route by size threshold
    SF->>Proc: Invoke appropriate processor

    loop For each chunk/partition
        Proc->>S3: Stream CSV chunk
        Proc->>Calc: Calculate tax insights
        Calc-->>Proc: Insight record
        Proc->>SQS: Publish JSON message
    end

    Proc->>SF: Processing complete
    SQS->>Consumer: Deliver messages
    Consumer-->>SQS: Acknowledge
Loading

JSON Message Schema

classDiagram
    class TaxInsightMessage {
        +String messageId
        +String correlationId
        +DateTime timestamp
        +String sourceFile
        +TaxInsight[] insights
        +Metadata metadata
    }

    class TaxInsight {
        +String taxpayerId
        +String taxYear
        +Decimal grossIncome
        +Decimal taxableIncome
        +Decimal taxWithheld
        +Decimal taxOwed
        +Decimal taxPaid
        +Decimal refundOrDue
        +String[] flags
    }

    class Metadata {
        +String processorType
        +Integer recordsProcessed
        +Integer chunkIndex
        +Integer totalChunks
        +String schemaVersion
    }

    TaxInsightMessage "1" *-- "many" TaxInsight
    TaxInsightMessage "1" *-- "1" Metadata
Loading

Processing States

stateDiagram-v2
    [*] --> FileDiscovered: S3 Event

    FileDiscovered --> SizeClassification: Get metadata

    SizeClassification --> SmallFile: < 1GB
    SizeClassification --> MediumFile: 1-100GB
    SizeClassification --> LargeFile: > 100GB

    SmallFile --> Processing: Lambda
    MediumFile --> Processing: Glue Job
    LargeFile --> Processing: EMR Cluster

    Processing --> Validation: Records extracted

    Validation --> TaxCalculation: Schema valid
    Validation --> ErrorHandling: Invalid schema

    TaxCalculation --> MessagePublishing: Insights computed

    MessagePublishing --> Complete: All messages sent
    MessagePublishing --> Retry: Transient failure

    Retry --> MessagePublishing: Backoff elapsed
    Retry --> ErrorHandling: Max retries exceeded

    ErrorHandling --> DLQ: Send to dead letter
    DLQ --> [*]

    Complete --> [*]
Loading

Component Interactions

C4Context
    title Tax Insights Pipeline - System Context

    Person(analyst, "Tax Analyst", "Consumes tax insights")

    System_Boundary(pipeline, "Tax Insights Pipeline") {
        System(ingestion, "Ingestion Layer", "S3 + EventBridge")
        System(orchestration, "Orchestration", "Step Functions")
        System(processing, "Processing", "Lambda/Glue/EMR")
        System(messaging, "Messaging", "SQS + SNS")
    }

    System_Ext(source, "Source Systems", "ERP, Payroll")
    System_Ext(downstream, "Downstream Systems", "Reporting, Compliance")

    Rel(source, ingestion, "Uploads CSV files")
    Rel(ingestion, orchestration, "Triggers workflow")
    Rel(orchestration, processing, "Invokes processors")
    Rel(processing, messaging, "Publishes JSON")
    Rel(messaging, downstream, "Delivers insights")
    Rel(downstream, analyst, "Presents reports")
Loading

Cost Optimization Strategy

pie showData
    title Processing Cost Distribution (Estimated)
    "Lambda (small files)" : 5
    "Glue (medium files)" : 25
    "EMR Spot (large files)" : 45
    "S3 Storage" : 10
    "SQS Messaging" : 5
    "Step Functions" : 10
Loading

JSON Message Format

{
  "messageId": "uuid-v4",
  "correlationId": "pipeline-run-id",
  "timestamp": "2025-02-11T14:30:00Z",
  "sourceFile": "s3://bucket/transactions.csv",
  "insights": [
    {
      "taxpayerId": "HASH-123",
      "taxYear": "2024",
      "grossIncome": 85000.00,
      "taxableIncome": 72000.00,
      "taxWithheld": 18500.00,
      "taxOwed": 17280.00,
      "taxPaid": 18500.00,
      "refundOrDue": 1220.00,
      "flags": ["OVERPAYMENT", "ELIGIBLE_REFUND"]
    }
  ],
  "metadata": {
    "processorType": "GLUE",
    "recordsProcessed": 50000,
    "chunkIndex": 1,
    "totalChunks": 10,
    "schemaVersion": "1.0.0"
  }
}

Consequences

Positive

  • Scalability: Handles files from KB to TB without code changes
  • Cost efficiency: Right-sized compute for each file size
  • Resilience: Dead letter queue captures failures for retry
  • Auditability: Full traceability via correlation IDs
  • Decoupling: JSON messages enable independent downstream evolution

Negative

  • Complexity: Three processing paths require maintenance
  • Latency variance: Large files take longer to complete
  • Cost unpredictability: EMR costs vary with data volume

Risks

Risk Mitigation
Schema drift across CSV files Implement schema registry with validation
PII exposure in messages Hash/tokenize taxpayer IDs before publishing
Cost overrun on large files Set EMR cluster auto-termination, use Spot
Message ordering Include sequence numbers; consumer handles ordering

Alternatives Considered

  1. Single Spark cluster for all sizes - Rejected due to cost inefficiency for small files
  2. Kinesis streaming - Rejected; batch processing better fits CSV file semantics
  3. AWS Batch - Viable but less integrated than Glue for S3/CSV workflows

References

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment