Covid-19 Analysis with BigData Applications – Part 2

Hi again!

On this post, I’ll explain on the second two ETL jobs: first one to process the Twitter data related to Covid-19 and second one will combine the data from previous two ETL jobs. As we have already covered the basic EMR concept earlier, I’ll directly get into the explanation of what is being done in these task.

  1. For ETL2, I’m creating a Hive table beforehand because this Twitter data is in semi-colon delimited format and isn’t easily parsed as regular CSV or other data formats. So, I’m instead using a Regular expression to parse the fields out of this data. I like to use Regix tester tool like “https://regex101.com/” for this kind of purpose. And then, I’m specifying it in the table definition, which looks like below in JSON format:
{
	"Name": "<table-name>",
	"StorageDescriptor": {
		"Columns": [{
				"Name": "time",
				"Type": "bigint"
			},
			{
				"Name": "tweet",
				"Type": "string"
			},
			{
				"Name": "country",
				"Type": "string"
			},
			{
				"Name": "followers",
				"Type": "int"
			},
			{
				"Name": "id",
				"Type": "bigint"
			},
			{
				"Name": "hashtags",
				"Type": "string"
			}
		],
		"Location": "s3://<your-bucket>/<prefix>",
		"InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
		"OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
		"SerdeInfo": {
			"SerializationLibrary": "org.apache.hadoop.hive.serde2.RegexSerDe"
		}
	},
	"PartitionKeys": [{
			"Name": "year",
			"Type": "string"
		},
		{
			"Name": "month",
			"Type": "string"
		},
		{
			"Name": "day",
			"Type": "string"
		}
	],
	"TableType": "EXTERNAL_TABLE"
}

2. Similar to ETL1 job from previous post, here also we first take the date (format YYYY-mm-dd) as parameter and get the year, month and day.

3. Then, I’m creating a Spark Dataframe by querying this table with above date as partition condition. Note, the partition already needs to exist before it can be queried, else it will just return empty results. In my case, I’m adding the new partitions from the scheduler Lambda function, which I’ll explain in next post.

4. Now that the DataFrame is ready, I’m first getting all unique countries present in it. Based on this list, I loop through each country and calculate the top 10 hashtags and words used in the tweets for that country. For this purpose, I’m utilizing flatMap() transformation method. Similarly, I’m also calculating the total number of tweets for that date and country.

5. Lastly, I create a new Spark DataFrame from the calculated info from the for loop and write it to S3 and save as table in Hive.

The full code for this ETL job is also available in the mentioned Github repo. Also, feel free to try out the corresponding Notebook in Jupyter or Zeppelin.

Next to ETL-3, it simply takes date as parameter and creates two Spark DataFrames by querying output tables from the previous two ETL jobs. Then, it joins them by country and date, and writes the output data to S3. I initially planned to also write this output data into Relational database like MySQL or PostgreSQL. This way, I could build a custom visualization application that would query the data from the relational database. However, for this project, I later chose to utilize Quicksight service for visualization, so I won’t be showing use of this ETL job. Nevertheless, it still can be an example for other use cases where you may need to implement further ETL jobs.

That’s the brief overview of the ETL2 and ETL3 jobs used in this project. Since these are mostly about programming, I’m not mentioning the code snippet as they are already published to Github. Instead, I’m thinking of making a walk-through video explaining the logic. For now, that’s all I’ve for today. Next, I’ll write about the scheduling setup with Lambda and notification process. Till then, stay safe. And if you’ve any question or feedback, please do share them in comments section.


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *