aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java98
1 files changed, 51 insertions, 47 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java b/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java
index 65528c4..96ecf08 100644
--- a/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java
@@ -32,7 +32,7 @@ public class CalculateAverage_isolgpus {
public static final int HISTOGRAMS_LENGTH = 1024 * 32;
public static final int HISTOGRAMS_MASK = HISTOGRAMS_LENGTH - 1;
- public static final int THREAD_COUNT = 8;
+ public static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors();
private static final String FILE = "./measurements.txt";
public static final byte SEPERATOR = 59;
public static final byte OFFSET = 48;
@@ -46,7 +46,7 @@ public class CalculateAverage_isolgpus {
File file = Paths.get(FILE).toFile();
long length = file.length();
- long chunksCount = Math.max(THREAD_COUNT, (int) Math.ceil(length / (double) MAX_CHUNK_SIZE));
+ long chunksCount = length < 8_000_000 ? 1 : Math.max(THREAD_COUNT, (int) Math.ceil(length / (double) MAX_CHUNK_SIZE));
long estimatedChunkSize = length / chunksCount;
@@ -80,7 +80,7 @@ public class CalculateAverage_isolgpus {
while (measurementCollectorFromChunk != null) {
MeasurementCollector currentMergedResult = mergedResults.get(new String(measurementCollectorFromChunk.name));
if (currentMergedResult == null) {
- currentMergedResult = new MeasurementCollector(measurementCollectorFromChunk.name);
+ currentMergedResult = new MeasurementCollector(measurementCollectorFromChunk.name, measurementCollectorFromChunk.nameSum);
mergedResults.put(new String(currentMergedResult.name), currentMergedResult);
}
currentMergedResult.merge(measurementCollectorFromChunk);
@@ -102,16 +102,10 @@ public class CalculateAverage_isolgpus {
MappedByteBuffer r = channel.map(FileChannel.MapMode.READ_ONLY, seekStart, length);
- byte[] nameBuffer = new byte[100];
boolean isNegative;
byte[] valueBuffer = new byte[3];
MeasurementCollector[] measurementCollectors = new MeasurementCollector[HISTOGRAMS_LENGTH];
- int valueIndex = 0;
- int nameBufferIndex = 0;
- int nameSum = 0;
- boolean parsingName = true;
- long i = 0;
- int hashResult = 0;
+ int i = 0;
// seek to the start of the next message
if (estimatedStart != 0) {
@@ -123,38 +117,49 @@ public class CalculateAverage_isolgpus {
try {
- while (i <= lengthOfChunk || !parsingName) {
+ while (i <= lengthOfChunk) {
+ int nameSum = 0;
+ int hashResult = 0;
+ int nameStart;
byte aChar;
- if (parsingName) {
-
- while ((aChar = r.get()) != SEPERATOR) {
- nameBuffer[nameBufferIndex++] = aChar;
- nameSum += aChar;
- hashResult = 31 * hashResult + aChar;
+ nameStart = i;
+ int nameBufferIndex = 0;
+ int valueIndex = 0;
+
+ // optimistically assume that the name is at least 4 bytes
+ int firstInt = r.getInt();
+ nameBufferIndex = 4;
+ nameSum = firstInt;
+ hashResult = 31 * firstInt;
+
+ while ((aChar = r.get()) != SEPERATOR) {
+ nameSum += aChar;
+ // hash as we go, stolen after a discussion with palmr
+ hashResult = 31 * hashResult + aChar;
+ nameBufferIndex++;
+
+ // oh no we read too much, do it the byte for byte way instead
+ if (aChar == NEW_LINE) {
+ r.position(i);
+ nameBufferIndex = 0;
+ nameSum = 0;
+ hashResult = 0;
}
- parsingName = false;
- i += nameBufferIndex + 1;
}
- else {
- isNegative = (aChar = r.get()) == NEGATIVE;
- valueIndex = readNumber(isNegative, valueBuffer, valueIndex, aChar, r);
- byte decimalValue = r.get();
+ i += nameBufferIndex + 1;
- int value = resolveValue(valueIndex, valueBuffer, decimalValue, isNegative);
- // new line character
- r.get();
+ isNegative = (aChar = r.get()) == NEGATIVE;
+ valueIndex = readNumber(isNegative, valueBuffer, valueIndex, aChar, r);
- MeasurementCollector measurementCollector = resolveMeasurementCollector(measurementCollectors, hashResult, nameBuffer, nameBufferIndex, nameSum);
+ int decimalValue = r.getShort() >> 8;
- measurementCollector.feed(value);
- i += valueIndex + (isNegative ? 4 : 3);
- valueIndex = 0;
- nameBufferIndex = 0;
- nameSum = 0;
- parsingName = true;
- hashResult = 0;
- }
+ int value = resolveValue(valueIndex, valueBuffer, decimalValue, isNegative);
+
+ MeasurementCollector measurementCollector = resolveMeasurementCollector(measurementCollectors, hashResult, nameStart, nameBufferIndex, nameSum, r);
+
+ measurementCollector.feed(value);
+ i += valueIndex + (isNegative ? 4 : 3);
}
}
@@ -168,18 +173,22 @@ public class CalculateAverage_isolgpus {
return measurementCollectors;
}
- private static MeasurementCollector resolveMeasurementCollector(MeasurementCollector[] measurementCollectors, int hash, byte[] nameBuffer, int nameBufferIndex,
- int nameSum) {
+ private static MeasurementCollector resolveMeasurementCollector(MeasurementCollector[] measurementCollectors, int hash, int nameStart, int nameBufferLength,
+ int nameSum, MappedByteBuffer r) {
MeasurementCollector measurementCollector = measurementCollectors[hash & HISTOGRAMS_MASK];
if (measurementCollector == null) {
- measurementCollector = new MeasurementCollector(Arrays.copyOf(nameBuffer, nameBufferIndex));
+ byte[] nameBuffer = new byte[nameBufferLength];
+ r.get(nameStart, nameBuffer, 0, nameBufferLength);
+ measurementCollector = new MeasurementCollector(nameBuffer, nameSum);
measurementCollectors[hash & HISTOGRAMS_MASK] = measurementCollector;
}
else {
// collision unhappy path, try to avoid
- while (!nameEquals(measurementCollector.name, measurementCollector.nameSum, nameSum, nameBufferIndex)) {
+ while (!nameEquals(measurementCollector.name, measurementCollector.nameSum, nameSum, nameBufferLength)) {
if (measurementCollector.link == null) {
- measurementCollector.link = new MeasurementCollector(Arrays.copyOf(nameBuffer, nameBufferIndex));
+ byte[] nameBuffer = new byte[nameBufferLength];
+ r.get(nameStart, nameBuffer, 0, nameBufferLength);
+ measurementCollector.link = new MeasurementCollector(nameBuffer, nameSum);
measurementCollector = measurementCollector.link;
break;
}
@@ -201,7 +210,7 @@ public class CalculateAverage_isolgpus {
return incomingNameSum == existingNameSum;
}
- private static int resolveValue(int valueIndex, byte[] valueBuffer, byte decimalValue, boolean isNegative) {
+ private static int resolveValue(int valueIndex, byte[] valueBuffer, int decimalValue, boolean isNegative) {
int value;
if (valueIndex == 1) {
value = ((valueBuffer[0] - OFFSET) * 10) + (decimalValue - OFFSET);
@@ -238,13 +247,9 @@ public class CalculateAverage_isolgpus {
private int min = Integer.MAX_VALUE;
private int max = Integer.MIN_VALUE;
- public MeasurementCollector(byte[] name) {
+ public MeasurementCollector(byte[] name, int nameSum) {
this.name = name;
- int nameSum = 0;
- for (int i = 0; i < name.length; i++) {
- nameSum += name[i];
- }
this.nameSum = nameSum;
}
@@ -279,7 +284,6 @@ public class CalculateAverage_isolgpus {
@Override
public String toString() {
- // Abha=-24.9/18.0/61.7
return name + "=" + min + "/" + mean + "/" + max;
}