This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def mergeFiles(spark: SparkSession, grouped: ListBuffer[ListBuffer[String]], targetDirectory: String): Unit = { | |
| val startedAt = System.currentTimeMillis() | |
| val forkJoinPool = new ForkJoinPool(grouped.size) | |
| val parllelBatches = grouped.par | |
| parllelBatches.tasksupport = new ForkJoinTaskSupport(forkJoinPool) | |
| parllelBatches foreach (batch => { | |
| logger.debug(s"Merging ${batch.size} files into one") | |
| try { | |
| spark.read.parquet(batch.toList: _*).coalesce(1).write.mode("append").parquet(targetDirectory.stripSuffix("/") + "/") | |
| } catch { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def makeMergeBatches(fileSizesMap: scala.collection.immutable.Map[String, Long], maxTargetFileSize: Long): ListBuffer[ListBuffer[String]] = { | |
| val sortedFileSizes = fileSizesMap.toSeq.sortBy(_._2) | |
| val groupedFiles = ListBuffer[ListBuffer[String]]() | |
| groupedFiles += ListBuffer[String]() | |
| for (aFile <- smallerFiles) { | |
| val lastBatch = groupedFiles.last | |
| if ((sizeOfThisBatch(lastBatch) + aFile._2) < maxTargetFileSize) { | |
| lastBatch += aFile._1 + "|" + aFile._2.toString | |
| } else { | |
| val newBatch = ListBuffer[String]() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val (inputBucket, prefix) = getBucketNameAndPrefix(args(1)) | |
| val targetDirectory = args(2) | |
| val maxIndividualMergedFileSize = args(3).toLong | |
| val inputDirs = listDirectoriesInS3(inputBucket, prefix).map(prefix => "s3://" + inputBucket + "/" + prefix) | |
| logger.info(s"Total directories found : ${inputDirs.size}") | |
| val startedAt = System.currentTimeMillis() | |
| //You may want to tweak the following to set how many input directories to process in parallel | |
| val forkJoinPool = new ForkJoinPool(inputDirs.size) | |
| val parallelBatches = inputDirs.par |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def getFileSizes(bucketName: String, prefix: String): scala.collection.immutable.Map[String, Long] = { | |
| val s3 = AmazonS3ClientBuilder.standard().withRegion(Regions.US_EAST_1).build() | |
| val listing = s3.listObjectsV2(bucketName, prefix) | |
| val files = listing.getObjectSummaries.asScala.map(_.getKey).filter(!_.split("/").last.startsWith("_")) | |
| val filesSizeMap = collection.mutable.Map[String, Long]() | |
| files.foreach(obj => { | |
| val meta = s3.getObjectMetadata(new GetObjectMetadataRequest(bucketName, obj)) | |
| filesSizeMap += (obj -> meta.getContentLength) | |
| }) | |
| filesSizeMap.toMap |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Media | Transfer Rate per second | Rate in MB per second | |
|---|---|---|---|
| Mobile 5G | 60Mb | 7.5 | |
| WiFi | 125Mb | 15 | |
| Fiber Internet | |||
| Cable Internet | 100Mbps | ||
| Ethernet cable(LAN) | 10Gb | 1250 | |
| SD Card | 10MB | 10 | |
| HDD | 200MB | 200 | |
| SSD | 550MB | 550 | |
| RAM | 25GB | 25000 |