Skip to content

Instantly share code, notes, and snippets.

@mdrakiburrahman
Created February 6, 2026 00:26
Show Gist options
  • Select an option

  • Save mdrakiburrahman/0907828f6df6c99f317de4522771aa5e to your computer and use it in GitHub Desktop.

Select an option

Save mdrakiburrahman/0907828f6df6c99f317de4522771aa5e to your computer and use it in GitHub Desktop.
package me.rakirahman.connection.fabric.sql
import me.rakirahman.connection.fabric.MetadataManager
import me.rakirahman.feeds.authentication.jwt.JwtScopeExtensions._
import me.rakirahman.feeds.authentication.jwt.JwtScopes
import com.azure.core.credential.{TokenCredential, TokenRequestContext}
import org.apache.http.client.methods.{HttpGet, HttpPost}
import org.apache.http.entity.{ContentType, StringEntity}
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.apache.spark.internal.Logging
import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.util.{Try, Success, Failure}
// @formatter:off
/** Fabric SQL Endpoint metadata manager for handling metadata refresh
* operations.
*
* >>> https://github.com/microsoft/fabric-toolbox/blob/main/samples/notebook-refresh-tables-in-sql-endpoint/RefreshTableinSQLEndpoint.py
*/
class FabricSqlEndpointManager(
credential: TokenCredential,
pollDurationInSeconds: Int = 15
) extends MetadataManager
with Logging {
implicit val formats: Formats = DefaultFormats
private val baseUrl = "https://api.powerbi.com/v1/workspaces"
private val operationsUrl = "https://api.powerbi.com/v1/operations"
private val operationIdHeader = "x-ms-operation-id"
private val retryAfterHeader = "retry-after"
private def getAccessToken: String = {
credential
.getToken(
new TokenRequestContext()
.addScopes(Seq(JwtScopes.fabric.toDefaultScope()): _*)
)
.block()
.getToken()
}
override def refresh(
workspaceId: String,
sqlEndpointId: String
): Unit = {
val client = HttpClients.createDefault()
val url = s"$baseUrl/$workspaceId/sqlEndpoints/$sqlEndpointId/refreshMetadata"
val request = new HttpPost(url)
request.setHeader("Authorization", s"Bearer ${getAccessToken}")
request.setHeader("Content-Type", "application/json")
request.setEntity(new StringEntity("{}", ContentType.APPLICATION_JSON))
try {
val response = client.execute(request)
val statusCode = response.getStatusLine.getStatusCode
val responseBody = EntityUtils.toString(response.getEntity)
statusCode match {
case 200 =>
logDebug("SQL Endpoint metadata refresh completed synchronously (HTTP 200)")
logTrace(s"$responseBody")
case 202 =>
val operationId = response.getFirstHeader(operationIdHeader)
val retryAfter = Option(response.getFirstHeader(retryAfterHeader)).map(_.getValue.toInt).getOrElse(pollDurationInSeconds)
if (operationId != null) {
logDebug(s"SQL Endpoint metadata refresh started asynchronously. Operation ID: ${operationId.getValue}")
handleAsyncOperation(operationId.getValue, retryAfter)
} else {
throw new RuntimeException("Async refresh started but no operation ID found")
}
case 500 =>
logDebug("The sync is already running")
logTrace(s"Response: $responseBody")
case _ =>
logError(s"Failed to refresh SQL Endpoint metadata. Status code: $statusCode, Response: $responseBody")
throw new RuntimeException(s"Failed to refresh SQL Endpoint metadata: HTTP $statusCode")
}
} finally {
client.close()
}
}
private def handleAsyncOperation(
operationId: String,
retryAfter: Int
): Unit = {
val client = HttpClients.createDefault()
val lroUri = s"$operationsUrl/$operationId"
val lroResultUri = s"$lroUri/result"
try {
var continue = true
while (continue) {
Thread.sleep(retryAfter * 1000)
val lroRequest = new HttpGet(lroUri)
lroRequest.setHeader("Authorization", s"Bearer ${getAccessToken}")
lroRequest.setHeader("Content-Type", "application/json")
val lroResponse = client.execute(lroRequest)
val lroResponseBody = EntityUtils.toString(lroResponse.getEntity)
logDebug(s"LRO status check: $lroUri")
logDebug(s"LRO response: $lroResponseBody")
if (lroResponse.getStatusLine.getStatusCode == 200) {
val lroJson = parse(lroResponseBody)
val status = (lroJson \ "status").extract[String]
status match {
case "Succeeded" =>
val resultRequest = new HttpGet(lroResultUri)
resultRequest.setHeader("Authorization", s"Bearer ${getAccessToken}")
resultRequest.setHeader("Content-Type", "application/json")
val resultResponse = client.execute(resultRequest)
val resultResponseBody = EntityUtils.toString(resultResponse.getEntity)
logInfo("SQL Endpoint metadata refresh completed successfully")
logDebug(s"Final result: $resultResponseBody")
continue = false
case "Failed" =>
logError(s"SQL Endpoint metadata refresh failed: $lroResponseBody")
throw new RuntimeException(s"SQL Endpoint metadata refresh failed: $lroResponseBody")
case _ => logDebug(s"Operation still in progress. Status: $status")
}
} else {
logError(s"Failed to check operation status. Status code: ${lroResponse.getStatusLine.getStatusCode}")
throw new RuntimeException(s"Failed to check operation status: HTTP ${lroResponse.getStatusLine.getStatusCode}")
}
}
} finally {
client.close()
}
}
}
// @formatter:on
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment