Created
February 6, 2026 22:58
-
-
Save mdrakiburrahman/86968ebe8b93945471a2d753d209741a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package me.rakirahman.quality.table.deltalake | |
| import me.rakirahman.config.DeltaLakeConfiguration | |
| import me.rakirahman.logging.level.LoggingConstants | |
| import me.rakirahman.metastore.MetastoreOperations | |
| import me.rakirahman.quality.deequ.repository.metric.spark.table.DataQualityMetadata | |
| import me.rakirahman.quality.table.TableAnalyzer | |
| import io.delta.tables._ | |
| import java.time.LocalDateTime | |
| import org.apache.hadoop.conf.Configuration | |
| import org.apache.hadoop.fs.Path | |
| import org.apache.parquet.hadoop.metadata.BlockMetaData | |
| import org.apache.parquet.hadoop.metadata.FileMetaData | |
| import org.apache.parquet.hadoop.metadata.ParquetMetadata | |
| import org.apache.parquet.hadoop.ParquetFileReader | |
| import org.apache.spark.sql.{DataFrame, SparkSession} | |
| import org.apache.spark.sql.catalyst.TableIdentifier | |
| import org.apache.spark.sql.delta.DeltaLog | |
| import org.apache.spark.sql.functions._ | |
| import org.apache.spark.sql.Row | |
| import org.apache.spark.sql.types._ | |
| import scala.collection.mutable.ListBuffer | |
| /** Metadata information for individual Parquet files within a Delta table. | |
| * | |
| * @param tableName | |
| * The name of the Delta table | |
| * @param timestamp | |
| * When this analysis was performed | |
| * @param filename | |
| * Relative path of the Parquet file within the table | |
| * @param vorderEnabled | |
| * Whether V-Order optimization is enabled | |
| * @param vorderLevel | |
| * The V-Order optimization level applied | |
| * @param rowCount | |
| * Total number of rows in this Parquet file | |
| * @param rowGroups | |
| * Number of row groups in this Parquet file | |
| * @param createdBy | |
| * Tool/library that created this Parquet file | |
| */ | |
| case class ParquetFileInfo( | |
| tableName: String, | |
| timestamp: String, | |
| filename: String, | |
| vorderEnabled: String, | |
| vorderLevel: String, | |
| rowCount: Long, | |
| rowGroups: Long, | |
| createdBy: String | |
| ) | |
| /** Row group statistics for Parquet files within a Delta table. | |
| * | |
| * @param tableName | |
| * The name of the Delta table | |
| * @param timestamp | |
| * When this analysis was performed | |
| * @param filename | |
| * Parquet file containing this row group | |
| * @param rowGroupId | |
| * Sequential ID of this row group within the file | |
| * @param totalFileRowGroups | |
| * Total number of row groups in the parent file | |
| * @param rowCount | |
| * Number of rows in this specific row group | |
| * @param compressedSize | |
| * Size in bytes after compression | |
| * @param uncompressedSize | |
| * Original size in bytes before compression | |
| * @param compressionRatio | |
| * Ratio of compressed to uncompressed size | |
| */ | |
| case class RowGroupInfo( | |
| tableName: String, | |
| timestamp: String, | |
| filename: String, | |
| rowGroupId: Int, | |
| totalFileRowGroups: Int, | |
| rowCount: Long, | |
| compressedSize: Long, | |
| uncompressedSize: Long, | |
| compressionRatio: Float | |
| ) | |
| /** Column chunk metadata and statistics from Parquet files. | |
| * | |
| * @param tableName | |
| * The name of the Delta table | |
| * @param timestamp | |
| * When this analysis was performed | |
| * @param filename | |
| * Parquet file containing this column chunk | |
| * @param columnChunkId | |
| * Sequential ID of the column chunk | |
| * @param path | |
| * Column path within the Parquet schema | |
| * @param codec | |
| * Compression codec used for this column | |
| * @param primitiveType | |
| * Data type of the column | |
| * @param statistics | |
| * Column statistics (min, max, null count, etc.) | |
| * @param totalSize | |
| * Compressed size in bytes for this column chunk | |
| * @param totalUncompressedSize | |
| * Uncompressed size in bytes | |
| * @param valueCount | |
| * Number of values in this column chunk | |
| * @param hasDict | |
| * Whether dictionary encoding is used | |
| * @param dictOffset | |
| * Byte offset of the dictionary page | |
| * @param encodings | |
| * List of encodings applied to this column | |
| */ | |
| case class ColumnChunkInfo( | |
| tableName: String, | |
| timestamp: String, | |
| filename: String, | |
| columnChunkId: Int, | |
| path: String, | |
| codec: String, | |
| primitiveType: String, | |
| statistics: String, | |
| totalSize: Long, | |
| totalUncompressedSize: Long, | |
| valueCount: Long, | |
| hasDict: String, | |
| dictOffset: Long, | |
| encodings: String | |
| ) | |
| /** Column-level analysis results including cardinality and storage metrics. | |
| * | |
| * @param tableName | |
| * The name of the Delta table | |
| * @param timestamp | |
| * When this analysis was performed | |
| * @param columnName | |
| * Name of the analyzed column | |
| * @param distinctCount | |
| * Number of unique values in the column | |
| * @param primitiveType | |
| * Data type of the column | |
| * @param columnSize | |
| * Total compressed storage size for this column | |
| * @param columnSizeUncompressed | |
| * Total uncompressed size for this column | |
| * @param totalRows | |
| * Total number of rows in the table | |
| * @param tableSize | |
| * Total compressed size of the entire table | |
| * @param cardinalityOfTotalRows | |
| * Percentage of unique values relative to total rows | |
| * @param sizePercentOfTable | |
| * Percentage of table storage consumed by this column | |
| */ | |
| case class ColumnAnalysisInfo( | |
| tableName: String, | |
| timestamp: String, | |
| columnName: String, | |
| distinctCount: Long, | |
| primitiveType: String, | |
| columnSize: Long, | |
| columnSizeUncompressed: Long, | |
| totalRows: Long, | |
| tableSize: Long, | |
| cardinalityOfTotalRows: Double, | |
| sizePercentOfTable: Double | |
| ) | |
| /** Complete analysis results for a Delta table including all DataFrames and | |
| * summary. | |
| * | |
| * @param parquetFilesDF | |
| * DataFrame containing file-level metadata and statistics | |
| * @param rowGroupsDF | |
| * DataFrame containing row group statistics | |
| * @param columnChunksDF | |
| * DataFrame containing column chunk metadata | |
| * @param columnsDF | |
| * DataFrame containing column-level analysis results | |
| * @param summaryReport | |
| * Human-readable summary of key table metrics | |
| */ | |
| case class DeltaLakeAnalysisResult( | |
| parquetFilesDF: DataFrame, | |
| rowGroupsDF: DataFrame, | |
| columnChunksDF: DataFrame, | |
| columnsDF: DataFrame, | |
| summaryReport: String | |
| ) | |
| object DeltaLakeAnalyzerConstants { | |
| val CARDINALITY_OF_TOTAL_ROWS = "cardinality_of_total_rows" | |
| val CLIENT_REQUEST_ID = "client_request_id" | |
| val CODEC = "codec" | |
| val COLUMN_CHUNK_ID = "column_chunk_id" | |
| val COLUMN_NAME = "column_name" | |
| val COLUMN_SIZE = "column_size" | |
| val COLUMN_SIZE_UNCOMPRESSED = "column_size_uncompressed" | |
| val COMPRESSED_SIZE = "compressed_size" | |
| val COMPRESSION_RATIO = "compression_ratio" | |
| val CREATED_BY = "created_by" | |
| val DICT_OFFSET = "dict_offset" | |
| val DISTINCT_COUNT = "distinct_count" | |
| val ENCODINGS = "encodings" | |
| val FILE_NAME = "file_name" | |
| val FILE_SYSTEM_ID = "file_system_id" | |
| val HAS_DICT = "has_dict" | |
| val NONE_DETECTED = "none_detected" | |
| val NOT_FOUND = "not_found" | |
| val PATH = "path" | |
| val PRIMITIVE_TYPE = "primitive_type" | |
| val RATIO_OF_TOTAL_TABLE_ROWS = "ratio_of_total_table_rows" | |
| val REPORT_GENERATION_TIMESTAMP = DataQualityMetadata.ColResultTimestamp | |
| val REQUEST_PATH_URI = "request_path_uri" | |
| val ROW_COUNT = "row_count" | |
| val ROW_GROUP_ID = "row_group_id" | |
| val ROW_GROUPS = "row_groups" | |
| val SIZE_PERCENT_OF_TABLE = "size_percent_of_table" | |
| val STATISTICS = "statistics" | |
| val TABLE_NAME = "table_name" | |
| val TABLE_SIZE = "table_size" | |
| val TOTAL_FILE_ROW_GROUPS = "total_file_row_groups" | |
| val TOTAL_ROWS = "total_rows" | |
| val TOTAL_SIZE = "total_size" | |
| val TOTAL_TABLE_ROW_GROUPS = "total_table_row_groups" | |
| val TOTAL_TABLE_ROWS = "total_table_rows" | |
| val TOTAL_UNCOMPRESSED_SIZE = "total_uncompressed_size" | |
| val UNCOMPRESSED_SIZE = "uncompressed_size" | |
| val VALUE_COUNT = "value_count" | |
| val VORDER_ENABLED = "vorder_enabled" | |
| val VORDER_LEVEL = "vorder_level" | |
| } | |
| /** Analyzes Delta Lake tables to provide detailed metadata and statistics. | |
| * | |
| * @param spark | |
| * Spark session for data operations | |
| * @param sqlMetastore | |
| * Operations for interacting with the metastore | |
| */ | |
| // @formatter:off | |
| class DeltaLakeAnalyzer( | |
| spark: SparkSession, | |
| sqlMetastore: MetastoreOperations | |
| ) extends TableAnalyzer[DeltaLakeAnalysisResult] { | |
| /** @inheritdoc | |
| */ | |
| def analyze( | |
| databaseName: String, | |
| tableName: String, | |
| skipColumnCardinality: Boolean = false | |
| ): DeltaLakeAnalysisResult = { | |
| spark.catalog.setCurrentDatabase(databaseName) | |
| val reportGenerationTime = LocalDateTime.now().toString | |
| val tableFqdn = s"$databaseName.$tableName" | |
| val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName, Some(databaseName))) | |
| val deltaTablePath = sqlMetastore.getLocation(databaseName, tableName) | |
| deltaLog.update() | |
| val fullHistoryDF = DeltaTable.forName(spark, s"${databaseName}.${tableName}").history(1) | |
| val operationParametersMapping = fullHistoryDF.select(DeltaLakeConfiguration.DELTA_LOG_OPERATION_PARAMETER).head()(0).asInstanceOf[Map[String, Any]] | |
| val zOrderBy = operationParametersMapping.get(DeltaLakeConfiguration.DELTA_LOG_Z_ORDER_IDENTIFIER) | |
| val parquetFiles = ListBuffer[ParquetFileInfo]() | |
| val rowGroups = ListBuffer[RowGroupInfo]() | |
| val columnChunks = ListBuffer[ColumnChunkInfo]() | |
| val totalNumberRowGroups = deltaLog.snapshot.allFiles | |
| .collect() | |
| .map { file => | |
| val path = file.path | |
| val configuration = new Configuration() | |
| val parquetFileReader = ParquetFileReader.open(configuration, new Path(deltaTablePath + "/" + path)) | |
| val rowGroupMeta = parquetFileReader.getRowGroups() | |
| val parquetFileMetaData = parquetFileReader.getFileMetaData() | |
| val parquetFileMetaDataKeyValues = parquetFileMetaData.getKeyValueMetaData() | |
| val parquetRecordCount = parquetFileReader.getRecordCount() | |
| val rowGroupSize = rowGroupMeta.size | |
| parquetFiles.append( | |
| ParquetFileInfo( | |
| tableName = tableName, | |
| timestamp = reportGenerationTime, | |
| filename = path, | |
| vorderEnabled = parquetFileMetaDataKeyValues.get(DeltaLakeConfiguration.V_ORDER_ENABLED), | |
| vorderLevel = parquetFileMetaDataKeyValues.get(DeltaLakeConfiguration.V_ORDER_LEVEL), | |
| rowCount = parquetRecordCount, | |
| rowGroups = rowGroupSize, | |
| createdBy = parquetFileMetaData.getCreatedBy | |
| ) | |
| ) | |
| for (rowGroupNumber <- 0 to rowGroupSize - 1) { | |
| val rowGroup = rowGroupMeta.get(rowGroupNumber) | |
| val rowGroupColumns = rowGroup.getColumns | |
| for (columnChunkId <- 0 to rowGroupColumns.size - 1) { | |
| val columnStat = rowGroupColumns.get(columnChunkId) | |
| columnChunks.append( | |
| ColumnChunkInfo( | |
| tableName = tableName, | |
| timestamp = reportGenerationTime, | |
| filename = path, | |
| columnChunkId = rowGroupNumber, | |
| path = columnStat.getPath().toString, | |
| codec = columnStat.getCodec().toString, | |
| primitiveType = columnStat.getPrimitiveType().toString, | |
| statistics = columnStat.getStatistics().toString().replace("\"", "'"), | |
| totalSize = columnStat.getTotalSize(), | |
| totalUncompressedSize = columnStat.getTotalUncompressedSize(), | |
| valueCount = columnStat.getValueCount(), | |
| hasDict = columnStat.hasDictionaryPage().toString, | |
| dictOffset = columnStat.getDictionaryPageOffset(), | |
| encodings = columnStat.getEncodings().toString | |
| ) | |
| ) | |
| } | |
| rowGroups.append( | |
| RowGroupInfo( | |
| tableName = tableName, | |
| timestamp = reportGenerationTime, | |
| filename = path, | |
| rowGroupId = rowGroupNumber, | |
| totalFileRowGroups = rowGroupSize, | |
| rowCount = rowGroup.getRowCount, | |
| compressedSize = rowGroup.getCompressedSize, | |
| uncompressedSize = rowGroup.getTotalByteSize, | |
| compressionRatio = rowGroup.getCompressedSize.toFloat / rowGroup.getTotalByteSize | |
| ) | |
| ) | |
| } | |
| rowGroupSize | |
| } | |
| .sum | |
| val parquetFilesDF = createParquetFilesDataFrame(parquetFiles.toSeq) | |
| val rowGroupsDF = createRowGroupsDataFrame(rowGroups.toSeq) | |
| val columnChunksDF = createColumnChunksDataFrame(columnChunks.toSeq) | |
| val totalRows = rowGroupsDF.agg(sum(DeltaLakeAnalyzerConstants.ROW_COUNT)).first().getLong(0) | |
| val tableSize = rowGroupsDF.agg(sum(DeltaLakeAnalyzerConstants.COMPRESSED_SIZE)).first().getLong(0) | |
| val totalRowGroups = parquetFilesDF.agg(sum(DeltaLakeAnalyzerConstants.ROW_GROUPS)).first().getLong(0) | |
| val columnsDF = analyzeColumns( | |
| databaseName, | |
| tableName, | |
| tableFqdn, | |
| columnChunksDF, | |
| totalRows, | |
| tableSize, | |
| reportGenerationTime, | |
| skipColumnCardinality | |
| ) | |
| val summaryReport = generateSummaryReport( | |
| tableFqdn, | |
| reportGenerationTime, | |
| totalRows, | |
| tableSize, | |
| parquetFilesDF, | |
| rowGroupsDF, | |
| columnChunksDF, | |
| totalRowGroups, | |
| zOrderBy | |
| ) | |
| DeltaLakeAnalysisResult( | |
| parquetFilesDF = parquetFilesDF.withColumn(DeltaLakeAnalyzerConstants.TOTAL_TABLE_ROWS, lit(totalRows)) | |
| .withColumn(DeltaLakeAnalyzerConstants.TOTAL_TABLE_ROW_GROUPS, lit(totalRowGroups)), | |
| rowGroupsDF = rowGroupsDF.withColumn(DeltaLakeAnalyzerConstants.TOTAL_TABLE_ROWS, lit(totalRows)) | |
| .withColumn(DeltaLakeAnalyzerConstants.RATIO_OF_TOTAL_TABLE_ROWS, col(DeltaLakeAnalyzerConstants.ROW_COUNT) * 100.0 / lit(totalRows)) | |
| .withColumn(DeltaLakeAnalyzerConstants.TOTAL_TABLE_ROW_GROUPS, lit(totalRowGroups)), | |
| columnChunksDF = columnChunksDF, | |
| columnsDF = columnsDF, | |
| summaryReport = summaryReport | |
| ) | |
| } | |
| private def createParquetFilesDataFrame( | |
| parquetFiles: Seq[ParquetFileInfo] | |
| ): DataFrame = { | |
| spark | |
| .createDataFrame(parquetFiles.map(pf => | |
| ( | |
| pf.tableName, | |
| pf.timestamp, | |
| pf.filename, | |
| pf.vorderEnabled, | |
| pf.vorderLevel, | |
| pf.rowCount, | |
| pf.rowGroups, | |
| pf.createdBy | |
| ))) | |
| .toDF( | |
| DeltaLakeAnalyzerConstants.TABLE_NAME, | |
| DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP, | |
| DeltaLakeAnalyzerConstants.FILE_NAME, | |
| DeltaLakeAnalyzerConstants.VORDER_ENABLED, | |
| DeltaLakeAnalyzerConstants.VORDER_LEVEL, | |
| DeltaLakeAnalyzerConstants.ROW_COUNT, | |
| DeltaLakeAnalyzerConstants.ROW_GROUPS, | |
| DeltaLakeAnalyzerConstants.CREATED_BY | |
| ) | |
| .withColumn(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP, col(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP).cast(TimestampType)) | |
| .withColumn(DeltaLakeAnalyzerConstants.VORDER_ENABLED, col(DeltaLakeAnalyzerConstants.VORDER_ENABLED).cast(BooleanType)) | |
| } | |
| private def createRowGroupsDataFrame( | |
| rowGroups: Seq[RowGroupInfo] | |
| ): DataFrame = { | |
| spark | |
| .createDataFrame(rowGroups.map(rg => | |
| ( | |
| rg.tableName, | |
| rg.timestamp, | |
| rg.filename, | |
| rg.rowGroupId, | |
| rg.totalFileRowGroups, | |
| rg.rowCount, | |
| rg.compressedSize, | |
| rg.uncompressedSize, | |
| rg.compressionRatio | |
| ))) | |
| .toDF( | |
| DeltaLakeAnalyzerConstants.TABLE_NAME, | |
| DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP, | |
| DeltaLakeAnalyzerConstants.FILE_NAME, | |
| DeltaLakeAnalyzerConstants.ROW_GROUP_ID, | |
| DeltaLakeAnalyzerConstants.TOTAL_FILE_ROW_GROUPS, | |
| DeltaLakeAnalyzerConstants.ROW_COUNT, | |
| DeltaLakeAnalyzerConstants.COMPRESSED_SIZE, | |
| DeltaLakeAnalyzerConstants.UNCOMPRESSED_SIZE, | |
| DeltaLakeAnalyzerConstants.COMPRESSION_RATIO | |
| ) | |
| .withColumn(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP, col(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP).cast(TimestampType)) | |
| } | |
| private def createColumnChunksDataFrame( | |
| columnChunks: Seq[ColumnChunkInfo] | |
| ): DataFrame = { | |
| spark | |
| .createDataFrame(columnChunks.map(cc => | |
| ( | |
| cc.tableName, | |
| cc.timestamp, | |
| cc.filename, | |
| cc.columnChunkId, | |
| cc.path, | |
| cc.codec, | |
| cc.primitiveType, | |
| cc.statistics, | |
| cc.totalSize, | |
| cc.totalUncompressedSize, | |
| cc.valueCount, | |
| cc.hasDict, | |
| cc.dictOffset, | |
| cc.encodings | |
| ))) | |
| .toDF( | |
| DeltaLakeAnalyzerConstants.TABLE_NAME, | |
| DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP, | |
| DeltaLakeAnalyzerConstants.FILE_NAME, | |
| DeltaLakeAnalyzerConstants.COLUMN_CHUNK_ID, | |
| DeltaLakeAnalyzerConstants.PATH, | |
| DeltaLakeAnalyzerConstants.CODEC, | |
| DeltaLakeAnalyzerConstants.PRIMITIVE_TYPE, | |
| DeltaLakeAnalyzerConstants.STATISTICS, | |
| DeltaLakeAnalyzerConstants.TOTAL_SIZE, | |
| DeltaLakeAnalyzerConstants.TOTAL_UNCOMPRESSED_SIZE, | |
| DeltaLakeAnalyzerConstants.VALUE_COUNT, | |
| DeltaLakeAnalyzerConstants.HAS_DICT, | |
| DeltaLakeAnalyzerConstants.DICT_OFFSET, | |
| DeltaLakeAnalyzerConstants.ENCODINGS | |
| ) | |
| .withColumn(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP, col(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP).cast(TimestampType)) | |
| } | |
| private def analyzeColumns( | |
| databaseName: String, | |
| tableName: String, | |
| tableFqdn: String, | |
| columnChunksDF: DataFrame, | |
| totalRows: Long, | |
| tableSize: Long, | |
| reportGenerationTime: String, | |
| skipColumnCardinality: Boolean | |
| ): DataFrame = { | |
| val columns = ListBuffer[ColumnAnalysisInfo]() | |
| val columnList = spark.table(tableFqdn).columns | |
| if (totalRows <= 1000000) { | |
| val distinctCountsSQL = buildDistinctCountsSQL(databaseName, tableName, columnList) | |
| val distinctCountsDF = spark.sql(distinctCountsSQL) | |
| for (column <- columnList) { | |
| val filterCondition = s"Path = '[$column]'" | |
| val distinctCount = distinctCountsDF.select(col(column).cast("long")).first().getLong(0) | |
| val columnSize = columnChunksDF.filter(filterCondition).agg(sum(DeltaLakeAnalyzerConstants.TOTAL_SIZE)).first().getLong(0) | |
| val primitiveType = columnChunksDF.filter(filterCondition).select(col(DeltaLakeAnalyzerConstants.PRIMITIVE_TYPE)).first().getString(0) | |
| val columnSizeUncompressed = columnChunksDF.filter(filterCondition).agg(sum(DeltaLakeAnalyzerConstants.TOTAL_UNCOMPRESSED_SIZE)).first().getLong(0) | |
| columns.append( | |
| ColumnAnalysisInfo( | |
| tableName = tableName, | |
| timestamp = reportGenerationTime, | |
| columnName = column, | |
| distinctCount = distinctCount, | |
| primitiveType = primitiveType, | |
| columnSize = columnSize, | |
| columnSizeUncompressed = columnSizeUncompressed, | |
| totalRows = totalRows, | |
| tableSize = tableSize, | |
| cardinalityOfTotalRows = distinctCount.toDouble / totalRows * 100.0, | |
| sizePercentOfTable = columnSize.toDouble / tableSize * 100.0 | |
| ) | |
| ) | |
| } | |
| } else { | |
| for (column <- columnList) { | |
| val filterCondition = s"Path = '[$column]'" | |
| val filteredRows = columnChunksDF.filter(filterCondition).count() | |
| val (primitiveType, columnSize, columnSizeUncompressed) = | |
| if (filteredRows > 0) { | |
| val pt = columnChunksDF.filter(filterCondition).select(col(DeltaLakeAnalyzerConstants.PRIMITIVE_TYPE)).first().getString(0) | |
| val cs = columnChunksDF.filter(filterCondition).agg(sum(DeltaLakeAnalyzerConstants.TOTAL_SIZE)).first().getLong(0) | |
| val csu = columnChunksDF.filter(filterCondition).agg(sum(DeltaLakeAnalyzerConstants.TOTAL_UNCOMPRESSED_SIZE)).first().getLong(0) | |
| (pt, cs, csu) | |
| } else { | |
| (DeltaLakeAnalyzerConstants.NONE_DETECTED, 0L, 0L) | |
| } | |
| val distinctCount = calculateDistinctCount( | |
| column, | |
| tableFqdn, | |
| totalRows, | |
| skipColumnCardinality | |
| ) | |
| columns.append( | |
| ColumnAnalysisInfo( | |
| tableName = tableName, | |
| timestamp = reportGenerationTime, | |
| columnName = column, | |
| distinctCount = distinctCount, | |
| primitiveType = primitiveType, | |
| columnSize = columnSize, | |
| columnSizeUncompressed = columnSizeUncompressed, | |
| totalRows = totalRows, | |
| tableSize = tableSize, | |
| cardinalityOfTotalRows = distinctCount.toDouble / totalRows * 100.0, | |
| sizePercentOfTable = columnSize.toDouble / tableSize * 100.0 | |
| ) | |
| ) | |
| } | |
| } | |
| spark | |
| .createDataFrame(columns.toSeq.map(c => | |
| ( | |
| c.tableName, | |
| c.timestamp, | |
| c.columnName, | |
| c.distinctCount, | |
| c.primitiveType, | |
| c.columnSize, | |
| c.columnSizeUncompressed, | |
| c.totalRows, | |
| c.tableSize, | |
| c.cardinalityOfTotalRows, | |
| c.sizePercentOfTable | |
| ))) | |
| .toDF( | |
| DeltaLakeAnalyzerConstants.TABLE_NAME, | |
| DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP, | |
| DeltaLakeAnalyzerConstants.COLUMN_NAME, | |
| DeltaLakeAnalyzerConstants.DISTINCT_COUNT, | |
| DeltaLakeAnalyzerConstants.PRIMITIVE_TYPE, | |
| DeltaLakeAnalyzerConstants.COLUMN_SIZE, | |
| DeltaLakeAnalyzerConstants.COLUMN_SIZE_UNCOMPRESSED, | |
| DeltaLakeAnalyzerConstants.TOTAL_ROWS, | |
| DeltaLakeAnalyzerConstants.TABLE_SIZE, | |
| DeltaLakeAnalyzerConstants.CARDINALITY_OF_TOTAL_ROWS, | |
| DeltaLakeAnalyzerConstants.SIZE_PERCENT_OF_TABLE | |
| ) | |
| .withColumn(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP, col(DeltaLakeAnalyzerConstants.REPORT_GENERATION_TIMESTAMP).cast(TimestampType)) | |
| } | |
| private def buildDistinctCountsSQL( | |
| databaseName: String, | |
| tableName: String, | |
| columnList: Array[String] | |
| ): String = { | |
| val selectClauses = columnList.map(col => s"COUNT(DISTINCT $col) AS $col") | |
| s"SELECT 1 as dummy, ${selectClauses.mkString(", ")} FROM $databaseName.$tableName" | |
| } | |
| private def calculateDistinctCount( | |
| column: String, | |
| tableFqdn: String, | |
| totalRows: Long, | |
| skipColumnCardinality: Boolean | |
| ): Long = { | |
| if ( | |
| List(DeltaLakeAnalyzerConstants.REQUEST_PATH_URI, DeltaLakeAnalyzerConstants.CLIENT_REQUEST_ID, DeltaLakeAnalyzerConstants.FILE_SYSTEM_ID).contains(column) | |
| ) { | |
| 0L | |
| } else if (totalRows >= 10000000 || skipColumnCardinality) { | |
| 0L | |
| } else { | |
| val sql = s"SELECT COUNT(DISTINCT $column) AS ${column} FROM ${tableFqdn}" | |
| val distinctDF = spark.sql(sql) | |
| distinctDF.select(col(column).cast("long")).first().getLong(0) | |
| } | |
| } | |
| private def generateSummaryReport( | |
| tableFqdn: String, | |
| reportGenerationTime: String, | |
| totalRows: Long, | |
| tableSize: Long, | |
| parquetFilesDF: DataFrame, | |
| rowGroupsDF: DataFrame, | |
| columnChunksDF: DataFrame, | |
| totalRowGroups: Long, | |
| zOrderBy: Option[Any] | |
| ): String = { | |
| val parquetFileCount = parquetFilesDF.count() | |
| val maxRowgroupRowCount = rowGroupsDF.agg(max(DeltaLakeAnalyzerConstants.ROW_COUNT)).first().getLong(0) | |
| val minRowgroupRowCount = rowGroupsDF.agg(min(DeltaLakeAnalyzerConstants.ROW_COUNT)).first().getLong(0) | |
| val hasVOrder = parquetFilesDF.agg(min(DeltaLakeAnalyzerConstants.VORDER_ENABLED)).first().getAs[Boolean](0) | |
| val formatter = java.text.NumberFormat.getIntegerInstance(new java.util.Locale("en", "US")) | |
| val tableSizeFromColumnChunks = columnChunksDF.agg(sum(DeltaLakeAnalyzerConstants.TOTAL_SIZE)).first().getLong(0) | |
| val tableSizeFromRowGroups = rowGroupsDF.agg(sum(DeltaLakeAnalyzerConstants.COMPRESSED_SIZE)).first().getLong(0) | |
| s""" | |
| |${LoggingConstants.mainDivider} | |
| |Table: $tableFqdn | |
| |${LoggingConstants.mainDivider} | |
| |Report Generated: $reportGenerationTime | |
| |Total Rows: ${formatter.format(totalRows)} | |
| |Total Size (Bytes): ${formatter.format(tableSize)} | |
| |Total Parquet Files: ${formatter.format(parquetFileCount)} | |
| |Total Row Groups: ${formatter.format(totalRowGroups)} | |
| |Max Rowgroup Row Count: ${formatter.format(maxRowgroupRowCount)} | |
| |Min Rowgroup Row Count: ${formatter.format(minRowgroupRowCount)} | |
| |Table Size from Column Chunks (Bytes): ${formatter.format(tableSizeFromColumnChunks)} | |
| |Table Size from Row Groups (Bytes): ${formatter.format(tableSizeFromRowGroups)} | |
| |Z-Order By: ${zOrderBy.getOrElse(DeltaLakeAnalyzerConstants.NOT_FOUND)} | |
| |Has V-Order Enabled: $hasVOrder | |
| |${LoggingConstants.mainDivider} | |
| |""".stripMargin | |
| } | |
| } | |
| // @formatter:on |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment