Created
February 6, 2026 00:26
-
-
Save mdrakiburrahman/0907828f6df6c99f317de4522771aa5e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package me.rakirahman.connection.fabric.sql | |
| import me.rakirahman.connection.fabric.MetadataManager | |
| import me.rakirahman.feeds.authentication.jwt.JwtScopeExtensions._ | |
| import me.rakirahman.feeds.authentication.jwt.JwtScopes | |
| import com.azure.core.credential.{TokenCredential, TokenRequestContext} | |
| import org.apache.http.client.methods.{HttpGet, HttpPost} | |
| import org.apache.http.entity.{ContentType, StringEntity} | |
| import org.apache.http.impl.client.HttpClients | |
| import org.apache.http.util.EntityUtils | |
| import org.apache.spark.internal.Logging | |
| import org.json4s._ | |
| import org.json4s.jackson.JsonMethods._ | |
| import scala.util.{Try, Success, Failure} | |
| // @formatter:off | |
| /** Fabric SQL Endpoint metadata manager for handling metadata refresh | |
| * operations. | |
| * | |
| * >>> https://github.com/microsoft/fabric-toolbox/blob/main/samples/notebook-refresh-tables-in-sql-endpoint/RefreshTableinSQLEndpoint.py | |
| */ | |
| class FabricSqlEndpointManager( | |
| credential: TokenCredential, | |
| pollDurationInSeconds: Int = 15 | |
| ) extends MetadataManager | |
| with Logging { | |
| implicit val formats: Formats = DefaultFormats | |
| private val baseUrl = "https://api.powerbi.com/v1/workspaces" | |
| private val operationsUrl = "https://api.powerbi.com/v1/operations" | |
| private val operationIdHeader = "x-ms-operation-id" | |
| private val retryAfterHeader = "retry-after" | |
| private def getAccessToken: String = { | |
| credential | |
| .getToken( | |
| new TokenRequestContext() | |
| .addScopes(Seq(JwtScopes.fabric.toDefaultScope()): _*) | |
| ) | |
| .block() | |
| .getToken() | |
| } | |
| override def refresh( | |
| workspaceId: String, | |
| sqlEndpointId: String | |
| ): Unit = { | |
| val client = HttpClients.createDefault() | |
| val url = s"$baseUrl/$workspaceId/sqlEndpoints/$sqlEndpointId/refreshMetadata" | |
| val request = new HttpPost(url) | |
| request.setHeader("Authorization", s"Bearer ${getAccessToken}") | |
| request.setHeader("Content-Type", "application/json") | |
| request.setEntity(new StringEntity("{}", ContentType.APPLICATION_JSON)) | |
| try { | |
| val response = client.execute(request) | |
| val statusCode = response.getStatusLine.getStatusCode | |
| val responseBody = EntityUtils.toString(response.getEntity) | |
| statusCode match { | |
| case 200 => | |
| logDebug("SQL Endpoint metadata refresh completed synchronously (HTTP 200)") | |
| logTrace(s"$responseBody") | |
| case 202 => | |
| val operationId = response.getFirstHeader(operationIdHeader) | |
| val retryAfter = Option(response.getFirstHeader(retryAfterHeader)).map(_.getValue.toInt).getOrElse(pollDurationInSeconds) | |
| if (operationId != null) { | |
| logDebug(s"SQL Endpoint metadata refresh started asynchronously. Operation ID: ${operationId.getValue}") | |
| handleAsyncOperation(operationId.getValue, retryAfter) | |
| } else { | |
| throw new RuntimeException("Async refresh started but no operation ID found") | |
| } | |
| case 500 => | |
| logDebug("The sync is already running") | |
| logTrace(s"Response: $responseBody") | |
| case _ => | |
| logError(s"Failed to refresh SQL Endpoint metadata. Status code: $statusCode, Response: $responseBody") | |
| throw new RuntimeException(s"Failed to refresh SQL Endpoint metadata: HTTP $statusCode") | |
| } | |
| } finally { | |
| client.close() | |
| } | |
| } | |
| private def handleAsyncOperation( | |
| operationId: String, | |
| retryAfter: Int | |
| ): Unit = { | |
| val client = HttpClients.createDefault() | |
| val lroUri = s"$operationsUrl/$operationId" | |
| val lroResultUri = s"$lroUri/result" | |
| try { | |
| var continue = true | |
| while (continue) { | |
| Thread.sleep(retryAfter * 1000) | |
| val lroRequest = new HttpGet(lroUri) | |
| lroRequest.setHeader("Authorization", s"Bearer ${getAccessToken}") | |
| lroRequest.setHeader("Content-Type", "application/json") | |
| val lroResponse = client.execute(lroRequest) | |
| val lroResponseBody = EntityUtils.toString(lroResponse.getEntity) | |
| logDebug(s"LRO status check: $lroUri") | |
| logDebug(s"LRO response: $lroResponseBody") | |
| if (lroResponse.getStatusLine.getStatusCode == 200) { | |
| val lroJson = parse(lroResponseBody) | |
| val status = (lroJson \ "status").extract[String] | |
| status match { | |
| case "Succeeded" => | |
| val resultRequest = new HttpGet(lroResultUri) | |
| resultRequest.setHeader("Authorization", s"Bearer ${getAccessToken}") | |
| resultRequest.setHeader("Content-Type", "application/json") | |
| val resultResponse = client.execute(resultRequest) | |
| val resultResponseBody = EntityUtils.toString(resultResponse.getEntity) | |
| logInfo("SQL Endpoint metadata refresh completed successfully") | |
| logDebug(s"Final result: $resultResponseBody") | |
| continue = false | |
| case "Failed" => | |
| logError(s"SQL Endpoint metadata refresh failed: $lroResponseBody") | |
| throw new RuntimeException(s"SQL Endpoint metadata refresh failed: $lroResponseBody") | |
| case _ => logDebug(s"Operation still in progress. Status: $status") | |
| } | |
| } else { | |
| logError(s"Failed to check operation status. Status code: ${lroResponse.getStatusLine.getStatusCode}") | |
| throw new RuntimeException(s"Failed to check operation status: HTTP ${lroResponse.getStatusLine.getStatusCode}") | |
| } | |
| } | |
| } finally { | |
| client.close() | |
| } | |
| } | |
| } | |
| // @formatter:on |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment