aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/dev/morling/onebrc
diff options
context:
space:
mode:
authorRoman Romanchuk <roman.romanchuk@zalando.de>2024-01-11 18:24:41 +0100
committerGunnar Morling <gunnar.morling@googlemail.com>2024-01-11 19:49:51 +0100
commit6381aefcc16f5d46c8322214e37aba716c98b1ef (patch)
treeb3c556c18250348b5bd5141b8b22a26cc8f5eb62 /src/main/java/dev/morling/onebrc
parent6761d784a2d6190f1b3681b3e478ac5bd7dbbd2a (diff)
Fixed failing tests
Diffstat (limited to 'src/main/java/dev/morling/onebrc')
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_fatroom.java74
1 files changed, 54 insertions, 20 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_fatroom.java b/src/main/java/dev/morling/onebrc/CalculateAverage_fatroom.java
index 5e1eb0c..7fa9b2e 100644
--- a/src/main/java/dev/morling/onebrc/CalculateAverage_fatroom.java
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_fatroom.java
@@ -69,7 +69,7 @@ public class CalculateAverage_fatroom {
long fileSize = file.length();
long position = 0;
- List<Callable<Map<String, MeasurementAggregator>>> tasks = new ArrayList<>();
+ List<Callable<Map<Station, MeasurementAggregator>>> tasks = new LinkedList<>();
while (position < fileSize) {
long end = Math.min(position + SEGMENT_LENGTH, fileSize);
int length = (int) (end - position);
@@ -88,42 +88,48 @@ public class CalculateAverage_fatroom {
for (var future : executor.invokeAll(tasks)) {
var segmentAggregates = future.get();
for (var entry : segmentAggregates.entrySet()) {
- aggregates.merge(entry.getKey(), entry.getValue(), MeasurementAggregator::combineWith);
+ aggregates.merge(entry.getKey().toString(), entry.getValue(), MeasurementAggregator::combineWith);
}
}
executor.shutdown();
- executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+
+ // no sense to wait longer than base case
+ executor.awaitTermination(5, TimeUnit.MINUTES);
System.out.println(aggregates);
}
- private static Map<String, MeasurementAggregator> processBuffer(MappedByteBuffer source, int length) {
- Map<String, MeasurementAggregator> aggregates = new HashMap<>(500);
- String station;
- byte[] buffer = new byte[64];
+ private static Map<Station, MeasurementAggregator> processBuffer(MappedByteBuffer source, int length) {
+
+ Map<Station, MeasurementAggregator> aggregates = new HashMap<>(500);
+ Station station;
+ byte[] buffer = new byte[200];
+ byte[] measurement = new byte[5];
int measurementLength;
int idx = 0;
+ int hash = 1;
for (int i = 0; i < length; ++i) {
byte b = source.get(i);
+ hash = 31 * hash + b;
buffer[idx++] = b;
if (b == ';') {
- station = new String(buffer, 0, idx - 1, StandardCharsets.UTF_8);
+ station = new Station(hash, buffer, idx - 1);
measurementLength = 3;
- source.position(i + 1);
- buffer[0] = source.get(++i);
- buffer[1] = source.get(++i);
- buffer[2] = source.get(++i);
- buffer[3] = source.get(++i);
- if (buffer[3] != '\n') {
+ measurement[0] = source.get(++i);
+ measurement[1] = source.get(++i);
+ measurement[2] = source.get(++i);
+ measurement[3] = source.get(++i);
+ if (measurement[3] != '\n') {
measurementLength++;
- buffer[4] = source.get(++i);
- if (buffer[4] != '\n') {
+ measurement[4] = source.get(++i);
+ if (measurement[4] != '\n') {
i++;
measurementLength++;
}
}
- aggregates.computeIfAbsent(station, s -> new MeasurementAggregator()).consume(parseMeasurement(buffer, measurementLength));
+ aggregates.computeIfAbsent(station, s -> new MeasurementAggregator()).consume(parseMeasurement(measurement, measurementLength));
idx = 0;
+ hash = 1;
}
}
return aggregates;
@@ -132,10 +138,38 @@ public class CalculateAverage_fatroom {
static double parseMeasurement(byte[] source, int size) {
int isNegativeSignPresent = ~(source[0] >> 4) & 1;
int firstDigit = source[isNegativeSignPresent] - '0';
- int secondDigit = source[size - 3] - '0';
- int thirdDigit = source[size - 1] - '0';
+ int secondDigit = source[size - 3];
+ int thirdDigit = source[size - 1];
int has4 = (size - isNegativeSignPresent) >> 2;
- int value = has4 * firstDigit * 100 + secondDigit * 10 + thirdDigit;
+ int value = has4 * firstDigit * 100 + secondDigit * 10 + thirdDigit - 528;
return -isNegativeSignPresent ^ value - isNegativeSignPresent;
}
+
+ static class Station {
+ private byte[] bytes;
+ private int hash;
+
+ public Station(int hash, byte[] bytes, int length) {
+ this.bytes = new byte[length];
+ System.arraycopy(bytes, 0, this.bytes, 0, length);
+ this.hash = hash;
+ }
+
+ public String toString() {
+ return new String(bytes, 0, bytes.length, StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ Station station = (Station) o;
+ if (hash != station.hash)
+ return false;
+ return Arrays.equals(bytes, station.bytes);
+ }
+
+ @Override
+ public int hashCode() {
+ return hash;
+ }
+ }
}