aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko/spark/common/sampler
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2023-03-19 13:24:13 +0000
committerLuck <git@lucko.me>2023-03-19 13:24:13 +0000
commit6cdd78a0c57d751d3a44b319703c20b034f8d675 (patch)
tree5907aeead9a748185049620f6c8b5d7171349ec7 /spark-common/src/main/java/me/lucko/spark/common/sampler
parentf5d119ec5a2eec9f4595bffc7efee1e5ce5d3b03 (diff)
downloadspark-6cdd78a0c57d751d3a44b319703c20b034f8d675.tar.gz
spark-6cdd78a0c57d751d3a44b319703c20b034f8d675.tar.bz2
spark-6cdd78a0c57d751d3a44b319703c20b034f8d675.zip
Tidy up thread factories and async sampler regex thread filter
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java1
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java71
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java16
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java6
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java6
5 files changed, 55 insertions, 45 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index 7891a98..b6895ce 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -120,7 +120,6 @@ public class SamplerBuilder {
boolean canUseAsyncProfiler = this.useAsyncProfiler &&
!onlyTicksOverMode &&
!(this.ignoreSleeping || this.ignoreNative) &&
- !(this.threadDumper instanceof ThreadDumper.Regex) &&
AsyncProfilerAccess.getInstance(platform).checkSupported(platform);
if (this.mode == SamplerMode.ALLOCATION && (!canUseAsyncProfiler || !AsyncProfilerAccess.getInstance(platform).checkAllocationProfilingSupported(platform))) {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
index 62e2dda..c68384b 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
@@ -32,7 +32,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
/**
@@ -50,6 +49,15 @@ public interface ThreadDumper {
ThreadInfo[] dumpThreads(ThreadMXBean threadBean);
/**
+ * Gets if the given thread should be included in the output.
+ *
+ * @param threadId the thread id
+ * @param threadName the thread name
+ * @return if the thread should be included
+ */
+ boolean isThreadIncluded(long threadId, String threadName);
+
+ /**
* Gets metadata about the thread dumper instance.
*/
SamplerMetadata.ThreadDumper getMetadata();
@@ -82,6 +90,11 @@ public interface ThreadDumper {
}
@Override
+ public boolean isThreadIncluded(long threadId, String threadName) {
+ return true;
+ }
+
+ @Override
public SamplerMetadata.ThreadDumper getMetadata() {
return SamplerMetadata.ThreadDumper.newBuilder()
.setType(SamplerMetadata.ThreadDumper.Type.ALL)
@@ -116,7 +129,7 @@ public interface ThreadDumper {
}
public void setThread(Thread thread) {
- this.dumper = new Specific(new long[]{thread.getId()});
+ this.dumper = new Specific(thread);
}
}
@@ -132,10 +145,6 @@ public interface ThreadDumper {
this.ids = new long[]{thread.getId()};
}
- public Specific(long[] ids) {
- this.ids = ids;
- }
-
public Specific(Set<String> names) {
this.threadNamesLowerCase = names.stream().map(String::toLowerCase).collect(Collectors.toSet());
this.ids = new ThreadFinder().getThreads()
@@ -164,6 +173,14 @@ public interface ThreadDumper {
}
@Override
+ public boolean isThreadIncluded(long threadId, String threadName) {
+ if (Arrays.binarySearch(this.ids, threadId) >= 0) {
+ return true;
+ }
+ return getThreadNames().contains(threadName.toLowerCase());
+ }
+
+ @Override
public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) {
return threadBean.getThreadInfo(this.ids, Integer.MAX_VALUE);
}
@@ -187,35 +204,31 @@ public interface ThreadDumper {
public Regex(Set<String> namePatterns) {
this.namePatterns = namePatterns.stream()
- .map(regex -> {
- try {
- return Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
- } catch (PatternSyntaxException e) {
- return null;
- }
- })
- .filter(Objects::nonNull)
+ .map(regex -> Pattern.compile(regex, Pattern.CASE_INSENSITIVE))
.collect(Collectors.toSet());
}
@Override
+ public boolean isThreadIncluded(long threadId, String threadName) {
+ Boolean result = this.cache.get(threadId);
+ if (result != null) {
+ return result;
+ }
+
+ for (Pattern pattern : this.namePatterns) {
+ if (pattern.matcher(threadName).matches()) {
+ this.cache.put(threadId, true);
+ return true;
+ }
+ }
+ this.cache.put(threadId, false);
+ return false;
+ }
+
+ @Override
public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) {
return this.threadFinder.getThreads()
- .filter(thread -> {
- Boolean result = this.cache.get(thread.getId());
- if (result != null) {
- return result;
- }
-
- for (Pattern pattern : this.namePatterns) {
- if (pattern.matcher(thread.getName()).matches()) {
- this.cache.put(thread.getId(), true);
- return true;
- }
- }
- this.cache.put(thread.getId(), false);
- return false;
- })
+ .filter(thread -> isThreadIncluded(thread.getId(), thread.getName()))
.map(thread -> threadBean.getThreadInfo(thread.getId(), Integer.MAX_VALUE))
.filter(Objects::nonNull)
.toArray(ThreadInfo[]::new);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
index 039d4ba..2fd304c 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
@@ -34,7 +34,6 @@ import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Predicate;
/**
* Represents a profiling job within async-profiler.
@@ -203,18 +202,9 @@ public class AsyncProfilerJob {
* Aggregates the collected data.
*/
public void aggregate(AsyncDataAggregator dataAggregator) {
-
- Predicate<String> threadFilter;
- if (this.threadDumper instanceof ThreadDumper.Specific) {
- ThreadDumper.Specific specificDumper = (ThreadDumper.Specific) this.threadDumper;
- threadFilter = n -> specificDumper.getThreadNames().contains(n.toLowerCase());
- } else {
- threadFilter = n -> true;
- }
-
// read the jfr file produced by async-profiler
try (JfrReader reader = new JfrReader(this.outputFile)) {
- readSegments(reader, this.sampleCollector, threadFilter, dataAggregator);
+ readSegments(reader, this.sampleCollector, dataAggregator);
} catch (Exception e) {
boolean fileExists;
try {
@@ -241,7 +231,7 @@ public class AsyncProfilerJob {
}
}
- private <E extends JfrReader.Event> void readSegments(JfrReader reader, SampleCollector<E> collector, Predicate<String> threadFilter, AsyncDataAggregator dataAggregator) throws IOException {
+ private <E extends JfrReader.Event> void readSegments(JfrReader reader, SampleCollector<E> collector, AsyncDataAggregator dataAggregator) throws IOException {
List<E> samples = reader.readAllEvents(collector.eventClass());
for (E sample : samples) {
String threadName = reader.threads.get((long) sample.tid);
@@ -249,7 +239,7 @@ public class AsyncProfilerJob {
continue;
}
- if (!threadFilter.test(threadName)) {
+ if (!this.threadDumper.isThreadIncluded(sample.tid, threadName)) {
continue;
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index ec88677..961c3e9 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -28,6 +28,7 @@ import me.lucko.spark.common.sampler.SamplerMode;
import me.lucko.spark.common.sampler.SamplerSettings;
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
import me.lucko.spark.common.tick.TickHook;
+import me.lucko.spark.common.util.SparkThreadFactory;
import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
@@ -69,7 +70,10 @@ public class AsyncSampler extends AbstractSampler {
this.profilerAccess = AsyncProfilerAccess.getInstance(platform);
this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper());
this.scheduler = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-worker-thread").build()
+ new ThreadFactoryBuilder()
+ .setNameFormat("spark-async-sampler-worker-thread")
+ .setUncaughtExceptionHandler(SparkThreadFactory.EXCEPTION_HANDLER)
+ .build()
);
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
index 2e40406..e29619b 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
@@ -29,6 +29,7 @@ import me.lucko.spark.common.sampler.SamplerSettings;
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
import me.lucko.spark.common.tick.TickHook;
+import me.lucko.spark.common.util.SparkThreadFactory;
import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
@@ -50,7 +51,10 @@ public class JavaSampler extends AbstractSampler implements Runnable {
/** The worker pool for inserting stack nodes */
private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool(
- 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build()
+ 6, new ThreadFactoryBuilder()
+ .setNameFormat("spark-java-sampler-" + THREAD_ID.getAndIncrement() + "-%d")
+ .setUncaughtExceptionHandler(SparkThreadFactory.EXCEPTION_HANDLER)
+ .build()
);
/** The main sampling task */