Hi again!
So, if you came here after reading the introduction post, we’ll be talking about the part that we run on EMR cluster i.e. ETL job. Across BigData community, the term ETL generally refers to Extract, Transform and Load. And for this project, I’m using Apache Spark, which happens to be one of the most popular open-source projects currently.
Data Ingestion Setup
Since we’ll be running multiple ETL jobs for this project, I’ll mostly focus on the first one here i.e. processing Covid-19 statistics. As mentioned earlier, this data is published to Github in daily basis by Center for Systems Science and Engineering (CSSE) at Johns Hopkins University. So, we’ll start by fetching this data and putting it in our S3 datalake bucket. To do this, I’m using one of my EC2 instance, however any machine can be used for this purpose. Basically, we’ll clone this Github repository to local folder, and then sync it to our S3 bucket. Then, to regularly sync new data, I’m adding this as a cronjob.
# Clone Github repo $ git clone https://github.com/CSSEGISandData/COVID-19.git # Here's a simple bash script that'll fetch new files from Github and upload to S3 location $ cat git-sync-s3.sh #!/usr/bin/bash cd ~/COVID-19/ echo =========== Git Pull =========== git pull echo =========== S3 Sync =========== aws s3 sync . s3://<s3-bucket>/<prefix>/ --acl bucket-owner-full-control --exclude "*" --include "*.csv" echo =============================== # Make this file executable $ chmod +x git-sync-s3.sh # Create a cronjob that runs in every 8 hours $ crontab -e 0 */8 * * * ./git-sync-s3.sh
Note that I’m using AWS CLI to run S3 sync command. Since I’m running this in EC2 instance, this tool is by default installed. And I’ve also attached an IAM role with necessary permissions to this EC2 instance, so I do not need to explicitly install CLI or configure it with credentials. If you were to try this in your PC or other machine, you’d need to install the AWS CLI and configure it with your AWS IAM credentials. Also, I’m specifying “–acl bucket-owner-full-control” because I’m putting my S3 datalake and applications in separate accounts. If everything is in same account, this ACL doesn’t necessarily be passed. Lastly, I’m using exclude and include filters to only upload CSV extension files to S3.
Considerations for EMR Cluster
Now that we’ve covered data ingestion part, let’s look into the EMR and Spark setup. In case you aren’t familiar with EMR, it stands for Elastic Map Reduce and it is an AWS managed Hadoop framework. As of now, it provides native support for about a dozen popular BigData applications. And with some additional work, one can also install any other applications as needed, though such applications aren’t managed by EMR team. Some of the main benefits of using EMR are:
- Managed cluster on top of EC2 service. So, it utilizes other components like EBS volumes, VPC, Auto-Scaling, Spot instances, and so on.
- Managed support for major applications like Spark, Hive, Tez, Presto, HBase, etc. So, no need to install them manually or to worry about dependencies.
- Strong integration with other core services like IAM, S3, DynamoDB, and Glue Data Catalog. These also allow EMR to be utilized as a transient cluster rather than being always running cluster. This can help a lot in cost saving if the cluster doesn’t need to run all the time.
Since there are lot of theoretical concepts related to EMR, Hadoop or the BigData applications, it won’t be feasible to cover all of them here. So, I’ll just walk you through the setup and will mention the relevant concept along the way. To get started with launching an EMR cluster, here are the things that we need to consider beforehand:
- A VPC and Subnet where we’ll be launching the EMR EC2 instances. The subnet can be both public or private depending on your use case. To allow the cluster to communicate with services like S3, we will also utilize VPC endpoints so that the traffic can remain private. If you’re launching your first EMR cluster from AWS Console, it can also create a basic VPC, subnet and security groups for you.
- IAM roles to be used by EMR. Mainly, it requires a service role and a instance profile role. Additionally, there can be an Auto-scaling role if auto-scaling is to be applied to the cluster. From application’s perspective, instance profile aka EMR EC2 role is the one where the necessary permissions need to be granted. If launching the first cluster from Console, it can also create these default IAM roles. Alternatively, we can also create these roles and then attach them to the cluster.
- As there are a lot of options and configurations to be used in EMR, it’s better to also have some tentative plan while launching a cluster. For example, which applications do we want to install, whether we want to utilize Glue catalog for Hive metastore, any security configuration, or any particular Hadoop/application configurations, etc. If you are just starting out, no need to worry about getting these configuration right the first time itself because we can always modify most of the configuration by accessing the cluster nodes. That way, we can test and determine which are the proper configuration for the use case and then use them in launching new clusters.
Setting up EMR Cluster
Based on above considerations, I’m launching my cluster with below properties:
- A public subnet in a VPC that has VPC endpoints for S3 and DynamoDB services.
- Default EMR and EC2 instance profile roles.
- Applications to be installed are Hadoop, Spark, Hive, Tez, Hue, Livy, Zeppelin. I don’t essentially need all of these. However, I’ve some topics to cover in regards to these applications, so installing them here.
- During configuration, I’m also choosing to use Glue Data Catalog for Hive and Spark metastore. This way, my Hive table metadata will be persisted in Glue catalog and won’t go away every time I terminate my EMR cluster. Then, I’m also doing some additional configurations such as setting Python version for Spark, EMRFS properties for S3, etc.
- And choose the desired EMR release version, EC2 instance type, count, EC2 SSH key pair, and so on.
I’ll be sharing a CloudFormation template that will also launch the EMR cluster being used in this use case. So, you can refer to it if you would like to setup similar environment. Whereas for general EMR launch, you can also go through AWS EMR documentations.
Working with Apache Spark
Now, let’s get into the actual processing task using Spark. Here, I’m using PySpark, which exposes Spark programming model to Python language. My main motivation to use PySpark in this project is because of its simplicity and ease of use. Also, it is more human understandable, so ideal for demonstrating and explaining to other people.
Before I go about writing the actual ETL script with PySpark, I’d like to briefly talk about some interactive options to try using PySpark. This can be very helpful for quick proof-of-concept or to also test out the logic. For this, I generally like to use PySparkShell through SSH into my master node. Once a shell is launched and ready, it’ll already create an application (Resource manager like YARN) containing a Spark Session. Then, we can just enter Python code similar to regular Python shell/Idle and interact with Spark APIs or actions. After trying and verifying the code in this shell, we can easily use them to prepare a Python script that we can then run with spark-submit.
Alternatively, we can also utilize Notebook such as Jupyter or Zeppelin running on top of EMR. We can then perform similar code development and testing. In a way, this is a much better option as it allows code, queries, equations, models and narrative texts within notebook cells. Additionally, one can also perform data visualization in the notebook itself. Plus, we can run the notebook and export it along with its results. Therefore, it is a very handy and useful tool.
In EMR, we can use both Jupyter and Zeppelin notebook. For JupyterLab, EMR also provides a serverless notebook that can be attached to a running EMR Spark cluster. Similarly, one can also install Zeppelin during cluster launch and use this notebook. For reference, I’ve published my notebooks in this Github repository along with all other scripts and CloudFormation template.
To summarize, here are the high level actions performed in this ETL job:
- After the job script is started, it takes the date (year, month and day) as parameter and determines the corresponding Covid-19 report file in S3.
- After Spark reads the CSV file, it renames the columns to standard names so that the resulting data will have consistent schema. One important thing to note here is that the data in Github contains varying schema, which evolved over time. So, to correctly process all these data, I first made a list of all known schema and performed corresponding mappings. Similarly, if the data layout changes in future, the job will log it accordingly.
- Then, it performs few other actions such as filling nulls, dropping duplicates, casting proper data types, etc. Also, the data refers to Country field as country name, so it also fetches the country code for all countries. For this, we are utilizing Pycountry module and using search_fuzzy() method. During my tests, I observed that there were two noticeable issues with this: i) slow performance and ii) several unidentified countries. So to address both, I initiated a Python dictionary to add each uniquely identified country and also maintained a list of all unidentified countries along with their expected codes. This helped to optimize execution time and ensure correct mapping for all countries.
- Then, it performs a GroupBy operation on the dataframe and sums on columns such as confirmed, deaths and recovered. This way, we’ll have aggregated data that can be easily used in further processing.
- Lastly, it writes both the aggregated and unaggregated to S3 output location.
After testing this logic in Notebook or PySparkShell, I wrote a Python script that can then be submitted as Spark job using EMR Step. At this stage, I also added more logic in the code to utilize argument parser and DynamoDB table for keeping track of jobs. This same DynamoDB table is also used for notifications via Stream and Lambda function.
That’s the overview of the ETL1 job. I’d encourage you to take a look into the Python Scripts available in the linked Github repo and to also test out the Notebooks if you’ve access to it. That’s all for this post. In next post, I’ll explain about the other ETL jobs and subsequently on how I’m scheduling/managing these jobs using Lambda function.
Till then, stay safe!
Leave a Reply