aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko/spark/common
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2023-03-19 13:24:13 +0000
committerLuck <git@lucko.me>2023-03-19 13:24:13 +0000
commit6cdd78a0c57d751d3a44b319703c20b034f8d675 (patch)
tree5907aeead9a748185049620f6c8b5d7171349ec7 /spark-common/src/main/java/me/lucko/spark/common
parentf5d119ec5a2eec9f4595bffc7efee1e5ce5d3b03 (diff)
downloadspark-6cdd78a0c57d751d3a44b319703c20b034f8d675.tar.gz
spark-6cdd78a0c57d751d3a44b319703c20b034f8d675.tar.bz2
spark-6cdd78a0c57d751d3a44b319703c20b034f8d675.zip
Tidy up thread factories and async sampler regex thread filter
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/monitor/MonitoringExecutor.java5
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java1
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java71
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java16
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java6
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java6
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/util/SparkThreadFactory.java15
7 files changed, 67 insertions, 53 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/monitor/MonitoringExecutor.java b/spark-common/src/main/java/me/lucko/spark/common/monitor/MonitoringExecutor.java
index 635ae20..cbacebf 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/monitor/MonitoringExecutor.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/monitor/MonitoringExecutor.java
@@ -20,6 +20,8 @@
package me.lucko.spark.common.monitor;
+import me.lucko.spark.common.util.SparkThreadFactory;
+
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -29,7 +31,8 @@ public enum MonitoringExecutor {
/** The executor used to monitor & calculate rolling averages. */
public static final ScheduledExecutorService INSTANCE = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = Executors.defaultThreadFactory().newThread(r);
- thread.setName("spark-monitor");
+ thread.setName("spark-monitoring-thread");
+ thread.setUncaughtExceptionHandler(SparkThreadFactory.EXCEPTION_HANDLER);
thread.setDaemon(true);
return thread;
});
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index 7891a98..b6895ce 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -120,7 +120,6 @@ public class SamplerBuilder {
boolean canUseAsyncProfiler = this.useAsyncProfiler &&
!onlyTicksOverMode &&
!(this.ignoreSleeping || this.ignoreNative) &&
- !(this.threadDumper instanceof ThreadDumper.Regex) &&
AsyncProfilerAccess.getInstance(platform).checkSupported(platform);
if (this.mode == SamplerMode.ALLOCATION && (!canUseAsyncProfiler || !AsyncProfilerAccess.getInstance(platform).checkAllocationProfilingSupported(platform))) {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
index 62e2dda..c68384b 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
@@ -32,7 +32,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
/**
@@ -50,6 +49,15 @@ public interface ThreadDumper {
ThreadInfo[] dumpThreads(ThreadMXBean threadBean);
/**
+ * Gets if the given thread should be included in the output.
+ *
+ * @param threadId the thread id
+ * @param threadName the thread name
+ * @return if the thread should be included
+ */
+ boolean isThreadIncluded(long threadId, String threadName);
+
+ /**
* Gets metadata about the thread dumper instance.
*/
SamplerMetadata.ThreadDumper getMetadata();
@@ -82,6 +90,11 @@ public interface ThreadDumper {
}
@Override
+ public boolean isThreadIncluded(long threadId, String threadName) {
+ return true;
+ }
+
+ @Override
public SamplerMetadata.ThreadDumper getMetadata() {
return SamplerMetadata.ThreadDumper.newBuilder()
.setType(SamplerMetadata.ThreadDumper.Type.ALL)
@@ -116,7 +129,7 @@ public interface ThreadDumper {
}
public void setThread(Thread thread) {
- this.dumper = new Specific(new long[]{thread.getId()});
+ this.dumper = new Specific(thread);
}
}
@@ -132,10 +145,6 @@ public interface ThreadDumper {
this.ids = new long[]{thread.getId()};
}
- public Specific(long[] ids) {
- this.ids = ids;
- }
-
public Specific(Set<String> names) {
this.threadNamesLowerCase = names.stream().map(String::toLowerCase).collect(Collectors.toSet());
this.ids = new ThreadFinder().getThreads()
@@ -164,6 +173,14 @@ public interface ThreadDumper {
}
@Override
+ public boolean isThreadIncluded(long threadId, String threadName) {
+ if (Arrays.binarySearch(this.ids, threadId) >= 0) {
+ return true;
+ }
+ return getThreadNames().contains(threadName.toLowerCase());
+ }
+
+ @Override
public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) {
return threadBean.getThreadInfo(this.ids, Integer.MAX_VALUE);
}
@@ -187,35 +204,31 @@ public interface ThreadDumper {
public Regex(Set<String> namePatterns) {
this.namePatterns = namePatterns.stream()
- .map(regex -> {
- try {
- return Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
- } catch (PatternSyntaxException e) {
- return null;
- }
- })
- .filter(Objects::nonNull)
+ .map(regex -> Pattern.compile(regex, Pattern.CASE_INSENSITIVE))
.collect(Collectors.toSet());
}
@Override
+ public boolean isThreadIncluded(long threadId, String threadName) {
+ Boolean result = this.cache.get(threadId);
+ if (result != null) {
+ return result;
+ }
+
+ for (Pattern pattern : this.namePatterns) {
+ if (pattern.matcher(threadName).matches()) {
+ this.cache.put(threadId, true);
+ return true;
+ }
+ }
+ this.cache.put(threadId, false);
+ return false;
+ }
+
+ @Override
public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) {
return this.threadFinder.getThreads()
- .filter(thread -> {
- Boolean result = this.cache.get(thread.getId());
- if (result != null) {
- return result;
- }
-
- for (Pattern pattern : this.namePatterns) {
- if (pattern.matcher(thread.getName()).matches()) {
- this.cache.put(thread.getId(), true);
- return true;
- }
- }
- this.cache.put(thread.getId(), false);
- return false;
- })
+ .filter(thread -> isThreadIncluded(thread.getId(), thread.getName()))
.map(thread -> threadBean.getThreadInfo(thread.getId(), Integer.MAX_VALUE))
.filter(Objects::nonNull)
.toArray(ThreadInfo[]::new);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
index 039d4ba..2fd304c 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
@@ -34,7 +34,6 @@ import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Predicate;
/**
* Represents a profiling job within async-profiler.
@@ -203,18 +202,9 @@ public class AsyncProfilerJob {
* Aggregates the collected data.
*/
public void aggregate(AsyncDataAggregator dataAggregator) {
-
- Predicate<String> threadFilter;
- if (this.threadDumper instanceof ThreadDumper.Specific) {
- ThreadDumper.Specific specificDumper = (ThreadDumper.Specific) this.threadDumper;
- threadFilter = n -> specificDumper.getThreadNames().contains(n.toLowerCase());
- } else {
- threadFilter = n -> true;
- }
-
// read the jfr file produced by async-profiler
try (JfrReader reader = new JfrReader(this.outputFile)) {
- readSegments(reader, this.sampleCollector, threadFilter, dataAggregator);
+ readSegments(reader, this.sampleCollector, dataAggregator);
} catch (Exception e) {
boolean fileExists;
try {
@@ -241,7 +231,7 @@ public class AsyncProfilerJob {
}
}
- private <E extends JfrReader.Event> void readSegments(JfrReader reader, SampleCollector<E> collector, Predicate<String> threadFilter, AsyncDataAggregator dataAggregator) throws IOException {
+ private <E extends JfrReader.Event> void readSegments(JfrReader reader, SampleCollector<E> collector, AsyncDataAggregator dataAggregator) throws IOException {
List<E> samples = reader.readAllEvents(collector.eventClass());
for (E sample : samples) {
String threadName = reader.threads.get((long) sample.tid);
@@ -249,7 +239,7 @@ public class AsyncProfilerJob {
continue;
}
- if (!threadFilter.test(threadName)) {
+ if (!this.threadDumper.isThreadIncluded(sample.tid, threadName)) {
continue;
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index ec88677..961c3e9 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -28,6 +28,7 @@ import me.lucko.spark.common.sampler.SamplerMode;
import me.lucko.spark.common.sampler.SamplerSettings;
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
import me.lucko.spark.common.tick.TickHook;
+import me.lucko.spark.common.util.SparkThreadFactory;
import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
@@ -69,7 +70,10 @@ public class AsyncSampler extends AbstractSampler {
this.profilerAccess = AsyncProfilerAccess.getInstance(platform);
this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper());
this.scheduler = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-worker-thread").build()
+ new ThreadFactoryBuilder()
+ .setNameFormat("spark-async-sampler-worker-thread")
+ .setUncaughtExceptionHandler(SparkThreadFactory.EXCEPTION_HANDLER)
+ .build()
);
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
index 2e40406..e29619b 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
@@ -29,6 +29,7 @@ import me.lucko.spark.common.sampler.SamplerSettings;
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
import me.lucko.spark.common.tick.TickHook;
+import me.lucko.spark.common.util.SparkThreadFactory;
import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
@@ -50,7 +51,10 @@ public class JavaSampler extends AbstractSampler implements Runnable {
/** The worker pool for inserting stack nodes */
private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool(
- 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build()
+ 6, new ThreadFactoryBuilder()
+ .setNameFormat("spark-java-sampler-" + THREAD_ID.getAndIncrement() + "-%d")
+ .setUncaughtExceptionHandler(SparkThreadFactory.EXCEPTION_HANDLER)
+ .build()
);
/** The main sampling task */
diff --git a/spark-common/src/main/java/me/lucko/spark/common/util/SparkThreadFactory.java b/spark-common/src/main/java/me/lucko/spark/common/util/SparkThreadFactory.java
index 156fa0d..42dca12 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/util/SparkThreadFactory.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/util/SparkThreadFactory.java
@@ -23,7 +23,13 @@ package me.lucko.spark.common.util;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
-public class SparkThreadFactory implements ThreadFactory, Thread.UncaughtExceptionHandler {
+public class SparkThreadFactory implements ThreadFactory {
+
+ public static final Thread.UncaughtExceptionHandler EXCEPTION_HANDLER = (t, e) -> {
+ System.err.println("Uncaught exception thrown by thread " + t.getName());
+ e.printStackTrace();
+ };
+
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
@@ -36,14 +42,9 @@ public class SparkThreadFactory implements ThreadFactory, Thread.UncaughtExcepti
public Thread newThread(Runnable r) {
Thread t = new Thread(r, this.namePrefix + this.threadNumber.getAndIncrement());
- t.setUncaughtExceptionHandler(this);
+ t.setUncaughtExceptionHandler(EXCEPTION_HANDLER);
t.setDaemon(true);
return t;
}
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- System.err.println("Uncaught exception thrown by thread " + t.getName());
- e.printStackTrace();
- }
}