grpc源码分析1-context

栏目: 服务器 · 发布时间: 5年前

内容简介:Context本身是Immutable的,但是它保存的状态不一定是。Context中保存数据使用的是KV映射。

io.grpc.Context 表示上下文,用来在一次grpc请求链路中传递用户登录信息、tracing信息等。

Context本身是Immutable的,但是它保存的状态不一定是。

Context中保存数据使用的是KV映射。

Key定义如下, 通过这种方式,而不是使用String可以有多个相同name的不同Key,而name则只用来做debug信息。

public static final class Key<T>{
   private final String name;
   private final T defaultValue;
}

获取Key对应的value即从Context中找到对应的映射,不过Key也提供了get这个简便方法,从当前context获取对应的value

/**
* Get the value from the {@link#current()} context for this key.
*/
    @SuppressWarnings("unchecked")
    public T get() {
      return get(Context.current());
    }

    /**
* Get the value from the specified context for this key.
*/
    @SuppressWarnings("unchecked")
    public T get(Context context) {
      T value = (T) context.lookup(this);
      return value == null ? defaultValue : value;
    }

那么Context中是如果保存这个KV映射的呢, 我们通常会使用HashMap来进行保存。

不过HashMap是一个比较全能的Map实现,有对put、delete、get等操作都有优化,因此内部有很多辅助结构,这就增加了内存消耗。(HashMap实现分析可以参考我的另一篇文章)

在Context的使用场景中,只需要put和get操作。

例如在当前Context基础上,增加一些KV对,是put操作;查询当前某个key对应的value,是get操作。

最开始的版本实现里,Context使用的是二维数组Object[][],第一维是Key,第二维是Value。

这样的缺点是查询的时间复杂度是线性的(其实大多数场景的key的数量都比较少)。

所以在HashMap和数组间的时间复杂度和占用空间的权衡就是 Hash Array Mapped Trie 数据结构了。

在grpc中的实现是 io.grpc.PersistentHashArrayMappedTrie

下面使用 SizeOf 工具简单通过deepSize对比一下 HashMapPersistentHashArrayMappedTrie 的内存占用情况。

private static final Context.Key<String> userName = Context.key("userName");

    private static final Map<Context.Key<?>, Object> cache = new HashMap<>();

    private static PersistentHashArrayMappedTrie<Context.Key<?>, Object> persistentHashArrayMappedTrie =
            new PersistentHashArrayMappedTrie<>();

    public static void main(String[] args) {
        cache.put(userName, "hello");
        persistentHashArrayMappedTrie = persistentHashArrayMappedTrie.put(userName, "hello");
        SizeOf sizeOf = SizeOf.newInstance();
        System.out.println(sizeOf.deepSizeOf(cache));
        System.out.println(sizeOf.deepSizeOf(persistentHashArrayMappedTrie));
    }

可以看到内存占用差别还是比较大的,因为HashMap里面的Node保存了key、hash、value、next等字段,并且默认使用一定大小的capacity数量作为起始capacity来避免反复的resize。

Context常用用法如下。首先获取当前context,这个一般是作为参数传过来的,或通过current()获取当前的已有context。

然后通过attach方法,绑定到当前线程上,并且返回当前线程

Context current = xxx.
Context previous = current.attach();
try {
    // do something in context
} finally {
    current.detach(previous);
}

Context的主要方法如下

  • attach() attach Context自己,从而进入到一个新的scope中,新的scope以此Context实例作为current,并且返回之前的current context
  • detach(Context toDetach) attach()方法的反向方法,退出当前Context并且detach到toDetachContext,每个attach方法要对应一个detach,所以一般通过try finally代码块或wrap模板方法来使用。
  • static storage() 获取storage,Storage是用来attach和detach当前context用的。
public class Context {
  public static final Context ROOT = new Context(null, EMPTY_ENTRIES);
  public static Context current(){
    Context current = storage().current();
    if (current == null) {
      return ROOT;
    }
    return current;
  }
  private Context(PersistentHashArrayMappedTrie<Key<?>, Object> keyValueEntries,int generation){
    cancellableAncestor = null;
    this.keyValueEntries = keyValueEntries;
    this.generation = generation;
    validateGeneration(generation);
  }
  public Context attach(){
    Context prev = storage().doAttach(this);
    if (prev == null) {
      return ROOT;
    }
    return prev;
  }
  public void detach(Context toAttach){
    checkNotNull(toAttach, "toAttach");
    storage().detach(this, toAttach);
  }
  // For testing
  static Storage storage(){
    Storage tmp = storage.get();
    if (tmp == null) {
      tmp = createStorage();
    }
    return tmp;
  }

上面提到的wrap模板方法如下,可以wrap一个Runnable(不需要返回值的使用)或Callable(要返回一个值)

public Runnablewrap(final Runnable r){
  return new Runnable() {
    @Override
    public void run(){
      Context previous = attach();
      try {
        r.run();
      } finally {
        detach(previous);
      }
    }
  };
}
public <C> Callable<C> wrap(final Callable<C> c) {
  return new Callable<C>() {
    @Override
    public Ccall()throws Exception {
      Context previous = attach();
      try {
        return c.call();
      } finally {
        detach(previous);
      }
    }
  };
}

Context的attach、detach方法都调用了Storage对应的方法。

grpc的默认的Storage实现是使用ThreadLocal的 ThreadLocalContextStorage 。ThreadLocal是实现可以参考 ThreadLocal使用和源码分析

final class ThreadLocalContextStorageextends Context.Storage{
  // ThreadLocal保存的是当前的Context
  static final ThreadLocal<Context> localContext = new ThreadLocal<>();

  @Override
  public Context doAttach(Context toAttach) {
    Context current = current();
    localContext.set(toAttach);
    return current;
  }

  @Override
  public void detach(Context toDetach, Context toRestore) {
    if (current() != toDetach) {
      log.log(Level.SEVERE, "Context was not attached when detaching",
          new Throwable().fillInStackTrace());
    }
    if (toRestore != Context.ROOT) {
      localContext.set(toRestore);
    } else {
      // Avoid leaking our ClassLoader via ROOT if this Thread is reused across multiple
      // ClassLoaders, as is common for Servlet Containers. The ThreadLocal is weakly referenced by
      // the Thread, but its current value is strongly referenced and only lazily collected as new
      // ThreadLocals are created.
      //
      // Use set(null) instead of remove() since remove() deletes the entry which is then re-created
      // on the next get() (because of initialValue() handling). set(null) has same performance as
      // set(toRestore).
      detach时清除到toDetach的引用,避免发生引用泄露,虽然ThreadLocal本身是Thread里面weak reference的,但是value确是强引用的,所以要通过null要主动去掉引用
      localContext.set(null);
    }
  }

  // 获得当前Context,通过ThreadLocal获取,如果ThreadLocal中没有值,说明当前Context为root context
  @Override
  public Context current() {
    Context current = localContext.get();
    if (current == null) {
      return Context.ROOT;
    }
    return current;
  }
}

ThreadLocal在线程池场景的问题

熟悉ThreadLocal的朋友知道,ThreadLocal存储在线程的ThreadLocalMap中,如果从一个线程提交一个Runnable或Callable到线程池,那么

在线程池中则获取不到提交任务的线程的ThreadLocal了。那么如何解决这个问题呢。

grpc中同样提供了一些对Executor的委托封装。

public static Executor currentContextExecutor(final Executor e){
    class CurrentContextExecutorimplements Executor{
      @Override
      public void execute(Runnable r){
        e.execute(Context.current().wrap(r));
      }
    }

    return new CurrentContextExecutor();
  }

这里的Executor执行execute时,会先调用Context.current()获取当前Context对象,然后使用Context.wrap(Runnable)方法wrap提交的Runnable。

public Runnablewrap(final Runnable r){
    return new Runnable() {
      @Override
      public void run(){
        Context previous = attach();
        try {
          r.run();
        } finally {
          detach(previous);
        }
      }
    };
  }

再看一下wrap方法,由于它是Context类的方法,所以new Runnable()这个匿名内部类的attach()调用实际是Context.this.attach()所以这个方法

对应到上面是Context.current这个提交任务的当前Context来执行attach(),这样就完成了Context从提交线程到实际执行线程的传递。

题外话,通用的ThreadLocal解决方案可以参考 transimitable-thread-local

可以解决一些tracing框架的例如上下文tracing信息丢失问题。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

遗传算法

遗传算法

王小平 / 西安交通大学出版社 / 2002-1 / 40.00元

《遗传算法:理论应用与软件实现》全面系统地介绍了遗传算法的基本理论,重点介绍了遗传算法的经典应用和国内外的新发展。全书共分11章。第1章概述了遗传算法的产生与发展、基本思想、基本操作以及应用情况;第2章介绍了基本遗传算法;第3章论述了遗传算法的数学基础;第4章分析了遗传算法的多种改进方法;第5章初步介绍了进化计算理论体系;第6章介绍了遗传算法应用于数值优化问题;第7章介绍了遗传算法应用于组合优化问......一起来看看 《遗传算法》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码