aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/dev/morling/onebrc
diff options
context:
space:
mode:
authorJairo GraterĂ³n <58091322+jgrateron@users.noreply.github.com>2024-01-11 16:22:58 -0400
committerGitHub <noreply@github.com>2024-01-11 21:22:58 +0100
commit20e52aaadfba5d830a5e099fe1f40633f7263efa (patch)
treec763ada07f102b0285295c6fa6531ab4a1a57b37 /src/main/java/dev/morling/onebrc
parent11a740c5d07d316113374dc148f0a51b90269af4 (diff)
Divide the reading of the file by parts (#254)
* divide the reading of the file by parts * fix format * add number of core partition * fix format * implement strToDouble * fix strtodouble * add locale, fix read file, tests pass * delete unnecessary method clean
Diffstat (limited to 'src/main/java/dev/morling/onebrc')
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_jgrateron.java296
1 files changed, 221 insertions, 75 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_jgrateron.java b/src/main/java/dev/morling/onebrc/CalculateAverage_jgrateron.java
index c6edb26..251f26d 100644
--- a/src/main/java/dev/morling/onebrc/CalculateAverage_jgrateron.java
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_jgrateron.java
@@ -16,115 +16,267 @@
package dev.morling.onebrc;
-import java.io.BufferedReader;
-import java.io.FileReader;
+import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
-import java.util.Map;
+import java.util.List;
+import java.util.Locale;
import java.util.Queue;
-import java.util.TreeMap;
+import java.util.Map.Entry;
import java.util.stream.Collectors;
public class CalculateAverage_jgrateron {
private static final String FILE = "./measurements.txt";
- private static int MAX_LINES = 100000;
-
- public static void main(String[] args) throws IOException, InterruptedException {
- // long startTime = System.nanoTime();
-
- var tasks = new ArrayList<TaskCalcular>();
- try (var reader = new BufferedReader(new FileReader(FILE))) {
- String line;
- var listaLineas = new LinkedList<String>();
- while ((line = reader.readLine()) != null) {
- listaLineas.add(line);
- if (listaLineas.size() > MAX_LINES) {
- var taskCalcular = new TaskCalcular(listaLineas);
- listaLineas = new LinkedList<String>();
- tasks.add(taskCalcular);
+ private static final int MAX_LENGTH_LINE = 115;
+ private static final int MAX_BUFFER = 1024 * 8;
+ private static boolean DEBUG = false;
+
+ public record Particion(long offset, long size) {
+ }
+
+ public record Tupla(String str, double num) {
+ }
+
+ /*
+ *
+ */
+ public static List<Particion> dividirArchivo(File archivo) throws IOException {
+ var particiones = new ArrayList<Particion>();
+ var buffer = new byte[MAX_LENGTH_LINE];
+ var length = archivo.length();
+ int cores = Runtime.getRuntime().availableProcessors();
+ var sizeParticion = length / cores;
+ if (sizeParticion > MAX_BUFFER) {
+ var ini = 0l;
+ try (var rfile = new RandomAccessFile(archivo, "r")) {
+ for (;;) {
+ var size = sizeParticion;
+ var pos = ini + size;
+ if (pos > length) {
+ pos = length - 1;
+ size = length - ini;
+ }
+ rfile.seek(pos);
+ int count = rfile.read(buffer);
+ if (count == -1) {
+ break;
+ }
+ for (int i = 0; i < count; i++) {
+ size++;
+ if (buffer[i] == '\n' || buffer[i] == '\r') {
+ break;
+ }
+ }
+ var particion = new Particion(ini, size);
+ particiones.add(particion);
+ if (count != buffer.length) {
+ break;
+ }
+ ini += size;
}
}
- if (listaLineas.size() > 0) {
- var taskCalcular = new TaskCalcular(listaLineas);
- tasks.add(taskCalcular);
- }
}
- // combinar todas las particiones
- var totalMediciones = new TreeMap<String, Medicion>();
- for (var task : tasks) {
- task.join();
- var mediciones = task.getMediciones();
- for (var entry : mediciones.entrySet()) {
- var medicion = totalMediciones.get(entry.getKey());
- if (medicion == null) {
- totalMediciones.put(entry.getKey(), entry.getValue());
+ else {
+ particiones.add(new Particion(0, length));
+ }
+ return particiones;
+ }
+
+ public static void main(String[] args) throws InterruptedException, IOException {
+ Locale.setDefault(Locale.US);
+ var startTime = System.nanoTime();
+ var archivo = new File(FILE);
+ var totalMediciones = new HashMap<Integer, Medicion>();
+ var tareas = new ArrayList<Thread>();
+ var particiones = dividirArchivo(archivo);
+
+ for (var p : particiones) {
+ var hilo = Thread.ofVirtual().start(() -> {
+ var mediciones = new HashMap<Integer, Medicion>();
+ try (var miArchivo = new MiArchivo(archivo)) {
+ miArchivo.seek(p);
+ for (;;) {
+ var tuples = miArchivo.readTuples();
+ if (tuples.isEmpty()) {
+ break;
+ }
+ for (;;) {
+ var tuple = tuples.poll();
+ if (tuple == null) {
+ break;
+ }
+ var estacion = tuple.str;
+ var temp = tuple.num;
+ var hashCode = estacion.hashCode();
+ var medicion = mediciones.get(hashCode);
+ if (medicion == null) {
+ medicion = new Medicion(estacion, 1, temp, temp, temp);
+ mediciones.put(hashCode, medicion);
+ }
+ else {
+ medicion.update(1, temp, temp, temp);
+ }
+ }
+ }
}
- else {
- var otraMed = entry.getValue();
- medicion.update(otraMed.count, otraMed.tempMin, otraMed.tempMax, otraMed.tempSum);
+ catch (IOException e) {
+ System.exit(-1);
}
- }
+ synchronized (totalMediciones) {
+ for (var entry : mediciones.entrySet()) {
+ var medicion = totalMediciones.get(entry.getKey());
+ if (medicion == null) {
+ totalMediciones.put(entry.getKey(), entry.getValue());
+ }
+ else {
+ var otraMed = entry.getValue();
+ medicion.update(otraMed.count, otraMed.tempMin, otraMed.tempMax, otraMed.tempSum);
+ }
+ }
+ }
+ });
+ tareas.add(hilo);
}
+ for (var hilo : tareas) {
+ hilo.join();
+ }
+
+ Comparator<Entry<Integer, Medicion>> comparar = (a, b) -> {
+ return a.getValue().estacion.compareTo(b.getValue().estacion);
+ };
+
var result = totalMediciones.entrySet().stream()//
- .map(e -> e.getKey() + "=" + e.getValue())//
+ .sorted(comparar)//
+ .map(e -> e.getValue().toString())//
.collect(Collectors.joining(", "));
System.out.println("{" + result + "}");
+ if (DEBUG) {
+ System.out.println("Total: " + (System.nanoTime() - startTime) / 1000000 + "ms");
+ }
- // System.out.println("Total: " + (System.nanoTime() - startTime) / 1000000);
}
/*
*
*/
- static class TaskCalcular {
+ public static double strToDouble(byte linea[], int posSeparator, int len) {
+ double number[] = { 0, 0 };
+ int pos = 0;
+ boolean esNegativo = false;
+ for (int i = posSeparator + 1; i < len; i++) {
+ switch (linea[i]) {
+ case '0', '1', '2', '3', '4':
+ case '5', '6', '7', '8', '9':
+ number[pos] = number[pos] * 10;
+ number[pos] = number[pos] + (linea[i] - 48);
+ break;
+ case '-':
+ esNegativo = true;
+ break;
+ case '.':
+ pos = 1;
+ break;
+ }
+ }
+ double num = number[0];
+ if (number[1] > 0) {
+ num += (number[1] / 10);
+ }
+ if (esNegativo) {
+ num = num * -1;
+ }
+ return num;
+ }
- private Queue<String> listaLineas;
- private Map<String, Medicion> mediciones;
- private Thread hilo;
+ /*
+ *
+ */
+ static class MiArchivo implements AutoCloseable {
+ private final RandomAccessFile rFile;
+ private final byte buffer[] = new byte[MAX_BUFFER];
+ private final byte line[] = new byte[MAX_LENGTH_LINE];
+ private final byte rest[] = new byte[MAX_LENGTH_LINE];
+ private int lenRest = 0;
+ private long maxRead = 0;
+ private long totalRead = 0;
+ private Queue<Tupla> tuples = new LinkedList<Tupla>();
- public TaskCalcular(Queue<String> listaLineas) {
- this.listaLineas = listaLineas;
- mediciones = new HashMap<String, Medicion>();
- hilo = Thread.ofPlatform().unstarted(() -> {
- run();
- });
- hilo.start();
+ public MiArchivo(File file) throws IOException {
+ rFile = new RandomAccessFile(file, "r");
}
- public void join() throws InterruptedException {
- hilo.join();
+ public void seek(Particion particion) throws IOException {
+ maxRead = particion.size;
+ rFile.seek(particion.offset);
+ }
+
+ @Override
+ public void close() throws IOException {
+ rFile.close();
}
- public void run() {
- String linea;
- int pos;
- while ((linea = listaLineas.poll()) != null) {
- pos = linea.indexOf(";");
- var estacion = linea.substring(0, pos);
- var temp = Double.parseDouble(linea.substring(pos + 1));
- var medicion = mediciones.get(estacion);
- if (medicion == null) {
- medicion = new Medicion(estacion, 1, temp, temp, temp);
- mediciones.put(estacion, medicion);
+ public Queue<Tupla> readTuples() throws IOException {
+ if (totalRead == maxRead) {
+ return tuples;
+ }
+ long numBytes = rFile.read(buffer);
+ if (numBytes == -1) {
+ return tuples;
+ }
+ var totalLeidos = totalRead + numBytes;
+ if (totalLeidos > maxRead) {
+ numBytes = maxRead - totalRead;
+ }
+ totalRead += numBytes;
+ int pos = 0;
+ int len = 0;
+ int idx = 0;
+ while (pos < numBytes) {
+ if (buffer[pos] == '\n' || buffer[pos] == '\r') {
+ if (lenRest > 0) {
+ System.arraycopy(rest, 0, line, 0, lenRest);
+ System.arraycopy(buffer, idx, line, lenRest, len);
+ len += lenRest;
+ lenRest = 0;
+ }
+ else {
+ System.arraycopy(buffer, idx, line, 0, len);
+ }
+ int semicolon = 0;
+ for (int i = 0; i < len; i++) {
+ if (line[i] == ';') {
+ semicolon = i;
+ break;
+ }
+ }
+ var temperatura = strToDouble(line, semicolon, len);
+ var tupla = new Tupla(new String(line, 0, semicolon), temperatura);
+ tuples.add(tupla);
+ idx = pos + 1;
+ len = 0;
}
else {
- medicion.update(1, temp, temp, temp);
+ len++;
}
+ pos++;
}
- }
-
- public Map<String, Medicion> getMediciones() {
- return mediciones;
+ if (len > 0) {
+ System.arraycopy(buffer, idx, rest, 0, len);
+ lenRest = len;
+ }
+ return tuples;
}
}
/*
*
*/
- static class Medicion implements Comparable<Medicion> {
+ static class Medicion {
private String estacion;
private int count;
private double tempMin;
@@ -154,13 +306,7 @@ public class CalculateAverage_jgrateron {
@Override
public String toString() {
double tempPro = tempSum / count;
- return "%.1f/%.1f/%.1f".formatted(tempMin, tempPro, tempMax);
- }
-
- @Override
- public int compareTo(Medicion medicion) {
- return estacion.compareTo(medicion.estacion);
+ return "%s=%.1f/%.1f/%.1f".formatted(estacion, tempMin, tempPro, tempMax);
}
}
-
}