diff options
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
10 files changed, 123 insertions, 127 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index a02eec5..1fb4d2d 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -22,22 +22,20 @@ package me.lucko.spark.common.sampler; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.gson.Gson; -import com.google.gson.stream.JsonWriter; import me.lucko.spark.common.CommandSender; import me.lucko.spark.common.sampler.aggregator.DataAggregator; import me.lucko.spark.common.sampler.aggregator.SimpleDataAggregator; import me.lucko.spark.common.sampler.aggregator.TickedDataAggregator; import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.proto.SparkProtos.SamplerData; +import me.lucko.spark.proto.SparkProtos.SamplerMetadata; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Writer; +import java.io.OutputStream; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -165,56 +163,31 @@ public class Sampler implements Runnable { } } - private void writeMetadata(JsonWriter writer) throws IOException { - writer.name("startTime").value(this.startTime); - writer.name("interval").value(this.interval); - - writer.name("threadDumper").beginObject(); - this.threadDumper.writeMetadata(writer); - writer.endObject(); - - writer.name("dataAggregator").beginObject(); - this.dataAggregator.writeMetadata(writer); - writer.endObject(); - } - - private void writeOutput(JsonWriter writer, CommandSender creator) throws IOException { - writer.beginObject(); - - writer.name("type").value("sampler"); - - writer.name("metadata").beginObject(); - writeMetadata(writer); - - writer.name("user"); - new Gson().toJson(creator.toData().serialize(), writer); - - writer.endObject(); - - writer.name("threads").beginArray(); + private SamplerData toProto(CommandSender creator) { + SamplerData.Builder proto = SamplerData.newBuilder(); + proto.setMetadata(SamplerMetadata.newBuilder() + .setUser(creator.toData().toProto()) + .setStartTime(this.startTime) + .setInterval(this.interval) + .setThreadDumper(this.threadDumper.getMetadata()) + .setDataAggregator(this.dataAggregator.getMetadata()) + .build() + ); List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet()); data.sort(Map.Entry.comparingByKey()); for (Map.Entry<String, ThreadNode> entry : data) { - writer.beginObject(); - writer.name("threadName").value(entry.getKey()); - writer.name("totalTime").value(entry.getValue().getTotalTime()); - writer.name("rootNode"); - entry.getValue().serializeTo(writer); - writer.endObject(); + proto.addThreads(entry.getValue().toProto()); } - writer.endArray(); - writer.endObject(); + return proto.build(); } public byte[] formCompressedDataPayload(CommandSender creator) { ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); - try (Writer writer = new OutputStreamWriter(new GZIPOutputStream(byteOut), StandardCharsets.UTF_8)) { - try (JsonWriter jsonWriter = new JsonWriter(writer)) { - writeOutput(jsonWriter, creator); - } + try (OutputStream out = new GZIPOutputStream(byteOut)) { + toProto(creator).writeTo(out); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java index fee130e..8ae2613 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java @@ -21,12 +21,12 @@ package me.lucko.spark.common.sampler; -import com.google.gson.stream.JsonWriter; import me.lucko.spark.common.util.ThreadFinder; +import me.lucko.spark.proto.SparkProtos.SamplerMetadata; -import java.io.IOException; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -50,12 +50,9 @@ public interface ThreadDumper { ThreadInfo[] dumpThreads(ThreadMXBean threadBean); /** - * Writes metadata about the thread dumper instance to the given {@code writer}. - * - * @param writer the writer - * @throws IOException if thrown by the writer + * Gets metadata about the thread dumper instance. */ - void writeMetadata(JsonWriter writer) throws IOException; + SamplerMetadata.ThreadDumper getMetadata(); /** * Implementation of {@link ThreadDumper} that generates data for all threads. @@ -67,8 +64,10 @@ public interface ThreadDumper { } @Override - public void writeMetadata(JsonWriter writer) throws IOException { - writer.name("type").value("all"); + public SamplerMetadata.ThreadDumper getMetadata() { + return SamplerMetadata.ThreadDumper.newBuilder() + .setType(SamplerMetadata.ThreadDumper.Type.ALL) + .build(); } }; @@ -96,13 +95,11 @@ public interface ThreadDumper { } @Override - public void writeMetadata(JsonWriter writer) throws IOException { - writer.name("type").value("specific"); - writer.name("ids").beginArray(); - for (long id : this.ids) { - writer.value(id); - } - writer.endArray(); + public SamplerMetadata.ThreadDumper getMetadata() { + return SamplerMetadata.ThreadDumper.newBuilder() + .setType(SamplerMetadata.ThreadDumper.Type.SPECIFIC) + .addAllIds(Arrays.stream(this.ids).boxed().collect(Collectors.toList())) + .build(); } } @@ -151,13 +148,11 @@ public interface ThreadDumper { } @Override - public void writeMetadata(JsonWriter writer) throws IOException { - writer.name("type").value("regex"); - writer.name("patterns").beginArray(); - for (Pattern pattern : this.namePatterns) { - writer.value(pattern.pattern()); - } - writer.endArray(); + public SamplerMetadata.ThreadDumper getMetadata() { + return SamplerMetadata.ThreadDumper.newBuilder() + .setType(SamplerMetadata.ThreadDumper.Type.REGEX) + .addAllPatterns(this.namePatterns.stream().map(Pattern::pattern).collect(Collectors.toList())) + .build(); } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java index 3023676..d70ba1a 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java @@ -20,6 +20,8 @@ package me.lucko.spark.common.sampler; +import me.lucko.spark.proto.SparkProtos.SamplerMetadata; + import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; @@ -89,4 +91,17 @@ public enum ThreadGrouper { */ public abstract String getGroup(long threadId, String threadName); + public static SamplerMetadata.DataAggregator.ThreadGrouper asProto(ThreadGrouper threadGrouper) { + switch (threadGrouper) { + case BY_NAME: + return SamplerMetadata.DataAggregator.ThreadGrouper.BY_NAME; + case BY_POOL: + return SamplerMetadata.DataAggregator.ThreadGrouper.BY_POOL; + case AS_ONE: + return SamplerMetadata.DataAggregator.ThreadGrouper.AS_ONE; + default: + throw new AssertionError(); + } + } + } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java index 6fc62d4..2024f97 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java @@ -20,10 +20,9 @@ package me.lucko.spark.common.sampler.aggregator; -import com.google.gson.stream.JsonWriter; import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.proto.SparkProtos.SamplerMetadata; -import java.io.IOException; import java.util.Map; /** @@ -47,11 +46,8 @@ public interface DataAggregator { void insertData(long threadId, String threadName, StackTraceElement[] stack); /** - * Writes metadata about the data aggregator instance to the given {@code writer}. - * - * @param writer the writer - * @throws IOException if thrown by the writer + * Gets metadata about the data aggregator instance. */ - void writeMetadata(JsonWriter writer) throws IOException; + SamplerMetadata.DataAggregator getMetadata(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java index 52e4912..7df3b9e 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java @@ -20,11 +20,10 @@ package me.lucko.spark.common.sampler.aggregator; -import com.google.gson.stream.JsonWriter; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.proto.SparkProtos.SamplerMetadata; -import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -89,8 +88,10 @@ public class SimpleDataAggregator implements DataAggregator { } @Override - public void writeMetadata(JsonWriter writer) throws IOException { - writer.name("type").value("simple"); - writer.name("threadGrouper").value(this.threadGrouper.name().toLowerCase()); + public SamplerMetadata.DataAggregator getMetadata() { + return SamplerMetadata.DataAggregator.newBuilder() + .setType(SamplerMetadata.DataAggregator.Type.SIMPLE) + .setThreadGrouper(ThreadGrouper.asProto(this.threadGrouper)) + .build(); } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java index 3b9bcd2..2437de8 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java @@ -20,13 +20,12 @@ package me.lucko.spark.common.sampler.aggregator; -import com.google.gson.stream.JsonWriter; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.TickCounter; import me.lucko.spark.common.sampler.node.AbstractNode; import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.proto.SparkProtos.SamplerMetadata; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -153,10 +152,12 @@ public class TickedDataAggregator implements DataAggregator { } @Override - public void writeMetadata(JsonWriter writer) throws IOException { - writer.name("type").value("ticked"); - writer.name("threadGrouper").value(this.threadGrouper.name().toLowerCase()); - writer.name("tickLengthThreshold").value(this.tickLengthThreshold); + public SamplerMetadata.DataAggregator getMetadata() { + return SamplerMetadata.DataAggregator.newBuilder() + .setType(SamplerMetadata.DataAggregator.Type.TICKED) + .setThreadGrouper(ThreadGrouper.asProto(this.threadGrouper)) + .setTickLengthThreshold(this.tickLengthThreshold) + .build(); } private final class TickList implements Runnable { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java index 221b9cb..c5dde1c 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java @@ -21,11 +21,7 @@ package me.lucko.spark.common.sampler.node; -import com.google.gson.stream.JsonWriter; - -import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -103,7 +99,7 @@ public abstract class AbstractNode { child.log(elements, offset + 1, time, includeLineNumbers); } - private Collection<? extends AbstractNode> getChildren() { + protected List<StackTraceNode> getChildren() { if (this.children.isEmpty()) { return Collections.emptyList(); } @@ -113,28 +109,4 @@ public abstract class AbstractNode { return list; } - public void serializeTo(JsonWriter writer) throws IOException { - writer.beginObject(); - - // append metadata about this node - appendMetadata(writer); - - // include the total time recorded for this node - writer.name("t").value(getTotalTime()); - - // append child nodes, if any are present - Collection<? extends AbstractNode> childNodes = getChildren(); - if (!childNodes.isEmpty()) { - writer.name("c").beginArray(); - for (AbstractNode child : childNodes) { - child.serializeTo(writer); - } - writer.endArray(); - } - - writer.endObject(); - } - - protected abstract void appendMetadata(JsonWriter writer) throws IOException; - } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java index 80338e1..5b83c22 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java @@ -21,9 +21,7 @@ package me.lucko.spark.common.sampler.node; -import com.google.gson.stream.JsonWriter; - -import java.io.IOException; +import me.lucko.spark.proto.SparkProtos; /** * Represents a stack trace element within the {@link AbstractNode node} structure. @@ -60,13 +58,21 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta this.lineNumber = lineNumber; } - @Override - protected void appendMetadata(JsonWriter writer) throws IOException { - writer.name("cl").value(this.className); - writer.name("m").value(this.methodName); + public SparkProtos.StackTraceNode toProto() { + SparkProtos.StackTraceNode.Builder proto = SparkProtos.StackTraceNode.newBuilder() + .setTime(getTotalTime()) + .setClassName(this.className) + .setMethodName(this.methodName); + if (this.lineNumber >= 0) { - writer.name("ln").value(this.lineNumber); + proto.setLineNumber(this.lineNumber); } + + for (StackTraceNode child : getChildren()) { + proto.addChildren(child.toProto()); + } + + return proto.build(); } private String key() { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java index 4e8714c..471d2c1 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java @@ -20,9 +20,7 @@ package me.lucko.spark.common.sampler.node; -import com.google.gson.stream.JsonWriter; - -import java.io.IOException; +import me.lucko.spark.proto.SparkProtos; /** * The root of a sampling stack for a given thread / thread group. @@ -38,7 +36,15 @@ public final class ThreadNode extends AbstractNode { this.threadName = threadName; } - protected void appendMetadata(JsonWriter writer) throws IOException { - writer.name("name").value(this.threadName); + public SparkProtos.ThreadNode toProto() { + SparkProtos.ThreadNode.Builder proto = SparkProtos.ThreadNode.newBuilder() + .setName(this.threadName) + .setTime(getTotalTime()); + + for (StackTraceNode child : getChildren()) { + proto.addChildren(child.toProto()); + } + + return proto.build(); } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/protocol/ProtocolConstants.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/protocol/ProtocolConstants.java new file mode 100644 index 0000000..1995a58 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/protocol/ProtocolConstants.java @@ -0,0 +1,31 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.protocol; + +public interface ProtocolConstants { + + int VERSION = 1; + int SAMPLER_TYPE = 0; + + byte THREAD_NODE_TYPE = 0; + byte STACK_TRACE_NODE_TYPE = 1; + +} |