Okio源码分析

栏目: 数据库 · 发布时间: 7年前

okio 是Square开源框架之一,它对 java.iojava.nio 做了补充,使访问,存储和数据处理变得更加容易。它最早是 Okhttp 组件之一。

Okio源码分析

1、ByteString与Buffer

Okio 主要围绕 ByteStringBuffer 这两个类展开,其主要功能都封装在这两个类中:

  • ByteString :是一个类似 String 的不可变类,它可以很容易的在 byteString 之间进行转换。该类提供了编/解码为hex,md5,base64及UTF-8等方法。
  • Buffer :是一个可变的字节序列。 与 ArrayList 一样,无需提前调整缓冲区大小。 Buffer 内部维护了一个双向链表,从链表尾部写入数据,头部读取数据。

ByteStringBuffer 做了一些节省CPU和内存的操作。 如果将一个字符串编码为 ByteStringByteString 就会缓存对该字符串的引用(以空间换时间),这样如果以后对其进行编/解码等操作,则无需在 byteString 之间进行转换。

//字符串对应的字节数据,避免再一次转换
  final byte[] data;
  //字符串
  transient String utf8; // Lazily computed.
复制代码

Buffer 内部维护了一个以 Segment 为节点的双向链表。 当数据从一个 Buffer 移动到另一个 Buffer 时,仅需要进行一次数据拷贝,且它会重新分配 Segment 的所有权,而不是重新创建 Segment 对象。

2、Source与Sink

Okio 包含自己的流类型,称为 SourceSink ,其工作方式虽然类似 InputStreamOutputStream ,但它与Java I/O相比具有以下优势(参考自 Android学习笔记——Okio ):

  • Okio 实现了I/O读写的超时机制( Timeout ),防止读写出错从而导致一直阻塞。
  • N合一, OKio 精简了输入输出流的类个数
  • 低的CPU和内存消耗,引入 SegmentSegmentPool 复用机制
  • 使用方便。 ByteString 处理不变 byteBuffer 处理可变 byte
  • 提供了一系列的工具。 OKio 支持md5、sha、base64等数据处理

SourceSink 可以与 InputStreamOutputStream 互相操作。我们可以将任何 Source 视为 InputStream ,也可以将任何 InputStream 视为 Source 。同样适用于 SinkInputStream

3、Okio数据读写流程

前面简单介绍了 Okio ,下面就来看看如何使用。

//okio实现图片复制
    public void copyImage(File sinkFile, File sourceFile) throws IOException {
        //try里面的代码是Okio的标准写法,不能改变
        try (Sink sink = Okio.sink(sinkFile);
             BufferedSink bufferedSink = Okio.buffer(sink);
             //从文件读取数据
             Source source = Okio.source(sourceFile);
             BufferedSource bufferedSource = Okio.buffer(source)) {
            //图片复制
            bufferedSink.write(bufferedSource.readByteArray());
            //设置超时时间为1秒中,
            sink.timeout().deadline(1, TimeUnit.SECONDS);
            //写入数据,将字符串以UTF-8格式写入,Okio专门针对utf-8做了处理
            bufferedSink.writeUtf8(entry.getKey())
                     .writeUtf8("=")
                     .writeUtf8(entry.getValue())
                     .writeUtf8("\n");
            //读取数据
            String str=bufferedSource.readUtf8();
            //读取数据并返回一个ByteString
            ByteStringstr=bufferedSource.readByteString();

        }
    }
复制代码

正如前面所说的那样, Okio 使用起来非常方便。由于 Java 字符串采用的是UTF-16编码,而一般开发中使用的都是UTF-8编码,所以 Okio 对字符串编码做了特殊处理。

3.1、Okio读数据原理分析

Source 的意思是水源,它对应着输入流,在 Okio 中通过 Okio.source 方法来获得一个 Source 对象。

//在Okio这个类中关于source重载的方法还是蛮多的,这里以文件为例
  public static Source source(File file) throws FileNotFoundException {
    if (file == null) throw new IllegalArgumentException("file == null");
    return source(new FileInputStream(file));
  }
  public static Source source(InputStream in) {
    return source(in, new Timeout());
  }
  private static Source source(final InputStream in, final Timeout timeout) {
    ...
    //这里才是真正读去数据的地方
    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        ...
        try {
          //每次写数据时都先检查是否超时,默认未设置超时
          timeout.throwIfReached();
          //获取链表的尾节点
          Segment tail = sink.writableSegment(1);
          //由于每个Segment的SIZE为8KB,所以每一次拷贝不能超过这个值
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          //通过InputStream读取数据
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          //数据读取完毕
          if (bytesRead == -1) return -1;
          //可写取位置往后移
          tail.limit += bytesRead;
          //读取的总字节数
          sink.size += bytesRead;
          //返回当前读取的字节数
          return bytesRead;
        } catch (AssertionError e) {
          ...
        }
      }
      ...
    };
  }
复制代码

可以发现,这个的 Source 是一个匿名对象。得到 Source 对象后,通过 Okio.buffer 方法将该对象传递给 BufferedSourceBufferedSource 是一个接口,它的具体实现类是 RealBufferedSource 。 在上面例子中是调用 RealBufferedSourcereadByteArray 方法来读取数据,下面就来看这个方法的实现。

//RealBufferedSource对应的Buffer 
  public final Buffer buffer = new Buffer();
  @Override public byte[] readByteArray() throws IOException {
    //将数据写入buffer
    buffer.writeAll(source);
    //将所有数据已字节数组形式返回
    return buffer.readByteArray();
  }
复制代码

readByteArray 方法中会首先将数据写入到 Buffer 中,并生成一个双向链表。

@Override public long writeAll(Source source) throws IOException {
    if (source == null) throw new IllegalArgumentException("source == null");
    long totalBytesRead = 0;
    //这里的source就是前面在Okio中创建的匿名Source对象
    for (long readCount; (readCount = source.read(this, Segment.SIZE)) != -1; ) {
      totalBytesRead += readCount;
    }
    return totalBytesRead;
  }
复制代码

将数据写入 Buffer 后,调用 BufferreadByteArray 方法生成一个字节数组并返回。

@Override
    public byte[] readByteArray() {
        try {
            //在读取数据时,就会得到size的大小
            return readByteArray(size);
        } catch (EOFException e) {
            throw new AssertionError(e);
        }
    }
    @Override
    public byte[] readByteArray(long byteCount) throws EOFException {
        checkOffsetAndCount(size, 0, byteCount);
        ...
        //创建一个大小为size的byte数组
        byte[] result = new byte[(int) byteCount];
        //将读取的数据写入这个数组中
        readFully(result);
        return result;
    }
    @Override
    public void readFully(byte[] sink) throws EOFException {
        int offset = 0;
        while (offset < sink.length) {
            //不断的将数据写入sink数组中
            int read = read(sink, offset, sink.length - offset);
            if (read == -1) throw new EOFException();
            offset += read;
        }
    }
    @Override
    public int read(byte[] sink, int offset, int byteCount) {
        checkOffsetAndCount(sink.length, offset, byteCount);

        Segment s = head;
        if (s == null) return -1;
        int toCopy = Math.min(byteCount, s.limit - s.pos);
        //进行数据拷贝
        System.arraycopy(s.data, s.pos, sink, offset, toCopy);

        s.pos += toCopy;
        size -= toCopy;
        //释放Segment并将其放入缓冲池
        if (s.pos == s.limit) {
            head = s.pop();
            SegmentPool.recycle(s);
        }

        return toCopy;
    }
复制代码

这样就将数据写入到一个新的数组中,并将链表中的所有 Segment 重新初始化并放入池中。

3.2、Okio写数据原理分析

Sink 的意思是水槽,它对应着输出流。通过 Okio.sink 来获取一个 Sink 对象。

public static Sink sink(File file) throws FileNotFoundException {
    if (file == null) throw new IllegalArgumentException("file == null");
    return sink(new FileOutputStream(file));
  }
  public static Sink sink(OutputStream out) {
    return sink(out, new Timeout());
  }

  private static Sink sink(final OutputStream out, final Timeout timeout) {
    ...
    //创建一个匿名Sink对象
    return new Sink() {
      @Override public void write(Buffer source, long byteCount) throws IOException {
        checkOffsetAndCount(source.size, 0, byteCount);
        //写入数据
        while (byteCount > 0) {
          //每次写数据时都先检查是否超时,默认未设置超时
          timeout.throwIfReached();
          //获取头结点
          Segment head = source.head;
          //能copy的最小字节
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          //通过OutputStream来写入数据
          out.write(head.data, head.pos, toCopy);
          //可读取的位置向后移动
          head.pos += toCopy;
          //减少可写入的字节数
          byteCount -= toCopy;
          //减少buffer中字节数
          source.size -= toCopy;
          //达到最大可写的位置
          if (head.pos == head.limit) {
            //释放节点
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }
      }

      ...
    };
  }
复制代码

获得 Sink 对象后,将该对象传递给 BufferedSinkBufferedSink 是一个接口,它的具体实现是 RealBufferedSink

public static BufferedSink buffer(Sink sink) {
    return new RealBufferedSink(sink);
  }
复制代码

在3.1节中讲了通过 InputStream 读取数据并返回一个字节数组。这里就将这个数组通过 RealBufferedSinkwrite 方法写入到新的文件中。

@Override public BufferedSink write(byte[] source) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(source);
    return emitCompleteSegments();
  }
复制代码

写入数据跟读取数据流程基本上一样,需要先将数据写入到 Buffer 中。

@Override
    public Buffer write(byte[] source) {
        if (source == null) throw new IllegalArgumentException("source == null");
        return write(source, 0, source.length);
    }
    @Override
    public Buffer write(byte[] source, int offset, int byteCount) {
        ...

        int limit = offset + byteCount;
        while (offset < limit) {
            Segment tail = writableSegment(1);

            int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);
            //进行数据拷贝
            System.arraycopy(source, offset, tail.data, tail.limit, toCopy);

            offset += toCopy;
            tail.limit += toCopy;
        }

        size += byteCount;
        return this;
    }
复制代码

前面说过 Buffer 维护的是一个链表,所以这里也是将数据写入一个链表中,由于在数据读取完毕后会将 Segment 对象重新初始化并放入到池中,所以这里就不用创建新的 Segment 对象,直接从池中获取即可。在写入 Buffer 成功后,再调用 emitCompleteSegments 方法,该方法就是将数据从 Buffer 写入到新文件。

@Override public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
  }
复制代码

这里的 Sink 就是在 Okio 中创建的匿名对象,在 Sink 对象中通过 OutputStream 将数据写入到新文件。 总体流程如下。

Okio源码分析

4、Segment及SegmentPool

SegmentOkio 中非常重要的一环,它可以说是 Buffer 中数据的载体。容量是8kb,头结点为head。

final class Segment {
  //Segment的容量,最大为8kb
  static final int SIZE = 8192;

  //如果Segment中字节数 > SHARE_MINIMUM时(大Segment),就可以共享,不能添加到SegmentPool
  static final int SHARE_MINIMUM = 1024;
  //存储的数据
  final byte[] data;

  //下一次读取的开始位置
  int pos;

 //写入的开始位置
  int limit;

  //当前Segment是否可以共享
  boolean shared;

  //data是否仅当前Segment独有,不share
  boolean owner;

  //后继节点
  Segment next;

  //前驱节点
  Segment prev;

  ...

  //移除当前Segment
  public final @Nullable Segment pop() {
    Segment result = next != this ? next : null;
    prev.next = next;
    next.prev = prev;
    next = null;
    prev = null;
    return result;
  }

  //在当前节点后添加一个新的节点
  public final Segment push(Segment segment) {
    segment.prev = this;
    segment.next = next;
    next.prev = segment;
    next = segment;
    return segment;
  }

  //将当前Segment分裂成2个Segment结点。前面结点pos~limit数据范围是[pos..pos+byteCount),后面结点pos~limit数据范围是[pos+byteCount..limit)
  public final Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;

    //如果字节数大于SHARE_MINIMUM则拆分成共享节点
    if (byteCount >= SHARE_MINIMUM) {
      prefix = sharedCopy();
    } else {
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }

    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    prev.push(prefix);
    return prefix;
  }

  //当前Segment结点和prev前驱结点合并成一个Segment,统一合并到prev,然后当前Segment结点从双向链表移除并添加到SegmentPool复用。当然合并的前提是:2个Segment的字节总和不超过8K。合并后可能会移动pos、limit
  public final void compact() {
    if (prev == this) throw new IllegalStateException();
    if (!prev.owner) return; // Cannot compact: prev isn't writable.
    int byteCount = limit - pos;
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
    if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
    writeTo(prev, byteCount);
    pop();
    SegmentPool.recycle(this);
  }

  //从当前节点移动byteCount个字节到sink中
  public final void writeTo(Segment sink, int byteCount) {
    if (!sink.owner) throw new IllegalArgumentException();
    if (sink.limit + byteCount > SIZE) {
      // We can't fit byteCount bytes at the sink's current position. Shift sink first.
      if (sink.shared) throw new IllegalArgumentException();
      if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
      sink.limit -= sink.pos;
      sink.pos = 0;
    }

    System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
    sink.limit += byteCount;
    pos += byteCount;
  }
}
复制代码

SegmentPool 是一个 Segment 池,内部维护了一个 Segment 单向链表,容量为64kb(8个 Segment ),回收不用的 Segment 对象。

final class SegmentPool {
    //SegmentPool的最大容量
    static final long MAX_SIZE = 64 * 1024; // 64 KiB.

    //后继节点
    static Segment next;

    //当前池内的总字节数
    static long byteCount;

    private SegmentPool() {
    }
    //从池中获取一个Segment对象
    static Segment take() {
        synchronized (SegmentPool.class) {
            if (next != null) {
                Segment result = next;
                next = result.next;
                result.next = null;
                byteCount -= Segment.SIZE;
                return result;
            }
        }
        return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
    }
    //将Segment状态初始化并放入池中
    static void recycle(Segment segment) {
        if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
        if (segment.shared) return; // This segment cannot be recycled.
        synchronized (SegmentPool.class) {
            if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
            byteCount += Segment.SIZE;
            segment.next = next;
            segment.pos = segment.limit = 0;
            next = segment;
        }
    }
}
复制代码

当从 InputStream 中读数据时,读取的数据会写进以 Segment 为节点的双向链表中。如果 Segment 容量不够(容量大于8kb),就会从 SegmentPooltake 一个 Segment 对象并添加到双向链表尾部。 当通过 OutputStrem 写数据时,会从双向链表的 head 节点开始读取,当 Segment 中的数据读取完毕后,就会将该 Segment 从双向链表中移除,并回收到 SegmentPool 中,等待下次复用。

5、超时机制

Okio 的亮点之一就是增加了超时机制,防止因为意外导致I/O一直阻塞的问题,默认的超时机制是同步的。 AsyncTimeoutOkio 中异步超时机制的实现,它是一个单链表,结点按等待时间从小到大排序,head是一个头结点,起占位作用。使用了一个 WatchDog 的后台线程来不断的遍历所有节点,如果某个节点超时就会将该节点从链表中移除,并关闭 SocketAsyncTimeout 提供了3个方法 enterexittimeout ,分别用于流操作开始、结束、超时三种情况调用。

public class AsyncTimeout extends Timeout {
    //头结点,占位使用
    static
    AsyncTimeout head;

    //是否在链表中
    private boolean inQueue;

    //后继节点
    private
    AsyncTimeout next;

    //超时时间
    private long timeoutAt;
    //把当前AsyncTimeout对象加入节点
    public final void enter() {
        ...
        scheduleTimeout(this, timeoutNanos, hasDeadline);
    }

    private static synchronized void scheduleTimeout(
            AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
        //创建占位头结点并开启子线程
        if (head == null) {
            head = new AsyncTimeout();
            new Watchdog().start();
        }

        ...

        //插入到链表中,按照时间长短进行排序,等待事件越长越靠后
        for (AsyncTimeout prev = head; true; prev = prev.next) {
            if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
                node.next = prev.next;
                prev.next = node;
                if (prev == head) {
                    AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
                }
                break;
            }
        }
    }

    //从链表中移除节点
    public final boolean exit() {
        if (!inQueue) return false;
        inQueue = false;
        return cancelScheduledTimeout(this);
    }

    //执行真正的移除操作
    private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
        // Remove the node from the linked list.
        for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
            if (prev.next == node) {
                prev.next = node.next;
                node.next = null;
                return false;
            }
        }

        // The node wasn't found in the linked list: it must have timed out!
        return true;
    }

    //在子类中重写了该方法,主要是进行socket的关闭
    protected void timedOut() {
    }

    //监听节点是否超时的子线程
    private static final class Watchdog extends Thread {
        Watchdog() {
            super("Okio Watchdog");
            setDaemon(true);
        }

        public void run() {
            while (true) {
                try {
                    AsyncTimeout timedOut;
                    synchronized (AsyncTimeout.class) {
                        timedOut = awaitTimeout();
                        //代表头结点的后继节点已超时,
                        if (timedOut == null) continue;
                        //除头结点外没有任何其他节点
                        if (timedOut == head) {
                            head = null;
                            return;
                        }
                    }

                    //关闭socket
                    timedOut.timedOut();
                } catch (InterruptedException ignored) {
                }
            }
        }
    }

    
    static AsyncTimeout awaitTimeout() throws InterruptedException {
        AsyncTimeout node = head.next;
        //除了头结点外没有任何其他节点
        if (node == null) {
            long startNanos = System.nanoTime();
            AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
            return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
                    ? head  // The idle timeout elapsed.
                    : null; // The situation has changed.
        }

        long waitNanos = node.remainingNanos(System.nanoTime());

        //进行等待
        if (waitNanos > 0) {
            //等待
            long waitMillis = waitNanos / 1000000L;
            waitNanos -= (waitMillis * 1000000L);
            AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
            return null;
        }

        //代表node节点已超时
        head.next = node.next;
        node.next = null;
        return node;
    }
}
复制代码

默认都是未设置超时时间的,需要我们自己来设置,同步及异步的超时时间设置方式是一样的,通过下面代码即可。

sink.timeout().deadline(1, TimeUnit.SECONDS);
    source.timeout().deadline(1,TimeUnit.MILLISECONDS);
复制代码

6、生产者/消费者模型

Okio 中可以使用 Pipe 来实现一个生产者/消费者模型。 Pipe 维护了一个一定大小 Buffer 。当该 Buffer 容量达到最大时,线程就会等待直到该 Buffer 有剩余的空间。

public final class Pipe {
  //Pipe的最大容量
  final long maxBufferSize;
  //Pipe对应的Buffer
  final Buffer buffer = new Buffer();
  boolean sinkClosed;
  boolean sourceClosed;
  //写入流,对应着生产者
  private final Sink sink = new PipeSink();
  //读取流,对应着消费者
  private final Source source = new PipeSource();

  public Pipe(long maxBufferSize) {
    //最大容量不能小于1
    if (maxBufferSize < 1L) {
      throw new IllegalArgumentException("maxBufferSize < 1: " + maxBufferSize);
    }
    this.maxBufferSize = maxBufferSize;
  }
  ...
  //写入数据到Pipe中
  final class PipeSink implements Sink {
    final Timeout timeout = new Timeout();

    @Override public void write(Buffer source, long byteCount) throws IOException {
      synchronized (buffer) {
        ...

        while (byteCount > 0) {
          ...

          long bufferSpaceAvailable = maxBufferSize - buffer.size();
          if (bufferSpaceAvailable == 0) {
            //buffer中,没有剩余空间,等待消费者消费
            timeout.waitUntilNotified(buffer); // Wait until the source drains the buffer.
            continue;
          }

          long bytesToWrite = Math.min(bufferSpaceAvailable, byteCount);
          buffer.write(source, bytesToWrite);
          byteCount -= bytesToWrite;
          //通知buffer,有新的数据了,
          buffer.notifyAll(); // Notify the source that it can resume reading.
        }
      }
    }

    ...
  }
  //从Pipe中读取数据
  final class PipeSource implements Source {
    final Timeout timeout = new Timeout();

    @Override public long read(Buffer sink, long byteCount) throws IOException {
      synchronized (buffer) {
        ...

        while (buffer.size() == 0) {
          if (sinkClosed) return -1L;
          //Pipe中没有数据,等待生产者写入
          timeout.waitUntilNotified(buffer); // Wait until the sink fills the buffer.
        }

        long result = buffer.read(sink, byteCount);
        buffer.notifyAll(); // Notify the sink that it can resume writing.
        return result;
      }
    }
    ...
  }
}
复制代码

Pipe 的代码还是比较少的。下面就来如何使用 Pipe

public void pipe() throws IOException {
        //设置Pipe的容量为1024字节,即1kb
        Pipe pipe = new Pipe(1024);
        new Thread(new Runnable() {
            @Override
            public void run() {
                try (BufferedSource bufferedSource = Okio.buffer(pipe.source())) {
                    //将Pipe中数据写入env4.txt这个文件中
                    bufferedSource.readAll(Okio.sink(new File("file/env4.txt")));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try (BufferedSink bufferedSink = Okio.buffer(pipe.sink())) {
                    //将env3.txt中数据写入到Pipe中
                    bufferedSink.writeAll(Okio.source(new File("file/env3.txt")));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
复制代码

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Game Programming Patterns

Game Programming Patterns

Robert Nystrom / Genever Benning / 2014-11-2 / USD 39.95

The biggest challenge facing many game programmers is completing their game. Most game projects fizzle out, overwhelmed by the complexity of their own code. Game Programming Patterns tackles that exac......一起来看看 《Game Programming Patterns》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

SHA 加密
SHA 加密

SHA 加密工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试