aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2023-05-11 20:39:24 +0100
committerLuck <git@lucko.me>2023-05-11 20:41:42 +0100
commitc79f47c7dcd4cd2b0cc9266c0dd0b784f647bfa1 (patch)
tree5e8cf5def3a85bb37fdacd96e52ebffc05b2a8a6 /spark-common/src/main/java
parenta70ccb394839c63f13b3e6ff5539c0a042925d2f (diff)
downloadspark-c79f47c7dcd4cd2b0cc9266c0dd0b784f647bfa1.tar.gz
spark-c79f47c7dcd4cd2b0cc9266c0dd0b784f647bfa1.tar.bz2
spark-c79f47c7dcd4cd2b0cc9266c0dd0b784f647bfa1.zip
Fix only-ticks-over rejected execution error (#324)
Diffstat (limited to 'spark-common/src/main/java')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java11
1 files changed, 6 insertions, 5 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java
index d537b96..08cb719 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java
@@ -30,6 +30,7 @@ import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
import java.lang.management.ThreadInfo;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -75,7 +76,7 @@ public class TickedDataAggregator extends JavaDataAggregator {
public SamplerMetadata.DataAggregator getMetadata() {
// push the current tick (so numberOfTicks is accurate)
synchronized (this.mutex) {
- pushCurrentTick();
+ pushCurrentTick(Runnable::run);
this.currentData = null;
}
@@ -92,7 +93,7 @@ public class TickedDataAggregator extends JavaDataAggregator {
synchronized (this.mutex) {
int tick = this.tickHook.getCurrentTick();
if (this.currentTick != tick || this.currentData == null) {
- pushCurrentTick();
+ pushCurrentTick(this.workerPool);
this.currentTick = tick;
this.currentData = new TickList(this.expectedSize, window);
}
@@ -102,7 +103,7 @@ public class TickedDataAggregator extends JavaDataAggregator {
}
// guarded by 'mutex'
- private void pushCurrentTick() {
+ private void pushCurrentTick(Executor executor) {
TickList currentData = this.currentData;
if (currentData == null) {
return;
@@ -116,7 +117,7 @@ public class TickedDataAggregator extends JavaDataAggregator {
return;
}
- this.workerPool.submit(currentData);
+ executor.execute(currentData);
this.tickCounter.increment();
}
@@ -124,7 +125,7 @@ public class TickedDataAggregator extends JavaDataAggregator {
public List<ThreadNode> exportData() {
// push the current tick
synchronized (this.mutex) {
- pushCurrentTick();
+ pushCurrentTick(Runnable::run);
}
return super.exportData();