aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJamie Stansfield <80010454+isolgpus@users.noreply.github.com>2024-01-07 19:55:30 +0000
committerGitHub <noreply@github.com>2024-01-07 20:55:30 +0100
commitdbd8ca4562bc527423e2ce0cc66c336e59f7db5d (patch)
treecbfebdccb98b383646847d3d46eb0a7d48f0ce1a
parent22d8580ac8e29a8801d24e7bd2c0740edc389ae4 (diff)
isolgpus: submission 2 - about a 25% improvement on submission 1. (#168)
* isolgpus: fix chunk sizing when not at 8 threads use as many cores as are available don't buffer the station name, only use it when we need it. get rid of the main branch move variables inside the loop * isolgpus: optimistically assume we can read a whole int for the station name, but roll back if we get it wrong. This should be very beneficial on a dataset where station names are mostly over 4 chars --------- Co-authored-by: Jamie Stansfield <jalstansfield@gmail.com>
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java98
1 files changed, 51 insertions, 47 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java b/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java
index 65528c4..96ecf08 100644
--- a/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java
@@ -32,7 +32,7 @@ public class CalculateAverage_isolgpus {
public static final int HISTOGRAMS_LENGTH = 1024 * 32;
public static final int HISTOGRAMS_MASK = HISTOGRAMS_LENGTH - 1;
- public static final int THREAD_COUNT = 8;
+ public static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors();
private static final String FILE = "./measurements.txt";
public static final byte SEPERATOR = 59;
public static final byte OFFSET = 48;
@@ -46,7 +46,7 @@ public class CalculateAverage_isolgpus {
File file = Paths.get(FILE).toFile();
long length = file.length();
- long chunksCount = Math.max(THREAD_COUNT, (int) Math.ceil(length / (double) MAX_CHUNK_SIZE));
+ long chunksCount = length < 8_000_000 ? 1 : Math.max(THREAD_COUNT, (int) Math.ceil(length / (double) MAX_CHUNK_SIZE));
long estimatedChunkSize = length / chunksCount;
@@ -80,7 +80,7 @@ public class CalculateAverage_isolgpus {
while (measurementCollectorFromChunk != null) {
MeasurementCollector currentMergedResult = mergedResults.get(new String(measurementCollectorFromChunk.name));
if (currentMergedResult == null) {
- currentMergedResult = new MeasurementCollector(measurementCollectorFromChunk.name);
+ currentMergedResult = new MeasurementCollector(measurementCollectorFromChunk.name, measurementCollectorFromChunk.nameSum);
mergedResults.put(new String(currentMergedResult.name), currentMergedResult);
}
currentMergedResult.merge(measurementCollectorFromChunk);
@@ -102,16 +102,10 @@ public class CalculateAverage_isolgpus {
MappedByteBuffer r = channel.map(FileChannel.MapMode.READ_ONLY, seekStart, length);
- byte[] nameBuffer = new byte[100];
boolean isNegative;
byte[] valueBuffer = new byte[3];
MeasurementCollector[] measurementCollectors = new MeasurementCollector[HISTOGRAMS_LENGTH];
- int valueIndex = 0;
- int nameBufferIndex = 0;
- int nameSum = 0;
- boolean parsingName = true;
- long i = 0;
- int hashResult = 0;
+ int i = 0;
// seek to the start of the next message
if (estimatedStart != 0) {
@@ -123,38 +117,49 @@ public class CalculateAverage_isolgpus {
try {
- while (i <= lengthOfChunk || !parsingName) {
+ while (i <= lengthOfChunk) {
+ int nameSum = 0;
+ int hashResult = 0;
+ int nameStart;
byte aChar;
- if (parsingName) {
-
- while ((aChar = r.get()) != SEPERATOR) {
- nameBuffer[nameBufferIndex++] = aChar;
- nameSum += aChar;
- hashResult = 31 * hashResult + aChar;
+ nameStart = i;
+ int nameBufferIndex = 0;
+ int valueIndex = 0;
+
+ // optimistically assume that the name is at least 4 bytes
+ int firstInt = r.getInt();
+ nameBufferIndex = 4;
+ nameSum = firstInt;
+ hashResult = 31 * firstInt;
+
+ while ((aChar = r.get()) != SEPERATOR) {
+ nameSum += aChar;
+ // hash as we go, stolen after a discussion with palmr
+ hashResult = 31 * hashResult + aChar;
+ nameBufferIndex++;
+
+ // oh no we read too much, do it the byte for byte way instead
+ if (aChar == NEW_LINE) {
+ r.position(i);
+ nameBufferIndex = 0;
+ nameSum = 0;
+ hashResult = 0;
}
- parsingName = false;
- i += nameBufferIndex + 1;
}
- else {
- isNegative = (aChar = r.get()) == NEGATIVE;
- valueIndex = readNumber(isNegative, valueBuffer, valueIndex, aChar, r);
- byte decimalValue = r.get();
+ i += nameBufferIndex + 1;
- int value = resolveValue(valueIndex, valueBuffer, decimalValue, isNegative);
- // new line character
- r.get();
+ isNegative = (aChar = r.get()) == NEGATIVE;
+ valueIndex = readNumber(isNegative, valueBuffer, valueIndex, aChar, r);
- MeasurementCollector measurementCollector = resolveMeasurementCollector(measurementCollectors, hashResult, nameBuffer, nameBufferIndex, nameSum);
+ int decimalValue = r.getShort() >> 8;
- measurementCollector.feed(value);
- i += valueIndex + (isNegative ? 4 : 3);
- valueIndex = 0;
- nameBufferIndex = 0;
- nameSum = 0;
- parsingName = true;
- hashResult = 0;
- }
+ int value = resolveValue(valueIndex, valueBuffer, decimalValue, isNegative);
+
+ MeasurementCollector measurementCollector = resolveMeasurementCollector(measurementCollectors, hashResult, nameStart, nameBufferIndex, nameSum, r);
+
+ measurementCollector.feed(value);
+ i += valueIndex + (isNegative ? 4 : 3);
}
}
@@ -168,18 +173,22 @@ public class CalculateAverage_isolgpus {
return measurementCollectors;
}
- private static MeasurementCollector resolveMeasurementCollector(MeasurementCollector[] measurementCollectors, int hash, byte[] nameBuffer, int nameBufferIndex,
- int nameSum) {
+ private static MeasurementCollector resolveMeasurementCollector(MeasurementCollector[] measurementCollectors, int hash, int nameStart, int nameBufferLength,
+ int nameSum, MappedByteBuffer r) {
MeasurementCollector measurementCollector = measurementCollectors[hash & HISTOGRAMS_MASK];
if (measurementCollector == null) {
- measurementCollector = new MeasurementCollector(Arrays.copyOf(nameBuffer, nameBufferIndex));
+ byte[] nameBuffer = new byte[nameBufferLength];
+ r.get(nameStart, nameBuffer, 0, nameBufferLength);
+ measurementCollector = new MeasurementCollector(nameBuffer, nameSum);
measurementCollectors[hash & HISTOGRAMS_MASK] = measurementCollector;
}
else {
// collision unhappy path, try to avoid
- while (!nameEquals(measurementCollector.name, measurementCollector.nameSum, nameSum, nameBufferIndex)) {
+ while (!nameEquals(measurementCollector.name, measurementCollector.nameSum, nameSum, nameBufferLength)) {
if (measurementCollector.link == null) {
- measurementCollector.link = new MeasurementCollector(Arrays.copyOf(nameBuffer, nameBufferIndex));
+ byte[] nameBuffer = new byte[nameBufferLength];
+ r.get(nameStart, nameBuffer, 0, nameBufferLength);
+ measurementCollector.link = new MeasurementCollector(nameBuffer, nameSum);
measurementCollector = measurementCollector.link;
break;
}
@@ -201,7 +210,7 @@ public class CalculateAverage_isolgpus {
return incomingNameSum == existingNameSum;
}
- private static int resolveValue(int valueIndex, byte[] valueBuffer, byte decimalValue, boolean isNegative) {
+ private static int resolveValue(int valueIndex, byte[] valueBuffer, int decimalValue, boolean isNegative) {
int value;
if (valueIndex == 1) {
value = ((valueBuffer[0] - OFFSET) * 10) + (decimalValue - OFFSET);
@@ -238,13 +247,9 @@ public class CalculateAverage_isolgpus {
private int min = Integer.MAX_VALUE;
private int max = Integer.MIN_VALUE;
- public MeasurementCollector(byte[] name) {
+ public MeasurementCollector(byte[] name, int nameSum) {
this.name = name;
- int nameSum = 0;
- for (int i = 0; i < name.length; i++) {
- nameSum += name[i];
- }
this.nameSum = nameSum;
}
@@ -279,7 +284,6 @@ public class CalculateAverage_isolgpus {
@Override
public String toString() {
- // Abha=-24.9/18.0/61.7
return name + "=" + min + "/" + mean + "/" + max;
}