aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko/spark
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2022-09-19 20:08:10 +0100
committerLuck <git@lucko.me>2022-09-19 20:09:07 +0100
commita42dda9eebdc8db6c310978d138708c367f95096 (patch)
tree0bd14e7f0485d3d323854beeea5c0be3b59fdc1c /spark-common/src/main/java/me/lucko/spark
parent7079484d428321c9b3db09394577efda4d591a4e (diff)
downloadspark-a42dda9eebdc8db6c310978d138708c367f95096.tar.gz
spark-a42dda9eebdc8db6c310978d138708c367f95096.tar.bz2
spark-a42dda9eebdc8db6c310978d138708c367f95096.zip
Fix issues with temporary files going missing (#225)
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java8
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java2
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java30
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java7
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/util/TemporaryFiles.java81
5 files changed, 103 insertions, 25 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java
index 1969206..2790a3c 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java
@@ -89,6 +89,7 @@ public class SparkPlatform {
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH.mm.ss");
private final SparkPlugin plugin;
+ private final TemporaryFiles temporaryFiles;
private final Configuration configuration;
private final String viewerUrl;
private final BytebinClient bytebinClient;
@@ -109,6 +110,7 @@ public class SparkPlatform {
public SparkPlatform(SparkPlugin plugin) {
this.plugin = plugin;
+ this.temporaryFiles = new TemporaryFiles(this.plugin.getPluginDirectory().resolve("tmp"));
this.configuration = new Configuration(this.plugin.getPluginDirectory().resolve("config.json"));
this.viewerUrl = this.configuration.getString("viewerUrl", "https://spark.lucko.me/");
@@ -192,13 +194,17 @@ public class SparkPlatform {
SparkApi.unregister();
- TemporaryFiles.deleteTemporaryFiles();
+ this.temporaryFiles.deleteTemporaryFiles();
}
public SparkPlugin getPlugin() {
return this.plugin;
}
+ public TemporaryFiles getTemporaryFiles() {
+ return this.temporaryFiles;
+ }
+
public Configuration getConfiguration() {
return this.configuration;
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index 88b9d91..52a7387 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -98,7 +98,7 @@ public class SamplerBuilder {
Sampler sampler;
if (this.ticksOver != -1 && this.tickHook != null) {
sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
- } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.checkSupported(platform)) {
+ } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.getInstance(platform).checkSupported(platform)) {
sampler = new AsyncSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout);
} else {
sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
index ef2c035..abde21d 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
@@ -22,9 +22,9 @@ package me.lucko.spark.common.sampler.async;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Table;
+import com.google.common.io.ByteStreams;
import me.lucko.spark.common.SparkPlatform;
-import me.lucko.spark.common.util.TemporaryFiles;
import one.profiler.AsyncProfiler;
import one.profiler.Events;
@@ -32,19 +32,29 @@ import one.profiler.Events;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
import java.util.Locale;
+import java.util.Objects;
import java.util.logging.Level;
import java.util.stream.Collectors;
/**
* Provides a bridge between spark and async-profiler.
*/
-public enum AsyncProfilerAccess {
- INSTANCE;
+public class AsyncProfilerAccess {
+ private static AsyncProfilerAccess instance;
+
+ // singleton, needs a SparkPlatform for first init
+ public static synchronized AsyncProfilerAccess getInstance(SparkPlatform platform) {
+ if (instance == null) {
+ Objects.requireNonNull(platform, "platform");
+ instance = new AsyncProfilerAccess(platform);
+ }
+ return instance;
+ }
/** An instance of the async-profiler Java API. */
private final AsyncProfiler profiler;
@@ -55,13 +65,13 @@ public enum AsyncProfilerAccess {
/** If profiler is null, contains the reason why setup failed */
private final Exception setupException;
- AsyncProfilerAccess() {
+ AsyncProfilerAccess(SparkPlatform platform) {
AsyncProfiler profiler;
ProfilingEvent profilingEvent = null;
Exception setupException = null;
try {
- profiler = load();
+ profiler = load(platform);
if (isEventSupported(profiler, ProfilingEvent.CPU, false)) {
profilingEvent = ProfilingEvent.CPU;
} else if (isEventSupported(profiler, ProfilingEvent.WALL, true)) {
@@ -106,7 +116,7 @@ public enum AsyncProfilerAccess {
return this.profiler != null;
}
- private static AsyncProfiler load() throws Exception {
+ private static AsyncProfiler load(SparkPlatform platform) throws Exception {
// check compatibility
String os = System.getProperty("os.name").toLowerCase(Locale.ROOT).replace(" ", "");
String arch = System.getProperty("os.arch").toLowerCase(Locale.ROOT);
@@ -135,10 +145,10 @@ public enum AsyncProfilerAccess {
throw new IllegalStateException("Could not find " + resource + " in spark jar file");
}
- Path extractPath = TemporaryFiles.create("spark-", "-libasyncProfiler.so.tmp");
+ Path extractPath = platform.getTemporaryFiles().create("spark-", "-libasyncProfiler.so.tmp");
- try (InputStream in = profilerResource.openStream()) {
- Files.copy(in, extractPath, StandardCopyOption.REPLACE_EXISTING);
+ try (InputStream in = profilerResource.openStream(); OutputStream out = Files.newOutputStream(extractPath)) {
+ ByteStreams.copy(in, out);
}
// get an instance of async-profiler
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index 37ccd96..7d9cb81 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -31,7 +31,6 @@ import me.lucko.spark.common.sampler.async.jfr.JfrReader;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.node.ThreadNode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
-import me.lucko.spark.common.util.TemporaryFiles;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import one.profiler.AsyncProfiler;
@@ -67,7 +66,7 @@ public class AsyncSampler extends AbstractSampler {
public AsyncSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) {
super(platform, interval, threadDumper, endTime);
- this.profiler = AsyncProfilerAccess.INSTANCE.getProfiler();
+ this.profiler = AsyncProfilerAccess.getInstance(platform).getProfiler();
this.dataAggregator = new AsyncDataAggregator(threadGrouper);
}
@@ -93,12 +92,12 @@ public class AsyncSampler extends AbstractSampler {
super.start();
try {
- this.outputFile = TemporaryFiles.create("spark-profile-", ".jfr.tmp");
+ this.outputFile = this.platform.getTemporaryFiles().create("spark-", "-profile-data.jfr.tmp");
} catch (IOException e) {
throw new RuntimeException("Unable to create temporary output file", e);
}
- String command = "start,event=" + AsyncProfilerAccess.INSTANCE.getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString();
+ String command = "start,event=" + AsyncProfilerAccess.getInstance(this.platform).getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString();
if (this.threadDumper instanceof ThreadDumper.Specific) {
command += ",filter";
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/util/TemporaryFiles.java b/spark-common/src/main/java/me/lucko/spark/common/util/TemporaryFiles.java
index 8a4a621..91a474c 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/util/TemporaryFiles.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/util/TemporaryFiles.java
@@ -20,10 +20,18 @@
package me.lucko.spark.common.util;
+import com.google.common.collect.ImmutableList;
+
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
@@ -32,23 +40,47 @@ import java.util.Set;
* Utility for handling temporary files.
*/
public final class TemporaryFiles {
- private TemporaryFiles() {}
- private static final Set<Path> DELETE_SET = Collections.synchronizedSet(new HashSet<>());
+ public static final FileAttribute<?>[] OWNER_ONLY_FILE_PERMISSIONS;
+
+ static {
+ boolean isPosix = FileSystems.getDefault().supportedFileAttributeViews().contains("posix");
+ if (isPosix) {
+ OWNER_ONLY_FILE_PERMISSIONS = new FileAttribute[]{PosixFilePermissions.asFileAttribute(EnumSet.of(
+ PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE
+ ))};
+ } else {
+ OWNER_ONLY_FILE_PERMISSIONS = new FileAttribute[0];
+ }
+ }
+
+ private final Path tmpDirectory;
+ private final Set<Path> files = Collections.synchronizedSet(new HashSet<>());
- public static Path create(String prefix, String suffix) throws IOException {
- return register(Files.createTempFile(prefix, suffix));
+ public TemporaryFiles(Path tmpDirectory) {
+ this.tmpDirectory = tmpDirectory;
}
- public static Path register(Path path) {
+ public Path create(String prefix, String suffix) throws IOException {
+ Path file;
+ if (ensureDirectoryIsReady()) {
+ String name = prefix + Long.toHexString(System.nanoTime()) + suffix;
+ file = Files.createFile(this.tmpDirectory.resolve(name), OWNER_ONLY_FILE_PERMISSIONS);
+ } else {
+ file = Files.createTempFile(prefix, suffix);
+ }
+ return register(file);
+ }
+
+ public Path register(Path path) {
path.toFile().deleteOnExit();
- DELETE_SET.add(path);
+ this.files.add(path);
return path;
}
- public static void deleteTemporaryFiles() {
- synchronized (DELETE_SET) {
- for (Iterator<Path> iterator = DELETE_SET.iterator(); iterator.hasNext(); ) {
+ public void deleteTemporaryFiles() {
+ synchronized (this.files) {
+ for (Iterator<Path> iterator = this.files.iterator(); iterator.hasNext(); ) {
Path path = iterator.next();
try {
Files.deleteIfExists(path);
@@ -60,4 +92,35 @@ public final class TemporaryFiles {
}
}
+ private boolean ensureDirectoryIsReady() {
+ if (Boolean.parseBoolean(System.getProperty("spark.useOsTmpDir", "false"))) {
+ return false;
+ }
+
+ if (Files.isDirectory(this.tmpDirectory)) {
+ return true;
+ }
+
+ try {
+ Files.createDirectories(this.tmpDirectory);
+
+ Files.write(this.tmpDirectory.resolve("about.txt"), ImmutableList.of(
+ "# What is this directory?",
+ "",
+ "* In order to perform certain functions, spark sometimes needs to write temporary data to the disk. ",
+ "* Previously, a temporary directory provided by the operating system was used for this purpose. ",
+ "* However, this proved to be unreliable in some circumstances, so spark now stores temporary data here instead!",
+ "",
+ "spark will automatically cleanup the contents of this directory. " ,
+ "(but if for some reason it doesn't, if the server is stopped, you can freely delete any files ending in .tmp)",
+ "",
+ "tl;dr: spark uses this folder to store some temporary data."
+ ), StandardCharsets.UTF_8);
+
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
}