diff options
Diffstat (limited to 'spark-common/src/main')
7 files changed, 67 insertions, 53 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/monitor/MonitoringExecutor.java b/spark-common/src/main/java/me/lucko/spark/common/monitor/MonitoringExecutor.java index 635ae20..cbacebf 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/monitor/MonitoringExecutor.java +++ b/spark-common/src/main/java/me/lucko/spark/common/monitor/MonitoringExecutor.java @@ -20,6 +20,8 @@ package me.lucko.spark.common.monitor; +import me.lucko.spark.common.util.SparkThreadFactory; + import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -29,7 +31,8 @@ public enum MonitoringExecutor { /** The executor used to monitor & calculate rolling averages. */ public static final ScheduledExecutorService INSTANCE = Executors.newSingleThreadScheduledExecutor(r -> { Thread thread = Executors.defaultThreadFactory().newThread(r); - thread.setName("spark-monitor"); + thread.setName("spark-monitoring-thread"); + thread.setUncaughtExceptionHandler(SparkThreadFactory.EXCEPTION_HANDLER); thread.setDaemon(true); return thread; }); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index 7891a98..b6895ce 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -120,7 +120,6 @@ public class SamplerBuilder { boolean canUseAsyncProfiler = this.useAsyncProfiler && !onlyTicksOverMode && !(this.ignoreSleeping || this.ignoreNative) && - !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.getInstance(platform).checkSupported(platform); if (this.mode == SamplerMode.ALLOCATION && (!canUseAsyncProfiler || !AsyncProfilerAccess.getInstance(platform).checkAllocationProfilingSupported(platform))) { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java index 62e2dda..c68384b 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java @@ -32,7 +32,6 @@ import java.util.Objects; import java.util.Set; import java.util.function.Supplier; import java.util.regex.Pattern; -import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; /** @@ -50,6 +49,15 @@ public interface ThreadDumper { ThreadInfo[] dumpThreads(ThreadMXBean threadBean); /** + * Gets if the given thread should be included in the output. + * + * @param threadId the thread id + * @param threadName the thread name + * @return if the thread should be included + */ + boolean isThreadIncluded(long threadId, String threadName); + + /** * Gets metadata about the thread dumper instance. */ SamplerMetadata.ThreadDumper getMetadata(); @@ -82,6 +90,11 @@ public interface ThreadDumper { } @Override + public boolean isThreadIncluded(long threadId, String threadName) { + return true; + } + + @Override public SamplerMetadata.ThreadDumper getMetadata() { return SamplerMetadata.ThreadDumper.newBuilder() .setType(SamplerMetadata.ThreadDumper.Type.ALL) @@ -116,7 +129,7 @@ public interface ThreadDumper { } public void setThread(Thread thread) { - this.dumper = new Specific(new long[]{thread.getId()}); + this.dumper = new Specific(thread); } } @@ -132,10 +145,6 @@ public interface ThreadDumper { this.ids = new long[]{thread.getId()}; } - public Specific(long[] ids) { - this.ids = ids; - } - public Specific(Set<String> names) { this.threadNamesLowerCase = names.stream().map(String::toLowerCase).collect(Collectors.toSet()); this.ids = new ThreadFinder().getThreads() @@ -164,6 +173,14 @@ public interface ThreadDumper { } @Override + public boolean isThreadIncluded(long threadId, String threadName) { + if (Arrays.binarySearch(this.ids, threadId) >= 0) { + return true; + } + return getThreadNames().contains(threadName.toLowerCase()); + } + + @Override public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) { return threadBean.getThreadInfo(this.ids, Integer.MAX_VALUE); } @@ -187,35 +204,31 @@ public interface ThreadDumper { public Regex(Set<String> namePatterns) { this.namePatterns = namePatterns.stream() - .map(regex -> { - try { - return Pattern.compile(regex, Pattern.CASE_INSENSITIVE); - } catch (PatternSyntaxException e) { - return null; - } - }) - .filter(Objects::nonNull) + .map(regex -> Pattern.compile(regex, Pattern.CASE_INSENSITIVE)) .collect(Collectors.toSet()); } @Override + public boolean isThreadIncluded(long threadId, String threadName) { + Boolean result = this.cache.get(threadId); + if (result != null) { + return result; + } + + for (Pattern pattern : this.namePatterns) { + if (pattern.matcher(threadName).matches()) { + this.cache.put(threadId, true); + return true; + } + } + this.cache.put(threadId, false); + return false; + } + + @Override public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) { return this.threadFinder.getThreads() - .filter(thread -> { - Boolean result = this.cache.get(thread.getId()); - if (result != null) { - return result; - } - - for (Pattern pattern : this.namePatterns) { - if (pattern.matcher(thread.getName()).matches()) { - this.cache.put(thread.getId(), true); - return true; - } - } - this.cache.put(thread.getId(), false); - return false; - }) + .filter(thread -> isThreadIncluded(thread.getId(), thread.getName())) .map(thread -> threadBean.getThreadInfo(thread.getId(), Integer.MAX_VALUE)) .filter(Objects::nonNull) .toArray(ThreadInfo[]::new); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java index 039d4ba..2fd304c 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java @@ -34,7 +34,6 @@ import java.nio.file.Path; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Predicate; /** * Represents a profiling job within async-profiler. @@ -203,18 +202,9 @@ public class AsyncProfilerJob { * Aggregates the collected data. */ public void aggregate(AsyncDataAggregator dataAggregator) { - - Predicate<String> threadFilter; - if (this.threadDumper instanceof ThreadDumper.Specific) { - ThreadDumper.Specific specificDumper = (ThreadDumper.Specific) this.threadDumper; - threadFilter = n -> specificDumper.getThreadNames().contains(n.toLowerCase()); - } else { - threadFilter = n -> true; - } - // read the jfr file produced by async-profiler try (JfrReader reader = new JfrReader(this.outputFile)) { - readSegments(reader, this.sampleCollector, threadFilter, dataAggregator); + readSegments(reader, this.sampleCollector, dataAggregator); } catch (Exception e) { boolean fileExists; try { @@ -241,7 +231,7 @@ public class AsyncProfilerJob { } } - private <E extends JfrReader.Event> void readSegments(JfrReader reader, SampleCollector<E> collector, Predicate<String> threadFilter, AsyncDataAggregator dataAggregator) throws IOException { + private <E extends JfrReader.Event> void readSegments(JfrReader reader, SampleCollector<E> collector, AsyncDataAggregator dataAggregator) throws IOException { List<E> samples = reader.readAllEvents(collector.eventClass()); for (E sample : samples) { String threadName = reader.threads.get((long) sample.tid); @@ -249,7 +239,7 @@ public class AsyncProfilerJob { continue; } - if (!threadFilter.test(threadName)) { + if (!this.threadDumper.isThreadIncluded(sample.tid, threadName)) { continue; } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index ec88677..961c3e9 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -28,6 +28,7 @@ import me.lucko.spark.common.sampler.SamplerMode; import me.lucko.spark.common.sampler.SamplerSettings; import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; import me.lucko.spark.common.tick.TickHook; +import me.lucko.spark.common.util.SparkThreadFactory; import me.lucko.spark.common.ws.ViewerSocket; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; @@ -69,7 +70,10 @@ public class AsyncSampler extends AbstractSampler { this.profilerAccess = AsyncProfilerAccess.getInstance(platform); this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper()); this.scheduler = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-worker-thread").build() + new ThreadFactoryBuilder() + .setNameFormat("spark-async-sampler-worker-thread") + .setUncaughtExceptionHandler(SparkThreadFactory.EXCEPTION_HANDLER) + .build() ); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java index 2e40406..e29619b 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java @@ -29,6 +29,7 @@ import me.lucko.spark.common.sampler.SamplerSettings; import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; import me.lucko.spark.common.tick.TickHook; +import me.lucko.spark.common.util.SparkThreadFactory; import me.lucko.spark.common.ws.ViewerSocket; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; @@ -50,7 +51,10 @@ public class JavaSampler extends AbstractSampler implements Runnable { /** The worker pool for inserting stack nodes */ private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool( - 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build() + 6, new ThreadFactoryBuilder() + .setNameFormat("spark-java-sampler-" + THREAD_ID.getAndIncrement() + "-%d") + .setUncaughtExceptionHandler(SparkThreadFactory.EXCEPTION_HANDLER) + .build() ); /** The main sampling task */ diff --git a/spark-common/src/main/java/me/lucko/spark/common/util/SparkThreadFactory.java b/spark-common/src/main/java/me/lucko/spark/common/util/SparkThreadFactory.java index 156fa0d..42dca12 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/util/SparkThreadFactory.java +++ b/spark-common/src/main/java/me/lucko/spark/common/util/SparkThreadFactory.java @@ -23,7 +23,13 @@ package me.lucko.spark.common.util; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; -public class SparkThreadFactory implements ThreadFactory, Thread.UncaughtExceptionHandler { +public class SparkThreadFactory implements ThreadFactory { + + public static final Thread.UncaughtExceptionHandler EXCEPTION_HANDLER = (t, e) -> { + System.err.println("Uncaught exception thrown by thread " + t.getName()); + e.printStackTrace(); + }; + private static final AtomicInteger poolNumber = new AtomicInteger(1); private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; @@ -36,14 +42,9 @@ public class SparkThreadFactory implements ThreadFactory, Thread.UncaughtExcepti public Thread newThread(Runnable r) { Thread t = new Thread(r, this.namePrefix + this.threadNumber.getAndIncrement()); - t.setUncaughtExceptionHandler(this); + t.setUncaughtExceptionHandler(EXCEPTION_HANDLER); t.setDaemon(true); return t; } - @Override - public void uncaughtException(Thread t, Throwable e) { - System.err.println("Uncaught exception thrown by thread " + t.getName()); - e.printStackTrace(); - } } |