diff options
| author | lucko <git@lucko.me> | 2022-10-07 20:26:24 +0100 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-10-07 20:26:24 +0100 | 
| commit | d31f3c7bdf03c874ff9518d47d060adc18322d6b (patch) | |
| tree | 94843af32b019568bda755cdf48441efed9abbc8 /spark-common/src/main/java/me/lucko/spark/common/sampler | |
| parent | c4b1eccd9cd51e348983fab42ced78166f39cb0e (diff) | |
| download | spark-d31f3c7bdf03c874ff9518d47d060adc18322d6b.tar.gz spark-d31f3c7bdf03c874ff9518d47d060adc18322d6b.tar.bz2 spark-d31f3c7bdf03c874ff9518d47d060adc18322d6b.zip | |
Split profiler output into windows (#253)
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
19 files changed, 974 insertions, 374 deletions
| diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java index 6fc5a10..c650738 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java @@ -30,7 +30,10 @@ import me.lucko.spark.common.sampler.node.MergeMode;  import me.lucko.spark.common.sampler.node.ThreadNode;  import me.lucko.spark.common.sampler.source.ClassSourceLookup;  import me.lucko.spark.common.sampler.source.SourceMetadata; +import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; +import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;  import me.lucko.spark.common.tick.TickHook; +import me.lucko.spark.proto.SparkProtos;  import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;  import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; @@ -58,12 +61,12 @@ public abstract class AbstractSampler implements Sampler {      /** The time when sampling first began */      protected long startTime = -1; -    /** The game tick when sampling first began */ -    protected int startTick = -1; -      /** The unix timestamp (in millis) when this sampler should automatically complete. */      protected final long autoEndTime; // -1 for nothing +    /** Collects statistics for each window in the sample */ +    protected final WindowStatisticsCollector windowStatisticsCollector; +      /** A future to encapsulate the completion of this sampler instance */      protected final CompletableFuture<Sampler> future = new CompletableFuture<>(); @@ -75,6 +78,7 @@ public abstract class AbstractSampler implements Sampler {          this.interval = interval;          this.threadDumper = threadDumper;          this.autoEndTime = autoEndTime; +        this.windowStatisticsCollector = new WindowStatisticsCollector(platform);      }      @Override @@ -106,11 +110,11 @@ public abstract class AbstractSampler implements Sampler {      @Override      public void start() {          this.startTime = System.currentTimeMillis(); +    } -        TickHook tickHook = this.platform.getTickHook(); -        if (tickHook != null) { -            this.startTick = tickHook.getCurrentTick(); -        } +    @Override +    public void stop() { +        this.windowStatisticsCollector.stop();      }      protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) { @@ -127,12 +131,9 @@ public abstract class AbstractSampler implements Sampler {              metadata.setComment(comment);          } -        if (this.startTick != -1) { -            TickHook tickHook = this.platform.getTickHook(); -            if (tickHook != null) { -                int numberOfTicks = tickHook.getCurrentTick() - this.startTick; -                metadata.setNumberOfTicks(numberOfTicks); -            } +        int totalTicks = this.windowStatisticsCollector.getTotalTicks(); +        if (totalTicks != -1) { +            metadata.setNumberOfTicks(totalTicks);          }          try { @@ -171,14 +172,23 @@ public abstract class AbstractSampler implements Sampler {          proto.setMetadata(metadata);      } -    protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, Comparator<ThreadNode> outputOrder, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { +    protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {          List<ThreadNode> data = dataAggregator.exportData(); -        data.sort(outputOrder); +        data.sort(Comparator.comparing(ThreadNode::getThreadLabel));          ClassSourceLookup.Visitor classSourceVisitor = ClassSourceLookup.createVisitor(classSourceLookup); +        ProtoTimeEncoder timeEncoder = new ProtoTimeEncoder(data); +        int[] timeWindows = timeEncoder.getKeys(); +        for (int timeWindow : timeWindows) { +            proto.addTimeWindows(timeWindow); +        } + +        this.windowStatisticsCollector.ensureHasStatisticsForAllWindows(timeWindows); +        proto.putAllTimeWindowStatistics(this.windowStatisticsCollector.export()); +          for (ThreadNode entry : data) { -            proto.addThreads(entry.toProto(mergeMode)); +            proto.addThreads(entry.toProto(mergeMode, timeEncoder));              classSourceVisitor.visit(entry);          } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index 98281de..e06cba6 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -23,11 +23,9 @@ package me.lucko.spark.common.sampler;  import me.lucko.spark.common.SparkPlatform;  import me.lucko.spark.common.command.sender.CommandSender;  import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.node.ThreadNode;  import me.lucko.spark.common.sampler.source.ClassSourceLookup;  import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; -import java.util.Comparator;  import java.util.concurrent.CompletableFuture;  /** @@ -67,6 +65,6 @@ public interface Sampler {      CompletableFuture<Sampler> getFuture();      // Methods used to export the sampler data to the web viewer. -    SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup); +    SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup);  } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java index 3de3943..402330a 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java @@ -47,10 +47,10 @@ public class AsyncDataAggregator extends AbstractDataAggregator {                  .build();      } -    public void insertData(ProfileSegment element) { +    public void insertData(ProfileSegment element, int window) {          try {              ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName())); -            node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime()); +            node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime(), window);          } catch (Exception e) {              e.printStackTrace();          } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java index abde21d..1480650 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java @@ -87,11 +87,11 @@ public class AsyncProfilerAccess {          this.setupException = setupException;      } -    public AsyncProfiler getProfiler() { +    public AsyncProfilerJob startNewProfilerJob() {          if (this.profiler == null) {              throw new UnsupportedOperationException("async-profiler not supported", this.setupException);          } -        return this.profiler; +        return AsyncProfilerJob.createNew(this, this.profiler);      }      public ProfilingEvent getProfilingEvent() { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java new file mode 100644 index 0000000..7b123a7 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java @@ -0,0 +1,264 @@ +/* + * This file is part of spark. + * + *  Copyright (c) lucko (Luck) <luck@lucko.me> + *  Copyright (c) contributors + * + *  This program is free software: you can redistribute it and/or modify + *  it under the terms of the GNU General Public License as published by + *  the Free Software Foundation, either version 3 of the License, or + *  (at your option) any later version. + * + *  This program is distributed in the hope that it will be useful, + *  but WITHOUT ANY WARRANTY; without even the implied warranty of + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + *  GNU General Public License for more details. + * + *  You should have received a copy of the GNU General Public License + *  along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.async; + +import me.lucko.spark.common.SparkPlatform; +import me.lucko.spark.common.sampler.ThreadDumper; +import me.lucko.spark.common.sampler.async.jfr.JfrReader; + +import one.profiler.AsyncProfiler; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; + +/** + * Represents a profiling job within async-profiler. + * + * <p>Only one job can be running at a time. This is guarded by + * {@link #createNew(AsyncProfilerAccess, AsyncProfiler)}.</p> + */ +public class AsyncProfilerJob { + +    /** +     * The currently active job. +     */ +    private static final AtomicReference<AsyncProfilerJob> ACTIVE = new AtomicReference<>(); + +    /** +     * Creates a new {@link AsyncProfilerJob}. +     * +     * <p>Will throw an {@link IllegalStateException} if another job is already active.</p> +     * +     * @param access the profiler access object +     * @param profiler the profiler +     * @return the job +     */ +    static AsyncProfilerJob createNew(AsyncProfilerAccess access, AsyncProfiler profiler) { +        synchronized (ACTIVE) { +            AsyncProfilerJob existing = ACTIVE.get(); +            if (existing != null) { +                throw new IllegalStateException("Another profiler is already active: " + existing); +            } + +            AsyncProfilerJob job = new AsyncProfilerJob(access, profiler); +            ACTIVE.set(job); +            return job; +        } +    } + +    /** The async-profiler access object */ +    private final AsyncProfilerAccess access; +    /** The async-profiler instance */ +    private final AsyncProfiler profiler; + +    // Set on init +    /** The platform */ +    private SparkPlatform platform; +    /** The sampling interval in microseconds */ +    private int interval; +    /** The thread dumper */ +    private ThreadDumper threadDumper; +    /** The profiling window */ +    private int window; + +    /** The file used by async-profiler to output data */ +    private Path outputFile; + +    private AsyncProfilerJob(AsyncProfilerAccess access, AsyncProfiler profiler) { +        this.access = access; +        this.profiler = profiler; +    } + +    /** +     * Executes an async-profiler command. +     * +     * @param command the command +     * @return the output +     */ +    private String execute(String command) { +        try { +            return this.profiler.execute(command); +        } catch (IOException e) { +            throw new RuntimeException("Exception whilst executing profiler command", e); +        } +    } + +    /** +     * Checks to ensure that this job is still active. +     */ +    private void checkActive() { +        if (ACTIVE.get() != this) { +            throw new IllegalStateException("Profiler job no longer active!"); +        } +    } + +    // Initialise the job +    public void init(SparkPlatform platform, int interval, ThreadDumper threadDumper, int window) { +        this.platform = platform; +        this.interval = interval; +        this.threadDumper = threadDumper; +        this.window = window; +    } + +    /** +     * Starts the job. +     */ +    public void start() { +        checkActive(); + +        try { +            // create a new temporary output file +            try { +                this.outputFile = this.platform.getTemporaryFiles().create("spark-", "-profile-data.jfr.tmp"); +            } catch (IOException e) { +                throw new RuntimeException("Unable to create temporary output file", e); +            } + +            // construct a command to send to async-profiler +            String command = "start,event=" + this.access.getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); +            if (this.threadDumper instanceof ThreadDumper.Specific) { +                command += ",filter"; +            } + +            // start the profiler +            String resp = execute(command).trim(); + +            if (!resp.equalsIgnoreCase("profiling started")) { +                throw new RuntimeException("Unexpected response: " + resp); +            } + +            // append threads to be profiled, if necessary +            if (this.threadDumper instanceof ThreadDumper.Specific) { +                ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; +                for (Thread thread : threadDumper.getThreads()) { +                    this.profiler.addThread(thread); +                } +            } + +        } catch (Exception e) { +            try { +                this.profiler.stop(); +            } catch (Exception e2) { +                // ignore +            } +            close(); + +            throw e; +        } +    } + +    /** +     * Stops the job. +     */ +    public void stop() { +        checkActive(); + +        try { +            this.profiler.stop(); +        } catch (IllegalStateException e) { +            if (!e.getMessage().equals("Profiler is not active")) { // ignore +                throw e; +            } +        } finally { +            close(); +        } +    } + +    /** +     * Aggregates the collected data. +     */ +    public void aggregate(AsyncDataAggregator dataAggregator) { + +        Predicate<String> threadFilter; +        if (this.threadDumper instanceof ThreadDumper.Specific) { +            ThreadDumper.Specific specificDumper = (ThreadDumper.Specific) this.threadDumper; +            threadFilter = n -> specificDumper.getThreadNames().contains(n.toLowerCase()); +        } else { +            threadFilter = n -> true; +        } + +        // read the jfr file produced by async-profiler +        try (JfrReader reader = new JfrReader(this.outputFile)) { +            readSegments(reader, threadFilter, dataAggregator, this.window); +        } catch (Exception e) { +            boolean fileExists; +            try { +                fileExists = Files.exists(this.outputFile) && Files.size(this.outputFile) != 0; +            } catch (IOException ex) { +                fileExists = false; +            } + +            if (fileExists) { +                throw new JfrParsingException("Error parsing JFR data from profiler output", e); +            } else { +                throw new JfrParsingException("Error parsing JFR data from profiler output - file " + this.outputFile + " does not exist!", e); +            } +        } + +        // delete the output file after reading +        try { +            Files.deleteIfExists(this.outputFile); +        } catch (IOException e) { +            // ignore +        } + +    } + +    private void readSegments(JfrReader reader, Predicate<String> threadFilter, AsyncDataAggregator dataAggregator, int window) throws IOException { +        List<JfrReader.ExecutionSample> samples = reader.readAllEvents(JfrReader.ExecutionSample.class); +        for (int i = 0; i < samples.size(); i++) { +            JfrReader.ExecutionSample sample = samples.get(i); + +            long duration; +            if (i == 0) { +                // we don't really know the duration of the first sample, so just use the sampling +                // interval +                duration = this.interval; +            } else { +                // calculate the duration of the sample by calculating the time elapsed since the +                // previous sample +                duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time); +            } + +            String threadName = reader.threads.get(sample.tid); +            if (!threadFilter.test(threadName)) { +                continue; +            } + +            // parse the segment and give it to the data aggregator +            ProfileSegment segment = ProfileSegment.parseSegment(reader, sample, threadName, duration); +            dataAggregator.insertData(segment, window); +        } +    } + +    public int getWindow() { +        return this.window; +    } + +    private void close() { +        ACTIVE.compareAndSet(this, null); +    } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 7d9cb81..2c9bb5f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -27,61 +27,41 @@ import me.lucko.spark.common.command.sender.CommandSender;  import me.lucko.spark.common.sampler.AbstractSampler;  import me.lucko.spark.common.sampler.ThreadDumper;  import me.lucko.spark.common.sampler.ThreadGrouper; -import me.lucko.spark.common.sampler.async.jfr.JfrReader;  import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.node.ThreadNode;  import me.lucko.spark.common.sampler.source.ClassSourceLookup; +import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; +import me.lucko.spark.common.tick.TickHook;  import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; -import one.profiler.AsyncProfiler; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Comparator; -import java.util.List;  import java.util.concurrent.Executors;  import java.util.concurrent.ScheduledExecutorService;  import java.util.concurrent.TimeUnit; -import java.util.function.Predicate;  /**   * A sampler implementation using async-profiler.   */  public class AsyncSampler extends AbstractSampler { -    private final AsyncProfiler profiler; +    private final AsyncProfilerAccess profilerAccess;      /** Responsible for aggregating and then outputting collected sampling data */      private final AsyncDataAggregator dataAggregator; -    /** Flag to mark if the output has been completed */ -    private boolean outputComplete = false; +    /** Mutex for the current profiler job */ +    private final Object[] currentJobMutex = new Object[0]; -    /** The temporary output file */ -    private Path outputFile; +    /** Current profiler job */ +    private AsyncProfilerJob currentJob; -    /** The executor used for timeouts */ -    private ScheduledExecutorService timeoutExecutor; +    /** The executor used for scheduling and management */ +    private ScheduledExecutorService scheduler;      public AsyncSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) {          super(platform, interval, threadDumper, endTime); -        this.profiler = AsyncProfilerAccess.getInstance(platform).getProfiler(); +        this.profilerAccess = AsyncProfilerAccess.getInstance(platform);          this.dataAggregator = new AsyncDataAggregator(threadGrouper); -    } - -    /** -     * Executes a profiler command. -     * -     * @param command the command to execute -     * @return the response -     */ -    private String execute(String command) { -        try { -            return this.profiler.execute(command); -        } catch (IOException e) { -            throw new RuntimeException("Exception whilst executing profiler command", e); -        } +        this.scheduler = Executors.newSingleThreadScheduledExecutor( +                new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-worker-thread").build() +        );      }      /** @@ -91,33 +71,58 @@ public class AsyncSampler extends AbstractSampler {      public void start() {          super.start(); -        try { -            this.outputFile = this.platform.getTemporaryFiles().create("spark-", "-profile-data.jfr.tmp"); -        } catch (IOException e) { -            throw new RuntimeException("Unable to create temporary output file", e); +        TickHook tickHook = this.platform.getTickHook(); +        if (tickHook != null) { +            this.windowStatisticsCollector.startCountingTicks(tickHook);          } -        String command = "start,event=" + AsyncProfilerAccess.getInstance(this.platform).getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); -        if (this.threadDumper instanceof ThreadDumper.Specific) { -            command += ",filter"; -        } +        int window = ProfilingWindowUtils.unixMillisToWindow(System.currentTimeMillis()); -        String resp = execute(command).trim(); -        if (!resp.equalsIgnoreCase("profiling started")) { -            throw new RuntimeException("Unexpected response: " + resp); -        } +        AsyncProfilerJob job = this.profilerAccess.startNewProfilerJob(); +        job.init(this.platform, this.interval, this.threadDumper, window); +        job.start(); +        this.currentJob = job; -        if (this.threadDumper instanceof ThreadDumper.Specific) { -            ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; -            for (Thread thread : threadDumper.getThreads()) { -                this.profiler.addThread(thread); -            } -        } +        // rotate the sampler job every minute to put data into a new window +        this.scheduler.scheduleAtFixedRate(this::rotateProfilerJob, 1, 1, TimeUnit.MINUTES);          recordInitialGcStats();          scheduleTimeout();      } +    private void rotateProfilerJob() { +        try { +            synchronized (this.currentJobMutex) { +                AsyncProfilerJob previousJob = this.currentJob; +                if (previousJob == null) { +                    return; +                } + +                try { +                    // stop the previous job +                    previousJob.stop(); + +                    // collect statistics for the window +                    this.windowStatisticsCollector.measureNow(previousJob.getWindow()); +                } catch (Exception e) { +                    e.printStackTrace(); +                } + +                // start a new job +                int window = previousJob.getWindow() + 1; +                AsyncProfilerJob newJob = this.profilerAccess.startNewProfilerJob(); +                newJob.init(this.platform, this.interval, this.threadDumper, window); +                newJob.start(); +                this.currentJob = newJob; + +                // aggregate the output of the previous job +                previousJob.aggregate(this.dataAggregator); +            } +        } catch (Throwable e) { +            e.printStackTrace(); +        } +    } +      private void scheduleTimeout() {          if (this.autoEndTime == -1) {              return; @@ -128,11 +133,7 @@ public class AsyncSampler extends AbstractSampler {              return;          } -        this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor( -                new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-timeout-thread").build() -        ); - -        this.timeoutExecutor.schedule(() -> { +        this.scheduler.schedule(() -> {              stop();              this.future.complete(this);          }, delay, TimeUnit.MILLISECONDS); @@ -143,145 +144,27 @@ public class AsyncSampler extends AbstractSampler {       */      @Override      public void stop() { -        try { -            this.profiler.stop(); -        } catch (IllegalStateException e) { -            if (!e.getMessage().equals("Profiler is not active")) { // ignore -                throw e; -            } -        } +        super.stop(); +        synchronized (this.currentJobMutex) { +            this.currentJob.stop(); +            this.windowStatisticsCollector.measureNow(this.currentJob.getWindow()); +            this.currentJob.aggregate(this.dataAggregator); +            this.currentJob = null; +        } -        if (this.timeoutExecutor != null) { -            this.timeoutExecutor.shutdown(); -            this.timeoutExecutor = null; +        if (this.scheduler != null) { +            this.scheduler.shutdown(); +            this.scheduler = null;          }      }      @Override -    public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { +    public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {          SamplerData.Builder proto = SamplerData.newBuilder();          writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator); -        aggregateOutput(); -        writeDataToProto(proto, this.dataAggregator, outputOrder, mergeMode, classSourceLookup); +        writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup);          return proto.build();      } -    private void aggregateOutput() { -        if (this.outputComplete) { -            return; -        } -        this.outputComplete = true; - -        Predicate<String> threadFilter; -        if (this.threadDumper instanceof ThreadDumper.Specific) { -            ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; -            threadFilter = n -> threadDumper.getThreadNames().contains(n.toLowerCase()); -        } else { -            threadFilter = n -> true; -        } - -        // read the jfr file produced by async-profiler -        try (JfrReader reader = new JfrReader(this.outputFile)) { -            readSegments(reader, threadFilter); -        } catch (Exception e) { -            boolean fileExists; -            try { -                fileExists = Files.exists(this.outputFile) && Files.size(this.outputFile) != 0; -            } catch (IOException ex) { -                fileExists = false; -            } - -            if (fileExists) { -                throw new JfrParsingException("Error parsing JFR data from profiler output", e); -            } else { -                throw new JfrParsingException("Error parsing JFR data from profiler output - file " + this.outputFile + " does not exist!", e); -            } -        } - -        // delete the output file after reading -        try { -            Files.deleteIfExists(this.outputFile); -        } catch (IOException e) { -            // ignore -        } -    } - -    private void readSegments(JfrReader reader, Predicate<String> threadFilter) throws IOException { -        List<JfrReader.ExecutionSample> samples = reader.readAllEvents(JfrReader.ExecutionSample.class); -        for (int i = 0; i < samples.size(); i++) { -            JfrReader.ExecutionSample sample = samples.get(i); - -            long duration; -            if (i == 0) { -                // we don't really know the duration of the first sample, so just use the sampling -                // interval -                duration = this.interval; -            } else { -                // calculate the duration of the sample by calculating the time elapsed since the -                // previous sample -                duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time); -            } - -            String threadName = reader.threads.get(sample.tid); -            if (!threadFilter.test(threadName)) { -                continue; -            } - -            // parse the segment and give it to the data aggregator -            ProfileSegment segment = parseSegment(reader, sample, threadName, duration); -            this.dataAggregator.insertData(segment); -        } -    } - -    private static ProfileSegment parseSegment(JfrReader reader, JfrReader.ExecutionSample sample, String threadName, long duration) { -        JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId); -        int len = stackTrace.methods.length; - -        AsyncStackTraceElement[] stack = new AsyncStackTraceElement[len]; -        for (int i = 0; i < len; i++) { -            stack[i] = parseStackFrame(reader, stackTrace.methods[i]); -        } - -        return new ProfileSegment(sample.tid, threadName, stack, duration); -    } - -    private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) { -        AsyncStackTraceElement result = reader.stackFrames.get(methodId); -        if (result != null) { -            return result; -        } - -        JfrReader.MethodRef methodRef = reader.methods.get(methodId); -        JfrReader.ClassRef classRef = reader.classes.get(methodRef.cls); - -        byte[] className = reader.symbols.get(classRef.name); -        byte[] methodName = reader.symbols.get(methodRef.name); - -        if (className == null || className.length == 0) { -            // native call -            result = new AsyncStackTraceElement( -                    AsyncStackTraceElement.NATIVE_CALL, -                    new String(methodName, StandardCharsets.UTF_8), -                    null -            ); -        } else { -            // java method -            byte[] methodDesc = reader.symbols.get(methodRef.sig); -            result = new AsyncStackTraceElement( -                    new String(className, StandardCharsets.UTF_8).replace('/', '.'), -                    new String(methodName, StandardCharsets.UTF_8), -                    new String(methodDesc, StandardCharsets.UTF_8) -            ); -        } - -        reader.stackFrames.put(methodId, result); -        return result; -    } - -    private static final class JfrParsingException extends RuntimeException { -        public JfrParsingException(String message, Throwable cause) { -            super(message, cause); -        } -    }  } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/JfrParsingException.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/JfrParsingException.java new file mode 100644 index 0000000..6dab359 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/JfrParsingException.java @@ -0,0 +1,27 @@ +/* + * This file is part of spark. + * + *  Copyright (c) lucko (Luck) <luck@lucko.me> + *  Copyright (c) contributors + * + *  This program is free software: you can redistribute it and/or modify + *  it under the terms of the GNU General Public License as published by + *  the Free Software Foundation, either version 3 of the License, or + *  (at your option) any later version. + * + *  This program is distributed in the hope that it will be useful, + *  but WITHOUT ANY WARRANTY; without even the implied warranty of + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + *  GNU General Public License for more details. + * + *  You should have received a copy of the GNU General Public License + *  along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.async; + +public class JfrParsingException extends RuntimeException { +    public JfrParsingException(String message, Throwable cause) { +        super(message, cause); +    } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java index 154e6fe..26debaf 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java @@ -20,6 +20,10 @@  package me.lucko.spark.common.sampler.async; +import me.lucko.spark.common.sampler.async.jfr.JfrReader; + +import java.nio.charset.StandardCharsets; +  /**   * Represents a profile "segment".   * @@ -58,4 +62,50 @@ public class ProfileSegment {      public long getTime() {          return this.time;      } + +    public static ProfileSegment parseSegment(JfrReader reader, JfrReader.ExecutionSample sample, String threadName, long duration) { +        JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId); +        int len = stackTrace.methods.length; + +        AsyncStackTraceElement[] stack = new AsyncStackTraceElement[len]; +        for (int i = 0; i < len; i++) { +            stack[i] = parseStackFrame(reader, stackTrace.methods[i]); +        } + +        return new ProfileSegment(sample.tid, threadName, stack, duration); +    } + +    private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) { +        AsyncStackTraceElement result = reader.stackFrames.get(methodId); +        if (result != null) { +            return result; +        } + +        JfrReader.MethodRef methodRef = reader.methods.get(methodId); +        JfrReader.ClassRef classRef = reader.classes.get(methodRef.cls); + +        byte[] className = reader.symbols.get(classRef.name); +        byte[] methodName = reader.symbols.get(methodRef.name); + +        if (className == null || className.length == 0) { +            // native call +            result = new AsyncStackTraceElement( +                    AsyncStackTraceElement.NATIVE_CALL, +                    new String(methodName, StandardCharsets.UTF_8), +                    null +            ); +        } else { +            // java method +            byte[] methodDesc = reader.symbols.get(methodRef.sig); +            result = new AsyncStackTraceElement( +                    new String(className, StandardCharsets.UTF_8).replace('/', '.'), +                    new String(methodName, StandardCharsets.UTF_8), +                    new String(methodDesc, StandardCharsets.UTF_8) +            ); +        } + +        reader.stackFrames.put(methodId, result); +        return result; +    } +  } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java index 23223a2..60f6543 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java @@ -37,6 +37,10 @@ public class Dictionary<T> {          size = 0;      } +    public int size() { +        return this.size; +    } +      public void put(long key, T value) {          if (key == 0) {              throw new IllegalArgumentException("Zero key not allowed"); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java index cc530d6..c51ec05 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java @@ -66,10 +66,11 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator {       * Inserts sampling data into this aggregator       *       * @param threadInfo the thread info +     * @param window the window       */ -    public abstract void insertData(ThreadInfo threadInfo); +    public abstract void insertData(ThreadInfo threadInfo, int window); -    protected void writeData(ThreadInfo threadInfo) { +    protected void writeData(ThreadInfo threadInfo, int window) {          if (this.ignoreSleeping && isSleeping(threadInfo)) {              return;          } @@ -79,7 +80,7 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator {          try {              ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName())); -            node.log(STACK_TRACE_DESCRIBER, threadInfo.getStackTrace(), this.interval); +            node.log(STACK_TRACE_DESCRIBER, threadInfo.getStackTrace(), this.interval, window);          } catch (Exception e) {              e.printStackTrace();          } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java index 0f73a9f..8c96fd3 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java @@ -28,15 +28,17 @@ import me.lucko.spark.common.sampler.AbstractSampler;  import me.lucko.spark.common.sampler.ThreadDumper;  import me.lucko.spark.common.sampler.ThreadGrouper;  import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.node.ThreadNode;  import me.lucko.spark.common.sampler.source.ClassSourceLookup; +import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; +import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;  import me.lucko.spark.common.tick.TickHook;  import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; +import org.checkerframework.checker.units.qual.A; +  import java.lang.management.ManagementFactory;  import java.lang.management.ThreadInfo;  import java.lang.management.ThreadMXBean; -import java.util.Comparator;  import java.util.concurrent.Executors;  import java.util.concurrent.ScheduledExecutorService;  import java.util.concurrent.ScheduledFuture; @@ -62,6 +64,9 @@ public class JavaSampler extends AbstractSampler implements Runnable {      /** Responsible for aggregating and then outputting collected sampling data */      private final JavaDataAggregator dataAggregator; + +    /** The last window that was profiled */ +    private final AtomicInteger lastWindow = new AtomicInteger();      public JavaSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative) {          super(platform, interval, threadDumper, endTime); @@ -76,12 +81,28 @@ public class JavaSampler extends AbstractSampler implements Runnable {      @Override      public void start() {          super.start(); + +        TickHook tickHook = this.platform.getTickHook(); +        if (tickHook != null) { +            if (this.dataAggregator instanceof TickedDataAggregator) { +                WindowStatisticsCollector.ExplicitTickCounter counter = this.windowStatisticsCollector.startCountingTicksExplicit(tickHook); +                ((TickedDataAggregator) this.dataAggregator).setTickCounter(counter); +            } else { +                this.windowStatisticsCollector.startCountingTicks(tickHook); +            } +        } +          this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS);      }      @Override      public void stop() { +        super.stop(); +          this.task.cancel(false); + +        // collect statistics for the final window +        this.windowStatisticsCollector.measureNow(this.lastWindow.get());      }      @Override @@ -89,27 +110,30 @@ public class JavaSampler extends AbstractSampler implements Runnable {          // this is effectively synchronized, the worker pool will not allow this task          // to concurrently execute.          try { -            if (this.autoEndTime != -1 && this.autoEndTime <= System.currentTimeMillis()) { -                this.future.complete(this); +            long time = System.currentTimeMillis(); + +            if (this.autoEndTime != -1 && this.autoEndTime <= time) {                  stop(); +                this.future.complete(this);                  return;              } +            int window = ProfilingWindowUtils.unixMillisToWindow(time);              ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean); -            this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps)); +            this.workerPool.execute(new InsertDataTask(threadDumps, window));          } catch (Throwable t) { -            this.future.completeExceptionally(t);              stop(); +            this.future.completeExceptionally(t);          }      } -    private static final class InsertDataTask implements Runnable { -        private final JavaDataAggregator dataAggregator; +    private final class InsertDataTask implements Runnable {          private final ThreadInfo[] threadDumps; +        private final int window; -        InsertDataTask(JavaDataAggregator dataAggregator, ThreadInfo[] threadDumps) { -            this.dataAggregator = dataAggregator; +        InsertDataTask(ThreadInfo[] threadDumps, int window) {              this.threadDumps = threadDumps; +            this.window = window;          }          @Override @@ -118,16 +142,22 @@ public class JavaSampler extends AbstractSampler implements Runnable {                  if (threadInfo.getThreadName() == null || threadInfo.getStackTrace() == null) {                      continue;                  } -                this.dataAggregator.insertData(threadInfo); +                JavaSampler.this.dataAggregator.insertData(threadInfo, this.window); +            } + +            // if we have just stepped over into a new window, collect statistics for the previous window +            int previousWindow = JavaSampler.this.lastWindow.getAndSet(this.window); +            if (previousWindow != 0 && previousWindow != this.window) { +                JavaSampler.this.windowStatisticsCollector.measureNow(previousWindow);              }          }      }      @Override -    public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { +    public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {          SamplerData.Builder proto = SamplerData.newBuilder();          writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator); -        writeDataToProto(proto, this.dataAggregator, outputOrder, mergeMode, classSourceLookup); +        writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup);          return proto.build();      } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java index 39e21aa..54173fe 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java @@ -44,8 +44,8 @@ public class SimpleDataAggregator extends JavaDataAggregator {      }      @Override -    public void insertData(ThreadInfo threadInfo) { -        writeData(threadInfo); +    public void insertData(ThreadInfo threadInfo, int window) { +        writeData(threadInfo, window);      }  } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java index e062f31..d537b96 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java @@ -23,6 +23,7 @@ package me.lucko.spark.common.sampler.java;  import me.lucko.spark.common.sampler.ThreadGrouper;  import me.lucko.spark.common.sampler.aggregator.DataAggregator;  import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;  import me.lucko.spark.common.tick.TickHook;  import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; @@ -31,7 +32,6 @@ import java.util.ArrayList;  import java.util.List;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger;  /**   * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks" @@ -48,14 +48,15 @@ public class TickedDataAggregator extends JavaDataAggregator {      /** The expected number of samples in each tick */      private final int expectedSize; -    /** The number of ticks aggregated so far */ -    private final AtomicInteger numberOfTicks = new AtomicInteger(); - -    private final Object mutex = new Object(); +    /** Counts the number of ticks aggregated */ +    private WindowStatisticsCollector.ExplicitTickCounter tickCounter;      // state      private int currentTick = -1; -    private TickList currentData = new TickList(0); +    private TickList currentData = null; + +    // guards currentData +    private final Object mutex = new Object();      public TickedDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) {          super(workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative); @@ -66,29 +67,34 @@ public class TickedDataAggregator extends JavaDataAggregator {          this.expectedSize = (int) ((50 / intervalMilliseconds) + 10);      } +    public void setTickCounter(WindowStatisticsCollector.ExplicitTickCounter tickCounter) { +        this.tickCounter = tickCounter; +    } +      @Override      public SamplerMetadata.DataAggregator getMetadata() {          // push the current tick (so numberOfTicks is accurate)          synchronized (this.mutex) {              pushCurrentTick(); +            this.currentData = null;          }          return SamplerMetadata.DataAggregator.newBuilder()                  .setType(SamplerMetadata.DataAggregator.Type.TICKED)                  .setThreadGrouper(this.threadGrouper.asProto())                  .setTickLengthThreshold(this.tickLengthThreshold) -                .setNumberOfIncludedTicks(this.numberOfTicks.get()) +                .setNumberOfIncludedTicks(this.tickCounter.getTotalCountedTicks())                  .build();      }      @Override -    public void insertData(ThreadInfo threadInfo) { +    public void insertData(ThreadInfo threadInfo, int window) {          synchronized (this.mutex) {              int tick = this.tickHook.getCurrentTick(); -            if (this.currentTick != tick) { +            if (this.currentTick != tick || this.currentData == null) {                  pushCurrentTick();                  this.currentTick = tick; -                this.currentData = new TickList(this.expectedSize); +                this.currentData = new TickList(this.expectedSize, window);              }              this.currentData.addData(threadInfo); @@ -98,6 +104,9 @@ public class TickedDataAggregator extends JavaDataAggregator {      // guarded by 'mutex'      private void pushCurrentTick() {          TickList currentData = this.currentData; +        if (currentData == null) { +            return; +        }          // approximate how long the tick lasted          int tickLengthMicros = currentData.getList().size() * this.interval; @@ -107,8 +116,8 @@ public class TickedDataAggregator extends JavaDataAggregator {              return;          } -        this.numberOfTicks.incrementAndGet();          this.workerPool.submit(currentData); +        this.tickCounter.increment();      }      @Override @@ -121,21 +130,19 @@ public class TickedDataAggregator extends JavaDataAggregator {          return super.exportData();      } -    public int getNumberOfTicks() { -        return this.numberOfTicks.get(); -    } -      private final class TickList implements Runnable {          private final List<ThreadInfo> list; +        private final int window; -        TickList(int expectedSize) { +        TickList(int expectedSize, int window) {              this.list = new ArrayList<>(expectedSize); +            this.window = window;          }          @Override          public void run() {              for (ThreadInfo data : this.list) { -                writeData(data); +                writeData(data, this.window);              }          } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java index fd2be8d..fe1afcd 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java @@ -20,6 +20,9 @@  package me.lucko.spark.common.sampler.node; +import me.lucko.spark.common.sampler.async.jfr.Dictionary; +import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; +  import java.util.ArrayList;  import java.util.Collection;  import java.util.Collections; @@ -27,62 +30,63 @@ import java.util.List;  import java.util.Map;  import java.util.concurrent.ConcurrentHashMap;  import java.util.concurrent.atomic.LongAdder; +import java.util.stream.IntStream;  /**   * Encapsulates a timed node in the sampling stack.   */  public abstract class AbstractNode { -    private static final int MAX_STACK_DEPTH = 300; +    protected static final int MAX_STACK_DEPTH = 300;      /** A map of the nodes children */      private final Map<StackTraceNode.Description, StackTraceNode> children = new ConcurrentHashMap<>();      /** The accumulated sample time for this node, measured in microseconds */ -    private final LongAdder totalTime = new LongAdder(); +    // long key = the window (effectively System.currentTimeMillis() / 60_000) +    // LongAdder value = accumulated time in microseconds +    private final Dictionary<LongAdder> times = new Dictionary<>();      /** -     * Gets the total sample time logged for this node in milliseconds. +     * Gets the time accumulator for a given window       * -     * @return the total time +     * @param window the window +     * @return the accumulator       */ -    public double getTotalTime() { -        return this.totalTime.longValue() / 1000d; +    protected LongAdder getTimeAccumulator(int window) { +        LongAdder adder = this.times.get(window); +        if (adder == null) { +            adder = new LongAdder(); +            this.times.put(window, adder); +        } +        return adder;      } -    public Collection<StackTraceNode> getChildren() { -        return this.children.values(); +    /** +     * Gets the time windows that have been logged for this node. +     * +     * @return the time windows +     */ +    public IntStream getTimeWindows() { +        IntStream.Builder keys = IntStream.builder(); +        this.times.forEach((key, value) -> keys.add((int) key)); +        return keys.build();      }      /** -     * Logs the given stack trace against this node and its children. +     * Gets the encoded total sample times logged for this node in milliseconds.       * -     * @param describer the function that describes the elements of the stack -     * @param stack the stack -     * @param time the total time to log -     * @param <T> the stack trace element type +     * @return the total times       */ -    public <T> void log(StackTraceNode.Describer<T> describer, T[] stack, long time) { -        if (stack.length == 0) { -            return; -        } - -        this.totalTime.add(time); - -        AbstractNode node = this; -        T previousElement = null; - -        for (int offset = 0; offset < Math.min(MAX_STACK_DEPTH, stack.length); offset++) { -            T element = stack[(stack.length - 1) - offset]; - -            node = node.resolveChild(describer.describe(element, previousElement)); -            node.totalTime.add(time); +    protected double[] encodeTimesForProto(ProtoTimeEncoder encoder) { +        return encoder.encode(this.times); +    } -            previousElement = element; -        } +    public Collection<StackTraceNode> getChildren() { +        return this.children.values();      } -    private StackTraceNode resolveChild(StackTraceNode.Description description) { +    protected StackTraceNode resolveChild(StackTraceNode.Description description) {          StackTraceNode result = this.children.get(description); // fast path          if (result != null) {              return result; @@ -96,7 +100,7 @@ public abstract class AbstractNode {       * @param other the other node       */      protected void merge(AbstractNode other) { -        this.totalTime.add(other.totalTime.longValue()); +        other.times.forEach((key, value) -> getTimeAccumulator((int) key).add(value.longValue()));          for (Map.Entry<StackTraceNode.Description, StackTraceNode> child : other.children.entrySet()) {              resolveChild(child.getKey()).merge(child.getValue());          } @@ -123,7 +127,7 @@ public abstract class AbstractNode {              list.add(child);          } -        list.sort(null); +        //list.sort(null);          return list;      } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java index b0d9237..ed938d5 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java @@ -20,6 +20,7 @@  package me.lucko.spark.common.sampler.node; +import me.lucko.spark.common.sampler.window.ProtoTimeEncoder;  import me.lucko.spark.common.util.MethodDisambiguator;  import me.lucko.spark.proto.SparkSamplerProtos; @@ -30,7 +31,7 @@ import java.util.Objects;  /**   * Represents a stack trace element within the {@link AbstractNode node} structure.   */ -public final class StackTraceNode extends AbstractNode implements Comparable<StackTraceNode> { +public final class StackTraceNode extends AbstractNode {      /**       * Magic number to denote "no present" line number for a node. @@ -64,12 +65,16 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta          return this.description.parentLineNumber;      } -    public SparkSamplerProtos.StackTraceNode toProto(MergeMode mergeMode) { +    public SparkSamplerProtos.StackTraceNode toProto(MergeMode mergeMode, ProtoTimeEncoder timeEncoder) {          SparkSamplerProtos.StackTraceNode.Builder proto = SparkSamplerProtos.StackTraceNode.newBuilder() -                .setTime(getTotalTime())                  .setClassName(this.description.className)                  .setMethodName(this.description.methodName); +        double[] times = encodeTimesForProto(timeEncoder); +        for (double time : times) { +            proto.addTimes(time); +        } +          if (this.description.lineNumber >= 0) {              proto.setLineNumber(this.description.lineNumber);          } @@ -87,26 +92,12 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta          }          for (StackTraceNode child : exportChildren(mergeMode)) { -            proto.addChildren(child.toProto(mergeMode)); +            proto.addChildren(child.toProto(mergeMode, timeEncoder));          }          return proto.build();      } -    @Override -    public int compareTo(StackTraceNode that) { -        if (this == that) { -            return 0; -        } - -        int i = -Double.compare(this.getTotalTime(), that.getTotalTime()); -        if (i != 0) { -            return i; -        } - -        return this.description.compareTo(that.description); -    } -      /**       * Function to construct a {@link StackTraceNode.Description} from a stack trace element       * of type {@code T}. @@ -129,7 +120,7 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta      /**       * Encapsulates the attributes of a {@link StackTraceNode}.       */ -    public static final class Description implements Comparable<Description> { +    public static final class Description {          private final String className;          private final String methodName; @@ -162,54 +153,6 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta              this.hash = Objects.hash(this.className, this.methodName, this.methodDescription);          } -        private static <T extends Comparable<T>> int nullCompare(T a, T b) { -            if (a == null && b == null) { -                return 0; -            } else if (a == null) { -                return -1; -            } else if (b == null) { -                return 1; -            } else { -                return a.compareTo(b); -            } -        } - -        @Override -        public int compareTo(Description that) { -            if (this == that) { -                return 0; -            } - -            int i = this.className.compareTo(that.className); -            if (i != 0) { -                return i; -            } - -            i = this.methodName.compareTo(that.methodName); -            if (i != 0) { -                return i; -            } - -            i = nullCompare(this.methodDescription, that.methodDescription); -            if (i != 0) { -                return i; -            } - -            if (this.methodDescription != null && that.methodDescription != null) { -                i = this.methodDescription.compareTo(that.methodDescription); -                if (i != 0) { -                    return i; -                } -            } - -            i = Integer.compare(this.lineNumber, that.lineNumber); -            if (i != 0) { -                return i; -            } - -            return Integer.compare(this.parentLineNumber, that.parentLineNumber); -        } -          @Override          public boolean equals(Object o) {              if (this == o) return true; diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java index ed97443..1dce523 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java @@ -20,6 +20,7 @@  package me.lucko.spark.common.sampler.node; +import me.lucko.spark.common.sampler.window.ProtoTimeEncoder;  import me.lucko.spark.proto.SparkSamplerProtos;  /** @@ -53,13 +54,46 @@ public final class ThreadNode extends AbstractNode {          this.label = label;      } -    public SparkSamplerProtos.ThreadNode toProto(MergeMode mergeMode) { +    /** +     * Logs the given stack trace against this node and its children. +     * +     * @param describer the function that describes the elements of the stack +     * @param stack the stack +     * @param time the total time to log +     * @param window the window +     * @param <T> the stack trace element type +     */ +    public <T> void log(StackTraceNode.Describer<T> describer, T[] stack, long time, int window) { +        if (stack.length == 0) { +            return; +        } + +        getTimeAccumulator(window).add(time); + +        AbstractNode node = this; +        T previousElement = null; + +        for (int offset = 0; offset < Math.min(MAX_STACK_DEPTH, stack.length); offset++) { +            T element = stack[(stack.length - 1) - offset]; + +            node = node.resolveChild(describer.describe(element, previousElement)); +            node.getTimeAccumulator(window).add(time); + +            previousElement = element; +        } +    } + +    public SparkSamplerProtos.ThreadNode toProto(MergeMode mergeMode, ProtoTimeEncoder timeEncoder) {          SparkSamplerProtos.ThreadNode.Builder proto = SparkSamplerProtos.ThreadNode.newBuilder() -                .setName(getThreadLabel()) -                .setTime(getTotalTime()); +                .setName(getThreadLabel()); + +        double[] times = encodeTimesForProto(timeEncoder); +        for (double time : times) { +            proto.addTimes(time); +        }          for (StackTraceNode child : exportChildren(mergeMode)) { -            proto.addChildren(child.toProto(mergeMode)); +            proto.addChildren(child.toProto(mergeMode, timeEncoder));          }          return proto.build(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java index adcedcd..109adb3 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java @@ -18,35 +18,19 @@   *  along with this program.  If not, see <http://www.gnu.org/licenses/>.   */ -package me.lucko.spark.common.sampler; +package me.lucko.spark.common.sampler.window; -import me.lucko.spark.common.sampler.node.ThreadNode; - -import java.util.Comparator; - -/** - * Methods of ordering {@link ThreadNode}s in the output data. - */ -public enum ThreadNodeOrder implements Comparator<ThreadNode> { +public enum ProfilingWindowUtils { +    ;      /** -     * Order by the name of the thread (alphabetically) +     * Gets the profiling window for the given time in unix-millis. +     * +     * @param time the time in milliseconds +     * @return the window       */ -    BY_NAME { -        @Override -        public int compare(ThreadNode o1, ThreadNode o2) { -            return o1.getThreadLabel().compareTo(o2.getThreadLabel()); -        } -    }, - -    /** -     * Order by the time taken by the thread (most time taken first) -     */ -    BY_TIME { -        @Override -        public int compare(ThreadNode o1, ThreadNode o2) { -            return -Double.compare(o1.getTotalTime(), o2.getTotalTime()); -        } +    public static int unixMillisToWindow(long time) { +        // one window per minute +        return (int) (time / 60_000);      } -  } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java new file mode 100644 index 0000000..edb2309 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java @@ -0,0 +1,94 @@ +/* + * This file is part of spark. + * + *  Copyright (c) lucko (Luck) <luck@lucko.me> + *  Copyright (c) contributors + * + *  This program is free software: you can redistribute it and/or modify + *  it under the terms of the GNU General Public License as published by + *  the Free Software Foundation, either version 3 of the License, or + *  (at your option) any later version. + * + *  This program is distributed in the hope that it will be useful, + *  but WITHOUT ANY WARRANTY; without even the implied warranty of + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + *  GNU General Public License for more details. + * + *  You should have received a copy of the GNU General Public License + *  along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.window; + +import me.lucko.spark.common.sampler.async.jfr.Dictionary; +import me.lucko.spark.common.sampler.node.AbstractNode; +import me.lucko.spark.common.sampler.node.ThreadNode; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.LongAdder; +import java.util.stream.IntStream; + +/** + * Encodes a map of int->double into a double array. + */ +public class ProtoTimeEncoder { +    /** A sorted array of all possible keys to encode */ +    private final int[] keys; +    /** A map of key value -> index in the keys array */ +    private final Map<Integer, Integer> keysToIndex; + +    public ProtoTimeEncoder(List<ThreadNode> sourceData) { +        // get an array of all keys that show up in the source data +        this.keys = sourceData.stream() +                .map(AbstractNode::getTimeWindows) +                .reduce(IntStream.empty(), IntStream::concat) +                .distinct() +                .sorted() +                .toArray(); + +        // construct a reverse index lookup +        this.keysToIndex = new HashMap<>(this.keys.length); +        for (int i = 0; i < this.keys.length; i++) { +            this.keysToIndex.put(this.keys[i], i); +        } +    } + +    /** +     * Gets an array of the keys that could be encoded by this encoder. +     * +     * @return an array of keys +     */ +    public int[] getKeys() { +        return this.keys; +    } + +    /** +     * Encode a {@link Dictionary} (map) of times/durations into a double array. +     * +     * @param times a dictionary of times (unix-time millis -> duration in microseconds) +     * @return the times encoded as a double array +     */ +    public double[] encode(Dictionary<LongAdder> times) { +        // construct an array of values - length needs to exactly match the +        // number of keys, even if some values are zero. +        double[] array = new double[this.keys.length]; + +        times.forEach((key, value) -> { +            // get the index for the given key +            Integer idx = this.keysToIndex.get((int) key); +            if (idx == null) { +                throw new RuntimeException("No index for key " + key + " in " + this.keysToIndex.keySet()); +            } + +            // convert the duration from microseconds -> milliseconds +            double durationInMilliseconds = value.longValue() / 1000d; + +            // store in the array +            array[idx] = durationInMilliseconds; +        }); + +        return array; +    } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java new file mode 100644 index 0000000..47f739d --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java @@ -0,0 +1,267 @@ +/* + * This file is part of spark. + * + *  Copyright (c) lucko (Luck) <luck@lucko.me> + *  Copyright (c) contributors + * + *  This program is free software: you can redistribute it and/or modify + *  it under the terms of the GNU General Public License as published by + *  the Free Software Foundation, either version 3 of the License, or + *  (at your option) any later version. + * + *  This program is distributed in the hope that it will be useful, + *  but WITHOUT ANY WARRANTY; without even the implied warranty of + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + *  GNU General Public License for more details. + * + *  You should have received a copy of the GNU General Public License + *  along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.window; + +import me.lucko.spark.common.SparkPlatform; +import me.lucko.spark.common.monitor.cpu.CpuMonitor; +import me.lucko.spark.common.monitor.tick.TickStatistics; +import me.lucko.spark.common.tick.TickHook; +import me.lucko.spark.common.util.RollingAverage; +import me.lucko.spark.proto.SparkProtos; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Collects statistics for each profiling window. + */ +public class WindowStatisticsCollector { +    private static final SparkProtos.WindowStatistics ZERO = SparkProtos.WindowStatistics.newBuilder().build(); + +    /** The platform */ +    private final SparkPlatform platform; + +    /** Map of profiling window -> statistics */ +    private final Map<Integer, SparkProtos.WindowStatistics> stats; + +    private TickCounter tickCounter; + +    public WindowStatisticsCollector(SparkPlatform platform) { +        this.platform = platform; +        this.stats = new ConcurrentHashMap<>(); +    } + +    /** +     * Indicates to the statistics collector that it should count the number +     * of ticks in each window using the provided {@link TickHook}. +     * +     * @param hook the tick hook +     */ +    public void startCountingTicks(TickHook hook) { +        this.tickCounter = new NormalTickCounter(this.platform, hook); +    } + +    /** +     * Indicates to the statistics collector that it should count the number +     * of ticks in each window, according to how many times the +     * {@link ExplicitTickCounter#increment()} method is called. +     * +     * @param hook the tick hook +     * @return the counter +     */ +    public ExplicitTickCounter startCountingTicksExplicit(TickHook hook) { +        ExplicitTickCounter counter = new ExplicitTickCounter(this.platform, hook); +        this.tickCounter = counter; +        return counter; +    } + +    public void stop() { +        if (this.tickCounter != null) { +            this.tickCounter.stop(); +        } +    } + +    /** +     * Gets the total number of ticks that have passed between the time +     * when the profiler started and stopped. +     * +     * <p>Importantly, note that this metric is different to the total number of ticks in a window +     * (which is recorded by {@link SparkProtos.WindowStatistics#getTicks()}) or the total number +     * of observed ticks if the 'only-ticks-over' aggregator is being used +     * (which is recorded by {@link SparkProtos.WindowStatistics#getTicks()} +     * and {@link ExplicitTickCounter#getTotalCountedTicks()}.</p> +     * +     * @return the total number of ticks in the profile +     */ +    public int getTotalTicks() { +        return this.tickCounter == null ? -1 : this.tickCounter.getTotalTicks(); +    } + +    /** +     * Measures statistics for the given window if none have been recorded yet. +     * +     * @param window the window +     */ +    public void measureNow(int window) { +        this.stats.computeIfAbsent(window, w -> measure()); +    } + +    /** +     * Ensures that the exported map has statistics (even if they are zeroed) for all windows. +     * +     * @param windows the expected windows +     */ +    public void ensureHasStatisticsForAllWindows(int[] windows) { +        for (int window : windows) { +            this.stats.computeIfAbsent(window, w -> ZERO); +        } +    } + +    public Map<Integer, SparkProtos.WindowStatistics> export() { +        return this.stats; +    } + +    /** +     * Measures current statistics, where possible averaging over the last minute. (1 min = 1 window) +     * +     * @return the current statistics +     */ +    private SparkProtos.WindowStatistics measure() { +        SparkProtos.WindowStatistics.Builder builder = SparkProtos.WindowStatistics.newBuilder(); + +        TickStatistics tickStatistics = this.platform.getTickStatistics(); +        if (tickStatistics != null) { +            builder.setTps(tickStatistics.tps1Min()); + +            RollingAverage mspt = tickStatistics.duration1Min(); +            if (mspt != null) { +                builder.setMsptMedian(mspt.median()); +                builder.setMsptMax(mspt.max()); +            } +        } + +        if (this.tickCounter != null) { +            int ticks = this.tickCounter.getCountedTicksThisWindowAndReset(); +            builder.setTicks(ticks); +        } + +        builder.setCpuProcess(CpuMonitor.processLoad1MinAvg()); +        builder.setCpuSystem(CpuMonitor.systemLoad1MinAvg()); + +        return builder.build(); +    } + +    /** +     * Responsible for counting the number of ticks in a profile/window. +     */ +    public interface TickCounter { + +        /** +         * Stop the counter. +         */ +        void stop(); + +        /** +         * Get the total number of ticks. +         * +         * <p>See {@link WindowStatisticsCollector#getTotalTicks()} for a longer explanation +         * of what this means exactly.</p> +         * +         * @return the total ticks +         */ +        int getTotalTicks(); + +        /** +         * Gets the total number of ticks counted in the last window, +         * and resets the counter to zero. +         * +         * @return the number of ticks counted since the last time this method was called +         */ +        int getCountedTicksThisWindowAndReset(); +    } + +    private static abstract class BaseTickCounter implements TickCounter { +        protected final SparkPlatform platform; +        protected final TickHook tickHook; + +        /** The game tick when sampling first began */ +        private final int startTick; + +        /** The game tick when sampling stopped */ +        private int stopTick = -1; + +        BaseTickCounter(SparkPlatform platform, TickHook tickHook) { +            this.platform = platform; +            this.tickHook = tickHook; +            this.startTick = this.tickHook.getCurrentTick(); +        } + +        @Override +        public void stop() { +            this.stopTick = this.tickHook.getCurrentTick(); +        } + +        @Override +        public int getTotalTicks() { +            if (this.startTick == -1) { +                throw new IllegalStateException("start tick not recorded"); +            } +            if (this.stopTick == -1) { +                throw new IllegalStateException("stop tick not recorded"); +            } + +            return this.stopTick - this.startTick; +        } +    } + +    /** +     * Counts the number of ticks in a window using a {@link TickHook}. +     */ +    public static final class NormalTickCounter extends BaseTickCounter { +        private int last; + +        NormalTickCounter(SparkPlatform platform, TickHook tickHook) { +            super(platform, tickHook); +            this.last = this.tickHook.getCurrentTick(); +        } + +        @Override +        public int getCountedTicksThisWindowAndReset() { +            synchronized (this) { +                int now = this.tickHook.getCurrentTick(); +                int ticks = now - this.last; +                this.last = now; +                return ticks; +            } +        } +    } + +    /** +     * Counts the number of ticks in a window according to the number of times +     * {@link #increment()} is called. +     * +     * Used by the {@link me.lucko.spark.common.sampler.java.TickedDataAggregator}. +     */ +    public static final class ExplicitTickCounter extends BaseTickCounter { +        private final AtomicInteger counted = new AtomicInteger(); +        private final AtomicInteger total = new AtomicInteger(); + +        ExplicitTickCounter(SparkPlatform platform, TickHook tickHook) { +            super(platform, tickHook); +        } + +        public void increment() { +            this.counted.incrementAndGet(); +            this.total.incrementAndGet(); +        } + +        public int getTotalCountedTicks() { +            return this.total.get(); +        } + +        @Override +        public int getCountedTicksThisWindowAndReset() { +            return this.counted.getAndSet(0); +        } +    } + +} | 
