Photo by Petr Macháček on Unsplash

On-call week in Data Engineering

Amit Singh Rathore
Dev Genius
Published in
5 min readApr 13, 2023

--

Issues worked in during my on-call days

DE On call 1 | DE On call 2 | DE On call 3 | DE On call 4 | DE On call 5

Last week I was On call for the Data Platform Engineering team. In this blog, I cover the issues that came during this week and their solution.

Pandas to pandas_on_spark

The job was scheduled from Airflow. The task running the spark job failed. The task log did not have any error messages. I looked at the Driver Pod in K8s and found that the driver got killed.

State: Terminated
Reason: OOMKilled
Exit Code: 137

Then I looked at the code and found it was a pandas job that was running on the Spark cluster. The user was reading the data in the Spark data frame and was running toPandas to convert it pandas data frame. Since this collected everything from executors to the driver, the driver ran out of memory.

Asked the user to use pandas_on_spark API rather than pandas. After minor edits to some functions, the job finished without any errors.

Class not found exception (Spark serialization error)

A new job on spark 3.2 failed with a class not found exception. After looking at the code I found that they had a class definition that was not serializable. Asked the user to change code / fix object reference so that class becomes serializable.

PyArrow compatibility issue

A job in production failed as in the conda env the user upgraded the numpy version. When the job ran, it failed with the following error:

pyarrow.lib.check_status pyarrow.lib.ArrowTypeError: Did not pass numpy.dtype object

Asked the user to upgrade the PyArrow version in the conda package to 4.0.1 and it solved the compatibility issue.

Insert Overwrite on the same partition

A Job that was running fine in Spark 2.3 failed when it ran with Spark 3.2 version. The error message was:

Analysis Exception: Cannot overwrite a path that is also being read from.

Looking at the user we found that the user was trying to do an insert overwrite from one partition to another partition of the same table. Since spark checks the input and output path of the table’s location rather than the partition location. Hence the error message. This is an open issue with Spark. Asked the user to set convertMetastoreParquet to false so that for insert overwrite Spark uses Hive Serde rather than the built-in one. Hive Serde follows the staging dir approach hence no issue happens there.

Ref Spark Commit: https://github.com/apache/spark/pull/35608

Ambiguous columns

A job failed with the following error:

org.apache.spark.sql.AnalysisException: Reference is ambiguous

When looking at the code I found that one of the tables that were being used in join added a new column which was also present in a different table. The code was using select($"joined_df1.*") . This is an issue with spark code, where it does not remove duplicate columns when we do star expansion of the subquery alias. Asked the user to drop the duplicate column from one of the dataframe. Also created a backlog ticket to fix this in the source code.

Ref Spark Commits:

Job Failure during write

A job that was running fine, failed. On looking at the logs I saw the following error.

An Error occurred while calling o5331.saveAsTable
java.lang.OutOfMemoryError: Java Heap Space

On further looking history server for executors' details, I found that it was a case of uneven partition. Some partitions was having more data compared to others. Since the user code was not doing any repartitioning of the dataframe before the executor wrote the skewed partition was throwing the OOM error. To let the job run in production I asked the user to change the executor memory, the job was successful with more memory. But asked the user to repartition.

Null Value in spark query, but data present in Hive table

A user reported that they are seeing null values when they query the table in Spark SQL. But when querying the same table from Hive it returned values.

This happens the way Spark treats ORC files vs how Hive treats the same. Asked the user to run the Spark Query with convertMetastoreORC to false. After setting this value Spark started to give the same result as Hive.

Spark local dir full issue

One of the jobs failed. On looking at the logs I found the following error

java.nio.file.FileSystemException: No space left on device

This happens when Scratch space (spark.local.dir) gets full. This happened as one of the rouge jobs has written too much of data on the space configured as local dir. Since we run Spark on the K8s cluster. We have a daemonset on each node for doing some node-level work. I used the same daemonset to clean up the disk on that node. And after that reran the job and it succeeded without any errors.

The broadcast hint not working

In one of the jobs, the user reported that Spark is not honoring the join hint. The table was of size ~800Mb. When I looked at the code the user had created an alias for the table, but he was giving hints using the actual table name. Asked him to provide the hint on the alias. And then the Spark plan showed that it was broadcasting the table.

Unable to work with PySpark with python 3.8 in Livy

When running PySpark with 3.8 via Livy we get the following error.

TypeError: required field “type_ignores” missing from Module

Python 3.8 introduced a change in the API. This cause the above error. The fix to this was to incorporate changes to Livy in this PR:

LIVY-795

Null Pointer Exception in PySpark

One of the users reported of getting exception in one of the production job.

java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.project_subExpr_5$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)

The issue was that the Parquet schema (case of column name) was different in two different partitions.

  • When querying those partitions separately it was working fine. Query data by partition and do union
  • Make the column names consistent across partitions.
  • Most Generic
    1. Read each partition path and get the schema of that partition 2. Convert the columns to String to assure that the data types is compatible between schemas 3. Convert the dataframe to a JSON RDD before doing union on the partitions.

Executor dying due to OOM

For one of the job executor was dying due to OOM. After looking at the code I found that they were using coalesce(8) to reduce number of partitions and then processing it further. Switched to repartition(8), this created more balanced partitions and executor OOM error was removed.

Hope it was useful !!!

--

--