Databricks Observability: Processing Collected Ganglia Metrics

Introduction

In this article, we will explore the use of collected Ganglia metrics and their preparation for observability tools like Grafana. We will begin by examining how the metrics files are organized and what kind of content they hold. Then, we will discuss how the metrics going to be processed by creating a notebook to ingest XML files. Finally, we will push the metrics to Azure Data Explorer.

This article depends on the previous story: Databricks Observability: Collecting Ganglia Metrics and uses the outcome of the init-script discussed there.

The processing flow

Before the implementation part starts let’s briefly discuss the logic behind the processing.

In a nutshell, it has a few steps:

  • Move logs from the landing to the processing directory
  • Generate Pyspark dataframe by processing xml logs
  • Push the content of the dataframe to Azure Data Explorer
processing_logic_2.png
Figure 1: The processing flow

Each of these steps will be covered later in the implementation part.

Folder structure and the metrics content

The init script generates metrics using the following structure:

/dbfs/logs/ganglia_metrics/in/<cluster-name>/<driver_ip>_<timestamp>.xml

Each cluster has its own directory and the filename of the log contains a driver IP as the first token. Driver’s IP is needed to separate such host from workers since ganglia do not distinguish them:

<GANGLIA_XML VERSION="3.6.0" SOURCE="gmetad">
    <GRID NAME="unspecified" AUTHORITY="<http://0411-080835-6k8t703k-172-16-0-5/ganglia/>"
        LOCALTIME="1681494185">
        <CLUSTER NAME="cluster" LOCALTIME="1681494184" OWNER="unspecified" LATLONG="unspecified"
            URL="unspecified">
            <HOST NAME="172.16.0.4" IP="172.16.0.4" REPORTED="1681494182" TN="3" TMAX="20" DMAX="0"
                LOCATION="unspecified" GMOND_STARTED="1681493542" TAGS="">
                <METRIC NAME="cpu_num" VAL="4" TYPE="uint16" UNITS="CPUs" TN="43" TMAX="1200"
                    DMAX="0" SLOPE="zero" SOURCE="gmond">
                    <EXTRA_DATA>
                        <EXTRA_ELEMENT NAME="GROUP" VAL="cpu" />
                        <EXTRA_ELEMENT NAME="DESC" VAL="Total number of CPUs" />
                        <EXTRA_ELEMENT NAME="TITLE" VAL="CPU Count" />
                    </EXTRA_DATA>
                </METRIC>
            </HOST>
            <HOST NAME="172.16.0.5" IP="172.16.0.5" REPORTED="1681494177" TN="8" TMAX="20" DMAX="0"
                LOCATION="unspecified" GMOND_STARTED="1681493528" TAGS="">
                <METRIC NAME="cpu_num" VAL="4" TYPE="uint16" UNITS="CPUs" TN="56" TMAX="1200"
                    DMAX="0" SLOPE="zero" SOURCE="gmond">
                    <EXTRA_DATA>
                        <EXTRA_ELEMENT NAME="GROUP" VAL="cpu" />
                        <EXTRA_ELEMENT NAME="DESC" VAL="Total number of CPUs" />
                        <EXTRA_ELEMENT NAME="TITLE" VAL="CPU Count" />
                    </EXTRA_DATA>
                </METRIC>
            </HOST>
        </CLUSTER>
    </GRID>
</GANGLIA_XML>

Each file holds may hold multiple metrics of the cluster, however one xml payload per moment in time.

Implementation of the processing logic

Step 1: Install extra libraries

Because gathered metrics are xml payloads and the destination target is Azure Data Explorer, the following library must be installed:

  • Maven: com.databricks:spark-xml_2.12:0.16.0
  • PyPI: azure-kusto-data
  • PyPI: azure-kusto-ingest

This can be done manually by adjusting cluster settings:

cluster_installed_libraries.png
Figure 2: Installed custom libraries

If the cluster creation is managed by Terraform the missing module can be added using an attribute library in the scope of a cluster:

...
library {
      maven {
        coordinates = "com.databricks:spark-xml_2.12:0.16.0"
      }
    }
library {
      pypi {
        package = "azure-kusto-data"
      }
}
library {
      pypi {
        package = "azure-kusto-ingest"
      }
}
...

Step 2: Move logs to the processing folder

The following snippet moves files from  /logs/ganglia_metrics/in to /logs/ganglia_metrics/processing. It terminates the notebook if the root ganglia_metrics folder does not exist or there are no input logs to process.

path_ganglia_root = "/logs/ganglia_metrics"
path_ganglia_input = f"{path_ganglia_root}/in"
path_ganglia_processing = f"{path_ganglia_root}/processing"
path_ganglia_processing_fetch_pattern = f"{path_ganglia_processing}/*/*"

# COMMAND ----------

# Check if ganglia root folder even created
# and if not then terminates the notebook flow
try:
    dbutils.fs.ls(path_ganglia_root)
except Exception as e:
    print(e)
    dbutils.notebook.exit("Ganglia metrics gathering is not enabled on any cluster in this opco, terminating...")

# Check if the metrics files available in the input folder
if not "in/" in [x.name for x in dbutils.fs.ls(path_ganglia_root)]:
    dbutils.notebook.exit("No input to process, terminating...")

# COMMAND ----------

# Moves incoming logs to the processing folder to isolate processing files from other processes
# 3 tries to move the files
try_nr = 1 
moved = False
while not moved and try_nr <= 3:
    print(f"Moving files to processing folder. Try {try_nr}")
    try:
        dbutils.fs.mv(path_ganglia_input, path_ganglia_processing, True)
        moved = True
    except Exception as e: # Replace Exception with something more specific.
        print(e)
        try_nr += 1

Step 3: Generate dataframe by processing moved files

The following Pyspark snippet reads metric payloads and extracts clusters and metrics details:

from pyspark.sql.functions import input_file_name
df = (
    spark
    .read
    .format("xml")
    .option("rowTag", "HOST")
    .load(path_ganglia_processing_fetch_pattern)
    .withColumn("path", input_file_name())
    .selectExpr(
        "SPLIT(path, '/')[4] as cluster",
        "_NAME AS host",
        "METRIC._NAME AS metric",
        "METRIC._VAL AS value",
        "METRIC._UNITS AS units",
        "from_unixtime(_REPORTED) AS timestamp",
        "CASE WHEN SPLIT(SPLIT(path, '/')[5], '_')[0] = _NAME THEN 'Driver' ELSE 'Worker' END AS host_type"
    )
)

The outcome is a tabular flat dataset:

df.limit(5).show(truncate=False)
Figure 3: The dataframe content

The dataframe df might be saved as the delta table. In some scenarios, this should be enough to analyze the health and utilization of the clusters. However, for a better user experience in observability tools like Grafana, the data will be exported to a more robust append-only query engine – Azure Data Explorer.

Step 4: Push the content of the dataframe to Azure Data Explorer

Warning: this is going the be the longest step since a few prerequisites must be implemented before running the code.

Prerequisites:

  1. Create or use existing SPN. Create an application secret. The Kusto client requires them for authentication.
  2. Create an instance of Azure Data Explorer and the database ganglia-metrics.
  3. Grant permission Ingestor to the SPN created in step 1 for the database created in step 2:
    • .add database ganglia-metrics ingestors ('aadapp=<Step 1 Application ID>')
  4. Run the following DDL script in ganglia-metrics to create a table cluster_usage:
    • .create table cluster_usage ( cluster: string, host: string, metric: string, value: decimal, units: string, timestamp: datetime, host_type:string)
  5. Create databricks secret scope ganglia-observability
  6. Create the following secrets in the added earlier secret scope:
    1. ingest-connectionstring: Use the Data Ingestion URI of ADX, an example is in a screenshot below
    2. client-id: Application (client) ID
    3. client-secret: The secret string created in Step 1
    4. tenant-id: Directory (tenant) ID

Data Ingestion URI can be extracted from the Azure Data Explorer overview page:

adx_endpoints.png
Figure 4: ADX endpoints

As soon as the ADX prerequisites are fulfilled the following script might be executed. It converts spark dataframe into the CSV string and pushes that string asynchronously to Azure Data Explorer using a native Python client QueuedIngestClient:

from azure.kusto.data import KustoConnectionStringBuilder
from azure.kusto.ingest import QueuedIngestClient
from azure.kusto.ingest.ingestion_properties import IngestionProperties, DataFormat
from datetime import datetime
import io

kusto_database = "ganglia-metrics"
kusto_table = "cluster_usage"

secret_scope = "ganglia-observability"
kusto_ingest_uri = dbutils.secrets.get(secret_scope, "ingest-connectionstring")
client_id = dbutils.secrets.get(secret_scope, "client-id")
client_secret = dbutils.secrets.get(secret_scope, "client-secret")
tenant_id = dbutils.secrets.get(secret_scope, "tenant-id")

# initialises client to push messages
kusto_ingest_client = QueuedIngestClient(
    KustoConnectionStringBuilder.with_aad_application_key_authentication(
        connection_string=kusto_ingest_uri,
        aad_app_id=client_id,
        app_key=client_secret,
        authority_id=tenant_id
    )
)

# controls ingestion behavior
ingestion_properties = IngestionProperties(
    database=kusto_database,
    table=kusto_table,
    data_format=DataFormat.CSV,
    flush_immediately=True,
)

# converts the dataframe to csv string
adx_csv_output = df.toPandas().to_csv(index=False, header=False,)
payload_text = io.StringIO(adx_csv_output)

# pushes the payload message to data explorer
kusto_ingest_client.ingest_from_stream(
    payload_text, 
    ingestion_properties
)
processed_rows = adx_csv_output.count("\\n")
print(f"{processed_rows} rows pushed appended into {kusto_database}.{kusto_table}")

# COMMAND ----------

# Final step: cleanup of the processing directory
dbutils.fs.rm(path_ganglia_processing, True)

If everything went well. the target table cluster_usage must contain metrics rows:

adx_dataset.png
Figure 5: The content of ADX table ‘cluster_usage’

In the next articles, I will talk about the usage of the metrics data in Grafana and more detailed overview of the ADX as the observability engine.

The processing notebook code put together

The following notebook might be scheduled to process data periodically and refresh the target dataset

# Databricks notebook source
path_ganglia_root = "/logs/ganglia_metrics"
path_ganglia_input = f"{path_ganglia_root}/in"
path_ganglia_processing = f"{path_ganglia_root}/processing"

path_ganglia_processing_fetch_pattern = f"{path_ganglia_processing}/*/*"

# COMMAND ----------

# Check if ganglia root folder even created
# and if not then terminates the notebook flow
try:
    dbutils.fs.ls(path_ganglia_root)
except Exception as e:
    print(e)
    dbutils.notebook.exit("Ganglia metrics gathering is not enabled on any cluster in this opco, terminating...")

# Check if the metrics files available in the input folder
if not "in/" in [x.name for x in dbutils.fs.ls(path_ganglia_root)]:
    dbutils.notebook.exit("No input to process, terminating...")

# COMMAND ----------

# Moves incoming logs to the processing folder to isolate processing files from other processes
# 3 tries to move the files
try_nr = 1 
moved = False
while not moved and try_nr <= 3:
    print(f"Moving files to processing folder. Try {try_nr}")
    try:
        dbutils.fs.mv(path_ganglia_input, path_ganglia_processing, True)
        moved = True
    except Exception as e: # Replace Exception with something more specific.
        print(e)
        try_nr += 1

# COMMAND ----------

# Ingests xmls using pyspark then converts to pandas to have control over generation of CSV output
from pyspark.sql.functions import input_file_name
df = (
    spark
    .read
    .format("xml")
    .option("rowTag", "HOST")
    .load(path_ganglia_processing_fetch_pattern)
    .withColumn("path", input_file_name())
    .selectExpr(
        "SPLIT(path, '/')[4] as cluster",
        "_NAME AS host",
        "METRIC._NAME AS metric",
        "METRIC._VAL AS value",
        "METRIC._UNITS AS units",
        "from_unixtime(_REPORTED) AS timestamp",
        "CASE WHEN SPLIT(SPLIT(path, '/')[5], '_')[0] = _NAME THEN 'Driver' ELSE 'Worker' END AS host_type"
    )
)

# COMMAND ----------

from azure.kusto.data import KustoConnectionStringBuilder
from azure.kusto.ingest import QueuedIngestClient
from azure.kusto.ingest.ingestion_properties import IngestionProperties, DataFormat
from datetime import datetime
import io

kusto_database = "ganglia-metrics"
kusto_table = "cluster_usage"

# to establish a connection with a data explorer
secret_scope = "ganglia-observability"
kusto_ingest_uri = dbutils.secrets.get(secret_scope, "ingest-connectionstring")
client_id = dbutils.secrets.get(secret_scope, "client-id")
client_secret = dbutils.secrets.get(secret_scope, "client-secret")
tenant_id = dbutils.secrets.get(secret_scope, "tenant-id")

# initialises client to push messages
kusto_ingest_client = QueuedIngestClient(
    KustoConnectionStringBuilder.with_aad_application_key_authentication(
        connection_string=kusto_ingest_uri,
        aad_app_id=client_id,
        app_key=client_secret,
        authority_id=tenant_id
    )
)

# controls ingestion behavior
ingestion_properties = IngestionProperties(
    database=kusto_database,
    table=kusto_table,
    data_format=DataFormat.CSV,
    flush_immediately=True,
)

# converts the dataframe to csv string
adx_csv_output = df.toPandas().to_csv(index=False, header=False,)
payload_text = io.StringIO(adx_csv_output)

# pushes the payload message to data explorer
kusto_ingest_client.ingest_from_stream(
    payload_text, 
    ingestion_properties
)
processed_rows = adx_csv_output.count("\\n")
print(f"{processed_rows} rows pushed appended into {kusto_database}.{kusto_table}")

# COMMAND ----------

# Final step: cleanup of the processing directory
dbutils.fs.rm(path_ganglia_processing, True)

Final words

In this article, we explored the processing flow of the ganglia metrics. The XMLs parsing is done by databricks and then the data content pushed to chosen query engine which is Azure Data Explorer. In the following article, I will discuss the combination of Azure Data Explorer and Grafana and build charts for rendering CPU and memory stats.