diff options
| author | lucko <git@lucko.me> | 2022-06-09 22:13:58 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-06-09 22:13:58 +0100 |
| commit | 4a16a1a2f4eb09f706b4a541e3d31618de29420b (patch) | |
| tree | cc320ee2e6551f2157a2d54968f8ba14f6713d08 /spark-common/src/main/java/me/lucko/spark/common/sampler | |
| parent | 32ab78c71c5be97da7329a4f7c4035289a3490b1 (diff) | |
| parent | ecc3714e6441ace0eb78156b2b4475ca050280db (diff) | |
| download | spark-4a16a1a2f4eb09f706b4a541e3d31618de29420b.tar.gz spark-4a16a1a2f4eb09f706b4a541e3d31618de29420b.tar.bz2 spark-4a16a1a2f4eb09f706b4a541e3d31618de29420b.zip | |
Merge pull request #213 from embeddedt/forge-1.7.10
Align 1.7.10 with Spark 1.9
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
18 files changed, 282 insertions, 135 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java index bae93b1..ce466a0 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java @@ -20,6 +20,20 @@ package me.lucko.spark.common.sampler; +import me.lucko.spark.common.SparkPlatform; +import me.lucko.spark.common.command.sender.CommandSender; +import me.lucko.spark.common.monitor.memory.GarbageCollectorStatistics; +import me.lucko.spark.common.platform.serverconfig.ServerConfigProvider; +import me.lucko.spark.common.sampler.aggregator.DataAggregator; +import me.lucko.spark.common.sampler.node.MergeMode; +import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.common.util.ClassSourceLookup; +import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; +import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -42,6 +56,9 @@ public abstract class AbstractSampler implements Sampler { /** A future to encapsulate the completion of this sampler instance */ protected final CompletableFuture<Sampler> future = new CompletableFuture<>(); + /** The garbage collector statistics when profiling started */ + protected Map<String, GarbageCollectorStatistics> initialGcStats; + protected AbstractSampler(int interval, ThreadDumper threadDumper, long endTime) { this.interval = interval; this.threadDumper = threadDumper; @@ -65,4 +82,64 @@ public abstract class AbstractSampler implements Sampler { public CompletableFuture<Sampler> getFuture() { return this.future; } + + protected void recordInitialGcStats() { + this.initialGcStats = GarbageCollectorStatistics.pollStats(); + } + + protected Map<String, GarbageCollectorStatistics> getInitialGcStats() { + return this.initialGcStats; + } + + protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) { + SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder() + .setPlatformMetadata(platform.getPlugin().getPlatformInfo().toData().toProto()) + .setCreator(creator.toData().toProto()) + .setStartTime(this.startTime) + .setEndTime(System.currentTimeMillis()) + .setInterval(this.interval) + .setThreadDumper(this.threadDumper.getMetadata()) + .setDataAggregator(dataAggregator.getMetadata()); + + if (comment != null) { + metadata.setComment(comment); + } + + try { + metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats())); + } catch (Exception e) { + e.printStackTrace(); + } + + try { + metadata.setSystemStatistics(platform.getStatisticsProvider().getSystemStatistics()); + } catch (Exception e) { + e.printStackTrace(); + } + + try { + ServerConfigProvider serverConfigProvider = platform.getPlugin().createServerConfigProvider(); + metadata.putAllServerConfigurations(serverConfigProvider.exportServerConfigurations()); + } catch (Exception e) { + e.printStackTrace(); + } + + proto.setMetadata(metadata); + } + + protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, Comparator<ThreadNode> outputOrder, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + List<ThreadNode> data = dataAggregator.exportData(); + data.sort(outputOrder); + + ClassSourceLookup.Visitor classSourceVisitor = ClassSourceLookup.createVisitor(classSourceLookup); + + for (ThreadNode entry : data) { + proto.addThreads(entry.toProto(mergeMode)); + classSourceVisitor.visit(entry); + } + + if (classSourceVisitor.hasMappings()) { + proto.putAllClassSources(classSourceVisitor.getMapping()); + } + } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index b71aaee..845043f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -20,15 +20,14 @@ package me.lucko.spark.common.sampler; +import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.command.sender.CommandSender; -import me.lucko.spark.common.platform.PlatformInfo; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.util.ClassSourceLookup; -import me.lucko.spark.proto.SparkProtos.SamplerData; +import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import java.util.Comparator; -import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -68,6 +67,6 @@ public interface Sampler { CompletableFuture<Sampler> getFuture(); // Methods used to export the sampler data to the web viewer. - SamplerData toProto(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup); + SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java index e99114a..9d54f50 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java @@ -1,7 +1,6 @@ /* * This file is part of spark. * - * Copyright (C) Albert Pham <http://www.sk89q.com> * Copyright (c) lucko (Luck) <luck@lucko.me> * Copyright (c) contributors * @@ -22,7 +21,7 @@ package me.lucko.spark.common.sampler; import me.lucko.spark.common.util.ThreadFinder; -import me.lucko.spark.proto.SparkProtos.SamplerMetadata; +import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java index e63ebc8..9ad84df 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java @@ -20,9 +20,11 @@ package me.lucko.spark.common.sampler; -import me.lucko.spark.proto.SparkProtos.SamplerMetadata; +import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; +import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -42,6 +44,11 @@ public interface ThreadGrouper { } @Override + public String getLabel(String group) { + return group; + } + + @Override public SamplerMetadata.DataAggregator.ThreadGrouper asProto() { return SamplerMetadata.DataAggregator.ThreadGrouper.BY_NAME; } @@ -55,14 +62,18 @@ public interface ThreadGrouper { * separated from the pool name with any of one or more of ' ', '-', or '#'.</p> */ ThreadGrouper BY_POOL = new ThreadGrouper() { + private /* static */ final Pattern pattern = Pattern.compile("^(.*?)[-# ]+\\d+$"); + + // thread id -> group private final Map<Long, String> cache = new ConcurrentHashMap<>(); - private final Pattern pattern = Pattern.compile("^(.*?)[-# ]+\\d+$"); + // group -> thread ids + private final Map<String, Set<Long>> seen = new ConcurrentHashMap<>(); @Override public String getGroup(long threadId, String threadName) { - String group = this.cache.get(threadId); - if (group != null) { - return group; + String cached = this.cache.get(threadId); + if (cached != null) { + return cached; } Matcher matcher = this.pattern.matcher(threadName); @@ -70,12 +81,22 @@ public interface ThreadGrouper { return threadName; } - group = matcher.group(1).trim() + " (Combined)"; - this.cache.put(threadId, group); // we don't care about race conditions here + String group = matcher.group(1).trim(); + this.cache.put(threadId, group); + this.seen.computeIfAbsent(group, g -> ConcurrentHashMap.newKeySet()).add(threadId); return group; } @Override + public String getLabel(String group) { + int count = this.seen.getOrDefault(group, Collections.emptySet()).size(); + if (count == 0) { + return group; + } + return group + " (x" + count + ")"; + } + + @Override public SamplerMetadata.DataAggregator.ThreadGrouper asProto() { return SamplerMetadata.DataAggregator.ThreadGrouper.BY_POOL; } @@ -86,9 +107,17 @@ public interface ThreadGrouper { * the name "All". */ ThreadGrouper AS_ONE = new ThreadGrouper() { + private final Set<Long> seen = ConcurrentHashMap.newKeySet(); + @Override public String getGroup(long threadId, String threadName) { - return "All"; + this.seen.add(threadId); + return "root"; + } + + @Override + public String getLabel(String group) { + return "All (x" + this.seen.size() + ")"; } @Override @@ -106,6 +135,14 @@ public interface ThreadGrouper { */ String getGroup(long threadId, String threadName); + /** + * Gets the label to use for a given group. + * + * @param group the group + * @return the label + */ + String getLabel(String group); + SamplerMetadata.DataAggregator.ThreadGrouper asProto(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java index 4fa8ff4..adcedcd 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java @@ -23,20 +23,19 @@ package me.lucko.spark.common.sampler; import me.lucko.spark.common.sampler.node.ThreadNode; import java.util.Comparator; -import java.util.Map; /** * Methods of ordering {@link ThreadNode}s in the output data. */ -public enum ThreadNodeOrder implements Comparator<Map.Entry<String, ThreadNode>> { +public enum ThreadNodeOrder implements Comparator<ThreadNode> { /** * Order by the name of the thread (alphabetically) */ BY_NAME { @Override - public int compare(Map.Entry<String, ThreadNode> o1, Map.Entry<String, ThreadNode> o2) { - return o1.getKey().compareTo(o2.getKey()); + public int compare(ThreadNode o1, ThreadNode o2) { + return o1.getThreadLabel().compareTo(o2.getThreadLabel()); } }, @@ -45,8 +44,8 @@ public enum ThreadNodeOrder implements Comparator<Map.Entry<String, ThreadNode>> */ BY_TIME { @Override - public int compare(Map.Entry<String, ThreadNode> o1, Map.Entry<String, ThreadNode> o2) { - return -Double.compare(o1.getValue().getTotalTime(), o2.getValue().getTotalTime()); + public int compare(ThreadNode o1, ThreadNode o2) { + return -Double.compare(o1.getTotalTime(), o2.getTotalTime()); } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java index 7640d60..ad9dee4 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java @@ -23,6 +23,8 @@ package me.lucko.spark.common.sampler.aggregator; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.node.ThreadNode; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -50,7 +52,11 @@ public abstract class AbstractDataAggregator implements DataAggregator { } @Override - public Map<String, ThreadNode> getData() { - return this.threadData; + public List<ThreadNode> exportData() { + List<ThreadNode> data = new ArrayList<>(this.threadData.values()); + for (ThreadNode node : data) { + node.setThreadLabel(this.threadGrouper.getLabel(node.getThreadGroup())); + } + return data; } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java index 8b90639..5590a96 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java @@ -21,9 +21,9 @@ package me.lucko.spark.common.sampler.aggregator; import me.lucko.spark.common.sampler.node.ThreadNode; -import me.lucko.spark.proto.SparkProtos.SamplerMetadata; +import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; -import java.util.Map; +import java.util.List; /** * Aggregates sampling data. @@ -35,7 +35,7 @@ public interface DataAggregator { * * @return the output data */ - Map<String, ThreadNode> getData(); + List<ThreadNode> exportData(); /** * Gets metadata about the data aggregator instance. diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java index 594d56e..3de3943 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java @@ -24,7 +24,7 @@ import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.aggregator.AbstractDataAggregator; import me.lucko.spark.common.sampler.node.StackTraceNode; import me.lucko.spark.common.sampler.node.ThreadNode; -import me.lucko.spark.proto.SparkProtos.SamplerMetadata; +import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; /** * Data aggregator for {@link AsyncSampler}. diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java index f1d7209..d642a53 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java @@ -20,13 +20,14 @@ package me.lucko.spark.common.sampler.async; -import com.google.common.collect.ImmutableSetMultimap; -import com.google.common.collect.Multimap; +import com.google.common.collect.ImmutableTable; +import com.google.common.collect.Table; import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.util.TemporaryFiles; import one.profiler.AsyncProfiler; +import one.profiler.Events; import java.io.InputStream; import java.net.URL; @@ -45,22 +46,31 @@ public enum AsyncProfilerAccess { /** An instance of the async-profiler Java API. */ private final AsyncProfiler profiler; + /** The event to use for profiling */ + private final ProfilingEvent profilingEvent; + /** If profiler is null, contains the reason why setup failed */ private final Exception setupException; AsyncProfilerAccess() { AsyncProfiler profiler; + ProfilingEvent profilingEvent = null; Exception setupException = null; try { profiler = load(); - ensureCpuEventSupported(profiler); + if (isEventSupported(profiler, ProfilingEvent.CPU, false)) { + profilingEvent = ProfilingEvent.CPU; + } else if (isEventSupported(profiler, ProfilingEvent.WALL, true)) { + profilingEvent = ProfilingEvent.WALL; + } } catch (Exception e) { profiler = null; setupException = e; } this.profiler = profiler; + this.profilingEvent = profilingEvent; this.setupException = setupException; } @@ -71,11 +81,18 @@ public enum AsyncProfilerAccess { return this.profiler; } + public ProfilingEvent getProfilingEvent() { + return this.profilingEvent; + } + public boolean checkSupported(SparkPlatform platform) { if (this.setupException != null) { if (this.setupException instanceof UnsupportedSystemException) { platform.getPlugin().log(Level.INFO, "The async-profiler engine is not supported for your os/arch (" + this.setupException.getMessage() + "), so the built-in Java engine will be used instead."); + } else if (this.setupException instanceof NativeLoadingException && this.setupException.getCause().getMessage().contains("libstdc++")) { + platform.getPlugin().log(Level.WARNING, "Unable to initialise the async-profiler engine because libstdc++ is not installed."); + platform.getPlugin().log(Level.WARNING, "Please see here for more information: https://spark.lucko.me/docs/misc/Using-async-profiler#install-libstdc"); } else { platform.getPlugin().log(Level.WARNING, "Unable to initialise the async-profiler engine: " + this.setupException.getMessage()); platform.getPlugin().log(Level.WARNING, "Please see here for more information: https://spark.lucko.me/docs/misc/Using-async-profiler"); @@ -91,18 +108,20 @@ public enum AsyncProfilerAccess { String os = System.getProperty("os.name").toLowerCase(Locale.ROOT).replace(" ", ""); String arch = System.getProperty("os.arch").toLowerCase(Locale.ROOT); - Multimap<String, String> supported = ImmutableSetMultimap.<String, String>builder() - .put("linux", "amd64") - .put("macosx", "amd64") - .put("macosx", "aarch64") + Table<String, String, String> supported = ImmutableTable.<String, String, String>builder() + .put("linux", "amd64", "linux/amd64") + .put("linux", "aarch64", "linux/aarch64") + .put("macosx", "amd64", "macos") + .put("macosx", "aarch64", "macos") .build(); - if (!supported.containsEntry(os, arch)) { + String libPath = supported.get(os, arch); + if (libPath == null) { throw new UnsupportedSystemException(os, arch); } // extract the profiler binary from the spark jar file - String resource = os + "/libasyncProfiler.so"; + String resource = "spark/" + libPath + "/libasyncProfiler.so"; URL profilerResource = AsyncProfilerAccess.class.getClassLoader().getResource(resource); if (profilerResource == null) { throw new IllegalStateException("Could not find " + resource + " in spark jar file"); @@ -118,7 +137,7 @@ public enum AsyncProfilerAccess { try { return AsyncProfiler.getInstance(extractPath.toAbsolutePath().toString()); } catch (UnsatisfiedLinkError e) { - throw new RuntimeException("A runtime error occurred whilst loading the native library", e); + throw new NativeLoadingException(e); } } @@ -126,12 +145,37 @@ public enum AsyncProfilerAccess { * Checks the {@code profiler} to ensure the CPU event is supported. * * @param profiler the profiler instance - * @throws Exception if the event is not supported + * @return if the event is supported */ - private static void ensureCpuEventSupported(AsyncProfiler profiler) throws Exception { - String resp = profiler.execute("check,event=cpu").trim(); - if (!resp.equalsIgnoreCase("ok")) { - throw new UnsupportedOperationException("CPU event is not supported"); + private static boolean isEventSupported(AsyncProfiler profiler, ProfilingEvent event, boolean throwException) { + try { + String resp = profiler.execute("check,event=" + event).trim(); + if (resp.equalsIgnoreCase("ok")) { + return true; + } else if (throwException) { + throw new IllegalArgumentException(resp); + } + } catch (Exception e) { + if (throwException) { + throw new RuntimeException("Event " + event + " is not supported", e); + } + } + return false; + } + + enum ProfilingEvent { + CPU(Events.CPU), + WALL(Events.WALL); + + private final String id; + + ProfilingEvent(String id) { + this.id = id; + } + + @Override + public String toString() { + return this.id; } } @@ -140,4 +184,10 @@ public enum AsyncProfilerAccess { super(os + '/' + arch); } } + + private static final class NativeLoadingException extends RuntimeException { + public NativeLoadingException(Throwable cause) { + super("A runtime error occurred whilst loading the native library", cause); + } + } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 1837cbc..5cb7fdc 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -22,8 +22,8 @@ package me.lucko.spark.common.sampler.async; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.command.sender.CommandSender; -import me.lucko.spark.common.platform.PlatformInfo; import me.lucko.spark.common.sampler.AbstractSampler; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.ThreadGrouper; @@ -32,7 +32,7 @@ import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.util.ClassSourceLookup; import me.lucko.spark.common.util.TemporaryFiles; -import me.lucko.spark.proto.SparkProtos; +import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import one.profiler.AsyncProfiler; @@ -40,10 +40,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -100,7 +98,7 @@ public class AsyncSampler extends AbstractSampler { throw new RuntimeException("Unable to create temporary output file", e); } - String command = "start,event=cpu,interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); + String command = "start,event=" + AsyncProfilerAccess.INSTANCE.getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); if (this.threadDumper instanceof ThreadDumper.Specific) { command += ",filter"; } @@ -117,6 +115,7 @@ public class AsyncSampler extends AbstractSampler { } } + recordInitialGcStats(); scheduleTimeout(); } @@ -145,7 +144,14 @@ public class AsyncSampler extends AbstractSampler { */ @Override public void stop() { - this.profiler.stop(); + try { + this.profiler.stop(); + } catch (IllegalStateException e) { + if (!e.getMessage().equals("Profiler is not active")) { // ignore + throw e; + } + } + if (this.timeoutExecutor != null) { this.timeoutExecutor.shutdown(); @@ -154,38 +160,11 @@ public class AsyncSampler extends AbstractSampler { } @Override - public SparkProtos.SamplerData toProto(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { - final SparkProtos.SamplerMetadata.Builder metadata = SparkProtos.SamplerMetadata.newBuilder() - .setPlatformMetadata(platformInfo.toData().toProto()) - .setCreator(creator.toData().toProto()) - .setStartTime(this.startTime) - .setInterval(this.interval) - .setThreadDumper(this.threadDumper.getMetadata()) - .setDataAggregator(this.dataAggregator.getMetadata()); - - if (comment != null) { - metadata.setComment(comment); - } - - SparkProtos.SamplerData.Builder proto = SparkProtos.SamplerData.newBuilder(); - proto.setMetadata(metadata.build()); - + public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + SamplerData.Builder proto = SamplerData.newBuilder(); + writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator); aggregateOutput(); - - List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet()); - data.sort(outputOrder); - - ClassSourceLookup.Visitor classSourceVisitor = ClassSourceLookup.createVisitor(classSourceLookup); - - for (Map.Entry<String, ThreadNode> entry : data) { - proto.addThreads(entry.getValue().toProto(mergeMode)); - classSourceVisitor.visit(entry.getValue()); - } - - if (classSourceVisitor.hasMappings()) { - proto.putAllClassSources(classSourceVisitor.getMapping()); - } - + writeDataToProto(proto, this.dataAggregator, outputOrder, mergeMode, classSourceLookup); return proto.build(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java index a705f2d..e0cc4e9 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java @@ -59,6 +59,7 @@ public class JfrReader implements Closeable { public final Dictionary<AsyncStackTraceElement> stackFrames = new Dictionary<>(); // spark public final Map<Integer, String> frameTypes = new HashMap<>(); public final Map<Integer, String> threadStates = new HashMap<>(); + public final Map<String, String> settings = new HashMap<>(); private int executionSample; private int nativeMethodSample; @@ -67,6 +68,8 @@ public class JfrReader implements Closeable { private int allocationSample; private int monitorEnter; private int threadPark; + private int activeSetting; + private boolean activeSettingHasStack; public JfrReader(Path path) throws IOException { // spark - Path instead of String this.ch = FileChannel.open(path, StandardOpenOption.READ); // spark - Path instead of String @@ -129,6 +132,8 @@ public class JfrReader implements Closeable { if (cls == null || cls == ContendedLock.class) return (E) readContendedLock(false); } else if (type == threadPark) { if (cls == null || cls == ContendedLock.class) return (E) readContendedLock(true); + } else if (type == activeSetting) { + readActiveSetting(); } if ((pos += size) <= buf.limit()) { @@ -170,6 +175,17 @@ public class JfrReader implements Closeable { return new ContendedLock(time, tid, stackTraceId, duration, classId); } + private void readActiveSetting() { + long time = getVarlong();< |
