1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
|
/*
* This file is part of spark.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package me.lucko.spark.common.sampler.async;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Table;
import com.google.common.io.ByteStreams;
import me.lucko.spark.common.SparkPlatform;
import one.profiler.AsyncProfiler;
import one.profiler.Events;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Locale;
import java.util.Objects;
import java.util.logging.Level;
import java.util.stream.Collectors;
/**
* Provides a bridge between spark and async-profiler.
*/
public class AsyncProfilerAccess {
private static AsyncProfilerAccess instance;
// singleton, needs a SparkPlatform for first init
public static synchronized AsyncProfilerAccess getInstance(SparkPlatform platform) {
if (instance == null) {
Objects.requireNonNull(platform, "platform");
instance = new AsyncProfilerAccess(platform);
}
return instance;
}
/** An instance of the async-profiler Java API. */
private final AsyncProfiler profiler;
/** The event to use for profiling */
private final ProfilingEvent profilingEvent;
/** The event to use for allocation profiling */
private final ProfilingEvent allocationProfilingEvent;
/** If profiler is null, contains the reason why setup failed */
private final Exception setupException;
AsyncProfilerAccess(SparkPlatform platform) {
AsyncProfiler profiler;
ProfilingEvent profilingEvent = null;
ProfilingEvent allocationProfilingEvent = null;
Exception setupException = null;
try {
profiler = load(platform);
if (isEventSupported(profiler, ProfilingEvent.ALLOC, false)) {
allocationProfilingEvent = ProfilingEvent.ALLOC;
}
if (isEventSupported(profiler, ProfilingEvent.CPU, false)) {
profilingEvent = ProfilingEvent.CPU;
} else if (isEventSupported(profiler, ProfilingEvent.WALL, true)) {
profilingEvent = ProfilingEvent.WALL;
}
} catch (Exception e) {
profiler = null;
setupException = e;
}
this.profiler = profiler;
this.profilingEvent = profilingEvent;
this.allocationProfilingEvent = allocationProfilingEvent;
this.setupException = setupException;
}
public AsyncProfilerJob startNewProfilerJob() {
if (this.profiler == null) {
throw new UnsupportedOperationException("async-profiler not supported", this.setupException);
}
return AsyncProfilerJob.createNew(this, this.profiler);
}
public ProfilingEvent getProfilingEvent() {
return this.profilingEvent;
}
public ProfilingEvent getAllocationProfilingEvent() {
return this.allocationProfilingEvent;
}
public boolean checkSupported(SparkPlatform platform) {
if (this.setupException != null) {
if (this.setupException instanceof UnsupportedSystemException) {
platform.getPlugin().log(Level.INFO, "The async-profiler engine is not supported for your os/arch (" +
this.setupException.getMessage() + "), so the built-in Java engine will be used instead.");
} else if (this.setupException instanceof NativeLoadingException && this.setupException.getCause().getMessage().contains("libstdc++")) {
platform.getPlugin().log(Level.WARNING, "Unable to initialise the async-profiler engine because libstdc++ is not installed.");
platform.getPlugin().log(Level.WARNING, "Please see here for more information: https://spark.lucko.me/docs/misc/Using-async-profiler#install-libstdc");
} else {
platform.getPlugin().log(Level.WARNING, "Unable to initialise the async-profiler engine: " + this.setupException.getMessage());
platform.getPlugin().log(Level.WARNING, "Please see here for more information: https://spark.lucko.me/docs/misc/Using-async-profiler");
this.setupException.printStackTrace();
}
}
return this.profiler != null;
}
public boolean checkAllocationProfilingSupported(SparkPlatform platform) {
boolean supported = this.allocationProfilingEvent != null;
if (!supported && this.profiler != null) {
platform.getPlugin().log(Level.WARNING, "The allocation profiling mode is not supported on your system. This is most likely because Hotspot debug symbols are not available.");
platform.getPlugin().log(Level.WARNING, "To resolve, try installing the 'openjdk-11-dbg' or 'openjdk-8-dbg' package using your OS package manager.");
}
return supported;
}
private static AsyncProfiler load(SparkPlatform platform) throws Exception {
// check compatibility
String os = System.getProperty("os.name").toLowerCase(Locale.ROOT).replace(" ", "");
String arch = System.getProperty("os.arch").toLowerCase(Locale.ROOT);
if (os.equals("linux") && arch.equals("amd64") && isLinuxMusl()) {
arch = "amd64-musl";
}
Table<String, String, String> supported = ImmutableTable.<String, String, String>builder()
.put("linux", "amd64", "linux/amd64")
.put("linux", "amd64-musl", "linux/amd64-musl")
.put("linux", "aarch64", "linux/aarch64")
.put("macosx", "amd64", "macos")
.put("macosx", "aarch64", "macos")
.build();
String libPath = supported.get(os, arch);
if (libPath == null) {
throw new UnsupportedSystemException(os, arch);
}
// extract the profiler binary from the spark jar file
String resource = "spark/" + libPath + "/libasyncProfiler.so";
URL profilerResource = AsyncProfilerAccess.class.getClassLoader().getResource(resource);
if (profilerResource == null) {
throw new IllegalStateException("Could not find " + resource + " in spark jar file");
}
Path extractPath = platform.getTemporaryFiles().create("spark-", "-libasyncProfiler.so.tmp");
try (InputStream in = profilerResource.openStream(); OutputStream out = Files.newOutputStream(extractPath)) {
ByteStreams.copy(in, out);
}
// get an instance of async-profiler
try {
return AsyncProfiler.getInstance(extractPath.toAbsolutePath().toString());
} catch (UnsatisfiedLinkError e) {
throw new NativeLoadingException(e);
}
}
/**
* Checks the {@code profiler} to ensure the CPU event is supported.
*
* @param profiler the profiler instance
* @return if the event is supported
*/
private static boolean isEventSupported(AsyncProfiler profiler, ProfilingEvent event, boolean throwException) {
try {
String resp = profiler.execute("check,event=" + event).trim();
if (resp.equalsIgnoreCase("ok")) {
return true;
} else if (throwException) {
throw new IllegalArgumentException(resp);
}
} catch (Exception e) {
if (throwException) {
throw new RuntimeException("Event " + event + " is not supported", e);
}
}
return false;
}
enum ProfilingEvent {
CPU(Events.CPU),
WALL(Events.WALL),
ALLOC(Events.ALLOC);
private final String id;
ProfilingEvent(String id) {
this.id = id;
}
@Override
public String toString() {
return this.id;
}
}
private static final class UnsupportedSystemException extends UnsupportedOperationException {
public UnsupportedSystemException(String os, String arch) {
super(os + '/' + arch);
}
}
private static final class NativeLoadingException extends RuntimeException {
public NativeLoadingException(Throwable cause) {
super("A runtime error occurred whilst loading the native library", cause);
}
}
// Checks if the system is using musl instead of glibc
private static boolean isLinuxMusl() {
try {
InputStream stream = new ProcessBuilder("sh", "-c", "ldd `which ls`")
.start()
.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
String output = reader.lines().collect(Collectors.joining());
return output.contains("musl"); // shrug
} catch (Throwable e) {
// ignore
return false;
}
}
}
|