Data Engineering— On call 3

Amit Singh Rathore
Dev Genius
Published in
4 min readAug 27, 2023

--

Third instalment of on-call series

DE On call 1 | DE On call 2 | DE On call 3 | DE On call 4 | DE On call 5

Issue 1

When writing to a partitioned table with df.write.insertInto command fails

df.write.mode(‘append’).insertInto(‘mybigtable’)

Error:

org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out
at org.apache.hadoop.hive.ql.metadata.Hive.getPartitions(Hive.java:2562)
at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitions(HiveExternalCatalog.scala:1238)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitions(ExternalCatalogWithListener.scala:254)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitions(SessionCatalog.scala:1234)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:436)

As seen in above pic, Spark makes hive metastore calls to get all partition information. This getPartition() API in Hive 2.3.9 (which is used by spark) pulls all metadata about all partitions in a single call. Due to the huge number of partitions(~6L), the metadata size exceeds the 2Gb limit on the thrift server buffer size. Hence we get socket time out. To fix this issue we have two options

  1. Implement batching API in the hive client (Hive 27505)
  2. Switch to use hive serde for dataframe write opertaion. This is done by setting spark.sql.hive.convertMetastoreOrc or spark.sql.hive.convertMetastoreParquet to false.

Issue 2

The Spark catalog is not getting refreshed with the new data inserted into the external hive table.

We have a streaming job that gets some info from a Kafka topic and queries the hive table based on the received info. When querying the table it was not getting the latest view of the table.

Since Spark version 2.4+, Spark caches file listing for tables & requires “REFRESH TABLE” if file listing has changed outside of Spark.

spark.sql.metadataCacheTTLSeconds 180s
spark.sql.catalogImplementation hive

Ref: SPARK-30616

Issue 3

One of the spark jobs failed with the following error:

org.apache.spark.SparkException: Job aborted due to stage failure:
ExecutorLostFailure (executor 4 exited caused by one of the running tasks)
Reason: Container from a bad node: container_xxx_xx_000009 on host: xx.xx.com. Exit status: 50.

On looking at the executor log we see the following error:

ERROR Executor: Exception in task 102.0 in stage 884.0 (TID 31106)
java.lang.StackOverflowError
at org.codehaus.janino.CodeContext.extract16BitValue(CodeContext.java:763)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:600)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:600)

In the above pic, we see how the executor's memory is divided. But the error is of stack overflow. This error is for Java stack size.

Spark allows us to set JVM properties using extraJavaOptions. Hence we Run the job with the following spark configuration.

# set stack size to 1G
"spark.executor.extraJavaOptions=-Xss1G"

Issue 4

One of the jobs failed with following error:

Caused by: org.apache.spark.SparkException: Dataset transformations and actions can only be invoked by the driver, not inside of other Dataset transformations.

When Spark serializes Datasets references to SparkContext and SparkSession are null’ed out (by being marked as @transient or via the Closure Cleaner). As a result, Dataset methods which reference these driver-side-only objects (e.g. actions or transformations) will see null references and fail with above exception.

In code, the user was doing a persist of dataframe & then using sparkContext.broadcast. On the broadcasted variable he was performing some operation and it was causing the issue.

# order of method call
df.cache
df.broadcast
df.apply()

We asked the user to remove the broadcast and the code worked fine.

Issue 5

error: type mismatch;
found : org.apache.spark.sql.types.DecimalType.type
required: org.apache.spark.sql.types.DataType
StructField("col_name", DecimalType, true)

Refresher on hive & parquet metadata

There are two key differences between Hive and Parquet from the perspective of table schema processing.

  1. Hive is case insensitive, while Parquet is not
  2. Hive considers all columns nullable, while nullability in Parquet is significant

Due to this reason, we must reconcile Hive metastore schema with Parquet schema when converting a Hive metastore Parquet table to a Spark SQL Parquet table. The reconciliation rules are:

Fields that have the same name in both schema must have the same data type regardless of nullability. The reconciled field should have the data type of the Parquet side, so that nullability is respected.

The reconciled schema contains exactly those fields defined in Hive metastore schema.

  • Any fields that only appear in the Parquet schema are dropped in the reconciled schema.
  • Any fields that only appear in the Hive metastore schema are added as nullable field in the reconciled schema.

Solution to the above problem was to rewrite the parquet file with same data type as define in hive table.

Thanks !!

--

--