We have a stock analytics app that reads one million stock metrics using AWS Athena. There is a Lambda function that computes these metrics so we need to decide how to save them all to S3 at once. Each record is about 150 bytes. Here is a record example:
{
"timestamp": "2023-06-17 17:56:54.000",
"symbol_key": "AAPL",
"parameter_key": "money_flow_index_14",
"parameter_value": 75.72768213608795
}
Constraints
Our customers want the data they receive to be as fresh as possible and, naturally, they want to retrieve it fast. We, in turn, want the solution to be cheap.
In short, we want the solution to be cheap to own, fast to write, and fast to read.
So, how would we do that?
Let's generate some options…
Options
- Simply write each object to S3 as a JSON file.
- Group objects, convert them to a single JSON file using the Lambda function, and write the file once.
- Group objects, convert them to a single Parquet file using the library like pandas in Lambda function, and write the file once.
- Send objects to Kinesis Firehose, it will group them and deliver them to S3 as Parquet files.
The Option (1) is in the list just to be ruled out explicitly:
- We need 1M records to be available to customers in Athena as soon as possible and our solution has to be cheap.
- Writing 1M records to S3 one by one will take about 1600 minutes and cost $5.
- By grouping records to a single file we can achieve 100–200 milliseconds of writing time with negligible cost.
Anatomy of an Option
After dropping the Option 1, each option has only two aspects:
- the output file format (JSON, Parquet).
- the way we create the output file (a library call in Lambda, Kinesis Firehose).
So it looks like we need to make a decision about each aspect considering the constraints we have. Let's start then.
Decision 1. The output file format
Cheap to own
What should we take into account when choosing the file format?
Intuitively, it feels like the file size is important because it must be cheaper to store it.
The intuition is right - the file size is an important cost-driving factor, however, the reason is wrong.
The thing is when you use Athena, the main cost-driving factor is the size of the data scanned per query. There are optimization techniques, but the simplest way to reduce costs is to use the file format that packs your data as tight as possible.
So if you pack one million 150-byte records from our example into ND-JSON, you will get 150Mb, while in Parquet format it will be around 18Mb.
Compare the cost for 1,000 read queries per day:
- ND-JSON monthly costs are $21.76
- Parquet monthly costs are just $2.61
S3 storage is negligible compared to this - storing 150Mb on S3 is like $0.00345, so we skip the storage cost comparison.
Fast to write
The write time for a single file depends on its size, so the Parquet should win here as well. However, the difference will be small, so we can think ND-JSON and Parquet are the same here.
Fast to read
Ok, reading ND-JSON in Athena is more expensive, writing speed is almost the same, what about the query execution for ND-JSON and Parquet?
Let's run the same query on 1M records stored in JSON and Parquet and then compare query execution time.
explain analyze
select *
from <TABLE_NAME>
where timestamp > timestamp'2023-04-28' and
parameter_key = 'money_flow_index_14' and
symbol_key = 'AAPL'
Results show ND-JSON is about 3 times slower:
TABLE_NAME = nd_json_table
Queued: 224.94us, Analysis: 128.22ms, Planning: 176.07ms, Execution: 2.78s
TABLE_NAME = parquet_table
Queued: 220.56us, Analysis: 130.52ms, Planning: 65.62ms, Execution: 998.42ms
So we will use Parquet because it is almost 9x cheaper and about 3x faster to read than ND-JSON. Now, let's decide how we pack one million records into a Parquet file.
Decision 2. A library call in Lambda vs Firehose
If you go to the AWS Kinesis Firehose product page, you should see this:
Notice the "Parquet transformation without building your own processing pipelines" piece in the third column.
So it looks like we should go no further, and just use Kinesis Firehose, right?
Not really.
We used Kinesis Firehose for quite a bit following this mantra until recently discovered that Python Pandas can export a DataFrame to a Parquet file with a single line of code.
df.to_parquet(path='/tmp/output.parquet', compression='snappy')
This does not look like a huge overhead having the "own processing pipeline".
So let's compare two pieces of code that will be executed at the end of our producer lambda.
This function saves records as a Parquet file using Pandas DataFrame to_parquet call.
def write_parquet(s3_client, records, bucket_name):
if not bucket_name:
raise ValueError('bucket_name is not provided')
df = pd.DataFrame(records)
df['timestamp'] = pd.to_datetime(df['timestamp'])
file_path = '/tmp/output.parquet'
df.to_parquet(path=file_path, compression='snappy')
return s3_client.upload_file(
file_path,
bucket_name,
'glue-db/parquet-data/output.parquet'
)
This function delegates the Parquet conversion to Kinesis Firehose.
def write_kinesis_firehose(firehose_client, records, firehose_name):
print('firehose_name', firehose_name)
if not firehose_name:
raise ValueError('firehose_name is not provided')
# Append a newline character to each JSON record
_input = [json.dumps(record) + '\n' for record in records]
batch_size = 500 # Max batch size for Firehose
response = {'FailedPutCount': 0, 'RequestResponses': []}
# Split the input into batches of 500 and send each batch to Firehose
for i in range(0, len(_input), batch_size):
# Convert string data to bytes
batch_records = [
{'Data': record.encode()} for record in _input[i:i + batch_size]
]
out = firehose_client.put_record_batch(
DeliveryStreamName=firehose_name,
Records=batch_records
)
response['FailedPutCount'] += out['FailedPutCount']
response['RequestResponses'].extend(out['RequestResponses'])
return response
So let's evaluate both functions against our constraints.
Cheap to own
In both options, we pay for reading the Parquet file of similar size, so we can assume the costs of Athena queries are the same.
For write_kinesis_firehose
function our costs consist of two parts:
- Lambda execution time to complete write_kinesis_firehose
function.
- Kinesis Firehose charges for 1M records ingestion and format conversion.
1M objects written to S3 as Parquet will cost $0.52 = $0.32 for ingestion and $0.2 for format conversion to Parquet.
For write_parquet
function we only have Lambda execution time.
For 1M records Lambda execution cost is about $0,0016 and is negligible in comparison with the above Kinesis charges.
So it looks like we pay $0.52 when using Firehose, and for a call to pandas to_parquet()
function we pay almost nothing.
Fast to write
Experiments have shown it takes about 60 seconds to execute write_kinesis_firehose
while write_parquet
completes in less than 2 seconds.
Why is that?
If you look closer at the write_kinesis_firehose
code you may notice, that it calls put_record_batch for every 500 records sequentially.
That's 2,000 calls for one million records!
So, can we improve the write_kinesis_firehose
function?
We can not increase the batch size - 500 is the maximum allowed by AWS.
We can try to parallelize writing to make it faster, but it will still be slower than write_parquet
function.
So we will use a pandas
df.to_parquet()
call in Lambda function to convert one million records to Parquet format because it is about 30x faster and drastically cheaper than using Kinesis Firehose.
Final words
These are a few important takeaways from our experiments with writing one million small objects to S3 for Athena consumers.
- Use Parquet (or similar) file format that packs your data as tight as possible because the main cost driver for Athena-based solutions is the size of data scanned per query.
- Using Kinesis Firehose to save a million(s) of small objects to S3 as a single Parquet file does not make sense: you'll end up paying more for a slower method.
Thank you for your time! I hope it was worth it! If it was, please don't hesitate to clap a couple of times. 👏 👏