Exploring AWS Glue - Part 3

Multi-faceted ETL Tool

In part three of introduction to AWS Glue, we’ll create a simple job and write code to add a calculated column to the datasets created in the previous part.

AWS Glue Jobs

 

An AWS Glue Job is used to transform your source data before loading into the destination. As a matter of fact, a Job can be used for both Transformation and Load parts of an ETL pipeline.

When creating an AWS Glue Job, you need to specify the destination of the transformed data. The destination can be an S3 bucket, Amazon Redshift, Amazon RDS, or a Relational database.

Leveraging the different destinations, together with the ability to schedule your jobs or trigger them based on events, you can chain jobs together and build a solid ETL/ELT pipeline.

Creating a Job

 

Creating a new job is very straight forward. From the AWS Glue console we’ll click Add Job. Next we will provide a name for the job and select/create an IAM Role.  Please note that the role must have permission to access your sources, and targets.

Until you get some experience with AWS Glue jobs, it is better to let AWS Glue generate a blueprint script for you.

It will create some code for accessing the source and writing to target with basic data mapping based on your configuration. You will then simply modify the script to add your business logic.

Once you get more experienced, you can write your own scripts from scratch and put them into an S3 bucket and have AWS Glue pick them and run them.

I am going to use Scala as my scripting language. You can also use Python if you are more comfortable with it.

We also need to instruct AWS Glue about the name of the script file and the S3 bucket that will contain the script file will be generated.

Examine other configuration options that is offered by AWS Glue. Look how you can instruct AWS Glue to remember previously processed data. Check out this link for more information on “bookmarks”.

You should let AWS Glue know about the extra libraries you intend to use in your script. Remember that you are writing a script here, so there is no build process to download and configure the additional libraries.

Here is my configuration:

I select schoolfiles as my data source.

Next we are going to tell AWS Glue about our target. Our first option is to update the tables in the data catalog created when we setup and ran the Crawler.

We can also create a new file based on the processed data and load into a separate bucket.

In addition we can use JDBC to load the resulting data into Amazon RDS, Amazon Redshift, or other relational databases that support JDBC connections.

For this exercise I am going to create a new file in the same S3 bucket as our source data. Our goal is to find the school district with the highest average score per subject.

Next we are going to do some data mapping. You have the option to remove columns, add new columns, set column mappings, change data types, etc.

Once this is done AWS will create a script that is fully functional and can be executed right away. It doesn't do anything special though.

Writing the Code

 

If you are familiar with Apache Spark then the code should be familiar to you. In addition to DataFrames AWS Glue has an object called DynamicFrame.

DynamicFrame is similar to DataFrame with improvements on how the schema is inferred and handled. You can read more about DynamicFrames here.

Examine the script created by AWS Glue. Apart from creating a Glue context and s Spark Context There are basically three main sections in the code:

Data source – Witch is a DynamicFrame object based on the specifications of the source data.

Data Mapping – Is basically how source columns are mapped to the destination columns. Our code will manipulate the data mapping and add a new column.

Data Sink – Which has the specifications of the destination. Location, format, whether we want to encrypt data, compression, etc.

Please note how these three sections are created based on our configuration. As mentioned this is a complete code which you can run.

 

The result would be a bunch of csv files (based on the number of partitions that AWS Glue will create) that have two columns: sch-dist which is renamed to District and subjt which is renamed to Subject.

In order to apply our business logic we are going to write a simple script and use Spark's functions to group the results by the school district and subject and take the maximum average score.

We will use Spark's aggregation functions provided for a DataFrame. This means that first we need to convert our DynamicFrame object to a DataFrame, apply the logic and create a new DataFrame, and convert the resulting DataFrame back to a DynamicFrame, so that we can use it in datamapping object.

// Convert Dynamic Frame to a Data Frame
val df = datasource0.toDF()

// Apply the group by and get the max average
val results_df = df.groupBy("sch-dist", "subjt").agg(max("avg-scr") as "maxavgscr")
val results_single_df = results_df.repartition(1)

// Convert the resulting data frame back to a dynamic frame
val results_dyf = DynamicFrame.apply(results_single_df, glueContext)

In addition we are going to modify the data mapping to utilize our new DynamicFrame.
val applymapping1 = results_dyf.applyMapping(mappings = Seq(("sch-dist", "string", "District", "string"), ("subjt", "string", "Subject", "string"), ("maxavgscr", "double", "MaxScore", "double")), caseSensitive = false, transformationContext = "applymapping1")

That’s all we need to change for this exercise.

Once we run this job, after a few seconds we should be able to see the file in the bucket that we specified in our DataSink configuration. (The location is specified as the value for the "Path" key below)

val datasink2 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions("""{"path": "s3://schoolfiles20180916/results"}"""), transformationContext = "datasink2", format = "csv").writeDynamicFrame(applymapping1)

 

AWS Glue interface doesn’t allow for much debugging. You can have AWS Glue setup a Zeppelin endpoint and notebook for you so you can debug and test your script more easily.

I highly recommend setting up a local Zeppelin endpoint, AWS Glue endpoints are expensive and if you forget to delete them you will accrue charges whether you use them or not. You can learn more about Zeppelin notebooks here.

Final Thoughts

 

In this three part series we tried to give you an overview of AWS Glue and show you how powerful it can be as an ETL tool. The automatic schema inference of the Crawler, together with the Scheduling and Triggering abilities of the Crawler and the Jobs should give you a complete toolset to create enterprise scale data pipelines.

Even if you don't want to use it to move data around, the fact that you can use Amazon Athena and query your csv and JOSN files without the need to load them into a staging database first should be enough to consider using AWS Glue for ad hoc source data analysis and discovery.

Please feel free to leave a comment and let us know what you think about AWS Glue or the series.

Part One

Part Two

AWS Glue Page

 




7 comments

Raj August 6, 2019

How to add a column and map so that the destination table / file

    Mohammad Meimandi August 6, 2019

    Hi Raj, To add a new column you can would convert your datasource object to a dataframe, and then use the withColumn method to add a new column: var newDF = datasource0.toDF() newDF = newDF.withColumn("newCol", newVal) then you would convert back to a DynamicFrame and continue with mapping: val newDatasource = DynamicFrame.apply(newDF, glueContext)

Srinivasan ramarao August 13, 2019

When converting Dynamic Dataframe to Spark DF, will the entire data be loaded to the Glue VMs and then converted to DF or is there an option to filter out required data alone from the S3 files before loading to the data frames?

    Mohammad Meimandi August 13, 2019

    Hi Srinivasan, .toDF converts your DynamicFrame to a DataFrame with the same schema and records. However nothing stops you filter the records with the "filter" method and then convert to DataFrame. This way your DF will contain only the records that were not filterd.

Shantanu September 12, 2019

how to copy a file from a URL(machineIP/folder/file) to a s3

Dillip November 19, 2019

How to get the source file name from dynamic frame,already i tried using the file_name() function but am not getting any value

    Mohammad Meimandi November 20, 2019

    Hi Dillip, Not sure I quite understand the question. Basically you create a DynamicFrame based on the data in the Glue table which may or may not be sourced from an object on S3. Let's say you create a Glue table based on the data in an RDS instance, then in your job you will create the DynamicFrame based on that Glue table. So what source file are you referring to. You are working on Glue tables.




Leave a Reply

Your email address will not be published.


Comment


Name

Email

Url