Proposed
2025-02-11
We need to process 4 CSV files of varying sizes (MB to TB) stored in S3 and produce JSON messages containing tax payment insights. The files have unpredictable sizes, requiring a scalable architecture that handles both small and large datasets efficiently.
- Process CSV files ranging from MB to TB
- Extract tax payment information and compute insights
- Produce JSON messages for downstream consumption
- Handle schema variations across files
- Ensure data privacy and compliance (tax data is sensitive)
- Minimize cost while maintaining performance
We will implement a size-adaptive streaming pipeline using AWS-native services with Apache Spark for large files.
flowchart TB
subgraph Input["S3 Input Bucket"]
CSV1[("transactions.csv")]
CSV2[("withholdings.csv")]
CSV3[("adjustments.csv")]
CSV4[("payments.csv")]
end
subgraph Orchestration["Step Functions Orchestrator"]
direction TB
S1[Discover Files]
S2[Classify by Size]
S3{Size Router}
S4[Aggregate Results]
S5[Publish Insights]
S1 --> S2 --> S3
S3 -->|"< 1GB"| Lambda
S3 -->|"1-100GB"| Glue
S3 -->|"> 100GB"| EMR
Lambda --> S4
Glue --> S4
EMR --> S4
S4 --> S5
end
subgraph Processing["Processing Tier"]
Lambda["Lambda + Pandas\n(small files)"]
Glue["AWS Glue\n(medium files)"]
EMR["EMR Spark\n(large files)"]
end
subgraph Output["Output"]
SQS["SQS Queue"]
JSON[("JSON Messages")]
DLQ["Dead Letter Queue"]
end
Input --> S1
S5 --> SQS --> JSON
SQS -.->|failures| DLQ
sequenceDiagram
autonumber
participant S3 as S3 Bucket
participant SF as Step Functions
participant Proc as Processor<br/>(Lambda/Glue/EMR)
participant Calc as Tax Calculator
participant SQS as SQS Queue
participant Consumer as Downstream<br/>Consumer
S3->>SF: S3 Event (new file)
SF->>S3: HEAD request (get file size)
SF->>SF: Route by size threshold
SF->>Proc: Invoke appropriate processor
loop For each chunk/partition
Proc->>S3: Stream CSV chunk
Proc->>Calc: Calculate tax insights
Calc-->>Proc: Insight record
Proc->>SQS: Publish JSON message
end
Proc->>SF: Processing complete
SQS->>Consumer: Deliver messages
Consumer-->>SQS: Acknowledge
classDiagram
class TaxInsightMessage {
+String messageId
+String correlationId
+DateTime timestamp
+String sourceFile
+TaxInsight[] insights
+Metadata metadata
}
class TaxInsight {
+String taxpayerId
+String taxYear
+Decimal grossIncome
+Decimal taxableIncome
+Decimal taxWithheld
+Decimal taxOwed
+Decimal taxPaid
+Decimal refundOrDue
+String[] flags
}
class Metadata {
+String processorType
+Integer recordsProcessed
+Integer chunkIndex
+Integer totalChunks
+String schemaVersion
}
TaxInsightMessage "1" *-- "many" TaxInsight
TaxInsightMessage "1" *-- "1" Metadata
stateDiagram-v2
[*] --> FileDiscovered: S3 Event
FileDiscovered --> SizeClassification: Get metadata
SizeClassification --> SmallFile: < 1GB
SizeClassification --> MediumFile: 1-100GB
SizeClassification --> LargeFile: > 100GB
SmallFile --> Processing: Lambda
MediumFile --> Processing: Glue Job
LargeFile --> Processing: EMR Cluster
Processing --> Validation: Records extracted
Validation --> TaxCalculation: Schema valid
Validation --> ErrorHandling: Invalid schema
TaxCalculation --> MessagePublishing: Insights computed
MessagePublishing --> Complete: All messages sent
MessagePublishing --> Retry: Transient failure
Retry --> MessagePublishing: Backoff elapsed
Retry --> ErrorHandling: Max retries exceeded
ErrorHandling --> DLQ: Send to dead letter
DLQ --> [*]
Complete --> [*]
C4Context
title Tax Insights Pipeline - System Context
Person(analyst, "Tax Analyst", "Consumes tax insights")
System_Boundary(pipeline, "Tax Insights Pipeline") {
System(ingestion, "Ingestion Layer", "S3 + EventBridge")
System(orchestration, "Orchestration", "Step Functions")
System(processing, "Processing", "Lambda/Glue/EMR")
System(messaging, "Messaging", "SQS + SNS")
}
System_Ext(source, "Source Systems", "ERP, Payroll")
System_Ext(downstream, "Downstream Systems", "Reporting, Compliance")
Rel(source, ingestion, "Uploads CSV files")
Rel(ingestion, orchestration, "Triggers workflow")
Rel(orchestration, processing, "Invokes processors")
Rel(processing, messaging, "Publishes JSON")
Rel(messaging, downstream, "Delivers insights")
Rel(downstream, analyst, "Presents reports")
pie showData
title Processing Cost Distribution (Estimated)
"Lambda (small files)" : 5
"Glue (medium files)" : 25
"EMR Spot (large files)" : 45
"S3 Storage" : 10
"SQS Messaging" : 5
"Step Functions" : 10
{
"messageId": "uuid-v4",
"correlationId": "pipeline-run-id",
"timestamp": "2025-02-11T14:30:00Z",
"sourceFile": "s3://bucket/transactions.csv",
"insights": [
{
"taxpayerId": "HASH-123",
"taxYear": "2024",
"grossIncome": 85000.00,
"taxableIncome": 72000.00,
"taxWithheld": 18500.00,
"taxOwed": 17280.00,
"taxPaid": 18500.00,
"refundOrDue": 1220.00,
"flags": ["OVERPAYMENT", "ELIGIBLE_REFUND"]
}
],
"metadata": {
"processorType": "GLUE",
"recordsProcessed": 50000,
"chunkIndex": 1,
"totalChunks": 10,
"schemaVersion": "1.0.0"
}
}- Scalability: Handles files from KB to TB without code changes
- Cost efficiency: Right-sized compute for each file size
- Resilience: Dead letter queue captures failures for retry
- Auditability: Full traceability via correlation IDs
- Decoupling: JSON messages enable independent downstream evolution
- Complexity: Three processing paths require maintenance
- Latency variance: Large files take longer to complete
- Cost unpredictability: EMR costs vary with data volume
| Risk | Mitigation |
|---|---|
| Schema drift across CSV files | Implement schema registry with validation |
| PII exposure in messages | Hash/tokenize taxpayer IDs before publishing |
| Cost overrun on large files | Set EMR cluster auto-termination, use Spot |
| Message ordering | Include sequence numbers; consumer handles ordering |
- Single Spark cluster for all sizes - Rejected due to cost inefficiency for small files
- Kinesis streaming - Rejected; batch processing better fits CSV file semantics
- AWS Batch - Viable but less integrated than Glue for S3/CSV workflows