aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xcalculate_average_shipilev.sh7
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_shipilev.java156
2 files changed, 99 insertions, 64 deletions
diff --git a/calculate_average_shipilev.sh b/calculate_average_shipilev.sh
index 5d9f633..13a12cd 100755
--- a/calculate_average_shipilev.sh
+++ b/calculate_average_shipilev.sh
@@ -15,12 +15,11 @@
# limitations under the License.
#
-JAVA_OPTS="-XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC -Xms64m -Xmx64m -XX:+AlwaysPreTouch -XX:+UseTransparentHugePages
--XX:-TieredCompilation -XX:CICompilerCount=2 -XX:-UseCountedLoopSafepoints -XX:+TrustFinalNonStaticFields
+JAVA_OPTS="-XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC -Xms1g -Xmx1g -XX:-AlwaysPreTouch -XX:+UseTransparentHugePages
+-XX:-TieredCompilation -XX:-UseCountedLoopSafepoints -XX:+TrustFinalNonStaticFields -XX:CompileThreshold=2048
--add-opens java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.ref=ALL-UNNAMED
-XX:+UnlockDiagnosticVMOptions -XX:CompileCommand=quiet
-XX:CompileCommand=dontinline,dev.morling.onebrc.CalculateAverage_shipilev\$ParsingTask::seqCompute
-XX:CompileCommand=dontinline,dev.morling.onebrc.CalculateAverage_shipilev\$MeasurementsMap::updateSlow
--XX:CompileCommand=inline,dev.morling.onebrc.CalculateAverage_shipilev::nameMatches
--XX:CompileThreshold=2048"
+-XX:CompileCommand=inline,dev.morling.onebrc.CalculateAverage_shipilev\$Bucket::matches"
java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_shipilev
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_shipilev.java b/src/main/java/dev/morling/onebrc/CalculateAverage_shipilev.java
index 4998986..1150f42 100644
--- a/src/main/java/dev/morling/onebrc/CalculateAverage_shipilev.java
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_shipilev.java
@@ -102,45 +102,12 @@ public class CalculateAverage_shipilev {
// ========================= MEATY GRITTY PARTS: PARSE AND AGGREGATE =========================
- // Little helper method to compare the array with given bytebuffer range.
- public static boolean nameMatches(Bucket bucket, ByteBuffer cand, int begin, int end) {
- byte[] orig = bucket.name;
- int origLen = orig.length;
- int candLen = end - begin;
- if (origLen != candLen) {
- return false;
- }
-
- // Check the tails first, to simplify the matches.
- if (origLen >= 8) {
- if (bucket.tail1 != cand.getLong(end - 8)) {
- return false;
- }
- if (origLen >= 16) {
- if (bucket.tail2 != cand.getLong(end - 16)) {
- return false;
- }
- origLen -= 16;
- }
- else {
- origLen -= 8;
- }
- }
-
- // Check the rest.
- for (int i = 0; i < origLen; i++) {
- if (orig[i] != cand.get(begin + i)) {
- return false;
- }
- }
- return true;
- }
-
public static final class Bucket {
- // Raw station name, its hash, and tails.
- public final byte[] name;
+ // Raw station name, its hash, and prefixes.
+ public final byte[] nameTail;
+ public final int len;
public final int hash;
- public final long tail1, tail2;
+ public final int prefix1, prefix2;
// Temperature values, in 10x scale.
public long sum;
@@ -148,10 +115,32 @@ public class CalculateAverage_shipilev {
public int min;
public int max;
- public Bucket(byte[] name, long tail1, long tail2, int hash, int temp) {
- this.name = name;
- this.tail1 = tail1;
- this.tail2 = tail2;
+ public Bucket(ByteBuffer slice, int begin, int end, int hash, int temp) {
+ len = end - begin;
+
+ // Also pick up any prefixes to simplify future matches.
+ int tailStart = 0;
+ if (len >= 8) {
+ prefix1 = slice.getInt(begin + 0);
+ prefix2 = slice.getInt(begin + 4);
+ tailStart += 8;
+ }
+ else if (len >= 4) {
+ prefix1 = slice.getInt(begin + 0);
+ prefix2 = 0;
+ tailStart += 4;
+ }
+ else {
+ prefix1 = 0;
+ prefix2 = 0;
+ }
+
+ // The rest goes to tail byte array. We are checking it names on hot-path.
+ // Therefore, it is convenient to keep allocation for names near the buckets.
+ int tailLen = len - tailStart;
+ nameTail = new byte[tailLen];
+ slice.get(begin + tailStart, nameTail, 0, tailLen);
+
this.hash = hash;
this.sum = temp;
this.count = 1;
@@ -159,6 +148,48 @@ public class CalculateAverage_shipilev {
this.max = temp;
}
+ // Little helper method to compare the array with given bytebuffer range.
+ public boolean matches(ByteBuffer cand, int begin, int end) {
+ int origLen = len;
+ int candLen = end - begin;
+ if (origLen != candLen) {
+ return false;
+ }
+
+ // Check the prefixes first, to simplify the matches.
+ int tailStart = 0;
+ if (origLen >= 8) {
+ if (prefix1 != cand.getInt(begin)) {
+ return false;
+ }
+ if (prefix2 != cand.getInt(begin + 4)) {
+ return false;
+ }
+ tailStart += 8;
+ }
+ else if (origLen >= 4) {
+ if (prefix1 != cand.getInt(begin)) {
+ return false;
+ }
+ tailStart += 4;
+ }
+
+ // Check the rest.
+ for (int i = 0; i < origLen - tailStart; i++) {
+ if (nameTail[i] != cand.get(begin + tailStart + i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean matches(Bucket other) {
+ return len == other.len &&
+ prefix1 == other.prefix1 &&
+ prefix2 == other.prefix2 &&
+ Arrays.equals(nameTail, other.nameTail);
+ }
+
public void merge(int value) {
sum += value;
count++;
@@ -178,8 +209,19 @@ public class CalculateAverage_shipilev {
}
public Row toRow() {
+ // Reconstruct the name
+ ByteBuffer bb = ByteBuffer.allocate(len);
+ bb.order(ByteOrder.LITTLE_ENDIAN);
+ if (len >= 4) {
+ bb.putInt(prefix1);
+ }
+ if (len >= 8) {
+ bb.putInt(prefix2);
+ }
+ bb.put(nameTail);
+
return new Row(
- new String(name),
+ new String(Arrays.copyOf(bb.array(), len)),
Math.round((double) min) / 10.0,
Math.round((double) sum / count) / 10.0,
Math.round((double) max) / 10.0);
@@ -205,21 +247,11 @@ public class CalculateAverage_shipilev {
while (true) {
Bucket cur = buckets[idx];
if (cur == null) {
- // No bucket yet, lucky us. Lookup the name and create the bucket with it.
- // We are checking the names on hot-path. Therefore, it is convenient
- // to keep allocation for names near the buckets.
- int len = end - begin;
- byte[] copy = new byte[len];
- name.get(begin, copy, 0, len);
-
- // Also pick up any tail to simplify future matches.
- long tail1 = (len < 8) ? 0 : name.getLong(begin + len - 8);
- long tail2 = (len < 16) ? 0 : name.getLong(begin + len - 16);
-
- buckets[idx] = new Bucket(copy, tail1, tail2, hash, temp);
+ // No bucket yet, lucky us. Create the bucket with it.
+ buckets[idx] = new Bucket(name, begin, end, hash, temp);
return;
}
- else if ((cur.hash == hash) && nameMatches(cur, name, begin, end)) {
+ else if ((cur.hash == hash) && cur.matches(name, begin, end)) {
// Same as bucket fastpath. Check for collision by checking the full hash
// first (since the index is truncated by map size), and then the exact name.
cur.merge(temp);
@@ -244,7 +276,7 @@ public class CalculateAverage_shipilev {
buckets[idx] = other;
break;
}
- else if ((cur.hash == other.hash) && Arrays.equals(cur.name, other.name)) {
+ else if ((cur.hash == other.hash) && cur.matches(other)) {
cur.merge(other);
break;
}
@@ -425,7 +457,7 @@ public class CalculateAverage_shipilev {
// Time to update!
Bucket bucket = buckets[nameHash & (MAP_SIZE - 1)];
- if ((bucket != null) && (nameHash == bucket.hash) && nameMatches(bucket, slice, nameBegin, nameEnd)) {
+ if ((bucket != null) && (nameHash == bucket.hash) && bucket.matches(slice, nameBegin, nameEnd)) {
// Lucky fast path, existing bucket hit. Most of the time we complete here.
bucket.merge(temp);
}
@@ -447,8 +479,8 @@ public class CalculateAverage_shipilev {
// a given mmap slice, while there is still other work to do. This allows
// us to unmap slices on the go.
public static final class RootTask extends CountedCompleter<Void> {
- public RootTask(CountedCompleter<Void> parent) {
- super(parent);
+ public RootTask() {
+ super(null);
}
@Override
@@ -517,8 +549,12 @@ public class CalculateAverage_shipilev {
// ========================= Invocation =========================
public static void main(String[] args) throws Exception {
+ // Instantiate a separate FJP to match the parallelism accurately, without
+ // relying on common pool defaults.
+ ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
+
// This little line carries the whole world
- new RootTask(null).fork();
+ pool.submit(new RootTask());
// While the root task is working, prepare what we need for the
// end of the run. Go and try to report something to prepare the