Spark has become extremely popular because it is easy-to-use, fast, and powerful for large-scale distributed data processing. Spark is developer friendly, and because it works well with many popular data analysis programming languages, such as Python, R, Scala, and Java, everyone from application developers to data scientists can readily take advantage of its capabilities.
However, Spark doesn’t come without its operational challenges. As a frequent Spark user who works with many other Spark users on a daily basis, I regularly encounter four common issues that tend to unnecessarily waste development time, slow delivery schedules, and complicate operational tasks that impact distributed system performance.
These issues aren’t related to Spark’s fundamental distributed processing capacity. Instead, they typically result from how Spark is being used. In this article, I will describe these common issues and provide guidance on how to address them quickly and easily so that you can optimize Spark performance and the time you spend configuring and operating Spark installations and jobs.
To set the context, let me describe the three main Spark application entities -- Driver, Cluster Manager, and Cache:
Now let’s look at some of the ways Spark is commonly misused and how to address these issues to boost Spark performance and improve output.
Data skew is probably the most common mistake among Spark users. Data is skewed when data sets aren’t properly or evenly distributed. Skewed data can impact performance and parallelism. The main reason data becomes skewed is because various data transformations like join, groupBy, and orderBy change data partitioning. This can cause discrepancies in the distribution across a cluster, which prevents Spark from processing data in parallel.
Data skew can cause performance problems because a single task that is taking too long to process gives the impression that your overall Spark SQL or Spark job is slow. Actually, it's only a problem with one task, or more accurately, with skewed data underlying that task. Once the skewed data problem is fixed, processing performance usually improves, and the job will finish more quickly.
The rule of thumb is to use 128 MB per partition so that tasks can be executed quickly. The associated costs of reading underlying blocks won’t be extravagant if partitions are kept to this prescribed amount. If partitions are kept to this amount, it’s possible to execute large numbers of jobs in parallel, which is ultimately more efficient than trying to overload one particular partition.
Executors can run several Spark tasks in parallel. Although conventional logic states that the greater the number of executors, the faster the computation, this isn’t always the case. The reality is that more executors can sometimes create unnecessary processing overhead and lead to slow compute processes.
How does this happen? Although Spark users can create as many executors as there are tasks, this can create issues with cache access. If there are too many executors created. individual executors will need to query the data from the underlying data sources and don’t benefit from rapid cache access.
The second common mistake with executor configuration is to create a single executor that is too big or tries to do too much. This can create memory allocation issues when all data can’t be read by the single task and additional resources are needed to run other processes that, for example, support running the OS. Dynamic allocation can help, but not in all cases.
The best way to think about the right number of executors is to determine the nature of the workload, data spread, and how clusters can best share resources. Dynamic allocation can help by enabling Spark applications to request executors when there is a backlog of pending tasks and free up executors when idle.
Spark applications require significant memory overhead when they perform data shuffling as part of a group or as part of the join operations. Remember that normal data shuffling is handled by the executor process, and if the execute activity is overloaded, it can’t handle shuffle requests. This issue can be handled with an external shuffle service. Keep in mind that data skew is especially problematic for data sets with joins. Joins can quickly create massive imbalances that can impact queries and performance.
Cartesian products frequently degrade Spark application performance because they don’t handle joins well. By using nested structures or types, you will be able to declare dealing with fewer numbers of rows at every stage, rather than moving data around.
The key is to fix the data layout. Salting the key to distribute data is the best option. One needs to pay attention to the reduce phase as well, which reduces the algorithm in two stages – first on salted keys, and secondly to reduce unsalted keys. Another strategy is to isolate keys that destroy the performance, and compute them separately.
Also, note that a Spark external shuffle often initiates an auxiliary service which will act as an external shuffle service. The NodeManager memory is about 1 GB, and apps that do a lot of data shuffling are liable to fail due to the NodeManager using up memory capacity. This brings up issues of configuration and memory, which we’ll look at next.
Spark users will invariably get an out-of-memory condition at some point in their development, which is not unusual. Spark is based on a memory-centric architecture. These memory issues are typically observed in the driver node, executor nodes, and in the NodeManager.
Note that Spark’s in-memory processing is directly tied to its performance and scalability. In order to get the most out of your Spark applications and data pipelines, there are a few things you should try when you encounter memory issues.
First off, driver shuffles are to be avoided at all costs. ReduceByKey should be used over GroupByKey, everything that goes into the shuffle memory of the executor, so avoid that all the time at all costs. TreeReduce is any day better than standard Reduce. Complex and Nested Structures should be used over Cartesian Joins. Ordering of data particularly for historical data.
When you get an error message about being out of memory, it’s usually the result of a driver failure. Spark users may encounter this frequently, but it’s a fixable issue. Keep in mind that Spark distributes workloads among various machines, and that a driver is an orchestrator of that distribution. As a result, a driver is not provisioned with the same amount of memory as executors, so it’s critical that you do not rely too heavily on the driver.
There are a number of other issues Spark users encounter, including modernizing the data science infrastructure and planning to run Kubernetes. These, and others, are big topics, and we will take them up in a later post in detail.
Subscribe to our newsletter to get fresh content and updates in your inbox every month.