diff options
author | Luck <git@lucko.me> | 2022-09-19 20:08:10 +0100 |
---|---|---|
committer | Luck <git@lucko.me> | 2022-09-19 20:09:07 +0100 |
commit | a42dda9eebdc8db6c310978d138708c367f95096 (patch) | |
tree | 0bd14e7f0485d3d323854beeea5c0be3b59fdc1c /spark-common/src/main/java/me/lucko/spark/common/sampler | |
parent | 7079484d428321c9b3db09394577efda4d591a4e (diff) | |
download | spark-a42dda9eebdc8db6c310978d138708c367f95096.tar.gz spark-a42dda9eebdc8db6c310978d138708c367f95096.tar.bz2 spark-a42dda9eebdc8db6c310978d138708c367f95096.zip |
Fix issues with temporary files going missing (#225)
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
3 files changed, 24 insertions, 15 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index 88b9d91..52a7387 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -98,7 +98,7 @@ public class SamplerBuilder { Sampler sampler; if (this.ticksOver != -1 && this.tickHook != null) { sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver); - } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.checkSupported(platform)) { + } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.getInstance(platform).checkSupported(platform)) { sampler = new AsyncSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout); } else { sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java index ef2c035..abde21d 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java @@ -22,9 +22,9 @@ package me.lucko.spark.common.sampler.async; import com.google.common.collect.ImmutableTable; import com.google.common.collect.Table; +import com.google.common.io.ByteStreams; import me.lucko.spark.common.SparkPlatform; -import me.lucko.spark.common.util.TemporaryFiles; import one.profiler.AsyncProfiler; import one.profiler.Events; @@ -32,19 +32,29 @@ import one.profiler.Events; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.OutputStream; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardCopyOption; import java.util.Locale; +import java.util.Objects; import java.util.logging.Level; import java.util.stream.Collectors; /** * Provides a bridge between spark and async-profiler. */ -public enum AsyncProfilerAccess { - INSTANCE; +public class AsyncProfilerAccess { + private static AsyncProfilerAccess instance; + + // singleton, needs a SparkPlatform for first init + public static synchronized AsyncProfilerAccess getInstance(SparkPlatform platform) { + if (instance == null) { + Objects.requireNonNull(platform, "platform"); + instance = new AsyncProfilerAccess(platform); + } + return instance; + } /** An instance of the async-profiler Java API. */ private final AsyncProfiler profiler; @@ -55,13 +65,13 @@ public enum AsyncProfilerAccess { /** If profiler is null, contains the reason why setup failed */ private final Exception setupException; - AsyncProfilerAccess() { + AsyncProfilerAccess(SparkPlatform platform) { AsyncProfiler profiler; ProfilingEvent profilingEvent = null; Exception setupException = null; try { - profiler = load(); + profiler = load(platform); if (isEventSupported(profiler, ProfilingEvent.CPU, false)) { profilingEvent = ProfilingEvent.CPU; } else if (isEventSupported(profiler, ProfilingEvent.WALL, true)) { @@ -106,7 +116,7 @@ public enum AsyncProfilerAccess { return this.profiler != null; } - private static AsyncProfiler load() throws Exception { + private static AsyncProfiler load(SparkPlatform platform) throws Exception { // check compatibility String os = System.getProperty("os.name").toLowerCase(Locale.ROOT).replace(" ", ""); String arch = System.getProperty("os.arch").toLowerCase(Locale.ROOT); @@ -135,10 +145,10 @@ public enum AsyncProfilerAccess { throw new IllegalStateException("Could not find " + resource + " in spark jar file"); } - Path extractPath = TemporaryFiles.create("spark-", "-libasyncProfiler.so.tmp"); + Path extractPath = platform.getTemporaryFiles().create("spark-", "-libasyncProfiler.so.tmp"); - try (InputStream in = profilerResource.openStream()) { - Files.copy(in, extractPath, StandardCopyOption.REPLACE_EXISTING); + try (InputStream in = profilerResource.openStream(); OutputStream out = Files.newOutputStream(extractPath)) { + ByteStreams.copy(in, out); } // get an instance of async-profiler diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 37ccd96..7d9cb81 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -31,7 +31,6 @@ import me.lucko.spark.common.sampler.async.jfr.JfrReader; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; -import me.lucko.spark.common.util.TemporaryFiles; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import one.profiler.AsyncProfiler; @@ -67,7 +66,7 @@ public class AsyncSampler extends AbstractSampler { public AsyncSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) { super(platform, interval, threadDumper, endTime); - this.profiler = AsyncProfilerAccess.INSTANCE.getProfiler(); + this.profiler = AsyncProfilerAccess.getInstance(platform).getProfiler(); this.dataAggregator = new AsyncDataAggregator(threadGrouper); } @@ -93,12 +92,12 @@ public class AsyncSampler extends AbstractSampler { super.start(); try { - this.outputFile = TemporaryFiles.create("spark-profile-", ".jfr.tmp"); + this.outputFile = this.platform.getTemporaryFiles().create("spark-", "-profile-data.jfr.tmp"); } catch (IOException e) { throw new RuntimeException("Unable to create temporary output file", e); } - String command = "start,event=" + AsyncProfilerAccess.INSTANCE.getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); + String command = "start,event=" + AsyncProfilerAccess.getInstance(this.platform).getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); if (this.threadDumper instanceof ThreadDumper.Specific) { command += ",filter"; } |