It is a well-known fact that s3 + Athena is a match made in heaven but since data is in S3 and Athena is serverless, we have to use GLUE crawler to store metadata about what is contained within those S3 locations.
Even small master tables, metrics tables or daily incremental transactional data with Schema changes must be crawled to create a table on Athena.
In the beginning, my team and I used to write python scripts which upload the CSV files to s3 and then trigger a Lambda function which will invoke the relevant Crawler and create/update the table on Athena. Which was a troublesome and tedious process. That is when “AWS data Wrangler” comes to the rescue.
In this article, I want to focus on using data wrangler to read data from Athena → transform the data → create a new table out of the transformed dataframe directly on Athena.
NOTE: AWS Data wrangler is synonymous with pandas but custom-tailored for AWS.
NO crawler == NO hassle
This can be achieved both from your local machine and glue python shell.
Before we start the implementation make sure Data Wrangler is installed using:
pip install awswrangler
Also, verify appropriate s3 bucket and Glue table policies are attached to the respective role/user. Data Wrangler internally uses boto3 to perform actions.
import awswrangler as wr
name = wr.sts.get_current_identity_name()
arn = wr.sts.get_current_identity_arn()
The sample test code looks something like this
#python lib path for Glue python shell job
#If you are using GLUE python shell then make sure you add this python lib path every time you create a new job. The latest release can be found here
# AWS data wrangler read data from Athena
import awswrangler as wr
import pandas as pd
import numpy as np
#databases = wr.catalog.databases()
df = wr.athena.read_sql_query("""
FROM table_name limit 100
df = df.melt(id_vars=['col1','col2','col3'])
# AWS data wrangler write data to Athena as table
Using data wrangler you can read data in any type(CSV, parquet, Athena query, etc etc) anywhere (local or glue) as a pandas dataframe and write it back to s3 as an Object and create table on Athena simultaneously. For instance, the following code snippet can perform:
res = wr.s3.to_parquet(
path=f"s3://xxxxxxx/aws-wrangler-test", #s3 path where you want to dump
database="test_database", #<your database name>
table="table_test_wrangler", #<your new table name>
More complex features like partitioning, casting and catalogue integration can be achieved by this to_parquet API. For an exhaustive list of Parameters, this method supports click here
BONUS: Athena cache
When calling read_sql_query, instead of just running the query, we now can verify if the query has been run before. This is disabled by default and can be enabled by passing max_cache_seconds Greater than 0.
When max_cache_seconds > 0 and if the query string match within the prescribed seconds then Wrangler will return the s3 object instead of re-running the query if they are still available in S3.
df = wr.athena.read_sql_query(query, database="test_database", max_cache_seconds=900)
AWS claims this increases performance more than 100x but must be executed with caution as the string should exactly match with the previous query ran in last 900 sec(15 min) as per max_cache_seconds parameter limit set here.
The detailed approach of implementing Athena cache can be found in here