diff options
| author | Luck <git@lucko.me> | 2023-05-11 20:39:24 +0100 | 
|---|---|---|
| committer | Luck <git@lucko.me> | 2023-05-11 20:41:42 +0100 | 
| commit | c79f47c7dcd4cd2b0cc9266c0dd0b784f647bfa1 (patch) | |
| tree | 5e8cf5def3a85bb37fdacd96e52ebffc05b2a8a6 /spark-common/src/main/java | |
| parent | a70ccb394839c63f13b3e6ff5539c0a042925d2f (diff) | |
| download | spark-c79f47c7dcd4cd2b0cc9266c0dd0b784f647bfa1.tar.gz spark-c79f47c7dcd4cd2b0cc9266c0dd0b784f647bfa1.tar.bz2 spark-c79f47c7dcd4cd2b0cc9266c0dd0b784f647bfa1.zip  | |
Fix only-ticks-over rejected execution error (#324)
Diffstat (limited to 'spark-common/src/main/java')
| -rw-r--r-- | spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java | 11 | 
1 files changed, 6 insertions, 5 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java index d537b96..08cb719 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java @@ -30,6 +30,7 @@ import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;  import java.lang.management.ThreadInfo;  import java.util.ArrayList;  import java.util.List; +import java.util.concurrent.Executor;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.TimeUnit; @@ -75,7 +76,7 @@ public class TickedDataAggregator extends JavaDataAggregator {      public SamplerMetadata.DataAggregator getMetadata() {          // push the current tick (so numberOfTicks is accurate)          synchronized (this.mutex) { -            pushCurrentTick(); +            pushCurrentTick(Runnable::run);              this.currentData = null;          } @@ -92,7 +93,7 @@ public class TickedDataAggregator extends JavaDataAggregator {          synchronized (this.mutex) {              int tick = this.tickHook.getCurrentTick();              if (this.currentTick != tick || this.currentData == null) { -                pushCurrentTick(); +                pushCurrentTick(this.workerPool);                  this.currentTick = tick;                  this.currentData = new TickList(this.expectedSize, window);              } @@ -102,7 +103,7 @@ public class TickedDataAggregator extends JavaDataAggregator {      }      // guarded by 'mutex' -    private void pushCurrentTick() { +    private void pushCurrentTick(Executor executor) {          TickList currentData = this.currentData;          if (currentData == null) {              return; @@ -116,7 +117,7 @@ public class TickedDataAggregator extends JavaDataAggregator {              return;          } -        this.workerPool.submit(currentData); +        executor.execute(currentData);          this.tickCounter.increment();      } @@ -124,7 +125,7 @@ public class TickedDataAggregator extends JavaDataAggregator {      public List<ThreadNode> exportData() {          // push the current tick          synchronized (this.mutex) { -            pushCurrentTick(); +            pushCurrentTick(Runnable::run);          }          return super.exportData();  | 
