Data Engineer On call 5

Amit Singh Rathore
Dev Genius
Published in
2 min readSep 20, 2023

--

Issues faced in daily BAU

Data Engineering On call 1
Data Engineering On call 2
Data Engineering On call 3
Data Engineering On call 4

ISSUE 1

Spark fails to initialize with the actual database.

df = spark.read.format("jdbc").
option("url", "<host url>").
option("dbtable", "CSDTL.MY_TABLE").
option("user", "postgres").
option("password", "<password>").
option("numPartitions", 50).
option("fetchsize", 20).
load()

After looking at the code and table we observed the following.

Database schema & table name are all in uppercase letters. And Spark converts it to lowercase which fails to initialize with the actual DB.

We set the spark.conf.set('spark.sql.caseSensitive', true) and it works.

ISSUE 2

One job was running a large query with over 87k tasks. Broadcast joins were causing Spark to throw an error that says

Total size of serialized results … is bigger than spark.driver.maxResultSize.

Setting spark.driver.maxResultSize to 0 (unlimited) didn’t work.

We finally disabled broadcast joins entirely, and the job finished.

spark.sql.autoBroadcastJoinThreshold -1

ISSUE 3

One user had a spark job that read a table from DB. While reading the data from DB Spark was infering one column boolean. The DB had this column as INT with only 1 & 0 values.

Asked the user to add customSchema in the jdbc option.

spark.read.format("jdbc") \
.option("url", f"jdbc:mysql://host:port/dbname") \
.option("query", f"SELECT * FROM {table}") \
.option("user", user) \
.option("password", password) \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("customSchema", "COLUMN_NAME INT") \
.load() \
.registerTempTable(table_name)

ISSUE 4

One of the jobs in Spark was failing due to a bad data node. Executors getting launched on that node were failing. We reported the same to the platform team and they said they will remove the node in the next maintenance window. In the meantime asked the user to exclude the node in spark-submit.

spark.yarn.exclude.nodes = node_host_name

Note this worked since the app was using dynamic allocation.

ISSUE 5

One of the jobs migrated from Spark 2.4 started to fail in Spark 3.2. The job failed with the following error:

Caused by: java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column keywords is repeated.

Looking at the Physical plan, it was found that there was a filter added on an Array type column.

DataFilters: [isnotnull(ColName#1), array_contains(ColName#1, FN)]

This is an error that happens in 3.2.0. The same was fixed in 3.2.1+. To make it work in 3.2.0, asked the user to set the following. This disables parquet filter push-down.

spark.sql.parquet.filterPushdown=false

ISSUE 6

One user complained the AQE is not doing the skewJoin although the user had enabled AQE and skewJoin.

spark.sql.adaptive.enabled true
spark.sql.adaptive.skewJoin.enabled true

Looked at the user code and found that the user was doing an explicit repartition.

This is a known behavior. If we manually alter the number of partitions (causing extra shuffle) or do a caching of the dataframe, this will skip AQE optimization.

Asked the user to remove the repartition from the code and AQE kicked in.

Note: AQE, with caching, has improved with this ticket, we can set spark.sql.optimizer.canChangeCachedPlanOutputPartitioning to Change partitioning during caching.

--

--