aboutsummaryrefslogtreecommitdiff
path: root/spark-common
diff options
context:
space:
mode:
Diffstat (limited to 'spark-common')
-rw-r--r--spark-common/build.gradle5
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/SparkPlugin.java9
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/api/SparkApi.java4
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java2
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java5
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java108
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java8
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java6
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java454
-rwxr-xr-xspark-common/src/main/resources/libasyncProfiler.sobin314098 -> 0 bytes
-rwxr-xr-xspark-common/src/main/resources/linux/libasyncProfiler.sobin0 -> 398099 bytes
-rwxr-xr-xspark-common/src/main/resources/macosx/libasyncProfiler.sobin0 -> 599568 bytes
12 files changed, 457 insertions, 144 deletions
diff --git a/spark-common/build.gradle b/spark-common/build.gradle
index 1243fd8..526c6e1 100644
--- a/spark-common/build.gradle
+++ b/spark-common/build.gradle
@@ -4,7 +4,7 @@ plugins {
dependencies {
api project(':spark-api')
- implementation 'com.github.jvm-profiling-tools:async-profiler:v2.0'
+ implementation 'com.github.jvm-profiling-tools:async-profiler:v2.5'
implementation 'org.ow2.asm:asm:9.1'
implementation 'com.google.protobuf:protobuf-javalite:3.15.6'
implementation 'com.squareup.okhttp3:okhttp:3.14.1'
@@ -32,7 +32,8 @@ dependencies {
processResources {
from(sourceSets.main.resources.srcDirs) {
- include 'libasyncProfiler.so'
+ include 'linux/libasyncProfiler.so'
+ include 'macosx/libasyncProfiler.so'
}
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/SparkPlugin.java b/spark-common/src/main/java/me/lucko/spark/common/SparkPlugin.java
index aa5112d..f312916 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/SparkPlugin.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/SparkPlugin.java
@@ -29,6 +29,7 @@ import me.lucko.spark.common.tick.TickReporter;
import me.lucko.spark.common.util.ClassSourceLookup;
import java.nio.file.Path;
+import java.util.logging.Level;
import java.util.stream.Stream;
/**
@@ -72,6 +73,14 @@ public interface SparkPlugin {
void executeAsync(Runnable task);
/**
+ * Print to the plugin logger.
+ *
+ * @param level the log level
+ * @param msg the message
+ */
+ void log(Level level, String msg);
+
+ /**
* Gets the default {@link ThreadDumper} to be used by the plugin.
*
* @return the default thread dumper
diff --git a/spark-common/src/main/java/me/lucko/spark/common/api/SparkApi.java b/spark-common/src/main/java/me/lucko/spark/common/api/SparkApi.java
index 5ac41fc..d0fcb0f 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/api/SparkApi.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/api/SparkApi.java
@@ -25,10 +25,10 @@ import com.google.common.collect.ImmutableMap;
import me.lucko.spark.api.Spark;
import me.lucko.spark.api.SparkProvider;
import me.lucko.spark.api.gc.GarbageCollector;
+import me.lucko.spark.api.statistic.StatisticWindow;
import me.lucko.spark.api.statistic.misc.DoubleAverageInfo;
import me.lucko.spark.api.statistic.types.DoubleStatistic;
import me.lucko.spark.api.statistic.types.GenericStatistic;
-import me.lucko.spark.api.statistic.StatisticWindow;
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.monitor.cpu.CpuMonitor;
import me.lucko.spark.common.monitor.memory.GarbageCollectorStatistics;
@@ -37,9 +37,7 @@ import me.lucko.spark.common.monitor.tick.TickStatistics;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.sql.Ref;
import java.util.HashMap;
import java.util.Map;
diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
index 4147de3..d45c7af 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
@@ -217,7 +217,7 @@ public class SamplerModule implements CommandModule {
if (ticksOver != -1) {
builder.ticksOver(ticksOver, tickHook);
}
- Sampler sampler = this.activeSampler = builder.start();
+ Sampler sampler = this.activeSampler = builder.start(platform);
resp.broadcastPrefixed(text()
.append(text("Profiler now active!", GOLD))
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index f0ed533..88cf018 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -20,6 +20,7 @@
package me.lucko.spark.common.sampler;
+import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.sampler.async.AsyncProfilerAccess;
import me.lucko.spark.common.sampler.async.AsyncSampler;
import me.lucko.spark.common.sampler.java.JavaSampler;
@@ -91,13 +92,13 @@ public class SamplerBuilder {
return this;
}
- public Sampler start() {
+ public Sampler start(SparkPlatform platform) {
int intervalMicros = (int) (this.samplingInterval * 1000d);
Sampler sampler;
if (this.ticksOver != -1 && this.tickHook != null) {
sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
- } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.isSupported()) {
+ } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.checkSupported(platform)) {
sampler = new AsyncSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout);
} else {
sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
index 2ed1aa4..2506db3 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
@@ -20,6 +20,10 @@
package me.lucko.spark.common.sampler.async;
+import com.google.common.collect.ImmutableSetMultimap;
+import com.google.common.collect.Multimap;
+
+import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.util.TemporaryFiles;
import one.profiler.AsyncProfiler;
@@ -29,35 +33,72 @@ import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
+import java.util.Locale;
+import java.util.logging.Level;
/**
* Provides a bridge between spark and async-profiler.
*/
-public final class AsyncProfilerAccess {
+public enum AsyncProfilerAccess {
+ INSTANCE;
+
+ /** An instance of the async-profiler Java API. */
+ private final AsyncProfiler profiler;
+
+ /** If profiler is null, contains the reason why setup failed */
+ private final Exception setupException;
+
+ AsyncProfilerAccess() {
+ AsyncProfiler profiler;
+ Exception setupException = null;
+
+ try {
+ profiler = load();
+ ensureCpuEventSupported(profiler);
+ } catch (Exception e) {
+ profiler = null;
+ setupException = e;
+ }
- // Singleton
- public static final AsyncProfilerAccess INSTANCE = new AsyncProfilerAccess();
+ this.profiler = profiler;
+ this.setupException = setupException;
+ }
- // Only support Linux x86_64
- private static final String SUPPORTED_OS = "linux";
- private static final String SUPPORTED_ARCH = "amd64";
+ public AsyncProfiler getProfiler() {
+ if (this.profiler == null) {
+ throw new UnsupportedOperationException("async-profiler not supported", this.setupException);
+ }
+ return this.profiler;
+ }
+
+ public boolean checkSupported(SparkPlatform platform) {
+ if (this.setupException != null) {
+ platform.getPlugin().log(Level.INFO, "async-profiler engine is not supported on your system: " + this.setupException.getMessage());
+ platform.getPlugin().log(Level.INFO, "Please see here for more information: https://spark.lucko.me/docs/misc/Using-async-profiler");
+ }
+ return this.profiler != null;
+ }
private static AsyncProfiler load() throws Exception {
// check compatibility
- String os = System.getProperty("os.name");
- if (!SUPPORTED_OS.equalsIgnoreCase(os)) {
- throw new UnsupportedOperationException("Only supported on Linux x86_64, your OS: " + os);
- }
+ String os = System.getProperty("os.name").toLowerCase(Locale.ROOT).replace(" ", "");
+ String arch = System.getProperty("os.arch").toLowerCase(Locale.ROOT);
+
+ Multimap<String, String> supported = ImmutableSetMultimap.<String, String>builder()
+ .put("linux", "amd64")
+ .put("macosx", "amd64")
+ .put("macosx", "aarch64")
+ .build();
- String arch = System.getProperty("os.arch");
- if (!SUPPORTED_ARCH.equalsIgnoreCase(arch)) {
- throw new UnsupportedOperationException("Only supported on Linux x86_64, your arch: " + os);
+ if (!supported.containsEntry(os, arch)) {
+ throw new UnsupportedOperationException("Not supported for your os/arch: " + os + '/' + arch);
}
// extract the profiler binary from the spark jar file
- URL profilerResource = AsyncProfilerAccess.class.getClassLoader().getResource("libasyncProfiler.so");
+ String resource = os + "/libasyncProfiler.so";
+ URL profilerResource = AsyncProfilerAccess.class.getClassLoader().getResource(resource);
if (profilerResource == null) {
- throw new IllegalStateException("Could not find libasyncProfiler.so in spark jar file");
+ throw new IllegalStateException("Could not find " + resource + " in spark jar file");
}
Path extractPath = TemporaryFiles.create("spark-", "-libasyncProfiler.so.tmp");
@@ -74,28 +115,6 @@ public final class AsyncProfilerAccess {
}
}
- /** An instance of the async-profiler Java API. */
- private final AsyncProfiler profiler;
-
- /** If profiler is null, contains the reason why setup failed */
- private final Exception setupException;
-
- private AsyncProfilerAccess() {
- AsyncProfiler profiler;
- Exception setupException = null;
-
- try {
- profiler = load();
- ensureCpuEventSupported(profiler);
- } catch (Exception e) {
- profiler = null;
- setupException = e;
- }
-
- this.profiler = profiler;
- this.setupException = setupException;
- }
-
/**
* Checks the {@code profiler} to ensure the CPU event is supported.
*
@@ -108,19 +127,4 @@ public final class AsyncProfilerAccess {
throw new UnsupportedOperationException("CPU event is not supported");
}
}
-
- public AsyncProfiler getProfiler() {
- if (this.profiler == null) {
- throw new UnsupportedOperationException("async-profiler not supported", this.setupException);
- }
- return this.profiler;
- }
-
- public boolean isSupported() {
- if (this.setupException != null) {
- System.out.println("[spark] async-profiler engine is not supported on your system: " + this.setupException.getMessage());
- System.out.println("[spark] Please see here for more information: https://spark.lucko.me/docs/misc/Using-async-profiler");
- }
- return this.profiler != null;
- }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index 4f3b3e4..1837cbc 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -218,10 +218,10 @@ public class AsyncSampler extends AbstractSampler {
}
}
- private void readSegments(JfrReader reader, Predicate<String> threadFilter) {
- List<JfrReader.Sample> samples = reader.samples;
+ private void readSegments(JfrReader reader, Predicate<String> threadFilter) throws IOException {
+ List<JfrReader.ExecutionSample> samples = reader.readAllEvents(JfrReader.ExecutionSample.class);
for (int i = 0; i < samples.size(); i++) {
- JfrReader.Sample sample = samples.get(i);
+ JfrReader.ExecutionSample sample = samples.get(i);
long duration;
if (i == 0) {
@@ -245,7 +245,7 @@ public class AsyncSampler extends AbstractSampler {
}
}
- private static ProfileSegment parseSegment(JfrReader reader, JfrReader.Sample sample, String threadName, long duration) {
+ private static ProfileSegment parseSegment(JfrReader reader, JfrReader.ExecutionSample sample, String threadName, long duration) {
JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId);
int len = stackTrace.methods.length;
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java
index 38d1b00..23223a2 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java
@@ -31,6 +31,12 @@ public class Dictionary<T> {
this.values = new Object[INITIAL_CAPACITY];
}
+ public void clear() {
+ keys = new long[INITIAL_CAPACITY];
+ values = new Object[INITIAL_CAPACITY];
+ size = 0;
+ }
+
public void put(long key, T value) {
if (key == 0) {
throw new IllegalArgumentException("Zero key not allowed");
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java
index 95c9bad..a705f2d 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java
@@ -35,17 +35,19 @@ import java.util.Map;
* Parses JFR output produced by async-profiler.
*/
public class JfrReader implements Closeable {
+ private static final int BUFFER_SIZE = 2 * 1024 * 1024;
private static final int CHUNK_HEADER_SIZE = 68;
- private static final int CPOOL_OFFSET = 16;
- private static final int META_OFFSET = 24;
+ private static final int CHUNK_SIGNATURE = 0x464c5200;
private final FileChannel ch;
- private final ByteBuffer buf;
+ private ByteBuffer buf;
+ private long filePosition;
- public final long startNanos;
- public final long durationNanos;
- public final long startTicks;
- public final long ticksPerSec;
+ public boolean incomplete;
+ public long startNanos = Long.MAX_VALUE;
+ public long endNanos = Long.MIN_VALUE;
+ public long startTicks = Long.MAX_VALUE;
+ public long ticksPerSec;
public final Dictionary<JfrClass> types = new Dictionary<>();
public final Map<String, JfrClass> typesByName = new HashMap<>();
@@ -57,39 +59,156 @@ public class JfrReader implements Closeable {
public final Dictionary<AsyncStackTraceElement> stackFrames = new Dictionary<>(); // spark
public final Map<Integer, String> frameTypes = new HashMap<>();
public final Map<Integer, String> threadStates = new HashMap<>();
- public final List<Sample> samples = new ArrayList<>();
+
+ private int executionSample;
+ private int nativeMethodSample;
+ private int allocationInNewTLAB;
+ private int allocationOutsideTLAB;
+ private int allocationSample;
+ private int monitorEnter;
+ private int threadPark;
public JfrReader(Path path) throws IOException { // spark - Path instead of String
this.ch = FileChannel.open(path, StandardOpenOption.READ); // spark - Path instead of String
- this.buf = ch.map(FileChannel.MapMode.READ_ONLY, 0, ch.size());
+ this.buf = ByteBuffer.allocateDirect(BUFFER_SIZE);
+
+ buf.flip();
+ ensureBytes(CHUNK_HEADER_SIZE);
+ if (!readChunk(0)) {
+ throw new IOException("Incomplete JFR file");
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ ch.close();
+ }
+
+ public long durationNanos() {
+ return endNanos - startNanos;
+ }
+
+ public List<Event> readAllEvents() throws IOException {
+ return readAllEvents(null);
+ }
+
+ public <E extends Event> List<E> readAllEvents(Class<E> cls) throws IOException {
+ ArrayList<E> events = new ArrayList<>();
+ for (E event; (event = readEvent(cls)) != null; ) {
+ events.add(event);
+ }
+ Collections.sort(events);
+ return events;
+ }
+
+ public Event readEvent() throws IOException {
+ return readEvent(null);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <E extends Event> E readEvent(Class<E> cls) throws IOException {
+ while (ensureBytes(CHUNK_HEADER_SIZE)) {
+ int pos = buf.position();
+ int size = getVarint();
+ int type = getVarint();
+
+ if (type == 'L' && buf.getInt(pos) == CHUNK_SIGNATURE) {
+ if (readChunk(pos)) {
+ continue;
+ }
+ break;
+ }
+
+ if (type == executionSample || type == nativeMethodSample) {
+ if (cls == null || cls == ExecutionSample.class) return (E) readExecutionSample();
+ } else if (type == allocationInNewTLAB) {
+ if (cls == null || cls == AllocationSample.class) return (E) readAllocationSample(true);
+ } else if (type == allocationOutsideTLAB || type == allocationSample) {
+ if (cls == null || cls == AllocationSample.class) return (E) readAllocationSample(false);
+ } else if (type == monitorEnter) {
+ if (cls == null || cls == ContendedLock.class) return (E) readContendedLock(false);
+ } else if (type == threadPark) {
+ if (cls == null || cls == ContendedLock.class) return (E) readContendedLock(true);
+ }
+
+ if ((pos += size) <= buf.limit()) {
+ buf.position(pos);
+ } else {
+ seek(filePosition + pos);
+ }
+ }
+ return null;
+ }
+
+ private ExecutionSample readExecutionSample() {
+ long time = getVarlong();
+ int tid = getVarint();
+ int stackTraceId = getVarint();
+ int threadState = getVarint();
+ return new ExecutionSample(time, tid, stackTraceId, threadState);
+ }
+
+ private AllocationSample readAllocationSample(boolean tlab) {
+ long time = getVarlong();
+ int tid = getVarint();
+ int stackTraceId = getVarint();
+ int classId = getVarint();
+ long allocationSize = getVarlong();
+ long tlabSize = tlab ? getVarlong() : 0;
+ return new AllocationSample(time, tid, stackTraceId, classId, allocationSize, tlabSize);
+ }
+
+ private ContendedLock readContendedLock(boolean hasTimeout) {
+ long time = getVarlong();
+ long duration = getVarlong();
+ int tid = getVarint();
+ int stackTraceId = getVarint();
+ int classId = getVarint();
+ if (hasTimeout) getVarlong();
+ long until = getVarlong();
+ long address = getVarlong();
+ return new ContendedLock(time, tid, stackTraceId, duration, classId);
+ }
- if (buf.getInt(0) != 0x464c5200) {
+ private boolean readChunk(int pos) throws IOException {
+ if (pos + CHUNK_HEADER_SIZE > buf.limit() || buf.getInt(pos) != CHUNK_SIGNATURE) {
throw new IOException("Not a valid JFR file");
}
- int version = buf.getInt(4);
+ int version = buf.getInt(pos + 4);
if (version < 0x20000 || version > 0x2ffff) {
throw new IOException("Unsupported JFR version: " + (version >>> 16) + "." + (version & 0xffff));
}
- this.startNanos = buf.getLong(32);
- this.durationNanos = buf.getLong(40);
- this.startTicks = buf.getLong(48);
- this.ticksPerSec = buf.getLong(56);
+ long cpOffset = buf.getLong(pos + 16);
+ long metaOffset = buf.getLong(pos + 24);
+ if (cpOffset == 0 || metaOffset == 0) {
+ incomplete = true;
+ return false;
+ }
- readMeta();
- readConstantPool();
- readEvents();
- }
+ startNanos = Math.min(startNanos, buf.getLong(pos + 32));
+ endNanos = Math.max(endNanos, buf.getLong(pos + 32) + buf.getLong(pos + 40));
+ startTicks = Math.min(startTicks, buf.getLong(pos + 48));
+ ticksPerSec = buf.getLong(pos + 56);
- @Override
- public void close() throws IOException {
- ch.close();
+ types.clear();
+ typesByName.clear();
+
+ long chunkStart = filePosition + pos;
+ readMeta(chunkStart + metaOffset);
+ readConstantPool(chunkStart + cpOffset);
+ cacheEventTypes();
+
+ seek(chunkStart + CHUNK_HEADER_SIZE);
+ return true;
}
- private void readMeta() {
- buf.position(buf.getInt(META_OFFSET + 4));
- getVarint();
+ private void readMeta(long metaOffset) throws IOException {
+ seek(metaOffset);
+ ensureBytes(5);
+
+ ensureBytes(getVarint() - buf.position());
getVarint();
getVarlong();
getVarlong();
@@ -136,15 +255,17 @@ public class JfrReader implements Closeable {
}
}
- private void readConstantPool() {
- int offset = buf.getInt(CPOOL_OFFSET + 4);
- while (true) {
- buf.position(offset);
- getVarint();
+ private void readConstantPool(long cpOffset) throws IOException {
+ long delta;
+ do {
+ seek(cpOffset);
+ ensureBytes(5);
+
+ ensureBytes(getVarint() - buf.position());
getVarint();
getVarlong();
getVarlong();
- long delta = getVarlong();
+ delta = getVarlong();
getVarint();
int poolCount = getVarint();
@@ -152,12 +273,7 @@ public class JfrReader implements Closeable {
int type = getVarint();
readConstants(types.get(type));
}
-
- if (delta == 0) {
- break;
- }
- offset += delta;
- }
+ } while (delta != 0 && (cpOffset += delta) > 0);
}
private void readConstants(JfrClass type) {
@@ -245,13 +361,15 @@ public class JfrReader implements Closeable {
int depth = getVarint();
long[] methods = new long[depth];
byte[] types = new byte[depth];
+ int[] locations = new int[depth];
for (int i = 0; i < depth; i++) {
methods[i] = getVarlong();
int line = getVarint();
int bci = getVarint();
+ locations[i] = line << 16 | (bci & 0xffff);
types[i] = buf.get();
}
- return new StackTrace(methods, types);
+ return new StackTrace(methods, types, locations);
}
private void readSymbols() {
@@ -298,36 +416,14 @@ public class JfrReader implements Closeable {
}
}
- private void readEvents() {
- int executionSample = getTypeId("jdk.ExecutionSample");
- int nativeMethodSample = getTypeId("jdk.NativeMethodSample");
-
- buf.position(CHUNK_HEADER_SIZE);
- while (buf.hasRemaining()) {
- int position = buf.position();
- int size = getVarint();
- int type = getVarint();
- if (type == executionSample || type == nativeMethodSample) {
- readExecutionSample();
- } else {
- buf.position(position + size);
- }
- }
-
- Collections.sort(samples);
- }
-
- private void readExecutionSample() {
- long time = getVarlong();
- int tid = getVarint();
- int stackTraceId = getVarint();
- int threadState = getVarint();
- samples.add(new Sample(time, tid, stackTraceId, threadState));
-
- StackTrace stackTrace = stackTraces.get(stackTraceId);
- if (stackTrace != null) {
- stackTrace.samples++;
- }
+ private void cacheEventTypes() {
+ executionSample = getTypeId("jdk.ExecutionSample");
+ nativeMethodSample = getTypeId("jdk.NativeMethodSample");
+ allocationInNewTLAB = getTypeId("jdk.ObjectAllocationInNewTLAB");
+ allocationOutsideTLAB = getTypeId("jdk.ObjectAllocationOutsideTLAB");
+ allocationSample = getTypeId("jdk.ObjectAllocationSample");
+ monitorEnter = getTypeId("jdk.JavaMonitorEnter");
+ threadPark = getTypeId("jdk.ThreadPark");
}
private int getTypeId(String typeName) {
@@ -386,6 +482,34 @@ public class JfrReader implements Closeable {
return bytes;
}
+ private void seek(long pos) throws IOException {
+ filePosition = pos;
+ ch.position(pos);
+ buf.rewind().flip();
+ }
+
+ private boolean ensureBytes(int needed) throws IOException {
+ if (buf.remaining() >= needed) {
+ return true;
+ }
+
+ filePosition += buf.position();
+
+ if (buf.capacity() < needed) {
+ ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
+ newBuf.put(buf);
+ buf = newBuf;
+ } else {
+ buf.compact();
+ }
+
+ while (ch.read(buf) > 0 && buf.position() < needed) {
+ // keep reading
+ }
+ buf.flip();
+ return buf.limit() > 0;
+ }
+
public static class ClassRef {
public final long name;
@@ -452,33 +576,203 @@ public class JfrReader implements Closeable {
}
}
- public static class Sample implements Comparable<Sample> {
+ public static class StackTrace {
+ public final long[] methods;
+ public final byte[] types;
+ public final int[] locations;
+
+ public StackTrace(long[] methods, byte[] types, int[] locations) {
+ this.methods = methods;
+ this.types = types;
+ this.locations = locations;
+ }
+ }
+
+ public static abstract class Event implements Comparable<Event> {
public final long time;
public final int tid;
public final int stackTraceId;
- public final int threadState;
- public Sample(long time, int tid, int stackTraceId, int threadState) {
+ protected Event(long time, int tid, int stackTraceId) {
this.time = time;
this.tid = tid;
this.stackTraceId = stackTraceId;
- this.threadState = threadState;
}
@Override
- public int compareTo(Sample o) {
+ public int compareTo(Event o) {
return Long.compare(time, o.time);
}
+
+ @Override
+ public int hashCode() {
+ return stackTraceId;
+ }
+
+ public boolean sameGroup(Event o) {
+ return getClass() == o.getClass();
+ }
+
+ public long value() {
+ return 1;
+ }
}
- public static class StackTrace {
- public final long[] methods;
- public final byte[] types;
- public long samples;
+ public static class EventAggregator {
+ private static final int INITIAL_CAPACITY = 1024;
+
+ private final boolean threads;
+ private final boolean total;
+ private Event[] keys;
+ private long[] values;
+ private int size;
+
+ public EventAggregator(boolean threads, boolean total) {
+ this.threads = threads;
+ this.total = total;
+ this.keys = new Event[INITIAL_CAPACITY];
+ this.values = new long[INITIAL_CAPACITY];
+ }
+
+ public void collect(Event e) {
+ int mask = keys.length - 1;
+ int i = hashCode(e) & mask;
+ while (keys[i] != null) {
+ if (sameGroup(keys[i], e)) {
+ values[i] += total ? e.value() : 1;
+ return;
+ }
+ i = (i + 1) & mask;
+ }
- public StackTrace(long[] methods, byte[] types) {
- this.methods = methods;
- this.types = types;
+ keys[i] = e;
+ values[i] = total ? e.value() : 1;
+
+ if (++size * 2 > keys.length) {
+ resize(keys.length * 2);
+ }
+ }
+
+ public long getValue(Event e) {
+ int mask = keys.length - 1;
+ int i = hashCode(e) & mask;
+ while (keys[i] != null && !sameGroup(keys[i], e)) {
+ i = (i + 1) & mask;
+ }
+ return values[i];
+ }
+
+ public void forEach(Visitor visitor) {
+ for (int i = 0; i < keys.length; i++) {
+ if (keys[i] != null) {
+ visitor.visit(keys[i], values[i]);
+ }
+ }
+ }
+
+ private int hashCode(Event e) {
+ return e.hashCode() + (threads ? e.tid * 31 : 0);
+ }
+
+ private boolean sameGroup(Event e1, Event e2) {
+ return e1.stackTraceId == e2.stackTraceId && (!threads || e1.tid == e2.tid) && e1.sameGroup(e2);
+ }
+
+ private void resize(int newCapacity) {
+ Event[] newKeys = new Event[newCapacity];
+ long[] newValues = new long[newCapacity];
+ int mask = newKeys.length - 1;
+
+ for (int i = 0; i < keys.length; i++) {
+ if (keys[i] != null) {
+ for (int j = hashCode(keys[i]) & mask; ; j = (j + 1) & mask) {
+ if (newKeys[j] == null) {
+ newKeys[j] = keys[i];
+ newValues[j] = values[i];
+ break;
+ }
+ }
+ }
+ }
+
+ keys = newKeys;
+ values = newValues;
+ }
+
+ public interface Visitor {
+ void visit(Event event, long value);
+ }
+ }
+
+ public static class AllocationSample extends Event {
+ public final int classId;
+ public final long allocationSize;
+ public final long tlabSize;
+
+ public AllocationSample(long time, int tid, int stackTraceId, int classId, long allocationSize, long tlabSize) {
+ super(time, tid, stackTraceId);
+ this.classId = classId;
+ this.allocationSize = allocationSize;
+ this.tlabSize = tlabSize;
+ }
+
+ @Override
+ public int hashCode() {
+ return classId * 127 + stackTraceId + (tlabSize == 0 ? 17 : 0);
+ }
+
+ @Override
+ public boolean sameGroup(Event o) {
+ if (o instanceof AllocationSample) {
+ AllocationSample a = (AllocationSample) o;
+ return classId == a.classId && (tlabSize == 0) == (a.tlabSize == 0);
+ }
+ return false;
+ }
+
+ @Override
+ public long value() {
+ return tlabSize != 0 ? tlabSize : allocationSize;
}
}
+
+ public static class ContendedLock extends Event {
+ public final long duration;
+ public final int classId;
+
+ public ContendedLock(long time, int tid, int stackTraceId, long duration, int classId) {
+ super(time, tid, stackTraceId);
+ this.duration = duration;
+ this.classId = classId;
+ }
+
+ @Override
+ public int hashCode() {
+ return classId * 127 + stackTraceId;
+ }
+
+ @Override
+ public boolean sameGroup(Event o) {
+ if (o instanceof ContendedLock) {
+ ContendedLock c = (ContendedLock) o;
+ return classId == c.classId;
+ }
+ return false;
+ }
+
+ @Override
+ public long value() {
+ return duration;
+ }
+ }
+
+ public static class ExecutionSample extends Event {
+ public final int threadState;
+
+ public ExecutionSample(long time, int tid, int stackTraceId, int threadState) {
+ super(time, tid, stackTraceId);
+ this.threadState = threadState;
+ }
+ }
+
}
diff --git a/spark-common/src/main/resources/libasyncProfiler.so b/spark-common/src/main/resources/libasyncProfiler.so
deleted file mode 100755
index 4153f52..0000000
--- a/spark-common/src/main/resources/libasyncProfiler.so
+++ /dev/null
Binary files differ
diff --git a/spark-common/src/main/resources/linux/libasyncProfiler.so b/spark-common/src/main/resources/linux/libasyncProfiler.so
new file mode 100755
index 0000000..ddee900
--- /dev/null
+++ b/spark-common/src/main/resources/linux/libasyncProfiler.so
Binary files differ
diff --git a/spark-common/src/main/resources/macosx/libasyncProfiler.so b/spark-common/src/main/resources/macosx/libasyncProfiler.so
new file mode 100755
index 0000000..75daf6e
--- /dev/null
+++ b/spark-common/src/main/resources/macosx/libasyncProfiler.so
Binary files differ