Skip to content

Instantly share code, notes, and snippets.

@mdrakiburrahman
Created February 6, 2026 22:58
Show Gist options
  • Select an option

  • Save mdrakiburrahman/86968ebe8b93945471a2d753d209741a to your computer and use it in GitHub Desktop.

Select an option

Save mdrakiburrahman/86968ebe8b93945471a2d753d209741a to your computer and use it in GitHub Desktop.
package me.rakirahman.quality.table.deltalake
import me.rakirahman.config.DeltaLakeConfiguration
import me.rakirahman.logging.level.LoggingConstants
import me.rakirahman.metastore.MetastoreOperations
import me.rakirahman.quality.deequ.repository.metric.spark.table.DataQualityMetadata
import me.rakirahman.quality.table.TableAnalyzer
import io.delta.tables._
import java.time.LocalDateTime
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.metadata.BlockMetaData
import org.apache.parquet.hadoop.metadata.FileMetaData
import org.apache.parquet.hadoop.metadata.ParquetMetadata
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import scala.collection.mutable.ListBuffer
/** Metadata information for individual Parquet files within a Delta table.
*
* @param tableName
* The name of the Delta table
* @param timestamp
* When this analysis was performed
* @param filename
* Relative path of the Parquet file within the table
* @param vorderEnabled
* Whether V-Order optimization is enabled
* @param vorderLevel
* The V-Order optimization level applied
* @param rowCount
* Total number of rows in this Parquet file
* @param rowGroups
* Number of row groups in this Parquet file
* @param createdBy
* Tool/library that created this Parquet file
*/
case class ParquetFileInfo(
tableName: String,
timestamp: String,
filename: String,
vorderEnabled: String,
vorderLevel: String,
rowCount: Long,
rowGroups: Long,
createdBy: String
)
/** Row group statistics for Parquet files within a Delta table.
*
* @param tableName
* The name of the Delta table
* @param timestamp
* When this analysis was performed
* @param filename
* Parquet file containing this row group
* @param rowGroupId
* Sequential ID of this row group within the file
* @param totalFileRowGroups
* Total number of row groups in the parent file
* @param rowCount
* Number of rows in this specific row group
* @param compressedSize
* Size in bytes after compression
* @param uncompressedSize
* Original size in bytes before compression
* @param compressionRatio
* Ratio of compressed to uncompressed size
*/
case class RowGroupInfo(
tableName: String,
timestamp: String,
filename: String,
rowGroupId: Int,
totalFileRowGroups: Int,
rowCount: Long,
compressedSize: Long,
uncompressedSize: Long,
compressionRatio: Float
)
/** Column chunk metadata and statistics from Parquet files.
*
* @param tableName
* The name of the Delta table
* @param timestamp
* When this analysis was performed
* @param filename
* Parquet file containing this column chunk
* @param columnChunkId
* Sequential ID of the column chunk
* @param path
* Column path within the Parquet schema
* @param codec
* Compression codec used for this column
* @param primitiveType
* Data type of the column
* @param statistics
* Column statistics (min, max, null count, etc.)
* @param totalSize
* Compressed size in bytes for this column chunk
* @param totalUncompressedSize
* Uncompressed size in bytes
* @param valueCount
* Number of values in this column chunk
* @param hasDict
* Whether dictionary encoding is used
* @param dictOffset
* Byte offset of the dictionary page
* @param encodings
* List of encodings applied to this column
*/
case class ColumnChunkInfo(
tableName: String,
timestamp: String,
filename: String,
columnChunkId: Int,
path: String,
codec: String,
primitiveType: String,
statistics: String,
totalSize: Long,
totalUncompressedSize: Long,
valueCount: Long,
hasDict: String,
dictOffset: Long,
encodings: String
)
/** Column-level analysis results including cardinality and storage metrics.
*
* @param tableName
* The name of the Delta table
* @param timestamp
* When this analysis was performed
* @param columnName
* Name of the analyzed column
* @param distinctCount
* Number of unique values in the column
* @param primitiveType
* Data type of the column
* @param columnSize
* Total compressed storage size for this column
* @param columnSizeUncompressed
* Total uncompressed size for this column
* @param totalRows
* Total number of rows in the table
* @param tableSize
* Total compressed size of the entire table
* @param cardinalityOfTotalRows
* Percentage of unique values relative to total rows
* @param sizePercentOfTable
* Percentage of table storage consumed by this column
*/
case class ColumnAnalysisInfo(
tableName: String,
timestamp: String,
columnName: String,
distinctCount: Long,
primitiveType: String,
columnSize: Long,
columnSizeUncompressed: Long,
totalRows: Long,
tableSize: Long,
cardinalityOfTotalRows: Double,
sizePercentOfTable: Double
)
/** Complete analysis results for a Delta table including all DataFrames and
* summary.
*
* @param parquetFilesDF
* DataFrame containing file-level metadata and statistics
* @param rowGroupsDF
* DataFrame containing row group statistics
* @param columnChunksDF
* DataFrame containing column chunk metadata
* @param columnsDF
* DataFrame containing column-level analysis results
* @param summaryReport
* Human-readable summary of key table metrics
*/
case class DeltaLakeAnalysisResult(
parquetFilesDF: DataFrame,
rowGroupsDF: DataFrame,
columnChunksDF: DataFrame,
columnsDF: DataFrame,
summaryReport: String
)
object DeltaLakeAnalyzerConstants {
val CARDINALITY_OF_TOTAL_ROWS = "cardinality_of_total_rows"
val CLIENT_REQUEST_ID = "client_request_id"
val CODEC = "codec"
val COLUMN_CHUNK_ID = "column_chunk_id"
val COLUMN_NAME = "column_name"
val COLUMN_SIZE = "column_size"
val COLUMN_SIZE_UNCOMPRESSED = "column_size_uncompressed"
val COMPRESSED_SIZE = "compressed_size"
val COMPRESSION_RATIO = "compression_ratio"
val CREATED_BY = "created_by"
val DICT_OFFSET = "dict_offset"
val DISTINCT_COUNT = "distinct_count"
val ENCODINGS = "encodings"
val FILE_NAME = "file_name"
val FILE_SYSTEM_ID = "file_system_id"
val HAS_DICT = "has_dict"
val NONE_DETECTED = "none_detected"
val NOT_FOUND = "not_found"
val PATH = "path"
val PRIMITIVE_TYPE = "primitive_type"
val RATIO_OF_TOTAL_TABLE_ROWS = "ratio_of_total_table_rows"
val REPORT_GENERATION_TIMESTAMP = DataQualityMetadata.ColResultTimestamp
val REQUEST_PATH_URI = "request_path_uri"
val ROW_COUNT = "row_count"
val ROW_GROUP_ID = "row_group_id"
val ROW_GROUPS = "row_groups"
val SIZE_PERCENT_OF_TABLE = "size_percent_of_table"
val STATISTICS = "statistics"
val TABLE_NAME = "table_name"
val TABLE_SIZE = "table_size"
val TOTAL_FILE_ROW_GROUPS = "total_file_row_groups"
val TOTAL_ROWS = "total_rows"
val TOTAL_SIZE = "total_size"
val TOTAL_TABLE_ROW_GROUPS = "total_table_row_groups"
val TOTAL_TABLE_ROWS = "total_table_rows"
val TOTAL_UNCOMPRESSED_SIZE = "total_uncompressed_size"
val UNCOMPRESSED_SIZE = "uncompressed_size"
val VALUE_COUNT = "value_count"
val VORDER_ENABLED = "vorder_enabled"
val VORDER_LEVEL = "vorder_level"
}
/** Analyzes Delta Lake tables to provide detailed metadata and statistics.
*
* @param spark
* Spark session for data operations
* @param sqlMetastore
* Operations for interacting with the metastore
*/
// @formatter:off
class DeltaLakeAnalyzer(
spark: SparkSession,
sqlMetastore: MetastoreOperations
) extends TableAnalyzer[DeltaLakeAnalysisResult] {
/** @inheritdoc
*/
def analyze(
databaseName: String,
tableName: String,
skipColumnCardinality: Boolean = false
): DeltaLakeAnalysisResult = {
spark.catalog.setCurrentDatabase(databaseName)
val reportGenerationTime = LocalDateTime.now().toString
val tableFqdn = s"$databaseName.$tableName"
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName, Some(databaseName)))
val deltaTablePath = sqlMetastore.getLocation(databaseName, tableName)
deltaLog.update()
val fullHistoryDF = DeltaTable.forName(spark, s"${databaseName}.${tableName}").history(1)
val operationParametersMapping = fullHistoryDF.select(DeltaLakeConfiguration.DELTA_LOG_OPERATION_PARAMETER).head()(0).asInstanceOf[Map[String, Any]]
val zOrderBy = operationParametersMapping.get(DeltaLakeConfiguration.DELTA_LOG_Z_ORDER_IDENTIFIER)
val parquetFiles = ListBuffer[ParquetFileInfo]()
val rowGroups = ListBuffer[RowGroupInfo]()
val columnChunks = ListBuffer[ColumnChunkInfo]()
val totalNumberRowGroups = deltaLog.snapshot.allFiles
.collect()
.map { file =>
val path = file.path
val configuration = new Configuration()
val parquetFileReader = ParquetFileReader.open(configuration, new Path(deltaTablePath + "/" + path))
val rowGroupMeta = parquetFileReader.getRowGroups()
val parquetFileMetaData = parquetFileReader.getFileMetaData()
val parquetFileMetaDataKeyValues = parquetFileMetaData.getKeyValueMetaData()
val parquetRecordCount = parquetFileReader.getRecordCount()
val rowGroupSize = rowGroupMeta.size
parquetFiles.append(
ParquetFileInfo(
tableName = tableName,
timestamp = reportGenerationTime,
filename = path,
vorderEnabled = parquetFileMetaDataKeyValues.get(DeltaLakeConfiguration.V_ORDER_ENABLED),
vorderLevel = parquetFileMetaDataKeyValues.get(DeltaLakeConfiguration.V_ORDER_LEVEL),
rowCount = parquetRecordCount,
rowGroups = rowGroupSize,
createdBy = parquetFileMetaData.getCreatedBy
)
)
for (rowGroupNumber <- 0 to rowGroupSize - 1) {
val rowGroup = rowGroupMeta.get(rowGroupNumber)
val rowGroupColumns = rowGroup.getColumns
for (columnChunkId <- 0 to rowGroupColumns.size - 1) {
val columnStat = rowGroupColumns.get(columnChunkId)
columnChunks.append(
ColumnChunkInfo(
tableName = tableName,
timestamp = reportGenerationTime,
filename = path,
columnChunkId = rowGroupNumber,
path = columnStat.getPath().toString,
codec = columnStat.getCodec().toString,
primitiveType = columnStat.getPrimitiveType().toString,
statistics = columnStat.getStatistics().toString().replace("\"", "'"),
totalSize = columnStat.getTotalSize(),
totalUncompressedSize = columnStat.getTotalUncompressedSize(),
valueCount = columnStat.getValueCount(),
hasDict = columnStat.hasDictionaryPage().toString,
dictOffset = columnStat.getDictionaryPageOffset(),
encodings = columnStat.getEncodings().toString
)
)
}
rowGroups.append(
RowGroupInfo(
tableName = tableName,
timestamp = reportGenerationTime,
filename = path,
rowGroupId = rowGroupNumber,
totalFileRowGroups = rowGroupSize,
rowCount = rowGroup.getRowCount,
compressedSize = rowGroup.getCompressedSize,
uncompressedSize = rowGroup.getTotalByteSize,
compressionRatio = rowGroup.getCompressedSize.toFloat / rowGroup.getTotalByteSize
)
)
}
rowGroupSize
}
.sum
val parquetFilesDF = createParquetFilesDataFrame(parquetFiles.toSeq)
val rowGroupsDF = createRowGroupsDataFrame(rowGroups.toSeq)
val columnChunksDF = createColumnChunksDataFrame(columnChunks.toSeq)
val totalRows = rowGroupsDF.agg(sum(DeltaLakeAnalyzerConstants.ROW_COUNT)).first().getLong(0)
val tableSize = rowGroupsDF.agg(sum(DeltaLakeAnalyzerConstants.COMPRESSED_SIZE)).first().getLong(0)
val totalRowGroups = parquetFilesDF.agg(sum(DeltaLakeAnalyzerConstants.ROW_GROUPS)).first().getLong(0)
val columnsDF = analyzeColumns(
databaseName,
tableName,
tableFqdn,
columnChunksDF,
totalRows,
tableSize,
reportGenerationTime,
skipColumnCardinality
)
val summaryReport = generateSummaryReport(
tableFqdn,
reportGenerationTime,
totalRows,
tableSize,
parquetFilesDF,
rowGroupsDF,
columnChunksDF,
totalRowGroups,
zOrderBy
)
DeltaLakeAnalysisResult(
parquetFilesDF = parquetFilesDF.withColumn(DeltaLakeAnalyzerConstants.TOTAL_TABLE_ROWS, lit(totalRows))
.withColumn(DeltaLakeAnalyzerConstants.TOTAL_TABLE_ROW_GROUPS, lit(totalRowGroups)),
rowGroupsDF = rowGroupsDF.withColumn(DeltaLakeAnalyzerConstants.TOTAL_TABLE_ROWS, lit(totalRows))
.withColumn(DeltaLakeAnalyzerConstants.RATIO_OF_TOTAL_TABLE_ROWS, col(DeltaLakeAnalyzerConstants.ROW_COUNT) * 100.0 / lit(totalRows))
.withColumn(DeltaLakeAnalyzerConstants.TOTAL_TABLE_ROW_GROUPS, lit(totalRowGroups)),
columnChunksDF = columnChunksDF,
columnsDF = columnsDF,
summaryReport = summaryReport
)
}
private def createParquetFilesDataFrame(
parquetFiles: Seq[ParquetFileInfo]
): DataFrame = {
spark
.createDataFrame(parquetFiles.map(pf =>
(
pf.tableName,
pf.timestamp,
pf.filename,
pf.vorderEnabled,
pf.vorderLevel,
pf.rowCount,
pf.rowGroups,
pf.createdBy
)))
.toDF(
DeltaLakeAnalyzerConstants.TABLE_NAME,
DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP,
DeltaLakeAnalyzerConstants.FILE_NAME,
DeltaLakeAnalyzerConstants.VORDER_ENABLED,
DeltaLakeAnalyzerConstants.VORDER_LEVEL,
DeltaLakeAnalyzerConstants.ROW_COUNT,
DeltaLakeAnalyzerConstants.ROW_GROUPS,
DeltaLakeAnalyzerConstants.CREATED_BY
)
.withColumn(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP, col(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP).cast(TimestampType))
.withColumn(DeltaLakeAnalyzerConstants.VORDER_ENABLED, col(DeltaLakeAnalyzerConstants.VORDER_ENABLED).cast(BooleanType))
}
private def createRowGroupsDataFrame(
rowGroups: Seq[RowGroupInfo]
): DataFrame = {
spark
.createDataFrame(rowGroups.map(rg =>
(
rg.tableName,
rg.timestamp,
rg.filename,
rg.rowGroupId,
rg.totalFileRowGroups,
rg.rowCount,
rg.compressedSize,
rg.uncompressedSize,
rg.compressionRatio
)))
.toDF(
DeltaLakeAnalyzerConstants.TABLE_NAME,
DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP,
DeltaLakeAnalyzerConstants.FILE_NAME,
DeltaLakeAnalyzerConstants.ROW_GROUP_ID,
DeltaLakeAnalyzerConstants.TOTAL_FILE_ROW_GROUPS,
DeltaLakeAnalyzerConstants.ROW_COUNT,
DeltaLakeAnalyzerConstants.COMPRESSED_SIZE,
DeltaLakeAnalyzerConstants.UNCOMPRESSED_SIZE,
DeltaLakeAnalyzerConstants.COMPRESSION_RATIO
)
.withColumn(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP, col(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP).cast(TimestampType))
}
private def createColumnChunksDataFrame(
columnChunks: Seq[ColumnChunkInfo]
): DataFrame = {
spark
.createDataFrame(columnChunks.map(cc =>
(
cc.tableName,
cc.timestamp,
cc.filename,
cc.columnChunkId,
cc.path,
cc.codec,
cc.primitiveType,
cc.statistics,
cc.totalSize,
cc.totalUncompressedSize,
cc.valueCount,
cc.hasDict,
cc.dictOffset,
cc.encodings
)))
.toDF(
DeltaLakeAnalyzerConstants.TABLE_NAME,
DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP,
DeltaLakeAnalyzerConstants.FILE_NAME,
DeltaLakeAnalyzerConstants.COLUMN_CHUNK_ID,
DeltaLakeAnalyzerConstants.PATH,
DeltaLakeAnalyzerConstants.CODEC,
DeltaLakeAnalyzerConstants.PRIMITIVE_TYPE,
DeltaLakeAnalyzerConstants.STATISTICS,
DeltaLakeAnalyzerConstants.TOTAL_SIZE,
DeltaLakeAnalyzerConstants.TOTAL_UNCOMPRESSED_SIZE,
DeltaLakeAnalyzerConstants.VALUE_COUNT,
DeltaLakeAnalyzerConstants.HAS_DICT,
DeltaLakeAnalyzerConstants.DICT_OFFSET,
DeltaLakeAnalyzerConstants.ENCODINGS
)
.withColumn(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP, col(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP).cast(TimestampType))
}
private def analyzeColumns(
databaseName: String,
tableName: String,
tableFqdn: String,
columnChunksDF: DataFrame,
totalRows: Long,
tableSize: Long,
reportGenerationTime: String,
skipColumnCardinality: Boolean
): DataFrame = {
val columns = ListBuffer[ColumnAnalysisInfo]()
val columnList = spark.table(tableFqdn).columns
if (totalRows <= 1000000) {
val distinctCountsSQL = buildDistinctCountsSQL(databaseName, tableName, columnList)
val distinctCountsDF = spark.sql(distinctCountsSQL)
for (column <- columnList) {
val filterCondition = s"Path = '[$column]'"
val distinctCount = distinctCountsDF.select(col(column).cast("long")).first().getLong(0)
val columnSize = columnChunksDF.filter(filterCondition).agg(sum(DeltaLakeAnalyzerConstants.TOTAL_SIZE)).first().getLong(0)
val primitiveType = columnChunksDF.filter(filterCondition).select(col(DeltaLakeAnalyzerConstants.PRIMITIVE_TYPE)).first().getString(0)
val columnSizeUncompressed = columnChunksDF.filter(filterCondition).agg(sum(DeltaLakeAnalyzerConstants.TOTAL_UNCOMPRESSED_SIZE)).first().getLong(0)
columns.append(
ColumnAnalysisInfo(
tableName = tableName,
timestamp = reportGenerationTime,
columnName = column,
distinctCount = distinctCount,
primitiveType = primitiveType,
columnSize = columnSize,
columnSizeUncompressed = columnSizeUncompressed,
totalRows = totalRows,
tableSize = tableSize,
cardinalityOfTotalRows = distinctCount.toDouble / totalRows * 100.0,
sizePercentOfTable = columnSize.toDouble / tableSize * 100.0
)
)
}
} else {
for (column <- columnList) {
val filterCondition = s"Path = '[$column]'"
val filteredRows = columnChunksDF.filter(filterCondition).count()
val (primitiveType, columnSize, columnSizeUncompressed) =
if (filteredRows > 0) {
val pt = columnChunksDF.filter(filterCondition).select(col(DeltaLakeAnalyzerConstants.PRIMITIVE_TYPE)).first().getString(0)
val cs = columnChunksDF.filter(filterCondition).agg(sum(DeltaLakeAnalyzerConstants.TOTAL_SIZE)).first().getLong(0)
val csu = columnChunksDF.filter(filterCondition).agg(sum(DeltaLakeAnalyzerConstants.TOTAL_UNCOMPRESSED_SIZE)).first().getLong(0)
(pt, cs, csu)
} else {
(DeltaLakeAnalyzerConstants.NONE_DETECTED, 0L, 0L)
}
val distinctCount = calculateDistinctCount(
column,
tableFqdn,
totalRows,
skipColumnCardinality
)
columns.append(
ColumnAnalysisInfo(
tableName = tableName,
timestamp = reportGenerationTime,
columnName = column,
distinctCount = distinctCount,
primitiveType = primitiveType,
columnSize = columnSize,
columnSizeUncompressed = columnSizeUncompressed,
totalRows = totalRows,
tableSize = tableSize,
cardinalityOfTotalRows = distinctCount.toDouble / totalRows * 100.0,
sizePercentOfTable = columnSize.toDouble / tableSize * 100.0
)
)
}
}
spark
.createDataFrame(columns.toSeq.map(c =>
(
c.tableName,
c.timestamp,
c.columnName,
c.distinctCount,
c.primitiveType,
c.columnSize,
c.columnSizeUncompressed,
c.totalRows,
c.tableSize,
c.cardinalityOfTotalRows,
c.sizePercentOfTable
)))
.toDF(
DeltaLakeAnalyzerConstants.TABLE_NAME,
DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP,
DeltaLakeAnalyzerConstants.COLUMN_NAME,
DeltaLakeAnalyzerConstants.DISTINCT_COUNT,
DeltaLakeAnalyzerConstants.PRIMITIVE_TYPE,
DeltaLakeAnalyzerConstants.COLUMN_SIZE,
DeltaLakeAnalyzerConstants.COLUMN_SIZE_UNCOMPRESSED,
DeltaLakeAnalyzerConstants.TOTAL_ROWS,
DeltaLakeAnalyzerConstants.TABLE_SIZE,
DeltaLakeAnalyzerConstants.CARDINALITY_OF_TOTAL_ROWS,
DeltaLakeAnalyzerConstants.SIZE_PERCENT_OF_TABLE
)
.withColumn(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP, col(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP).cast(TimestampType))
}
private def buildDistinctCountsSQL(
databaseName: String,
tableName: String,
columnList: Array[String]
): String = {
val selectClauses = columnList.map(col => s"COUNT(DISTINCT $col) AS $col")
s"SELECT 1 as dummy, ${selectClauses.mkString(", ")} FROM $databaseName.$tableName"
}
private def calculateDistinctCount(
column: String,
tableFqdn: String,
totalRows: Long,
skipColumnCardinality: Boolean
): Long = {
if (
List(DeltaLakeAnalyzerConstants.REQUEST_PATH_URI, DeltaLakeAnalyzerConstants.CLIENT_REQUEST_ID, DeltaLakeAnalyzerConstants.FILE_SYSTEM_ID).contains(column)
) {
0L
} else if (totalRows >= 10000000 || skipColumnCardinality) {
0L
} else {
val sql = s"SELECT COUNT(DISTINCT $column) AS ${column} FROM ${tableFqdn}"
val distinctDF = spark.sql(sql)
distinctDF.select(col(column).cast("long")).first().getLong(0)
}
}
private def generateSummaryReport(
tableFqdn: String,
reportGenerationTime: String,
totalRows: Long,
tableSize: Long,
parquetFilesDF: DataFrame,
rowGroupsDF: DataFrame,
columnChunksDF: DataFrame,
totalRowGroups: Long,
zOrderBy: Option[Any]
): String = {
val parquetFileCount = parquetFilesDF.count()
val maxRowgroupRowCount = rowGroupsDF.agg(max(DeltaLakeAnalyzerConstants.ROW_COUNT)).first().getLong(0)
val minRowgroupRowCount = rowGroupsDF.agg(min(DeltaLakeAnalyzerConstants.ROW_COUNT)).first().getLong(0)
val hasVOrder = parquetFilesDF.agg(min(DeltaLakeAnalyzerConstants.VORDER_ENABLED)).first().getAs[Boolean](0)
val formatter = java.text.NumberFormat.getIntegerInstance(new java.util.Locale("en", "US"))
val tableSizeFromColumnChunks = columnChunksDF.agg(sum(DeltaLakeAnalyzerConstants.TOTAL_SIZE)).first().getLong(0)
val tableSizeFromRowGroups = rowGroupsDF.agg(sum(DeltaLakeAnalyzerConstants.COMPRESSED_SIZE)).first().getLong(0)
s"""
|${LoggingConstants.mainDivider}
|Table: $tableFqdn
|${LoggingConstants.mainDivider}
|Report Generated: $reportGenerationTime
|Total Rows: ${formatter.format(totalRows)}
|Total Size (Bytes): ${formatter.format(tableSize)}
|Total Parquet Files: ${formatter.format(parquetFileCount)}
|Total Row Groups: ${formatter.format(totalRowGroups)}
|Max Rowgroup Row Count: ${formatter.format(maxRowgroupRowCount)}
|Min Rowgroup Row Count: ${formatter.format(minRowgroupRowCount)}
|Table Size from Column Chunks (Bytes): ${formatter.format(tableSizeFromColumnChunks)}
|Table Size from Row Groups (Bytes): ${formatter.format(tableSizeFromRowGroups)}
|Z-Order By: ${zOrderBy.getOrElse(DeltaLakeAnalyzerConstants.NOT_FOUND)}
|Has V-Order Enabled: $hasVOrder
|${LoggingConstants.mainDivider}
|""".stripMargin
}
}
// @formatter:on
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment