aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/dev/morling
diff options
context:
space:
mode:
authorRichard Startin <richardstartin@apache.org>2024-01-04 22:11:28 +0000
committerGunnar Morling <gunnar.morling@googlemail.com>2024-01-04 23:48:54 +0100
commitb2cd84c6bc2b6ba0b84ea52f1ef2f2d5d8592c24 (patch)
treeb668d051c84fea0bc9a1667e4581dbea4a7d64be /src/main/java/dev/morling
parentb467319e58599d42df151e57731604804d2ad416 (diff)
make aggregation state grow dynamically
Diffstat (limited to 'src/main/java/dev/morling')
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_richardstartin.java44
1 files changed, 26 insertions, 18 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_richardstartin.java b/src/main/java/dev/morling/onebrc/CalculateAverage_richardstartin.java
index 0d5d2fa..b619c25 100644
--- a/src/main/java/dev/morling/onebrc/CalculateAverage_richardstartin.java
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_richardstartin.java
@@ -248,12 +248,18 @@ public class CalculateAverage_richardstartin {
return Arrays.copyOf(PAGE_PROTOTYPE, PAGE_PROTOTYPE.length);
}
- static void update(double[][] pages, int position, double value) {
+ static void update(List<double[]> pages, int position, double value) {
// find the page
int pageIndex = position >>> PAGE_SHIFT;
- double[] page = pages[pageIndex];
+ if (pageIndex >= pages.size()) {
+ for (int i = pages.size(); i <= pageIndex; i++) {
+ pages.add(null);
+ }
+ }
+ double[] page = pages.get(pageIndex);
if (page == null) {
- pages[pageIndex] = page = newPage();
+ page = newPage();
+ pages.set(pageIndex, page);
}
// update local aggregates
@@ -263,8 +269,8 @@ public class CalculateAverage_richardstartin {
page[(position & PAGE_MASK) * 4 + 3] += value; // sum
}
- static ResultRow toResultRow(double[][] pages, int position) {
- double[] page = pages[position >>> PAGE_SHIFT];
+ static ResultRow toResultRow(List<double[]> pages, int position) {
+ double[] page = pages.get(position >>> PAGE_SHIFT);
double count = page[(position & PAGE_MASK) * 4];
double min = page[(position & PAGE_MASK) * 4 + 1];
double max = page[(position & PAGE_MASK) * 4 + 2];
@@ -273,7 +279,7 @@ public class CalculateAverage_richardstartin {
}
}
- private static class AggregationTask extends RecursiveTask<double[][]> {
+ private static class AggregationTask extends RecursiveTask<List<double[]>> {
private final Dictionary dictionary;
private final List<ByteBuffer> slices;
@@ -291,7 +297,7 @@ public class CalculateAverage_richardstartin {
this.max = max;
}
- private void computeSlice(ByteBuffer slice, double[][] pages) {
+ private void computeSlice(ByteBuffer slice, List<double[]> pages) {
for (int offset = 0; offset < slice.limit();) {
int nextSeparator = findIndexOf(slice, offset, DELIMITER);
ByteBuffer key = slice.slice(offset, nextSeparator - offset).order(ByteOrder.LITTLE_ENDIAN);
@@ -310,14 +316,17 @@ public class CalculateAverage_richardstartin {
}
}
- private static void merge(double[][] contribution, double[][] aggregate) {
- for (int i = 0; i < contribution.length; i++) {
- if (aggregate[i] == null) {
- aggregate[i] = contribution[i];
+ private static void merge(List<double[]> contribution, List<double[]> aggregate) {
+ for (int i = aggregate.size(); i < contribution.size(); i++) {
+ aggregate.add(null);
+ }
+ for (int i = 0; i < contribution.size(); i++) {
+ if (aggregate.get(i) == null) {
+ aggregate.set(i, contribution.get(i));
}
- else if (contribution[i] != null) {
- double[] to = aggregate[i];
- double[] from = contribution[i];
+ else if (contribution.get(i) != null) {
+ double[] to = aggregate.get(i);
+ double[] from = contribution.get(i);
// todo won't vectorise - consider separating aggregates into distinct regions and apply
// loop fission (if this shows up in the profile)
for (int j = 0; j < to.length; j += 4) {
@@ -331,10 +340,9 @@ public class CalculateAverage_richardstartin {
}
@Override
- protected double[][] compute() {
+ protected List<double[]> compute() {
if (min == max) {
- // fixme - hardcoded to problem size
- var pages = new double[600][];
+ var pages = new ArrayList<double[]>();
var slice = slices.get(min);
computeSlice(slice, pages);
return pages;
@@ -388,7 +396,7 @@ public class CalculateAverage_richardstartin {
var fjp = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
Dictionary dictionary = new Dictionary();
- double[][] aggregates = fjp.submit(new AggregationTask(dictionary, slices)).join();
+ var aggregates = fjp.submit(new AggregationTask(dictionary, slices)).join();
var map = new TreeMap<String, ResultRow>();
dictionary.forEach((key, index) -> map.put(key, Page.toResultRow(aggregates, index)));
System.out.println(map);