Created
October 21, 2025 13:39
-
-
Save vladdenisov/dfa3c28c1ffbd2934806e5e4cc34fdf3 to your computer and use it in GitHub Desktop.
denisov_parallel_v1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.itmo; | |
| import java.io.BufferedReader; | |
| import java.io.BufferedWriter; | |
| import java.io.IOException; | |
| import java.io.UncheckedIOException; | |
| import java.nio.charset.StandardCharsets; | |
| import java.nio.file.*; | |
| import java.util.*; | |
| import java.util.concurrent.*; | |
| import java.util.concurrent.atomic.LongAdder; | |
| public class Alert5MultiThread { | |
| private static final Path OUTPUT_FILE = Paths.get("out.txt"); // строго out.txt по условию | |
| private static final int ERROR_THRESHOLD = 5; | |
| public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { | |
| int threads = Math.max(32, Runtime.getRuntime().availableProcessors()); | |
| final ConcurrentHashMap<String, ConcurrentHashMap<Long, LongAdder>> counts = new ConcurrentHashMap<>(700_000, 0.70f, threads); // ip -> (minuteStart -> count) | |
| final ConcurrentHashMap<String, String> ipPool = new ConcurrentHashMap<>(20_000, 0.70f, threads); | |
| // List<Alert> alerts = new ArrayList<>(); | |
| var INPUT_DIR = Paths.get(args[0]); | |
| var startTime = System.currentTimeMillis(); | |
| if (!Files.isDirectory(INPUT_DIR)) { | |
| throw new IllegalArgumentException("INPUT_DIR is not a directory: " + INPUT_DIR.toAbsolutePath()); | |
| } | |
| ExecutorService pool = Executors.newFixedThreadPool(threads); | |
| List<Path> files; | |
| try (var stream = Files.list(INPUT_DIR)) { | |
| files = stream.filter(Files::isRegularFile).toList(); | |
| } | |
| // List<Future<Map<String, Map<Long, Integer>>>> futures = new ArrayList<>(files.size()); | |
| // | |
| // for (Path p : files) { | |
| // futures.add(pool.submit(() -> { | |
| // try { | |
| // Map<String, Map<Long, Integer>> local = new HashMap<>(); | |
| // readFile(p, local, ipPool); | |
| // | |
| // | |
| // | |
| // return local; | |
| // } catch (IOException e) { | |
| // throw new UncheckedIOException(e); | |
| // } | |
| // })); | |
| // } | |
| // | |
| // for (Future<Map<String, Map<Long, Integer>>> f : futures) { | |
| // Map<String, Map<Long, Integer>> local = f.get(); | |
| // mergeLocalIntoGlobal(local, counts); | |
| // } | |
| List<Future<List<Alert>>> futures = new ArrayList<>(files.size()); | |
| for (Path p : files) { | |
| futures.add(pool.submit(() -> { | |
| try { | |
| Map<String, Map<Long, Integer>> local = new HashMap<>(); | |
| readFile(p, local, ipPool); | |
| // List<Alert> localAlerts = new ArrayList<>(); | |
| for (var ipEntry : local.entrySet()) { | |
| String ip = ipEntry.getKey(); | |
| Map<Long, Integer> perMinuteLocal = ipEntry.getValue(); | |
| ConcurrentHashMap<Long, LongAdder> perMinuteGlobal = | |
| counts.computeIfAbsent(ip, k -> new ConcurrentHashMap<>()); | |
| for (var mEntry : perMinuteLocal.entrySet()) { | |
| long minute = mEntry.getKey(); | |
| int delta = mEntry.getValue(); | |
| perMinuteGlobal.computeIfAbsent(minute, k -> new LongAdder()).add(delta); | |
| } | |
| } | |
| // for (Map.Entry<String, Map<Long, Integer>> eIp : local.entrySet()) { | |
| // String ip = eIp.getKey(); | |
| // for (Map.Entry<Long, Integer> eMin : eIp.getValue().entrySet()) { | |
| // Integer c = eMin.getValue(); | |
| // if (c >= ERROR_THRESHOLD) { | |
| // localAlerts.add(new Alert(ip, eMin.getKey(), c)); | |
| // } | |
| // } | |
| // } | |
| return null; | |
| } catch (IOException e) { | |
| throw new UncheckedIOException(e); | |
| } | |
| })); | |
| } | |
| for (Future<List<Alert>> f : futures) { | |
| f.get(); | |
| } | |
| pool.shutdown(); | |
| System.out.println("Finished reading files in " + (System.currentTimeMillis() - startTime) + " ms"); | |
| // Собираем алерты | |
| List<Alert> alerts = new ArrayList<>(); | |
| for (Map.Entry<String, ConcurrentHashMap<Long, LongAdder>> eIp : counts.entrySet()) { | |
| String ip = eIp.getKey(); | |
| for (Map.Entry<Long, LongAdder> eMin : eIp.getValue().entrySet()) { | |
| LongAdder c = eMin.getValue(); | |
| if (c.intValue() >= ERROR_THRESHOLD) { | |
| alerts.add(new Alert(ip, eMin.getKey(), c.intValue())); | |
| } | |
| } | |
| } | |
| System.out.println("Finished collecting alerts in " + (System.currentTimeMillis() - startTime) + " ms"); | |
| Alert[] alertsArr = alerts.toArray(new Alert[0]); | |
| Arrays.parallelSort(alertsArr, new AlertsComparator()); | |
| alerts = Arrays.asList(alertsArr); | |
| // // Сортировка: по ip (лексикографически), затем по window_start (возрастание) | |
| // alerts.sort(Comparator | |
| // .comparing((Alert a) -> a.ip) | |
| // .thenComparingLong(a -> a.windowStart)); | |
| System.out.println("Finished sorting alerts in " + (System.currentTimeMillis() - startTime) + " ms"); | |
| // Запись в out.txt (JSON Lines) | |
| try (BufferedWriter w = Files.newBufferedWriter( | |
| OUTPUT_FILE, StandardCharsets.UTF_8, | |
| StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)) { | |
| StringBuilder sb = new StringBuilder(128); | |
| for (Alert a : alerts) { | |
| long windowEnd = a.windowStart + 59; | |
| sb.setLength(0); | |
| sb.append("{\"type\":\"alert\",\"ip\":\"") | |
| .append(a.ip) // IPv4 — безопасно, без экранирования | |
| .append("\",\"window_start\":") | |
| .append(a.windowStart) | |
| .append(",\"window_end\":") | |
| .append(windowEnd) | |
| .append(",\"error_count\":") | |
| .append(a.count) | |
| .append("}\n"); | |
| w.write(sb.toString()); | |
| } | |
| } | |
| var endTime = System.currentTimeMillis(); | |
| System.out.println("Done. Alerts written to " + OUTPUT_FILE.toAbsolutePath()); | |
| System.out.println("Total time: " + (endTime - startTime) + " ms"); | |
| } | |
| private static void readFile(Path file, Map<String, Map<Long, Integer>> counts, Map<String, String> ipPool) throws IOException { | |
| try (BufferedReader br = Files.newBufferedReader(file, StandardCharsets.UTF_8)) { | |
| String line; | |
| while ((line = br.readLine()) != null) { | |
| parseAndAccumulate(line, counts, ipPool); | |
| } | |
| } | |
| } | |
| // private static void mergeLocalIntoGlobal(Map<String, Map<Long, Integer>> local, | |
| // ConcurrentHashMap<String, ConcurrentHashMap<Long, LongAdder>> global) { | |
| // for (var ipEntry : local.entrySet()) { | |
| // String ip = ipEntry.getKey(); | |
| // Map<Long, Integer> perMinuteLocal = ipEntry.getValue(); | |
| // | |
| // ConcurrentHashMap<Long, LongAdder> perMinuteGlobal = | |
| // global.computeIfAbsent(ip, k -> new ConcurrentHashMap<>()); | |
| // | |
| // for (var mEntry : perMinuteLocal.entrySet()) { | |
| // long minute = mEntry.getKey(); | |
| // int delta = mEntry.getValue(); | |
| // perMinuteGlobal.computeIfAbsent(minute, k -> new LongAdder()).add(delta); | |
| // } | |
| // } | |
| // } | |
| private static void parseAndAccumulate(String line, Map<String, Map<Long, Integer>> counts, Map<String, String> ipPool) { | |
| // Формат: | |
| // <timestamp> userId=<user> ip=<ip> level=<LEVEL> msg="<...>" | |
| // Нам нужны: timestamp, ip, level | |
| int s1 = line.indexOf(' '); | |
| if (s1 <= 0) return; | |
| long ts; | |
| try { | |
| ts = Long.parseLong(line.substring(0, s1)); | |
| } catch (NumberFormatException ex) { | |
| return; // по условию валидно, но на всякий случай | |
| } | |
| int pos = s1 + 1; | |
| // userId=... | |
| int spaceAfterUser = line.indexOf(' ', pos); | |
| if (spaceAfterUser < 0) return; | |
| // ip=... | |
| int ipFieldStart = spaceAfterUser + 1; // позиция 'i' в "ip=" | |
| if (ipFieldStart + 3 >= line.length() || !line.startsWith("ip=", ipFieldStart)) return; | |
| int spaceAfterIp = line.indexOf(' ', ipFieldStart); | |
| if (spaceAfterIp < 0) return; | |
| String ipRaw = line.substring(ipFieldStart + 3, spaceAfterIp); | |
| // Интернируем ip (уменьшаем число дубликатов строк в памяти) | |
| String ip = ipPool.computeIfAbsent(ipRaw, k -> k); | |
| // level=... | |
| int levelFieldStart = spaceAfterIp + 1; | |
| if (levelFieldStart + 6 >= line.length() || !line.startsWith("level=", levelFieldStart)) return; | |
| int spaceAfterLevel = line.indexOf(' ', levelFieldStart); | |
| String level = (spaceAfterLevel < 0) | |
| ? line.substring(levelFieldStart + 6) | |
| : line.substring(levelFieldStart + 6, spaceAfterLevel); | |
| if (!"ERROR".equals(level)) return; | |
| long minuteStart = ts - (ts % 60); | |
| Map<Long, Integer> byMinute = counts.computeIfAbsent(ip, k -> new HashMap<>()); | |
| byMinute.merge(minuteStart, 1, Integer::sum); | |
| } | |
| private static final class Alert{ | |
| final String ip; | |
| final long windowStart; | |
| final int count; | |
| Alert(String ip, long windowStart, int count) { | |
| this.ip = ip; this.windowStart = windowStart; this.count = count; | |
| } | |
| } | |
| private static final class AlertsComparator implements Comparator<Alert>{ | |
| @Override | |
| public int compare(Alert a1, Alert a2) { | |
| int ipComp = a1.ip.compareTo(a2.ip); | |
| if (ipComp != 0) { | |
| return ipComp; | |
| } | |
| return Long.compare(a1.windowStart, a2.windowStart); | |
| } | |
| } | |
| } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment