一 背景
aviatorscript
每分钟处理几百万行日志,日志流速几十 MB/S;
每行日志可能需要执行多个计算任务,计算任务个数不好估计,几个到几千都有;
每个计算任务需要对一行日志进行切分/过滤,一般条件<10个;
Go call Java
使用 Java 重写这个应用
二 Go call Java
使用共享内存也无法避免需要将对象复制进出共享内存的开销;
1 JVM inside 原理
2 CGO 简介
hello world
3 JNI 简介
#include < stdio.h>
#include < stdlib.h>
#include "jni.h"
JavaVM *bootJvm() {
JavaVM *jvm;
JNIEnv *env;
JavaVMInitArgs jvm_args;
JavaVMOption options[4];
// 此处可以定制一些JVM属性
// 通过这种方式启动的JVM只能通过 -Djava.class.path= 来指定classpath
// 并且此处不支持*
options[0].optionString = "-Djava.class.path= -Dfoo=bar";
options[1].optionString = "-Xmx1g";
options[2].optionString = "-Xms1g";
options[3].optionString = "-Xmn256m";
jvm_args.options = options;
jvm_args.nOptions = sizeof(options) / sizeof(JavaVMOption);
jvm_args.version = JNI_VERSION_1_8; // Same as Java version
jvm_args.ignoreUnrecognized = JNI_FALSE; // For more error messages.
JavaVMAttachArgs aargs;
aargs.version = JNI_VERSION_1_8;
aargs.name = "TODO";
aargs.group = NULL;
JNI_CreateJavaVM(&jvm, (void **) &env, &jvm_args);
// 此处env对我们已经没用了, 所以detach掉.
// 否则默认情况下刚create完JVM, 会自动将当前线程Attach上去
(*jvm)->DetachCurrentThread(jvm);
return jvm;
}
int main() {
JavaVM *jvm = bootJvm();
JNIEnv *env;
if ((*jvm)->AttachCurrentThread(jvm, (void **) &env, NULL) != JNI_OK) {
printf("AttachCurrentThread error\n");
exit(1);
}
// 以下是 C 调用Java 执行 String.format("hello %s %s %d", "world", "haha", 2) 的例子
jclass String_class = (*env)->FindClass(env, "java/lang/String");
jclass Object_class = (*env)->FindClass(env, "java/lang/Object");
jclass Integer_class = (*env)->FindClass(env, "java/lang/Integer");
jmethodID format_method = (*env)->GetStaticMethodID(env, String_class, "format",
"(Ljava/lang/String;[Ljava/lang/Object;)Ljava/lang/String;");
jmethodID Integer_constructor = (*env)->GetMethodID(env, Integer_class, "< init>", "(I)V");
// string里不能包含中文 否则还需要额外的代码
jstring j_arg0 = (*env)->NewStringUTF(env, "world");
jstring j_arg1 = (*env)->NewStringUTF(env, "haha");
jobject j_arg2 = (*env)->NewObject(env, Integer_class, Integer_constructor, 2);
// args = new Object[3]
jobjectArray j_args = (*env)->NewObjectArray(env, 3, Object_class, NULL);
// args[0] = j_arg0
// args[1] = j_arg1
// args[2] = new Integer(2)
(*env)->SetObjectArrayElement(env, j_args, 0, j_arg0);
(*env)->SetObjectArrayElement(env, j_args, 1, j_arg1);
(*env)->SetObjectArrayElement(env, j_args, 2, j_arg2);
(*env)->DeleteLocalRef(env, j_arg0);
(*env)->DeleteLocalRef(env, j_arg1);
(*env)->DeleteLocalRef(env, j_arg2);
jstring j_format = (*env)->NewStringUTF(env, "hello %s %s %d");
// j_result = String.format("hello %s %s %d", jargs);
jobject j_result = (*env)->CallStaticObjectMethod(env, String_class, format_method, j_format, j_args);
(*env)->DeleteLocalRef(env, j_format);
// 异常处理
if ((*env)->ExceptionCheck(env)) {
(*env)->ExceptionDescribe(env);
printf("ExceptionCheck\n");
exit(1);
}
jint result_length = (*env)->GetStringUTFLength(env, j_result);
char *c_result = malloc(result_length + 1);
c_result[result_length] = 0;
(*env)->GetStringUTFRegion(env, j_result, 0, result_length, c_result);
(*env)->DeleteLocalRef(env, j_result);
printf("java result=%s\n", c_result);
free(c_result);
(*env)->DeleteLocalRef(env, j_args);
if ((*jvm)->DetachCurrentThread(jvm) != JNI_OK) {
printf("AttachCurrentThread error\n");
exit(1);
}
printf("done\n");
return 0;
}
/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/include/jni.h
/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/jre/lib/server/libjvm.dylib
java result=hello world haha 2
done
4 实现
三 性能优化
1 预处理
Groovy 等脚本语言的静态化和预编译;
正则表达式预编译;
使用字符串池减少重复的字符串实例;
提前解析数据为特定数据结构;
动态沙箱是通过拦截所有方法调用(以及一些其他行为)实现的,性能损失太大。
静态沙箱是通过静态分析,在编译阶段发现恶意调用,通过植入检测代码,避免方法长时间不返回,但由于 Groovy 的动态特性,静态分析很难分析出 Groovy 的真正行为( 比如方法的返回类型总是 Object,调用的方法本身是一个表达式,只有运行时才知道 ),因此有非常多的办法可以绕过静态分析调用恶意代码。
2 批量化
type Filter interface {
Filter(string) bool
}
type JavaFilter struct {
}
func (f *JavaFilter) Filter(content string) bool {
// call java
}
3 线程模型
4 Java 使用 ThreadLocal 优化
由于这些Java线程是由JNI在Attach时创建的,不受我们控制,因此无法定制Thread的实现类,否则可以使用类似Netty的FastThreadLocal再优化一把。
5 unsafe编程
大量 Go 风格的字符串要转成 C 风格的字符串,此处有 malloc,调用完之后记得 free 掉。
Go 风格字符串如果包含 '\0',会导致 C 风格字符串提前结束。
我们需要保证字符串在堆上分配而非栈上分配才行,Go 里一个简单的技巧是保证数据直接或间接跨goroutine引用就能保证分配到堆上。还可以参考 reflect.ValueOf() 里调用的 escape 方法。
Go的GC是非移动式GC,因此即使GC了对象地址也不会变化
Java 解码字符串:Java 收到指针之后将指针转成 DirectByteBuffer ,然后利用 CharsetDecoder 解码出 String。
Java返回数据给C:
考虑到返回的结构体比较复杂,将其 Protobuf 序列化成 byte[] 然后传递回去, 这样 C 只需要负责搬运几个数值。
此处我们注意到有很多临时的 malloc,结合我们的线程模型,每个线程使用了一块 ThreadLocal 的堆外内存存放 Protobuf 序列化结果,使用 writeTo(CodedOutputStream.newInstance(ByteBuffer))可以直接将序列化结果写入堆外, 而不用再将 byte[] 拷贝一次。
经过统计一般这块 Response 不会太大,现在大小是 10MB,超过这个大小就老老实实用 malloc&free了。
type SliceHeader struct {
// 底层字节数组的地址
Data uintptr
// 长度
Len int
// 容量
Cap int
}
type StringHeader struct {
// 底层字节数组的地址
Data uintptr
// 长度
Len int
}
public class DirectMemoryUtils {
private static final Unsafe unsafe;
private static final Class< ?> DIRECT_BYTE_BUFFER_CLASS;
private static final long DIRECT_BYTE_BUFFER_ADDRESS_OFFSET;
private static final long DIRECT_BYTE_BUFFER_CAPACITY_OFFSET;
private static final long DIRECT_BYTE_BUFFER_LIMIT_OFFSET;
static {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
} catch (Exception e) {
throw new AssertionError(e);
}
try {
ByteBuffer directBuffer = ByteBuffer.allocateDirect(0);
Class<?> clazz = directBuffer.getClass();
DIRECT_BYTE_BUFFER_ADDRESS_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("address"));
DIRECT_BYTE_BUFFER_CAPACITY_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("capacity"));
DIRECT_BYTE_BUFFER_LIMIT_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("limit"));
DIRECT_BYTE_BUFFER_CLASS = clazz;
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
public static long allocateMemory(long size) {
// 经过测试 JNA 的 Native.malloc 吞吐量是 unsafe.allocateMemory 的接近2倍
// return Native.malloc(size);
return unsafe.allocateMemory(size);
}
public static void freeMemory(long address) {
// Native.free(address);
unsafe.freeMemory(address);
}
/**
* @param address 用long表示一个来自C的指针, 指向一块内存区域
* @param len 内存区域长度
* @return
*/
public static ByteBuffer directBufferFor(long address, long len) {
if (len > Integer.MAX_VALUE || len < 0L) {
throw new IllegalArgumentException("invalid len " + len);
}
// 以下技巧来自OHC, 通过unsafe绕过构造器直接创建对象, 然后对几个内部字段进行赋值
try {
ByteBuffer bb = (ByteBuffer) unsafe.allocateInstance(DIRECT_BYTE_BUFFER_CLASS);
unsafe.putLong(bb, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, address);
unsafe.putInt(bb, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, (int) len);
unsafe.putInt(bb, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, (int) len);
return bb;
} catch (Error e) {
throw e;
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
public static byte[] readAll(ByteBuffer bb) {
byte[] bs = new byte[bb.remaining()];
bb.get(bs);
return bs;
}
}
6 左起右至优化
字符串="a,b,c,d"
规则=("," , 1, ",")
结果="c"
规则=("b=", 0, " ")
结果="2"
7 动态GC优化
已用的堆内存达到 NextGC 时;
连续 2min 没有发生任何 GC;
用户手动调用 runtime.GC() 或 debug.FreeOSMemory();
根据公式,NextGC 可能会超过物理内存;
Go 并没有在内存不足时进行 GC 的机制(而 Java 就可以);
8 使用紧凑的数据结构
type Row struct {
Timestamp int64
StringArray []string
DataArray []Data
// 此处省略一些其他无用字段, 均已经设为nil
}
type Data interface {
// 省略一些方法
}
type Float64Data struct {
Value float64
}
9 字符串复用
四 后续
五 总结
提升代码质量的方法:领域模型、设计原则、设计模式
零基础入门必学!阿里云大学免费精华课程大盘点!


