Photo by Petr Macháček on Unsplash

Data Engineer — On call 2

Amit Singh Rathore
Dev Genius
Published in
6 min readAug 18, 2023

--

Issues observed during on call

Note: This is a continuation of the series on On-call issues in data engineering. You can read the other blogs here.

DE On call 1 | DE On call 2 | DE On call 3 | DE On call 4 | DE On call 5

General investigation process

  1. Open Spark History Server
  2. Check the Environment tab to see the Spark Properties
  3. Track failures/slowness Job → Stage → Task
  4. Look for skew & scheduler delay (mean vs max)
  5. For the Stage see the DAG, Event Timeline & Additional Metrics
  6. Under executor check for executor loss reason, shuffle read & write size
  7. See the SQL tab especially the Details tab to see the physical plan
  8. Check the executor & driver logs
  9. Add extraJavaOption(like -XX:+PrintGCDetails -XX:+PrintGCDateStamps) for more verbose logs

Mismatched Partitions in the disk and Hive Metastore

Job failure with the following error:

>> spark.sql("select max(<partition_col>) from schema.table_name")

Caused by: java.util.concurrent.ExecutionException:
org.apache.hadoop.mapred.InvalidInputException:
Input path does not exist:

This error means that there is a mismatch in the metatsore and actual data file present on the storage. Spark gets the metadata(partition location) from the HMS and then reads files directly from the partition location. If any of the partition locations are not present the spark throws the above-mentioned error. To resolve this we have following options

  1. Running MSCK REPAIR on the hive table. This will sync the table's metadata with the partitions present on the disk. Beware this method will remove the capability to trace back any partition as the Hive does not history of partition locations. So if for any reason (compliance) you wish to keep the partition location take a backup of the current partition info.
MSCK REPAIR TABLE table_identifier [{ADD|DROP|SYNC} PARTITIONS]

# ADD - adds new partitions (DEFAULT)
# DROP - drops all partitions from the session catalog that have non-existing locations
# SYNC - combination of DROP and ADD.

Note: we can also use ALTER TABLE table_identifier RECOVER PARTITIONS

2. verifyPartitionPath — This is a deprecated flag and you will get a Warning when you set this. By setting we tell Spark to filter partition locations received from HMS by traversing the locations on disk and only keep those locations that match both on disk and hive. If we have many partitions this will significantly degrade the performance. spark.sql.hive.verifyPartitionPath=true

3. ignoreMissingFiles — This is a new flag that instructs Spark to ignore any missing files on the disk. This is just a failsafe. When this flag is set spark catches any file not found exception and handles it by just printing a warning message. spark.files.ignoreMissingFiles=true .

4. spark.sql.optimizer.metadataOnly — If the aggregate operation we are performing is only on the partition column then we can set this flag. This flag tells Spark to just do the computation on the metadata and it will not actually check files in those partitions. This might give wrong results when you don’t have data in certain partitions but hive has the partition info. spark.sql.optimizer.metadataOnly=true

Dataframe Union in a loop

One user complained that his job has started to fail with a GC overhead error.

INFO yarn.ApplicationMaster: 
Unregistering ApplicationMaster with FAILED (diag message: User class threw exception:
java.lang.OutOfMemoryError:
GC overhead limit exceeded

He was running with 32G memory. And we did not have the scope to increase the driver memory any further. So I started to look into their code to see from where the issue might be coming. I found code like the following (I have translated this in Pyspark it was a scala code).

df = Empty DF
for files_metadata in file_metadata_list: # 10 items
for files_location in files_metadata: # 60 locations
cur_df = spark.read.parquet(files_location)
cur_df = cur_df.withColumn("new col", "logic based on files_metadata")
df = df.union(cur_df)

The above code is looping and creating so many references to data frames (600). This puts a lot of pressure on the driver. The same was confirmed by JVM heap dump as well.

def assign_col_based_on_metadata(col_val):
'''
logic here
'''
return new_val

helper_udf = udf(assign_col_based_on_metadata, returnType)

df = spark.read.parquet(glob_exp)
df = df.withColumn("file_name", input_file_name())
df = df.withColumn("new col", helper_udf(file_name))

This removed the driver GC overhead error.

Lookup for membership check

One of the users complained that their job is running very slowly. Looking at the code it was found that they were performing a lookup to get all the rows from one table not present in another table. Both the table were having billions of records. The query was something like below

select a.* from huge_table a where a.id not in (select id from big_table)

The above query kept running for almost 4 hrs.

To fix the slowness asked the user to use LEFT ANTI JOIN.

SELECT a.* FROM huge_table a LEFT ANTI JOIN big_table b ON a.id = b.id

This was completed in 8 mins.

Excessive Shuffle to do aggregation within a loop

One user complained that her job is running slow. Her job was running fine on smaller data in the lower environment. On looking at the stage metrics I noticed there were huge amount of data shuffle happening. Then I looked at her code and found code something like below:

def aggregate_calls(df, t_age, agg_func):
return df.filter(df['t_age'] <= t_age).groupBy('pos_type').agg(agg_func)

agg_funcs = [F.sum('t_amount'), F.count('*'), F.avg('t_amount')]

transaction_age = [1, 7, 15, 30, 90]

for t_age, agg_func in itertools.product(transaction_age, agg_funcs):
result = aggregate_calls(df, t_age, agg_func)
# other business logic

The above code looks straightforward and easy to understand. She preferred this approach because by filtering the data first, the amount of data going into the groupBy operation is smaller. But by calling groupBy multiple times inside a loop, we force data to be shuffled multiple times.

Asked her to rewrite the logic by moving the filter in the group by the operation itself.

aggregations = []

for t_age in [1, 7, 15, 30, 90]:
aggregations.extend([
F.sum(F.when(df['t_age'] <= t_age, df['t_amount'])).alias(f'total_transcation_amount_last_{t_age}_days'),
F.count(F.when(df['t_age'] <= t_age, True)).alias(f'num_transaction_last_{t_age}_days'),
(F.sum(F.when(df['t_age'] <= t_age, df['t_amount'])) / F.count(F.when(df['t_age'] <= t_age, True))).alias(f'avg_tran_amount_last_{days}_days')
])
result = df.groupBy('pos_type').agg(*aggregations)

In the above code, we use a loop to generate the aggregation expressions for each aggregate function and each number of days. This reduces the amount of shuffle happening to 1.

Spark History server stopped working

One of the tenants’ history server stopped to come up.

Spark History Server uses level_db/rocks_db for caching the application details to fasten the UI. The issue was that to catch up on old data in parallel one team had started many streaming jobs. This flooded the application and the disk containing the level_db data was getting filled up fast.

To fix this we can remove the setting spark.history.store.path and restarted the history server.

Slowness in Job

One user reported that an application that used to take 30 min was running for 4 hrs and was not completed. On looking at the spark history server we saw that multiple jobs (2.5k+) getting launched and completed in minutes. When I compared it with older days run it was noticed that these jobs used to take around 300ms. Digging further it was found that there was a DB call for checking duplicates and dropping it. When I checked the logs of the job for the DB connection it was taking 4–5 seconds. Also, this duplicated check was called in every job. So this delay was caused by the slowness of the DB.

Asked the DB team to fix the slowness issue, in the meantime suggested application owner to move the DB connection logic to a single place. After that job started to finish within 40 min.

Proleptic Gregorian Calendar

Caused by: org.apache.spark.SparkUpgradeException:
Spark 3.0+'s Proleptic Gregorian calendar.

This was an easy one.

Spark 3.2 was reading a file written by Spark 2.3 and the old date were using a different format. In Spark version 2.4 and below, uses hybrid calendar (Julian + Gregorian). While in Spark 3.2 Proleptic Gregorian calendar is used. The changes impact on the results for dates before October 15, 1582 (Gregorian).

Spark 3.2 will fail the reading if it sees ancient dates/timestamps that are ambiguous between the two calendars.

--conf spark.sql.parquet.datetimeRebaseModeInRead=CORRECTED 
--conf spark.sql.parquet.datetimeRebaseModeInWrite=CORRECTED

By setting the above conf, Spark will not perform the rebase and read the dates/timestamps as it is.

Thanks !!!

--

--