diff options
Diffstat (limited to 'spark-common')
12 files changed, 457 insertions, 144 deletions
diff --git a/spark-common/build.gradle b/spark-common/build.gradle index 1243fd8..526c6e1 100644 --- a/spark-common/build.gradle +++ b/spark-common/build.gradle @@ -4,7 +4,7 @@ plugins { dependencies { api project(':spark-api') - implementation 'com.github.jvm-profiling-tools:async-profiler:v2.0' + implementation 'com.github.jvm-profiling-tools:async-profiler:v2.5' implementation 'org.ow2.asm:asm:9.1' implementation 'com.google.protobuf:protobuf-javalite:3.15.6' implementation 'com.squareup.okhttp3:okhttp:3.14.1' @@ -32,7 +32,8 @@ dependencies { processResources { from(sourceSets.main.resources.srcDirs) { - include 'libasyncProfiler.so' + include 'linux/libasyncProfiler.so' + include 'macosx/libasyncProfiler.so' } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/SparkPlugin.java b/spark-common/src/main/java/me/lucko/spark/common/SparkPlugin.java index aa5112d..f312916 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/SparkPlugin.java +++ b/spark-common/src/main/java/me/lucko/spark/common/SparkPlugin.java @@ -29,6 +29,7 @@ import me.lucko.spark.common.tick.TickReporter; import me.lucko.spark.common.util.ClassSourceLookup; import java.nio.file.Path; +import java.util.logging.Level; import java.util.stream.Stream; /** @@ -72,6 +73,14 @@ public interface SparkPlugin { void executeAsync(Runnable task); /** + * Print to the plugin logger. + * + * @param level the log level + * @param msg the message + */ + void log(Level level, String msg); + + /** * Gets the default {@link ThreadDumper} to be used by the plugin. * * @return the default thread dumper diff --git a/spark-common/src/main/java/me/lucko/spark/common/api/SparkApi.java b/spark-common/src/main/java/me/lucko/spark/common/api/SparkApi.java index 5ac41fc..d0fcb0f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/api/SparkApi.java +++ b/spark-common/src/main/java/me/lucko/spark/common/api/SparkApi.java @@ -25,10 +25,10 @@ import com.google.common.collect.ImmutableMap; import me.lucko.spark.api.Spark; import me.lucko.spark.api.SparkProvider; import me.lucko.spark.api.gc.GarbageCollector; +import me.lucko.spark.api.statistic.StatisticWindow; import me.lucko.spark.api.statistic.misc.DoubleAverageInfo; import me.lucko.spark.api.statistic.types.DoubleStatistic; import me.lucko.spark.api.statistic.types.GenericStatistic; -import me.lucko.spark.api.statistic.StatisticWindow; import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.monitor.cpu.CpuMonitor; import me.lucko.spark.common.monitor.memory.GarbageCollectorStatistics; @@ -37,9 +37,7 @@ import me.lucko.spark.common.monitor.tick.TickStatistics; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.sql.Ref; import java.util.HashMap; import java.util.Map; diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java index 4147de3..d45c7af 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java @@ -217,7 +217,7 @@ public class SamplerModule implements CommandModule { if (ticksOver != -1) { builder.ticksOver(ticksOver, tickHook); } - Sampler sampler = this.activeSampler = builder.start(); + Sampler sampler = this.activeSampler = builder.start(platform); resp.broadcastPrefixed(text() .append(text("Profiler now active!", GOLD)) diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index f0ed533..88cf018 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -20,6 +20,7 @@ package me.lucko.spark.common.sampler; +import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.sampler.async.AsyncProfilerAccess; import me.lucko.spark.common.sampler.async.AsyncSampler; import me.lucko.spark.common.sampler.java.JavaSampler; @@ -91,13 +92,13 @@ public class SamplerBuilder { return this; } - public Sampler start() { + public Sampler start(SparkPlatform platform) { int intervalMicros = (int) (this.samplingInterval * 1000d); Sampler sampler; if (this.ticksOver != -1 && this.tickHook != null) { sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver); - } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.isSupported()) { + } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.checkSupported(platform)) { sampler = new AsyncSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout); } else { sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java index 2ed1aa4..2506db3 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java @@ -20,6 +20,10 @@ package me.lucko.spark.common.sampler.async; +import com.google.common.collect.ImmutableSetMultimap; +import com.google.common.collect.Multimap; + +import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.util.TemporaryFiles; import one.profiler.AsyncProfiler; @@ -29,35 +33,72 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.util.Locale; +import java.util.logging.Level; /** * Provides a bridge between spark and async-profiler. */ -public final class AsyncProfilerAccess { +public enum AsyncProfilerAccess { + INSTANCE; + + /** An instance of the async-profiler Java API. */ + private final AsyncProfiler profiler; + + /** If profiler is null, contains the reason why setup failed */ + private final Exception setupException; + + AsyncProfilerAccess() { + AsyncProfiler profiler; + Exception setupException = null; + + try { + profiler = load(); + ensureCpuEventSupported(profiler); + } catch (Exception e) { + profiler = null; + setupException = e; + } - // Singleton - public static final AsyncProfilerAccess INSTANCE = new AsyncProfilerAccess(); + this.profiler = profiler; + this.setupException = setupException; + } - // Only support Linux x86_64 - private static final String SUPPORTED_OS = "linux"; - private static final String SUPPORTED_ARCH = "amd64"; + public AsyncProfiler getProfiler() { + if (this.profiler == null) { + throw new UnsupportedOperationException("async-profiler not supported", this.setupException); + } + return this.profiler; + } + + public boolean checkSupported(SparkPlatform platform) { + if (this.setupException != null) { + platform.getPlugin().log(Level.INFO, "async-profiler engine is not supported on your system: " + this.setupException.getMessage()); + platform.getPlugin().log(Level.INFO, "Please see here for more information: https://spark.lucko.me/docs/misc/Using-async-profiler"); + } + return this.profiler != null; + } private static AsyncProfiler load() throws Exception { // check compatibility - String os = System.getProperty("os.name"); - if (!SUPPORTED_OS.equalsIgnoreCase(os)) { - throw new UnsupportedOperationException("Only supported on Linux x86_64, your OS: " + os); - } + String os = System.getProperty("os.name").toLowerCase(Locale.ROOT).replace(" ", ""); + String arch = System.getProperty("os.arch").toLowerCase(Locale.ROOT); + + Multimap<String, String> supported = ImmutableSetMultimap.<String, String>builder() + .put("linux", "amd64") + .put("macosx", "amd64") + .put("macosx", "aarch64") + .build(); - String arch = System.getProperty("os.arch"); - if (!SUPPORTED_ARCH.equalsIgnoreCase(arch)) { - throw new UnsupportedOperationException("Only supported on Linux x86_64, your arch: " + os); + if (!supported.containsEntry(os, arch)) { + throw new UnsupportedOperationException("Not supported for your os/arch: " + os + '/' + arch); } // extract the profiler binary from the spark jar file - URL profilerResource = AsyncProfilerAccess.class.getClassLoader().getResource("libasyncProfiler.so"); + String resource = os + "/libasyncProfiler.so"; + URL profilerResource = AsyncProfilerAccess.class.getClassLoader().getResource(resource); if (profilerResource == null) { - throw new IllegalStateException("Could not find libasyncProfiler.so in spark jar file"); + throw new IllegalStateException("Could not find " + resource + " in spark jar file"); } Path extractPath = TemporaryFiles.create("spark-", "-libasyncProfiler.so.tmp"); @@ -74,28 +115,6 @@ public final class AsyncProfilerAccess { } } - /** An instance of the async-profiler Java API. */ - private final AsyncProfiler profiler; - - /** If profiler is null, contains the reason why setup failed */ - private final Exception setupException; - - private AsyncProfilerAccess() { - AsyncProfiler profiler; - Exception setupException = null; - - try { - profiler = load(); - ensureCpuEventSupported(profiler); - } catch (Exception e) { - profiler = null; - setupException = e; - } - - this.profiler = profiler; - this.setupException = setupException; - } - /** * Checks the {@code profiler} to ensure the CPU event is supported. * @@ -108,19 +127,4 @@ public final class AsyncProfilerAccess { throw new UnsupportedOperationException("CPU event is not supported"); } } - - public AsyncProfiler getProfiler() { - if (this.profiler == null) { - throw new UnsupportedOperationException("async-profiler not supported", this.setupException); - } - return this.profiler; - } - - public boolean isSupported() { - if (this.setupException != null) { - System.out.println("[spark] async-profiler engine is not supported on your system: " + this.setupException.getMessage()); - System.out.println("[spark] Please see here for more information: https://spark.lucko.me/docs/misc/Using-async-profiler"); - } - return this.profiler != null; - } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 4f3b3e4..1837cbc 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -218,10 +218,10 @@ public class AsyncSampler extends AbstractSampler { } } - private void readSegments(JfrReader reader, Predicate<String> threadFilter) { - List<JfrReader.Sample> samples = reader.samples; + private void readSegments(JfrReader reader, Predicate<String> threadFilter) throws IOException { + List<JfrReader.ExecutionSample> samples = reader.readAllEvents(JfrReader.ExecutionSample.class); for (int i = 0; i < samples.size(); i++) { - JfrReader.Sample sample = samples.get(i); + JfrReader.ExecutionSample sample = samples.get(i); long duration; if (i == 0) { @@ -245,7 +245,7 @@ public class AsyncSampler extends AbstractSampler { } } - private static ProfileSegment parseSegment(JfrReader reader, JfrReader.Sample sample, String threadName, long duration) { + private static ProfileSegment parseSegment(JfrReader reader, JfrReader.ExecutionSample sample, String threadName, long duration) { JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId); int len = stackTrace.methods.length; diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java index 38d1b00..23223a2 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java @@ -31,6 +31,12 @@ public class Dictionary<T> { this.values = new Object[INITIAL_CAPACITY]; } + public void clear() { + keys = new long[INITIAL_CAPACITY]; + values = new Object[INITIAL_CAPACITY]; + size = 0; + } + public void put(long key, T value) { if (key == 0) { throw new IllegalArgumentException("Zero key not allowed"); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java index 95c9bad..a705f2d 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java @@ -35,17 +35,19 @@ import java.util.Map; * Parses JFR output produced by async-profiler. */ public class JfrReader implements Closeable { + private static final int BUFFER_SIZE = 2 * 1024 * 1024; private static final int CHUNK_HEADER_SIZE = 68; - private static final int CPOOL_OFFSET = 16; - private static final int META_OFFSET = 24; + private static final int CHUNK_SIGNATURE = 0x464c5200; private final FileChannel ch; - private final ByteBuffer buf; + private ByteBuffer buf; + private long filePosition; - public final long startNanos; - public final long durationNanos; - public final long startTicks; - public final long ticksPerSec; + public boolean incomplete; + public long startNanos = Long.MAX_VALUE; + public long endNanos = Long.MIN_VALUE; + public long startTicks = Long.MAX_VALUE; + public long ticksPerSec; public final Dictionary<JfrClass> types = new Dictionary<>(); public final Map<String, JfrClass> typesByName = new HashMap<>(); @@ -57,39 +59,156 @@ public class JfrReader implements Closeable { public final Dictionary<AsyncStackTraceElement> stackFrames = new Dictionary<>(); // spark public final Map<Integer, String> frameTypes = new HashMap<>(); public final Map<Integer, String> threadStates = new HashMap<>(); - public final List<Sample> samples = new ArrayList<>(); + + private int executionSample; + private int nativeMethodSample; + private int allocationInNewTLAB; + private int allocationOutsideTLAB; + private int allocationSample; + private int monitorEnter; + private int threadPark; public JfrReader(Path path) throws IOException { // spark - Path instead of String this.ch = FileChannel.open(path, StandardOpenOption.READ); // spark - Path instead of String - this.buf = ch.map(FileChannel.MapMode.READ_ONLY, 0, ch.size()); + this.buf = ByteBuffer.allocateDirect(BUFFER_SIZE); + + buf.flip(); + ensureBytes(CHUNK_HEADER_SIZE); + if (!readChunk(0)) { + throw new IOException("Incomplete JFR file"); + } + } + + @Override + public void close() throws IOException { + ch.close(); + } + + public long durationNanos() { + return endNanos - startNanos; + } + + public List<Event> readAllEvents() throws IOException { + return readAllEvents(null); + } + + public <E extends Event> List<E> readAllEvents(Class<E> cls) throws IOException { + ArrayList<E> events = new ArrayList<>(); + for (E event; (event = readEvent(cls)) != null; ) { + events.add(event); + } + Collections.sort(events); + return events; + } + + public Event readEvent() throws IOException { + return readEvent(null); + } + + @SuppressWarnings("unchecked") + public <E extends Event> E readEvent(Class<E> cls) throws IOException { + while (ensureBytes(CHUNK_HEADER_SIZE)) { + int pos = buf.position(); + int size = getVarint(); + int type = getVarint(); + + if (type == 'L' && buf.getInt(pos) == CHUNK_SIGNATURE) { + if (readChunk(pos)) { + continue; + } + break; + } + + if (type == executionSample || type == nativeMethodSample) { + if (cls == null || cls == ExecutionSample.class) return (E) readExecutionSample(); + } else if (type == allocationInNewTLAB) { + if (cls == null || cls == AllocationSample.class) return (E) readAllocationSample(true); + } else if (type == allocationOutsideTLAB || type == allocationSample) { + if (cls == null || cls == AllocationSample.class) return (E) readAllocationSample(false); + } else if (type == monitorEnter) { + if (cls == null || cls == ContendedLock.class) return (E) readContendedLock(false); + } else if (type == threadPark) { + if (cls == null || cls == ContendedLock.class) return (E) readContendedLock(true); + } + + if ((pos += size) <= buf.limit()) { + buf.position(pos); + } else { + seek(filePosition + pos); + } + } + return null; + } + + private ExecutionSample readExecutionSample() { + long time = getVarlong(); + int tid = getVarint(); + int stackTraceId = getVarint(); + int threadState = getVarint(); + return new ExecutionSample(time, tid, stackTraceId, threadState); + } + + private AllocationSample readAllocationSample(boolean tlab) { + long time = getVarlong(); + int tid = getVarint(); + int stackTraceId = getVarint(); + int classId = getVarint(); + long allocationSize = getVarlong(); + long tlabSize = tlab ? getVarlong() : 0; + return new AllocationSample(time, tid, stackTraceId, classId, allocationSize, tlabSize); + } + + private ContendedLock readContendedLock(boolean hasTimeout) { + long time = getVarlong(); + long duration = getVarlong(); + int tid = getVarint(); + int stackTraceId = getVarint(); + int classId = getVarint(); + if (hasTimeout) getVarlong(); + long until = getVarlong(); + long address = getVarlong(); + return new ContendedLock(time, tid, stackTraceId, duration, classId); + } - if (buf.getInt(0) != 0x464c5200) { + private boolean readChunk(int pos) throws IOException { + if (pos + CHUNK_HEADER_SIZE > buf.limit() || buf.getInt(pos) != CHUNK_SIGNATURE) { throw new IOException("Not a valid JFR file"); } - int version = buf.getInt(4); + int version = buf.getInt(pos + 4); if (version < 0x20000 || version > 0x2ffff) { throw new IOException("Unsupported JFR version: " + (version >>> 16) + "." + (version & 0xffff)); } - this.startNanos = buf.getLong(32); - this.durationNanos = buf.getLong(40); - this.startTicks = buf.getLong(48); - this.ticksPerSec = buf.getLong(56); + long cpOffset = buf.getLong(pos + 16); + long metaOffset = buf.getLong(pos + 24); + if (cpOffset == 0 || metaOffset == 0) { + incomplete = true; + return false; + } - readMeta(); - readConstantPool(); - readEvents(); - } + startNanos = Math.min(startNanos, buf.getLong(pos + 32)); + endNanos = Math.max(endNanos, buf.getLong(pos + 32) + buf.getLong(pos + 40)); + startTicks = Math.min(startTicks, buf.getLong(pos + 48)); + ticksPerSec = buf.getLong(pos + 56); - @Override - public void close() throws IOException { - ch.close(); + types.clear(); + typesByName.clear(); + + long chunkStart = filePosition + pos; + readMeta(chunkStart + metaOffset); + readConstantPool(chunkStart + cpOffset); + cacheEventTypes(); + + seek(chunkStart + CHUNK_HEADER_SIZE); + return true; } - private void readMeta() { - buf.position(buf.getInt(META_OFFSET + 4)); - getVarint(); + private void readMeta(long metaOffset) throws IOException { + seek(metaOffset); + ensureBytes(5); + + ensureBytes(getVarint() - buf.position()); getVarint(); getVarlong(); getVarlong(); @@ -136,15 +255,17 @@ public class JfrReader implements Closeable { } } - private void readConstantPool() { - int offset = buf.getInt(CPOOL_OFFSET + 4); - while (true) { - buf.position(offset); - getVarint(); + private void readConstantPool(long cpOffset) throws IOException { + long delta; + do { + seek(cpOffset); + ensureBytes(5); + + ensureBytes(getVarint() - buf.position()); getVarint(); getVarlong(); getVarlong(); - long delta = getVarlong(); + delta = getVarlong(); getVarint(); int poolCount = getVarint(); @@ -152,12 +273,7 @@ public class JfrReader implements Closeable { int type = getVarint(); readConstants(types.get(type)); } - - if (delta == 0) { - break; - } - offset += delta; - } + } while (delta != 0 && (cpOffset += delta) > 0); } private void readConstants(JfrClass type) { @@ -245,13 +361,15 @@ public class JfrReader implements Closeable { int depth = getVarint(); long[] methods = new long[depth]; byte[] types = new byte[depth]; + int[] locations = new int[depth]; for (int i = 0; i < depth; i++) { methods[i] = getVarlong(); int line = getVarint(); int bci = getVarint(); + locations[i] = line << 16 | (bci & 0xffff); types[i] = buf.get(); } - return new StackTrace(methods, types); + return new StackTrace(methods, types, locations); } private void readSymbols() { @@ -298,36 +416,14 @@ public class JfrReader implements Closeable { } } - private void readEvents() { - int executionSample = getTypeId("jdk.ExecutionSample"); - int nativeMethodSample = getTypeId("jdk.NativeMethodSample"); - - buf.position(CHUNK_HEADER_SIZE); - while (buf.hasRemaining()) { - int position = buf.position(); - int size = getVarint(); - int type = getVarint(); - if (type == executionSample || type == nativeMethodSample) { - readExecutionSample(); - } else { - buf.position(position + size); - } - } - - Collections.sort(samples); - } - - private void readExecutionSample() { - long time = getVarlong(); - int tid = getVarint(); - int stackTraceId = getVarint(); - int threadState = getVarint(); - samples.add(new Sample(time, tid, stackTraceId, threadState)); - - StackTrace stackTrace = stackTraces.get(stackTraceId); - if (stackTrace != null) { - stackTrace.samples++; - } + private void cacheEventTypes() { + executionSample = getTypeId("jdk.ExecutionSample"); + nativeMethodSample = getTypeId("jdk.NativeMethodSample"); + allocationInNewTLAB = getTypeId("jdk.ObjectAllocationInNewTLAB"); + allocationOutsideTLAB = getTypeId("jdk.ObjectAllocationOutsideTLAB"); + allocationSample = getTypeId("jdk.ObjectAllocationSample"); + monitorEnter = getTypeId("jdk.JavaMonitorEnter"); + threadPark = getTypeId("jdk.ThreadPark"); } private int getTypeId(String typeName) { @@ -386,6 +482,34 @@ public class JfrReader implements Closeable { return bytes; } + private void seek(long pos) throws IOException { + filePosition = pos; + ch.position(pos); + buf.rewind().flip(); + } + + private boolean ensureBytes(int needed) throws IOException { + if (buf.remaining() >= needed) { + return true; + } + + filePosition += buf.position(); + + if (buf.capacity() < needed) { + ByteBuffer newBuf = ByteBuffer.allocateDirect(needed); + newBuf.put(buf); + buf = newBuf; + } else { + buf.compact(); + } + + while (ch.read(buf) > 0 && buf.position() < needed) { + // keep reading + } + buf.flip(); + return buf.limit() > 0; + } + public static class ClassRef { public final long name; @@ -452,33 +576,203 @@ public class JfrReader implements Closeable { } } - public static class Sample implements Comparable<Sample> { + public static class StackTrace { + public final long[] methods; + public final byte[] types; + public final int[] locations; + + public StackTrace(long[] methods, byte[] types, int[] locations) { + this.methods = methods; + this.types = types; + this.locations = locations; + } + } + + public static abstract class Event implements Comparable<Event> { public final long time; public final int tid; public final int stackTraceId; - public final int threadState; - public Sample(long time, int tid, int stackTraceId, int threadState) { + protected Event(long time, int tid, int stackTraceId) { this.time = time; this.tid = tid; this.stackTraceId = stackTraceId; - this.threadState = threadState; } @Override - public int compareTo(Sample o) { + public int compareTo(Event o) { return Long.compare(time, o.time); } + + @Override + public int hashCode() { + return stackTraceId; + } + + public boolean sameGroup(Event o) { + return getClass() == o.getClass(); + } + + public long value() { + return 1; + } } - public static class StackTrace { - public final long[] methods; - public final byte[] types; - public long samples; + public static class EventAggregator { + private static final int INITIAL_CAPACITY = 1024; + + private final boolean threads; + private final boolean total; + private Event[] keys; + private long[] values; + private int size; + + public EventAggregator(boolean threads, boolean total) { + this.threads = threads; + this.total = total; + this.keys = new Event[INITIAL_CAPACITY]; + this.values = new long[INITIAL_CAPACITY]; + } + + public void collect(Event e) { + int mask = keys.length - 1; + int i = hashCode(e) & mask; + while (keys[i] != null) { + if (sameGroup(keys[i], e)) { + values[i] += total ? e.value() : 1; + return; + } + i = (i + 1) & mask; + } - public StackTrace(long[] methods, byte[] types) { - this.methods = methods; - this.types = types; + keys[i] = e; + values[i] = total ? e.value() : 1; + + if (++size * 2 > keys.length) { + resize(keys.length * 2); + } + } + + public long getValue(Event e) { + int mask = keys.length - 1; + int i = hashCode(e) & mask; + while (keys[i] != null && !sameGroup(keys[i], e)) { + i = (i + 1) & mask; + } + return values[i]; + } + + public void forEach(Visitor visitor) { + for (int i = 0; i < keys.length; i++) { + if (keys[i] != null) { + visitor.visit(keys[i], values[i]); + } + } + } + + private int hashCode(Event e) { + return e.hashCode() + (threads ? e.tid * 31 : 0); + } + + private boolean sameGroup(Event e1, Event e2) { + return e1.stackTraceId == e2.stackTraceId && (!threads || e1.tid == e2.tid) && e1.sameGroup(e2); + } + + private void resize(int newCapacity) { + Event[] newKeys = new Event[newCapacity]; + long[] newValues = new long[newCapacity]; + int mask = newKeys.length - 1; + + for (int i = 0; i < keys.length; i++) { + if (keys[i] != null) { + for (int j = hashCode(keys[i]) & mask; ; j = (j + 1) & mask) { + if (newKeys[j] == null) { + newKeys[j] = keys[i]; + newValues[j] = values[i]; + break; + } + } + } + } + + keys = newKeys; + values = newValues; + } + + public interface Visitor { + void visit(Event event, long value); + } + } + + public static class AllocationSample extends Event { + public final int classId; + public final long allocationSize; + public final long tlabSize; + + public AllocationSample(long time, int tid, int stackTraceId, int classId, long allocationSize, long tlabSize) { + super(time, tid, stackTraceId); + this.classId = classId; + this.allocationSize = allocationSize; + this.tlabSize = tlabSize; + } + + @Override + public int hashCode() { + return classId * 127 + stackTraceId + (tlabSize == 0 ? 17 : 0); + } + + @Override + public boolean sameGroup(Event o) { + if (o instanceof AllocationSample) { + AllocationSample a = (AllocationSample) o; + return classId == a.classId && (tlabSize == 0) == (a.tlabSize == 0); + } + return false; + } + + @Override + public long value() { + return tlabSize != 0 ? tlabSize : allocationSize; } } + + public static class ContendedLock extends Event { + public final long duration; + public final int classId; + + public ContendedLock(long time, int tid, int stackTraceId, long duration, int classId) { + super(time, tid, stackTraceId); + this.duration = duration; + this.classId = classId; + } + + @Override + public int hashCode() { + return classId * 127 + stackTraceId; + } + + @Override + public boolean sameGroup(Event o) { + if (o instanceof ContendedLock) { + ContendedLock c = (ContendedLock) o; + return classId == c.classId; + } + return false; + } + + @Override + public long value() { + return duration; + } + } + + public static class ExecutionSample extends Event { + public final int threadState; + + public ExecutionSample(long time, int tid, int stackTraceId, int threadState) { + super(time, tid, stackTraceId); + this.threadState = threadState; + } + } + } diff --git a/spark-common/src/main/resources/libasyncProfiler.so b/spark-common/src/main/resources/libasyncProfiler.so Binary files differdeleted file mode 100755 index 4153f52..0000000 --- a/spark-common/src/main/resources/libasyncProfiler.so +++ /dev/null diff --git a/spark-common/src/main/resources/linux/libasyncProfiler.so b/spark-common/src/main/resources/linux/libasyncProfiler.so Binary files differnew file mode 100755 index 0000000..ddee900 --- /dev/null +++ b/spark-common/src/main/resources/linux/libasyncProfiler.so diff --git a/spark-common/src/main/resources/macosx/libasyncProfiler.so b/spark-common/src/main/resources/macosx/libasyncProfiler.so Binary files differnew file mode 100755 index 0000000..75daf6e --- /dev/null +++ b/spark-common/src/main/resources/macosx/libasyncProfiler.so |