Quickly Transform Huge CSV Files Using AWS Lambda with Amazon S3

Photo by Nazarizal Mohammad on Unsplash

When dealing with micro-functionalities, you don’t want to deal with many things, including server management, logging, performance, and scaling. That’s where AWS Lambda comes into play.

Processing CSV Files

After AWS added S3 Triggers, it is easy to specify that every time a file gets uploaded to a specific bucket, an action will trigger, which is AWS Lambda function.

AWS Lambda Function

import io
import os
import csv
import time
import uuid
import boto3
import s3fs
import re
from datetime import datetime
from urllib.parse import unquote_plus
from botocore.exceptions import ClientError
# You might need to change some things to fit your specific needs.# If incoming data doesn't specify a country, you have to pass a default value.
# Specify a default country code in ISO 3166-1 alpha-2 format.
defaultCountry = "US"
# You probably don't need to change any variables below this point.
AWS_REGION = os.environ["AWS_REGION"]
startTime = datetime.now()
s3 = s3fs.S3FileSystem(anon=False)
output_bucket = "amazon-search-terms"
def lambda_handler(event, context):
print("Received event: " + str(event))
for record in event["Records"]:
bucket = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]
input_file = os.path.join(bucket, unquote_plus(key))
if input_file.endswith("processed.csv"):
return
output_file_name = (
os.path.splitext(os.path.basename(input_file))[0] + "_processed.csv"
)
output_file = os.path.join(output_bucket, output_file_name)
process_incoming_file(input_file, output_file)
# Change the column names, validate and reformat some of the input, and then
# write to output files.
def process_incoming_file(input_file, output_file):
# Counters for tracking the number of records and endpoints processed.
line_count = 0
create_count = 0
update_count = 0
with s3.open(output_file, "w", newline="", encoding="utf-8-sig") as new_csvfile:
header = ["reporting_range", "period_starting"]
csvwriter = csv.writer(new_csvfile)
reporting_range = ""
period_starting = ""
with s3.open(input_file, "r", newline="", encoding="utf-8-sig") as old_csvfile:
csvreader = csv.reader(old_csvfile)
for row in csvreader:
if line_count == 0:
reporting_range = re.search(
"Reporting Range=\[(.+?)\]", row[3]
).group(1)
period_starting = (
re.search("Viewing=\[(.+?)\]", row[4]).group(1).split("-")[0]
)
line_count += 1
continue
if line_count == 1:
csvwriter.writerow([*row, *header])
line_count += 1
continue
csvwriter.writerow(row + [reporting_range, period_starting])
line_count += 1

This function reads a CSV file uploaded to amazon-search-terms and creates a new processed file under the same bucket prefixed with _processed.csv, which separates processed files from their origin.

AWS Lambda Limitations

If you’re dealing with files over 1GB, perhaps you should consider AWS Athena or optimizing your AWS Lambda function to read the file using a stream instead of loading the whole file into memory.

AWS Lambda Execution Role

You need to attach IAMReadonly and S3 Full Access policies; you can find an example policy below, which is only for learning purposes.

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"iam:GetPolicyVersion",
"iam:GetAccountPasswordPolicy",
"iam:ListRoleTags",
"iam:ListServerCertificates",
"logs:*",
"iam:GenerateServiceLastAccessedDetails",
"iam:ListServiceSpecificCredentials",
"iam:ListSigningCertificates",
"iam:ListVirtualMFADevices",
"iam:ListSSHPublicKeys",
"iam:SimulateCustomPolicy",
"iam:SimulatePrincipalPolicy",
"iam:ListAttachedRolePolicies",
"iam:ListRolePolicies",
"iam:GetAccountAuthorizationDetails",
"iam:GetCredentialReport",
"iam:ListPolicies",
"iam:GetServerCertificate",
"iam:GetRole",
"iam:ListSAMLProviders",
"iam:GetPolicy",
"iam:GetAccessKeyLastUsed",
"iam:ListEntitiesForPolicy",
"iam:GetUserPolicy",
"iam:ListGroupsForUser",
"iam:GetGroupPolicy",
"iam:GetOpenIDConnectProvider",
"iam:GetRolePolicy",
"iam:GetAccountSummary",
"iam:GenerateCredentialReport",
"iam:GetServiceLastAccessedDetailsWithEntities",
"iam:ListPoliciesGrantingServiceAccess",
"iam:ListMFADevices",
"iam:GetServiceLastAccessedDetails",
"iam:GetGroup",
"iam:GetContextKeysForPrincipalPolicy",
"iam:GetOrganizationsAccessReport",
"iam:GetServiceLinkedRoleDeletionStatus",
"iam:ListInstanceProfilesForRole",
"iam:GenerateOrganizationsAccessReport",
"iam:ListAttachedUserPolicies",
"iam:ListAttachedGroupPolicies",
"iam:GetSAMLProvider",
"iam:ListAccessKeys",
"iam:GetInstanceProfile",
"s3:*",
"iam:ListGroupPolicies",
"iam:GetSSHPublicKey",
"iam:ListRoles",
"iam:ListUserPolicies",
"iam:ListInstanceProfiles",
"iam:GetContextKeysForCustomPolicy",
"iam:ListPolicyVersions",
"iam:ListOpenIDConnectProviders",
"iam:ListAccountAliases",
"iam:ListUsers",
"iam:GetUser",
"iam:ListGroups",
"iam:GetLoginProfile",
"iam:ListUserTags"
],
"Resource": "*"
}
]
}

Conclusion

∞ Travel | Coding | Lifestyle ⦿ Bangkok | ✧ #freelancer ▷ Got a project? ⭣ www.upwork.com/fl/al3rez