aboutsummaryrefslogtreecommitdiff
path: root/spark-common
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2022-11-16 22:20:50 +0000
committerLuck <git@lucko.me>2022-11-16 22:20:50 +0000
commitf5ed027fcda244f1a1fea7f7bc7f9f4324f95db9 (patch)
tree51df9f9c0c85aaba30c33f1aee89eefb98432cdd /spark-common
parente52ea7dbac9df3d610aef2ab3924fa9410d167e3 (diff)
downloadspark-f5ed027fcda244f1a1fea7f7bc7f9f4324f95db9.tar.gz
spark-f5ed027fcda244f1a1fea7f7bc7f9f4324f95db9.tar.bz2
spark-f5ed027fcda244f1a1fea7f7bc7f9f4324f95db9.zip
Fix concurrency errors in java sampler (#267)
Diffstat (limited to 'spark-common')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java2
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java3
2 files changed, 2 insertions, 3 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
index 95c3508..42a457d 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
@@ -144,7 +144,7 @@ public class JavaSampler extends AbstractSampler implements Runnable {
}
// if we have just stepped over into a new window...
- int previousWindow = JavaSampler.this.lastWindow.getAndSet(this.window);
+ int previousWindow = JavaSampler.this.lastWindow.getAndUpdate(previous -> Math.max(this.window, previous));
if (previousWindow != 0 && previousWindow != this.window) {
// collect statistics for the previous window
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
index 2e4b055..163365c 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
@@ -25,7 +25,6 @@ import me.lucko.spark.common.sampler.window.ProtoTimeEncoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -46,7 +45,7 @@ public abstract class AbstractNode {
/** The accumulated sample time for this node, measured in microseconds */
// Integer key = the window (effectively System.currentTimeMillis() / 60_000)
// LongAdder value = accumulated time in microseconds
- private final Map<Integer, LongAdder> times = new HashMap<>();
+ private final Map<Integer, LongAdder> times = new ConcurrentHashMap<>();
/**
* Gets the time accumulator for a given window