Quickly Transform Huge CSV Files Using AWS Lambda with Amazon S3
When dealing with micro-functionalities, you don’t want to deal with many things, including server management, logging, performance, and scaling. That’s where AWS Lambda comes into play.
Processing CSV Files
You have an application that allows users to upload CSV on AWS S3, and the uploaded file gets processed afterward; to do so, you need a background job to avoid blocking your application and rate-limit the traffic.
After AWS added S3 Triggers, it is easy to specify that every time a file gets uploaded to a specific bucket, an action will trigger, which is AWS Lambda function.
AWS Lambda Function
To read and process S3 files we’re going to use Amazon Web Services (AWS) SDK for Python, “Boto”.
import io
import os
import csv
import time
import uuid
import boto3
import s3fs
import re
from datetime import datetime
from urllib.parse import unquote_plus
from botocore.exceptions import ClientError# You might need to change some things to fit your specific needs.# If incoming data doesn't specify a country, you have to pass a default value.
# Specify a default country code in ISO 3166-1 alpha-2 format.
defaultCountry = "US"# You probably don't need to change any variables below this point.
AWS_REGION = os.environ["AWS_REGION"]
startTime = datetime.now()
s3 = s3fs.S3FileSystem(anon=False)
output_bucket = "amazon-search-terms"def lambda_handler(event, context):
print("Received event: " + str(event))for record in event["Records"]:
bucket = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]
input_file = os.path.join(bucket, unquote_plus(key))
if input_file.endswith("processed.csv"):
return
output_file_name = (
os.path.splitext(os.path.basename(input_file))[0] + "_processed.csv"
)
output_file = os.path.join(output_bucket, output_file_name)
process_incoming_file(input_file, output_file)# Change the column names, validate and reformat some of the input, and then
# write to output files.
def process_incoming_file(input_file, output_file):
# Counters for tracking the number of records and endpoints processed.
line_count = 0
create_count = 0
update_count = 0with s3.open(output_file, "w", newline="", encoding="utf-8-sig") as new_csvfile:
header = ["reporting_range", "period_starting"]
csvwriter = csv.writer(new_csvfile)
reporting_range = ""
period_starting = ""
with s3.open(input_file, "r", newline="", encoding="utf-8-sig") as old_csvfile:
csvreader = csv.reader(old_csvfile)
for row in csvreader:
if line_count == 0:
reporting_range = re.search(
"Reporting Range=\[(.+?)\]", row[3]
).group(1)
period_starting = (
re.search("Viewing=\[(.+?)\]", row[4]).group(1).split("-")[0]
)
line_count += 1
continueif line_count == 1:
csvwriter.writerow([*row, *header])
line_count += 1
continuecsvwriter.writerow(row + [reporting_range, period_starting])
line_count += 1
This function reads a CSV file uploaded to amazon-search-terms
and creates a new processed file under the same bucket prefixed with _processed.csv
, which separates processed files from their origin.
AWS Lambda Limitations
You can now configure your AWS Lambda functions to run up to 15 minutes per execution. Previously, the maximum execution time (timeout) for a Lambda function was 5 minutes.
If you’re dealing with files over 1GB, perhaps you should consider AWS Athena or optimizing your AWS Lambda function to read the file using a stream instead of loading the whole file into memory.
AWS Lambda Execution Role
To read/write data into AWS S3 you’re AWS Lambda function needs to have a role with policies, or you’ll get an “Access Denied Error” if you attempt to open a file to read/write.
You need to attach IAMReadonly and S3 Full Access policies; you can find an example policy below, which is only for learning purposes.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"iam:GetPolicyVersion",
"iam:GetAccountPasswordPolicy",
"iam:ListRoleTags",
"iam:ListServerCertificates",
"logs:*",
"iam:GenerateServiceLastAccessedDetails",
"iam:ListServiceSpecificCredentials",
"iam:ListSigningCertificates",
"iam:ListVirtualMFADevices",
"iam:ListSSHPublicKeys",
"iam:SimulateCustomPolicy",
"iam:SimulatePrincipalPolicy",
"iam:ListAttachedRolePolicies",
"iam:ListRolePolicies",
"iam:GetAccountAuthorizationDetails",
"iam:GetCredentialReport",
"iam:ListPolicies",
"iam:GetServerCertificate",
"iam:GetRole",
"iam:ListSAMLProviders",
"iam:GetPolicy",
"iam:GetAccessKeyLastUsed",
"iam:ListEntitiesForPolicy",
"iam:GetUserPolicy",
"iam:ListGroupsForUser",
"iam:GetGroupPolicy",
"iam:GetOpenIDConnectProvider",
"iam:GetRolePolicy",
"iam:GetAccountSummary",
"iam:GenerateCredentialReport",
"iam:GetServiceLastAccessedDetailsWithEntities",
"iam:ListPoliciesGrantingServiceAccess",
"iam:ListMFADevices",
"iam:GetServiceLastAccessedDetails",
"iam:GetGroup",
"iam:GetContextKeysForPrincipalPolicy",
"iam:GetOrganizationsAccessReport",
"iam:GetServiceLinkedRoleDeletionStatus",
"iam:ListInstanceProfilesForRole",
"iam:GenerateOrganizationsAccessReport",
"iam:ListAttachedUserPolicies",
"iam:ListAttachedGroupPolicies",
"iam:GetSAMLProvider",
"iam:ListAccessKeys",
"iam:GetInstanceProfile",
"s3:*",
"iam:ListGroupPolicies",
"iam:GetSSHPublicKey",
"iam:ListRoles",
"iam:ListUserPolicies",
"iam:ListInstanceProfiles",
"iam:GetContextKeysForCustomPolicy",
"iam:ListPolicyVersions",
"iam:ListOpenIDConnectProviders",
"iam:ListAccountAliases",
"iam:ListUsers",
"iam:GetUser",
"iam:ListGroups",
"iam:GetLoginProfile",
"iam:ListUserTags"
],
"Resource": "*"
}
]
}
Conclusion
I have to say I’m pretty happy with the result since the files I’m processing are all under 600m, and it takes around an average of 5–10 min for each file to be processed; otherwise, you can look into AWS Athena and AWS SQS.