Skip to content

Instantly share code, notes, and snippets.

@vladdenisov
Created October 21, 2025 13:39
Show Gist options
  • Select an option

  • Save vladdenisov/dfa3c28c1ffbd2934806e5e4cc34fdf3 to your computer and use it in GitHub Desktop.

Select an option

Save vladdenisov/dfa3c28c1ffbd2934806e5e4cc34fdf3 to your computer and use it in GitHub Desktop.
denisov_parallel_v1
package org.itmo;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
public class Alert5MultiThread {
private static final Path OUTPUT_FILE = Paths.get("out.txt"); // строго out.txt по условию
private static final int ERROR_THRESHOLD = 5;
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
int threads = Math.max(32, Runtime.getRuntime().availableProcessors());
final ConcurrentHashMap<String, ConcurrentHashMap<Long, LongAdder>> counts = new ConcurrentHashMap<>(700_000, 0.70f, threads); // ip -> (minuteStart -> count)
final ConcurrentHashMap<String, String> ipPool = new ConcurrentHashMap<>(20_000, 0.70f, threads);
// List<Alert> alerts = new ArrayList<>();
var INPUT_DIR = Paths.get(args[0]);
var startTime = System.currentTimeMillis();
if (!Files.isDirectory(INPUT_DIR)) {
throw new IllegalArgumentException("INPUT_DIR is not a directory: " + INPUT_DIR.toAbsolutePath());
}
ExecutorService pool = Executors.newFixedThreadPool(threads);
List<Path> files;
try (var stream = Files.list(INPUT_DIR)) {
files = stream.filter(Files::isRegularFile).toList();
}
// List<Future<Map<String, Map<Long, Integer>>>> futures = new ArrayList<>(files.size());
//
// for (Path p : files) {
// futures.add(pool.submit(() -> {
// try {
// Map<String, Map<Long, Integer>> local = new HashMap<>();
// readFile(p, local, ipPool);
//
//
//
// return local;
// } catch (IOException e) {
// throw new UncheckedIOException(e);
// }
// }));
// }
//
// for (Future<Map<String, Map<Long, Integer>>> f : futures) {
// Map<String, Map<Long, Integer>> local = f.get();
// mergeLocalIntoGlobal(local, counts);
// }
List<Future<List<Alert>>> futures = new ArrayList<>(files.size());
for (Path p : files) {
futures.add(pool.submit(() -> {
try {
Map<String, Map<Long, Integer>> local = new HashMap<>();
readFile(p, local, ipPool);
// List<Alert> localAlerts = new ArrayList<>();
for (var ipEntry : local.entrySet()) {
String ip = ipEntry.getKey();
Map<Long, Integer> perMinuteLocal = ipEntry.getValue();
ConcurrentHashMap<Long, LongAdder> perMinuteGlobal =
counts.computeIfAbsent(ip, k -> new ConcurrentHashMap<>());
for (var mEntry : perMinuteLocal.entrySet()) {
long minute = mEntry.getKey();
int delta = mEntry.getValue();
perMinuteGlobal.computeIfAbsent(minute, k -> new LongAdder()).add(delta);
}
}
// for (Map.Entry<String, Map<Long, Integer>> eIp : local.entrySet()) {
// String ip = eIp.getKey();
// for (Map.Entry<Long, Integer> eMin : eIp.getValue().entrySet()) {
// Integer c = eMin.getValue();
// if (c >= ERROR_THRESHOLD) {
// localAlerts.add(new Alert(ip, eMin.getKey(), c));
// }
// }
// }
return null;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}));
}
for (Future<List<Alert>> f : futures) {
f.get();
}
pool.shutdown();
System.out.println("Finished reading files in " + (System.currentTimeMillis() - startTime) + " ms");
// Собираем алерты
List<Alert> alerts = new ArrayList<>();
for (Map.Entry<String, ConcurrentHashMap<Long, LongAdder>> eIp : counts.entrySet()) {
String ip = eIp.getKey();
for (Map.Entry<Long, LongAdder> eMin : eIp.getValue().entrySet()) {
LongAdder c = eMin.getValue();
if (c.intValue() >= ERROR_THRESHOLD) {
alerts.add(new Alert(ip, eMin.getKey(), c.intValue()));
}
}
}
System.out.println("Finished collecting alerts in " + (System.currentTimeMillis() - startTime) + " ms");
Alert[] alertsArr = alerts.toArray(new Alert[0]);
Arrays.parallelSort(alertsArr, new AlertsComparator());
alerts = Arrays.asList(alertsArr);
// // Сортировка: по ip (лексикографически), затем по window_start (возрастание)
// alerts.sort(Comparator
// .comparing((Alert a) -> a.ip)
// .thenComparingLong(a -> a.windowStart));
System.out.println("Finished sorting alerts in " + (System.currentTimeMillis() - startTime) + " ms");
// Запись в out.txt (JSON Lines)
try (BufferedWriter w = Files.newBufferedWriter(
OUTPUT_FILE, StandardCharsets.UTF_8,
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)) {
StringBuilder sb = new StringBuilder(128);
for (Alert a : alerts) {
long windowEnd = a.windowStart + 59;
sb.setLength(0);
sb.append("{\"type\":\"alert\",\"ip\":\"")
.append(a.ip) // IPv4 — безопасно, без экранирования
.append("\",\"window_start\":")
.append(a.windowStart)
.append(",\"window_end\":")
.append(windowEnd)
.append(",\"error_count\":")
.append(a.count)
.append("}\n");
w.write(sb.toString());
}
}
var endTime = System.currentTimeMillis();
System.out.println("Done. Alerts written to " + OUTPUT_FILE.toAbsolutePath());
System.out.println("Total time: " + (endTime - startTime) + " ms");
}
private static void readFile(Path file, Map<String, Map<Long, Integer>> counts, Map<String, String> ipPool) throws IOException {
try (BufferedReader br = Files.newBufferedReader(file, StandardCharsets.UTF_8)) {
String line;
while ((line = br.readLine()) != null) {
parseAndAccumulate(line, counts, ipPool);
}
}
}
// private static void mergeLocalIntoGlobal(Map<String, Map<Long, Integer>> local,
// ConcurrentHashMap<String, ConcurrentHashMap<Long, LongAdder>> global) {
// for (var ipEntry : local.entrySet()) {
// String ip = ipEntry.getKey();
// Map<Long, Integer> perMinuteLocal = ipEntry.getValue();
//
// ConcurrentHashMap<Long, LongAdder> perMinuteGlobal =
// global.computeIfAbsent(ip, k -> new ConcurrentHashMap<>());
//
// for (var mEntry : perMinuteLocal.entrySet()) {
// long minute = mEntry.getKey();
// int delta = mEntry.getValue();
// perMinuteGlobal.computeIfAbsent(minute, k -> new LongAdder()).add(delta);
// }
// }
// }
private static void parseAndAccumulate(String line, Map<String, Map<Long, Integer>> counts, Map<String, String> ipPool) {
// Формат:
// <timestamp> userId=<user> ip=<ip> level=<LEVEL> msg="<...>"
// Нам нужны: timestamp, ip, level
int s1 = line.indexOf(' ');
if (s1 <= 0) return;
long ts;
try {
ts = Long.parseLong(line.substring(0, s1));
} catch (NumberFormatException ex) {
return; // по условию валидно, но на всякий случай
}
int pos = s1 + 1;
// userId=...
int spaceAfterUser = line.indexOf(' ', pos);
if (spaceAfterUser < 0) return;
// ip=...
int ipFieldStart = spaceAfterUser + 1; // позиция 'i' в "ip="
if (ipFieldStart + 3 >= line.length() || !line.startsWith("ip=", ipFieldStart)) return;
int spaceAfterIp = line.indexOf(' ', ipFieldStart);
if (spaceAfterIp < 0) return;
String ipRaw = line.substring(ipFieldStart + 3, spaceAfterIp);
// Интернируем ip (уменьшаем число дубликатов строк в памяти)
String ip = ipPool.computeIfAbsent(ipRaw, k -> k);
// level=...
int levelFieldStart = spaceAfterIp + 1;
if (levelFieldStart + 6 >= line.length() || !line.startsWith("level=", levelFieldStart)) return;
int spaceAfterLevel = line.indexOf(' ', levelFieldStart);
String level = (spaceAfterLevel < 0)
? line.substring(levelFieldStart + 6)
: line.substring(levelFieldStart + 6, spaceAfterLevel);
if (!"ERROR".equals(level)) return;
long minuteStart = ts - (ts % 60);
Map<Long, Integer> byMinute = counts.computeIfAbsent(ip, k -> new HashMap<>());
byMinute.merge(minuteStart, 1, Integer::sum);
}
private static final class Alert{
final String ip;
final long windowStart;
final int count;
Alert(String ip, long windowStart, int count) {
this.ip = ip; this.windowStart = windowStart; this.count = count;
}
}
private static final class AlertsComparator implements Comparator<Alert>{
@Override
public int compare(Alert a1, Alert a2) {
int ipComp = a1.ip.compareTo(a2.ip);
if (ipComp != 0) {
return ipComp;
}
return Long.compare(a1.windowStart, a2.windowStart);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment