AWS Big Data Blog
Optimize downstream data processing with Amazon Data Firehose and Amazon EMR running Apache Spark
February 9, 2024: Amazon Kinesis Data Firehose has been renamed to Amazon Data Firehose. Read the AWS What’s New post to learn more.
For most organizations, working with ever-increasing volumes of data and incorporating new data sources can be a challenge. Often, AWS customers have messages coming from various connected devices and sensors that must be efficiently ingested and processed before further analysis. Amazon S3 is a natural landing spot for data of all types. However, the way data is stored in Amazon S3 can make a significant difference in the efficiency and cost of downstream data processing. Specifically, Apache Spark can be over-burdened with file operations if it is processing a large number of small files versus fewer larger files. Each of these files has its own overhead of a few milliseconds for opening, reading metadata information, and closing. This overhead of file operations on these large numbers of files results in slow processing. This blog post shows how to use Amazon Data Firehose to merge many small messages into larger messages for delivery to Amazon S3. This results in faster processing with Amazon EMR running Spark.
Like Amazon Kinesis Data Streams, Amazon Data Firehose accepts a maximum incoming message size of 1 MB. If a single message is greater than 1 MB, it can be compressed before placing it on the stream. However, at large volumes, a message or file size of 1 MB or less is usually too small. Although there is no right answer for file size, 1 MB for many datasets would just yield too many files and file operations.
This post also shows how to read the compressed files using Apache Spark that are in Amazon S3, which does not have a proper file name extension and store back in Amazon S3 in parquet format.
Solution overview
The steps we follow in this blog post are:
- Create a virtual private cloud (VPC) and an Amazon S3 bucket.
- Provision a Kinesis data stream, and an AWS Lambda function to process the messages from the Kinesis data stream.
- Provision Amazon Data Firehose to deliver messages to Amazon S3 sent from the Lambda function in step 2. This step also provisions an Amazon EMR cluster to process the data in Amazon S3.
- Generate test data with custom code running on an Amazon EC2
- Run a sample Spark program from the Amazon EMR cluster’s master instance to read the files from Amazon S3, convert them into parquet format and write back to an Amazon S3 destination.
The following diagram explains how the services work together:

The AWS Lambda function in the diagram reads the messages, append additional data to them, and compress them with gzip before sending to Amazon Data Firehose. The reason for this is most customers need some enrichment to the data before arriving to Amazon S3.
Amazon Data Firehose can buffer incoming messages into larger records before delivering them to your Amazon S3 bucket. It does so according to two conditions, buffer size (up to 128 MB) and buffer interval (up to 900 seconds). Record delivery is triggered once either of these conditions has been satisfied.
An Apache Spark job reads the messages from Amazon S3, and stores them in parquet format. With parquet, data is stored in a columnar format that provides more efficient scanning and enables ad hoc querying or further processing by services like Amazon Athena.
Considerations
The maximum size of a record sent to Data Firehose is 1,000 KB. If your message size is greater than this value, compressing the message before it is sent to Data Firehose is the best approach. Data Firehose also offers compression of messages after they are written to the Data Firehose data stream. Unfortunately, this does not overcome the message size limitation, because this compression happens after the message is written. When Data Firehose delivers a previously compressed message to Amazon S3 it is written as an object without a file extension. For example, if a message is compressed with gzip before it is written to Data Firehose, it is delivered to Amazon S3 without the .gz extension. This is problematic if you are using Apache Spark for downstream processing because a “.gz” extension is required.
We will see how to overcome this issue by reading the files using the Amazon S3 API operations later in this blog.
Prerequisites and assumptions
To follow the steps outlined in this blog post, you need the following:
- An AWS account that provides access to AWS services.
- An AWS Identity and Access Management (IAM) user with an access key and secret access key to configure the AWS CLI.
- The templates and code are intended to work in the US East (N. Virginia) Region only.
Additionally, be aware of the following:
- We configure all services in the same VPC to simplify networking considerations.
- Important: The AWS CloudFormation templates and the sample code that we provide use hardcoded user names and passwords and open security groups. These are for testing purposes only. They aren’t intended for production use without any modifications.
Implementing the solution
You can use this downloadable template for single-click deployment. This template is launched in the US East (N. Virginia) Region by default. Do not change to a different Region. The template is designed to work only in the US East (N. Virginia) Region. To launch directly through the console, choose the Launch Stack button.
This template takes the following parameters. Some of the parameters have default values, and you can’t edit these. These predefined names are hardcoded in the code. For some of the parameters, you must provide the values. The following table provides additional details.
| For this parameter | Provide this | 
| StackName | Provide the stack name. | 
| ClientIP | The IP address range of the client that is allowed to connect to the cluster using SSH. | 
| FirehoseDeliveryStreamName | The name of the Amazon Firehose delivery stream. Default value is set to “AWSBlogs-LambdaToFireHose”. | 
| InstanceType | The EC2 instance type. | 
| KeyName | The name of an existing EC2 key pair to enable access to login. | 
| KinesisStreamName | The name of the Amazon Kinesis Stream. Default value is set to “AWS-Blog-BaseKinesisStream” | 
| Region | AWS Region – By default it is us-east-1 — US East (N. Virginia). Do not change this as the scripts are developed to work in this Region only. | 
| EMRClusterName | A name for the EMR cluster. | 
| S3BucketName | The name of the bucket that is created in your account. Provide some unique name to this bucket. This bucket is used for storing the messages and output from the Spark code. | 
After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names and for I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND. And then click on the Create button.
If you use this one-step solution, you can skip to Step 7: Generate test dataset and load into Kinesis Data Streams.
To create each component individually, use the following steps.
1. Use the AWS CloudFormation template to configure Amazon VPC and create an Amazon S3 bucket
In this step, we set up a VPC, public subnet, internet gateway, route table, and a security group. The security group has two inbound access rules. The first inbound rule allows access to the TCP port 22 (SSH) from the provided client IP CIDR range and the second inbound rule allows access to any TCP port from any host with in the same security group. We use this VPC and subnet for all other services that are created in the next steps. In addition to these resources, we will also create a standard Amazon S3 bucket with a provided bucket name to store the incoming data and processed data. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.
This template takes the following parameters. The following table provides additional details.
| For this parameter | Do this | 
| StackName | Provide the stack name. | 
| S3BucketName | Provide a unique Amazon S3 bucket. This bucket is created in your account. | 
| ClientIp | Provide a CIDR IP address range that is added to inbound rule of the security group. You can get your current IP address from “checkip.amazon.com” web url. | 
After you specify the template details, choose Next. On the Review page, choose Create.
When the stack launch is complete, it should return outputs similar to the following.
| Key | Value | 
| StackName | Name | 
| VPCID | Vpc-xxxxxxx | 
| SubnetID | subnet-xxxxxxxx | 
| SecurityGroup | sg-xxxxxxxxxx | 
| S3BucketDomain | <S3_BUCKET_NAME>.s3.amazonaws.com | 
| S3BucketARN | arn:aws:s3:::<S3_BUCKET_NAME> | 
Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:
$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'
2. Use the AWS CloudFormation template to create necessary IAM Roles
In this step, we set up two AWS IAM roles. One of the IAM roles will be used by an AWS Lambda function to allow access to Amazon S3 service, Amazon Data Firehose, Amazon CloudWatch Logs, and Amazon EC2 instances. The second IAM role is used by the Amazon Data Firehose service to access Amazon S3 service. You can use this downloadable CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.
This template takes the following parameters. The following table provides additional details.
| For this parameter | Do this | 
| StackName | Provide the stack name. | 
After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names. Choose Create.
When the stack launch is complete, it should return outputs similar to the following.
| Key | Value | 
| LambdaRoleArn | arn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-lamdarole | 
| FirehoseRoleArn | arn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-firehoserole | 
When the stack launch is complete, it returns the output with information about the resources that were created. Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:
$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'
3. Use an AWS CloudFormation template to configure the Amazon Data Firehose data stream
In this step, we set up Amazon Data Firehose with Amazon S3 as destination for the incoming messages. We select the Uncompressed option for compression format, buffering options with 128 MB size and interval seconds of 300. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.
This template takes the following parameters. The following table provides additional details.
| For this parameter | Do this | 
| StackName | Provide the stack name. | 
| FirehoseDeliveryStreamName | Provide the name of the Amazon Data Firehose delivery stream. The default value is set to “AWSBlogs-LambdaToFirehose” | 
| Role | Provide the Data Firehose IAM role ARN that was created as part of step 2. | 
| S3BucketARN | Select the S3BucketARN. You can get this from the step 1 AWS CloudFormation output. | 
After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.
4. Use an AWS CloudFormation template to create a Kinesis data stream and a Lambda function
In this step, we set up a Kinesis data stream and an AWS Lambda function. We can use the AWS Lambda function to process incoming messages in a Kinesis data stream. An event source mapping is also created as part of this template. This adds a trigger to the AWS Lambda function for the Kinesis data stream source. For more information about creating event source mapping, see Creating an Event Source Mapping. This Kinesis data stream is created with 10 shards and the Lambda function is created with a Java 8 runtime. We allocate memory size of 1920 MB and timeout of 300 seconds. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.
This template takes the following parameters. The following table provides details.
| For this parameter | Do this | 
| StackName | Provide the stack name. | 
| KinesisStreamName | Provide the name of the Amazon Kinesis stream. Default value is set to ‘AWS-Blog-BaseKinesisStream’ | 
| Role | Provide the IAM Role created for Lambda function as part of the second AWS CloudFormation template. Get the value from the output of second AWS CloudFormation template. | 
| S3Bucket | Provide the existing Amazon S3 bucket name that was created using first AWS CloudFormation template. Do not use the domain name. Provide the bucket name only. | 
| Region | Select the AWS Region. By default it is us-east-1 — US East (N. Virginia). | 
After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.
5. Use an AWS CloudFormation template to configure the Amazon EMR cluster
In this step, we set up an Amazon EMR 5.16.0 cluster with “Spark”, “Ganglia” and “Hive” applications. We create this cluster with one master and two core nodes, and use an r4.xlarge instance type. The template uses an AWS Glue metastore for the Amazon EMR hive metastore. This Amazon EMR cluster is used to process the messages in Amazon S3 bucket that are created by the Amazon Data Firehose data stream. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.
This template takes the following parameters. The following table provides additional details.
| For this parameter | Do this | 
| EMRClusterName | Provide the name for the EMR cluster. | 
| ClusterSecurityGroup | Select the security group ID that was created as part of the first AWS CloudFormation template. | 
| ClusterSubnetID | Select the subnet ID that was created as part of the first AWS CloudFormation template. | 
| AllowedCIDR | Provide the IP address range of the client that is allowed to connect to the cluster. | 
| KeyName | Provide the name of an existing EC2 key pair to access the Amazon EMR cluster. | 
After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.
When the stack launch is complete, it should return outputs similar to the following.
| Key | Value | 
| EMRClusterMaster | ssh hadoop@ec2-XX-XXX-XXX-XXX.us-east-1.compute.amazonaws.com -i <KEY_PAIR_NAME>.pem | 
Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:
$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'
6. Use an AWS CloudFormation template to create an Amazon EC2 Instance to generate test data
In this step, we set up an Amazon EC2 instance and install open-jdk version 1.8. The AWS CloudFormation script that creates this EC2 instance runs two additional steps. First, it downloads and installs open-jdk version 1.8. Second, it downloads a Java program jar file on to the EC2 instance’s ec2-user home directory. We use this Java program to generate test data messages with an approximate size of ~900 KB. We then send them to the Kinesis data stream that was created as part of the previous steps. The Java jar file name is: “sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar”.
You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.
This template takes the following parameters. The following table provides additional details.
| For this parameter | Do this | 
| EC2SecurityGroup | Select the security group ID that was created from the first AWS CloudFormation template. | 
| EC2Subnet | Select the subnet that was created from the first AWS CloudFormation template. | 
| InstanceType | Select the provided instance type. By default, it selects r4.4xlarge instance. | 
| KeyName | Name of an existing EC2 key pair to enable SSH access to the EC2 instance. | 
After you specify the template details, choose Next. On the options page, choose Next again. On the Review page select “I acknowledge that AWS CloudFormation might create IAM resources with custom names” option and, click Create button.
When the stack launch is complete, it should return outputs similar to the following.
| Key | Value | 
| EC2Instance | ssh ec2-user@<Public-IP> -i <KEY_PAIR_NAME>.pem | 
Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:
$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'
7. Generate the test dataset and load into Kinesis Data Streams
After all of the previous AWS CloudFormation stacks are created successfully, log in to the EC2 instance that was created as part of the step 6. Use the “ssh” command as shown in the CloudFormation stack template output. This template copies the “sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar” file, which we use to generate the test data and send to Amazon Kinesis Data Streams. You can find the code corresponding to this sample Kinesis producer in this Git repository.
Make sure your EC2 instance’s security group allows ssh port 22 (Inbound) from your IP address. If not, update your security group inbound access.
- ssh ec2-user@<Public IP Address of the EC2 Instance> -i <SSH_KEY_PAIR_NAME>.pem
Run the following commands to generate some test data.
$ cd;
$ ls -ltra sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar
-rwxr-xr-x 1 ec2-user ec2-user 27536802 Oct 29 21:19 sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar
$java -Xms1024m -Xmx25600m -XX:+UseG1GC -cp sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar com.optimize.downstream.entry.Main 10000
This java program uses PutRecords API method that allows many records to be sent with a single HTTP request. For more information on this you can check this AWS blog. Once you run the above java program, you will see the below output that shows messages are in the process of sending to Kinesis Data Stream.
When running the sample Kinesis producer jar, notice that the number of messages is 10,000. This program generates the test data messages and is not a replacement for your load testing tool. This is created to demonstrate the use case presented in this post.
After all of the messages generated and sent to Amazon Kinesis Data Streams, program will exit gracefully.
The sample JSON input message format is shown as follows:
Log in to the Kinesis Data Streams console, then choose the Kinesis data stream that was created as part of the step 4. Choose the Monitor tab to see the graphs. Run the data generation utility for at least 15 mins to generate enough data.

8. Processing Kinesis Data Streams messages using AWS Lambda
As part of the previously-described setup, we also use an AWS Lambda function (name:LambdaForProcessingKinesisRecords) to process the messages from the Kinesis data stream. This Lambda function reads each message content and appends “additional data.” This demonstrates that the incoming message from Kinesis data stream is read, and appended with some additional information to make the message size more than 1 MB. Several customers have a use case like this to enrich the incoming messages by adding additional information. After the AWS Lambda function appends additional data to incoming messages, it sends them to Amazon Data Firehose. Because Kinesis Data Firehose accepts only messages that are less than 1 MB, we must compress the messages before sending to it. In the Lambda function, we are compressing the message using gzip compression before sending it to Data Firehose. In addition to compressing each message, we are also appending a new line character (“/n”) to each message after compressing it to separate the messages.
We set the buffer size to 128 MB and duration of the buffer is 900 seconds while creating the Data Firehose. This helps merge the incoming compressed messages into larger messages and sends to the provided Amazon S3 bucket.
The AWS Lambda function appends the following content to the original message in Kinesis Data Streams after reading it.
If we do not compress the message before sending to Data Firehose, it throws this error message in the Amazon CloudWatch Logs.

Here is the code snippet where we are compressing the message in the AWS Lambda function. The complete code can be found in this Git repository.
You can check the provided bucket to see if the messages are flowing into the bucket. The Amazon S3 bucket should show something similar to the following example:

You see the files generated from Data Firehose that do not have any extension. By default, Data Firehose does not provide any extension to the files that are generated in Amazon S3 bucket unless you select a compression option. But in our use case, since the size of the uncompressed input message is greater than 1 MB, we are compressing it before sending to Data Firehose. As the message is already compressed, we are not selecting any compression option in Data Firehose, as it double-compresses the message and the downstream Spark application cannot process this.
9. Reading and converting the data into parquet format using Apache Spark program with Amazon EMR
As we noted down from the previous screen shot, Data Firehose by default does not generate any file extensions to the files that are written into Amazon S3 bucket. This creates a problem while reading the files using Apache Spark. Apache Spark, by default, checks for a valid file name extension if the file is compressed. In this case for gzip compression, it looks for <filename>.gz to successfully read it.
To overcome this issue, we can use Amazon S3 API operations, particularly AmazonS3Client class, to list all the Amazon S3 keys and use Spark’s parallelize method to read the contents of the files. After reading the file content, we can uncompress it using GZipInputStream class. You can find the code snippet below. The complete code can be found in the Git repository.
Once the Amazon EMR cluster creation is completed successfully, login to the Amazon EMR master machine using the following command. You can get the “ssh” login command from the AWS CloudFormation stack 5 (step 5) outputs parameter “EMRClusterMaster”.
- ssh hadoop@ec2-XX-XX-XX-XX.compute-1.amazonaws.com -i <KEYPAIR_NAME>.pem
- Make sure the security port 22 is opened to connect to the Amazon EMR master machine.
Run the Spark program using the following Spark submit command.
spark-submit --class com.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet --master yarn --deploy-mode client s3://aws-bigdata-blog/artifacts/aws-blog-optimize-downstream-data-processing/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar <S3_BUCKET_NAME> fromfirehose/<YEAR>/ output-emr-parquet/
Change the S3_BUCKET_NAME and YEAR values from the previous Spark command.
| Argument # | Property | Value | 
| 1 | –class | com.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet | 
| 2 | –master | yarn | 
| 3 | –deploy-mode | client | 
| 4 | s3://aws-bigdata-blog/artifacts/aws-blog-avoid-small-files/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar | |
| 5 | S3_BUCKET_NAME | The Amazon S3 bucket name that was created as part of the AWS CloudFormation template. The source files are created in this bucket. | 
| 6 | <INPUT S3 LOCATION> | “fromfirehose/<YYYY>/”. The input files are created in this Amazon S3 key location under the bucket that was created. “YYYY” represents the current year. For example, “fromfirehose/2018/” | 
| 7 | <OUTPUT S3 LOCATION> | Provide an output directory name that will be created under the above provided Amazon S3 bucket. For example: “output-emr-parquet/” | 
When the program finishes running, you can check the Amazon S3 output location to see the files that are written in parquet format.
Cleaning up after the migration
After completing and testing this solution, clean up the resources by stopping your tasks and deleting the AWS CloudFormation stacks. The stack deletion fails if you have any files in the created Amazon S3 bucket. Make sure that you cleaned up the Amazon S3 bucket that was created before deleting the AWS CloudFormation templates.
Conclusion
In this post, we described the process of avoiding small file creation in Amazon S3 by sending the incoming messages to Amazon Data Firehose. We also went through the process of reading and storing the data in parquet format using Apache Spark with an Amazon EMR cluster.
About the Author
 Srikanth Kodali is a Sr. IOT Data analytics architect at Amazon Web Services. He works with AWS customers to provide guidance and technical assistance on building IoT data and analytics solutions, helping them improve the value of their solutions when using AWS.
Srikanth Kodali is a Sr. IOT Data analytics architect at Amazon Web Services. He works with AWS customers to provide guidance and technical assistance on building IoT data and analytics solutions, helping them improve the value of their solutions when using AWS.
