JW Player Pipeline Conversion Project

Jul 6, 2016

Share:

Introduction

JW Player receives an average of 45,000 log entries per second (or “pings”) into a lambda architecture data pipeline. This equates to a daily data set that contains between 3 and 4 billion rows. To improve query performance and save storage costs the Data team at JW Player has started an initiative to re-define the data domain where pings are stored as sessions in a columnar format.

Current situation

In the current situation we store the pings in Avro format on Amazon S3. We then process this data in different periodic batch pipelines consisting of Hive jobs that run on EMR. Our real-time pipelines process this data using Kafka and Storm/Trident as the main technologies.  For task scheduling within the batch pipelines we use Luigi. The aggregated reports are stored in MySQL or MongoDB after processing.

Figure 1: Schematic view of our Lambda Architecture

The problem with storing our data in Avro format is that it’s very narrow, and that every ping contains a lot of duplicate data that has also been sent with previous pings. Storing duplicate data wastes resources and increases processing times.

Therefore, we wanted to fix the data redundancy issue by switching to a newer, wider, columnar storage format. We also wanted to combine the individual pings into sessions. Our goal was to leverage Parquet’s efficient storage capabilities for this new data domain.

 

Figure 2: Converting pings to sessions

The wider, session-based data format will result in fewer rows placed in persistent storage every day and as a result, processing times will decrease.

Designing a new pipeline

Before designing the new pipeline we started out with setting the following goals:

The pipeline’s performance should improve by a factor of 6X or better.

The technology for this pipeline should be limited to what is compatible with the Hortonworks HDP stack.

The pipeline must use the Parquet storage format.

The pipeline must be able to read/write data to S3 since our data is stored there.

During early prototyping of a session based data domain, we found that a 6X performance improvement would be possible. The current pipeline takes about 1.5 hours per job.

We wanted to limit the technologies that we use to what is compatible with the Hortonworks HDP stack. The reasons being that it is less expensive to roll out HDP clusters than Amazon EMR clusters and also they provide us with more control over the compute resources.

Since the data was going to be widened, we also wanted to switch to a more suitable storage format than Avro. Parquet is a columnar storage format designed for processing data on Hadoop. It has built-in support for compression and complex nested structures.

Researching Technologies

We started out by researching technologies that could handle data at this scale and that would be faster than Hive. Our shortlist consisted out of the following:

Apache Impala

PrestoDB

Apache Spark

The general idea was that each team member would pick a technology, evaluate it and run a small set of queries on both subsets of production data and a full day’s worth of production data. Afterwards, we would compare findings and benchmark results to decide which technology to move forward with.

Impala

Impala is an open-source analytic database shipped by Cloudera, MapR, Oracle and Amazon. Impala allows querying data using SQL queries in real-time. Impala uses the same metadata, SQL syntax (Hive SQL), ODBC driver, and user interface (Hue Beeswax) as Apache Hive. Thus, providing a familiar and unified platform for batch-oriented or real-time queries.

One of the first problems we encountered was that Impala is not supported on the HDP stack. So Impala had to be installed manually, which was challenging. Consequently the Cloudera stack was used because Impala is already pre-installed. However, we immediately hit another wall when we tried to run our first query. Array support in Impala wasn’t optimal. Although Impala can store and read arrays, you cannot easily select a single element in an array. This was a real deal breaker because our query frequently require selecting the first element of an array.

Apache Spark

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. Nowadays there’s a lot of fuss about Apache Spark. Databricks claims programs run in Spark are up to 100 times faster than conventional MapReduce. Accordingly, Cloudera is replacing MapReduce with Spark as the default processing engine for their platform.

We were very excited about Spark because of its built-in support for reading and writing to the Parquet storage format in addition to reading and writing from S3 using the third generation S3 filesystem. The third generation S3 filesystem (s3a:) is a successor to the S3 Native (s3n:) filesystem. It supports larger files (no more 5 GB limit) and promises higher performance. It has been designed to be a switch in replacement for the S3 Native filesystem.

Furthermore, we were excited about Spark’s high-level API’s in Python. Many of our projects are written in Python so that was an extra plus for us.

Because Spark is a general-purpose computing system, it is very flexible and we are also looking into using it for other projects within the data team. In our case this was an extra plus for Spark.

PrestoDB

Besides Impala and Spark we also investigated PrestoDB. Presto is an open-source distributed SQL query engine that has been developed by Facebook. It allows you to query data where it lives, including Hive, Cassandra, relational databases or even proprietary data stores.

A couple of colleagues over at Netflix wrote a very interesting blog post on Presto.

After reading their blog, we were pretty keen on trying out and evaluating Presto ourselves.

Installing Presto on our cluster was not trivial. There is an ambari service available for Presto, but it’s not compatible with Ubuntu (which we used to run HDP on). That meant that we had to install Presto ourselves on all the nodes in the cluster. We wrote a couple of deployment scripts in bash and let parallel ssh (pssh) do the magic for us.

Now, for Presto we had to do a little bit more tuning (compared to Spark/Hive) in order for it to execute our queries successfully. Presto appears to be pretty picky about memory settings, query.max-memory-per-node should not be set higher than +/- 60% of the available JVM memory per node otherwise Presto will throw an IllegalArgumentException because it doesn’t have enough heap space available.

During our evaluation of Presto, we also hit its memory limits. We managed to come up with a query that could not complete on our cluster because there wasn’t enough query memory available and we were unable to increase the query memory and heap space so that it could complete successfully. This query did manage to run successfully in Spark and Hive, though.

Benchmarking

During the investigation, we’ve tested Impala, Spark and Presto against an entire day’s worth of production data in the new wide format. The data was stored in Parquet format and was about 1TB in size consisting of 817 million rows. The data was located on a S3 bucket in order to access the data we used the third generation S3 filesystem (s3a).

Cluster composition

Nodes:

1 x M4.XLarge

5 x M4.2XLarge

The nodes ran HDP 2.4 on top of Ubuntu.

The M4.XLarge node was set up as Ambari master and all the other nodes were set up as processing nodes.

We used the default configuration for YARN, except for some memory/cpu resource tuning, we set the following values:

yarn.nodemanager.resource.memory-mb

29696 (megabyte)

yarn.nodemanager.resource.percentage-physical-cpu-limit

90 (%)

yarn.scheduler.maximum-allocation-mb

29696 (megabyte)

For Spark we used the default configurations, but when launching the PySpark shell we set the number of executors to 50 and we set the executors to use 2GB of memory each.

We used the following configurations for Presto:

Coordinator etc/config.properties:

coordinator=true

node-scheduler.include-coordinator=true

http-server.http.port=9999

query.max-memory=80GB

query.max-memory-per-node=15GB

discovery-server.enabled=true

discovery.uri=http://:9999

Worker etc/config.properties:

coordinator=false

http-server.http.port=9999

query.max-memory=80GB

query.max-memory-per-node=15GB

discovery.uri=http://:9999

JVM config etc/jvm.config:

-server

-Xmx26G

-XX:+UseG1GC

-XX:G1HeapRegionSize=32M

-XX:+UseGCOverheadLimit

-XX:+ExplicitGCInvokesConcurrent

-XX:+HeapDumpOnOutOfMemoryError

-XX:OnOutOfMemoryError=kill -9 %p

Technologies compared – the numbers

We ran two queries. The first query is a row count query to see how fast the technology can perform a full table scan. The second query represents a query the data team uses at the moment in the current pipeline.

Query 1

SELECT COUNT(*) FROM Table;

Query 2

SELECT Column1, Column2,  Column3[0] AS Column3, SUM(Column4) AS Column4, SUM(Column5) AS Column5, SUM(Column6) AS Column6, SUM(Column7) AS Column7 FROM Table GROUP BY Column1, Column2, Column3 ORDER BY Column1 ASC, Column2 ASC, Column3 ASC

Those queries gave the following results:

Technology

Query 1

Query 2

Spark

87 seconds

213 seconds

Presto

57 seconds

248 seconds

Hive (with TEZ)

291 seconds

937 seconds

As you can see, both Spark and Presto easily out perform Hive. We’re not sure why Presto is so much faster than Spark for Query 1, but we think it has to do with Spark’s startup overhead.

Conclusion

We’ve decided to build our new pipeline on top of Spark. The benchmark results show it’s much faster than Hive (with Tez). Furthermore, Spark integrates very well with the HDP stack as opposed to Presto. We also found it to be more stable and mature than Presto as Presto required a lot of memory tuning before we could run queries. In light of all these reasons, we are very excited about Spark and we are looking forward to using it in more projects.