aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko/spark/common/sampler
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2022-09-19 20:08:10 +0100
committerLuck <git@lucko.me>2022-09-19 20:09:07 +0100
commita42dda9eebdc8db6c310978d138708c367f95096 (patch)
tree0bd14e7f0485d3d323854beeea5c0be3b59fdc1c /spark-common/src/main/java/me/lucko/spark/common/sampler
parent7079484d428321c9b3db09394577efda4d591a4e (diff)
downloadspark-a42dda9eebdc8db6c310978d138708c367f95096.tar.gz
spark-a42dda9eebdc8db6c310978d138708c367f95096.tar.bz2
spark-a42dda9eebdc8db6c310978d138708c367f95096.zip
Fix issues with temporary files going missing (#225)
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java2
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java30
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java7
3 files changed, 24 insertions, 15 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index 88b9d91..52a7387 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -98,7 +98,7 @@ public class SamplerBuilder {
Sampler sampler;
if (this.ticksOver != -1 && this.tickHook != null) {
sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
- } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.checkSupported(platform)) {
+ } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.getInstance(platform).checkSupported(platform)) {
sampler = new AsyncSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout);
} else {
sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
index ef2c035..abde21d 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
@@ -22,9 +22,9 @@ package me.lucko.spark.common.sampler.async;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Table;
+import com.google.common.io.ByteStreams;
import me.lucko.spark.common.SparkPlatform;
-import me.lucko.spark.common.util.TemporaryFiles;
import one.profiler.AsyncProfiler;
import one.profiler.Events;
@@ -32,19 +32,29 @@ import one.profiler.Events;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
import java.util.Locale;
+import java.util.Objects;
import java.util.logging.Level;
import java.util.stream.Collectors;
/**
* Provides a bridge between spark and async-profiler.
*/
-public enum AsyncProfilerAccess {
- INSTANCE;
+public class AsyncProfilerAccess {
+ private static AsyncProfilerAccess instance;
+
+ // singleton, needs a SparkPlatform for first init
+ public static synchronized AsyncProfilerAccess getInstance(SparkPlatform platform) {
+ if (instance == null) {
+ Objects.requireNonNull(platform, "platform");
+ instance = new AsyncProfilerAccess(platform);
+ }
+ return instance;
+ }
/** An instance of the async-profiler Java API. */
private final AsyncProfiler profiler;
@@ -55,13 +65,13 @@ public enum AsyncProfilerAccess {
/** If profiler is null, contains the reason why setup failed */
private final Exception setupException;
- AsyncProfilerAccess() {
+ AsyncProfilerAccess(SparkPlatform platform) {
AsyncProfiler profiler;
ProfilingEvent profilingEvent = null;
Exception setupException = null;
try {
- profiler = load();
+ profiler = load(platform);
if (isEventSupported(profiler, ProfilingEvent.CPU, false)) {
profilingEvent = ProfilingEvent.CPU;
} else if (isEventSupported(profiler, ProfilingEvent.WALL, true)) {
@@ -106,7 +116,7 @@ public enum AsyncProfilerAccess {
return this.profiler != null;
}
- private static AsyncProfiler load() throws Exception {
+ private static AsyncProfiler load(SparkPlatform platform) throws Exception {
// check compatibility
String os = System.getProperty("os.name").toLowerCase(Locale.ROOT).replace(" ", "");
String arch = System.getProperty("os.arch").toLowerCase(Locale.ROOT);
@@ -135,10 +145,10 @@ public enum AsyncProfilerAccess {
throw new IllegalStateException("Could not find " + resource + " in spark jar file");
}
- Path extractPath = TemporaryFiles.create("spark-", "-libasyncProfiler.so.tmp");
+ Path extractPath = platform.getTemporaryFiles().create("spark-", "-libasyncProfiler.so.tmp");
- try (InputStream in = profilerResource.openStream()) {
- Files.copy(in, extractPath, StandardCopyOption.REPLACE_EXISTING);
+ try (InputStream in = profilerResource.openStream(); OutputStream out = Files.newOutputStream(extractPath)) {
+ ByteStreams.copy(in, out);
}
// get an instance of async-profiler
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index 37ccd96..7d9cb81 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -31,7 +31,6 @@ import me.lucko.spark.common.sampler.async.jfr.JfrReader;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.node.ThreadNode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
-import me.lucko.spark.common.util.TemporaryFiles;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import one.profiler.AsyncProfiler;
@@ -67,7 +66,7 @@ public class AsyncSampler extends AbstractSampler {
public AsyncSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) {
super(platform, interval, threadDumper, endTime);
- this.profiler = AsyncProfilerAccess.INSTANCE.getProfiler();
+ this.profiler = AsyncProfilerAccess.getInstance(platform).getProfiler();
this.dataAggregator = new AsyncDataAggregator(threadGrouper);
}
@@ -93,12 +92,12 @@ public class AsyncSampler extends AbstractSampler {
super.start();
try {
- this.outputFile = TemporaryFiles.create("spark-profile-", ".jfr.tmp");
+ this.outputFile = this.platform.getTemporaryFiles().create("spark-", "-profile-data.jfr.tmp");
} catch (IOException e) {
throw new RuntimeException("Unable to create temporary output file", e);
}
- String command = "start,event=" + AsyncProfilerAccess.INSTANCE.getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString();
+ String command = "start,event=" + AsyncProfilerAccess.getInstance(this.platform).getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString();
if (this.threadDumper instanceof ThreadDumper.Specific) {
command += ",filter";
}