Full GC causes driver crashed with an OOM exception

Problems

One common cause is that the Spark driver is undergoing a memory bottleneck. When this happens, the driver crashes with an out-of-memory (OOM) and gets restarted or becomes unresponsive due to frequent full garbage collection (GC).

Our system contained 4 jobs running on a single interactive cluster, some of them are 1-minute scheduled jobs, which affected the overall job performance.

Attempts

  • Reduce the interval of periodic GC cleaning by configuring spark.cleaner.periodicGC.interval, 15mins (30 minutes by default)
  • Clear sparkContext after scheduled job finished by configuring spark.gracefullyShutdown, true. We found that sparkContext still existed by observing cluster memory kept constantly increasing every 1 minute, corresponding to scheduled jobs.
  • Upgrade the library used to integrate with Azure EventHub com.azure:com.microsoft.azure:azure-eventhubs-spark_2.12 to be the latest one.

Resolutions

  • We detached interactive cluster from each running job and replaced with automated cluster, 1 job per cluster. This improved job performance significantly with an exchange of an applied additional infrastructure cost, and slow deployment speed because automated clusters have more overhead than interactive clusters (which can be solved by Databricks Pools).
  • We also guaranteed sparkContext one-time initialization by modifying code as following snippets:

Key Takeaways

  • Avoid memory-intensive operations, such as Spark SQL collect function, large DataFrame transformation; or scale up cluster memory appropriately.
  • Regardless of cluster size, Spark driver functionalities (including logs) cannot be distinguished within a cluster. That is why we found mixed logs from every running jobs.

Sending Standard output and Standard error from Databricks to Application Insights directly

Problems

We would like to send logging from all Databricks clusters to Application Insights by configuring custom log4j appender.

However, Spark job could not find TelemetryConfiguration after the library com.microsoft.azure:applicationinsights-logging-log4j1_2:2.5.1 is uploaded.

Telemetry Exception

Resolutions

  • There is no supported method to send Standard output and Standard error from Databricks to Application Insights directly.
  • We saved Spark driver logs to Databricks File System (DBFS). Still, this is not an idea solution because of poor discoverability.
  • There is a repository https://github.com/AdamPaternostro/Azure-Databricks-Log4J-To-AppInsights which shows how to send log4j logs to Application Insights.

DBFS logging

LimitExceededException from writing too large text data

Problems

There were some jobs processing large JSON data retrieved from EventHub throwing the following exception:

com.databricks.spark.util.LimitedOutputStream$LimitExceededException: Exceeded 2097152 bytes (current = 2099270)

com.fasterxml.jackson.databind.JsonMappingException: Exceeded 2097152 bytes (current = 2099270) (through reference chain: org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart['physicalPlanDescription'])

Resolutions

Scale up maximum size of Spark capacity to handle Event Log spark.conf.set("spark.eventLog.unknownRecord.maxSize","16m"). Strangely, the in-code configuration was ineffective, so we needed to configure via Databricks Job API instead.

ReceiverDisconnectedException

Problems

This exception was thrown if there are more than one Epoch receiver connecting to the same partition with different epoch values.

According to Microsoft documentation, by using com.azure:com.microsoft.azure:azure-eventhubs-spark_2.12 version 2.3.2 and above, the existing epoch receiver will be disconnected if we create a new receiver which consumer from the same consumer group with an epoch value < existing epoch value.

Therefore, we have to be assured that our streaming application contain only one consumer group per Spark session being run.

Resolutions

  • Limit EventHub stream by storing EventHub messages in DBFS as Parquet format. Then, create an another stream to read from DBFS and continue logging streams.

For the production grade, it is recommended to replace in-memory DBFS with other persistent storages, such as RocksDB.

DBFS logging

  • If we want to write the output of streaming query to multiple streaming data sources (i.e. logging and database) using foreachBatch or foreach, be mindful that output can be recomputed (and even reread the input) every time we attempt write the output. Leveraging DataFrame cache will avoid this issue.