okio是square公司开源的一款便于io处理的底层库。代码不长,很实用, 例如可以对输入输出流进行gzip压缩,支持socket,文件等类型的输入输出,方便计算流的MD5/SHA等信息。github地址为:okio

重要的类和字段功能

Sink, Source, BufferedSink, BufferedSource

这几个类是okio中非常重要的interface,其中Sink,BufferedSink一组用于写,Source,BufferedSource一组用于读。okio主要采用的是装饰模式,将输入或者输出分层进行装饰,类似于java中的inputStream,BufferedInputStream等
下面贴出这几个接口的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface Sink extends Closeable, Flushable {
/** Removes {@code byteCount} bytes from {@code source} and appends them to this. */
void write(Buffer source, long byteCount) throws IOException;
/** Pushes all buffered bytes to their final destination. */
@Override void flush() throws IOException;
/** Returns the timeout for this sink. */
Timeout timeout();
/**
* Pushes all buffered bytes to their final destination and releases the
* resources held by this sink. It is an error to write a closed sink. It is
* safe to close a sink more than once.
*/
@Override void close() throws IOException;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface Source extends Closeable {
/**
* Removes at least 1, and up to {@code byteCount} bytes from this and appends
* them to {@code sink}. Returns the number of bytes read, or -1 if this
* source is exhausted.
*/
long read(Buffer sink, long byteCount) throws IOException;
/** Returns the timeout for this source. */
Timeout timeout();
/**
* Closes this source and releases the resources held by this source. It is an
* error to read a closed source. It is safe to close a source more than once.
*/
@Override void close() throws IOException;
}

segment

segment作为buffer中的一个单位,保存一小段byte数组。segment保存有prev,next指针,支持实现单项链表,和双向链表。同时segment支持共享。当处于共享时,不能修改内部byte数组,下面给出segment核心字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final byte[] data;
/** The next byte of application data byte to read in this segment. */
int pos;
/** The first byte of available data ready to be written to. */
int limit;
/** True if other segments or byte strings use the same byte array. */
boolean shared;
/** True if this segment owns the byte array and can append to it, extending {@code limit}. */
boolean owner;
/** Next segment in a linked or circularly-linked list. */
Segment next;
/** Previous segment in a circularly-linked list. */
Segment prev;

SegmentPool

该类非常简单,用来保存被回收的segment,下面给出其代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}
static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; // This segment cannot be recycled.
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}

Timeout, AsyncTimeout

  1. Timeout
    可以通过deadline,或者timeout时间控制超时。内部最终是通过object的wait,notify来实现线程等待和唤醒。因此该Timeout是同步的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //部分实现核心代码
    long elapsedNanos = 0L;
    if (waitNanos > 0L) {
    long waitMillis = waitNanos / 1000000L;
    monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L));
    elapsedNanos = System.nanoTime() - start;
    }
    // Throw if the timeout elapsed before the monitor was notified.
    if (elapsedNanos >= waitNanos) {
    throw new InterruptedIOException("timeout");
    }
  2. AsyncTimeout
    通过一个有序的队列进行管理AsyncTimeout节点。异步线程Watchdog,用于监视AsycTimeout列表的超时情况。在watchdog中,通过对AsyncTimeout.class.wait方式进行等待,当发生超时时,调用对应AsyncTimeout的timeOut()方法进行回调
    AsyncTimeout还提供了sink和source的包装返回类

Okio

该类定义了一些类公共静态方法,方便用户将InputStream/OutputStream/File/Socket转化为Sink/Source.
转化Socket时,最终都是通过AsyncTimeout包装的Sink/Source进行返回

1
2
3
4
5
6
public static Source source(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
AsyncTimeout timeout = timeout(socket);
Source source = source(socket.getInputStream(), timeout);
return timeout.source(source);
}

Buffer

Buffer是okio的一个非常关键的实现类。考虑到BufferedSource和BufferedSink接口的一些实现共性,因此把这些实现放到一个类中,同时集成这两个接口,这就是Buffer
Buffer内部使用Segment组成环形双向列表管理字节数组,在copy buffer to buffer过程中巧妙的使用了segment的共享机制,效率上会非常快

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public Buffer copyTo(Buffer out, long offset, long byteCount) {
if (out == null) throw new IllegalArgumentException("out == null");
checkOffsetAndCount(size, offset, byteCount);
if (byteCount == 0) return this;
out.size += byteCount;
// Skip segments that we aren't copying from.
Segment s = head;
for (; offset >= (s.limit - s.pos); s = s.next) {
offset -= (s.limit - s.pos);
}
// Copy one segment at a time.
for (; byteCount > 0; s = s.next) {
Segment copy = new Segment(s);
copy.pos += offset;
copy.limit = Math.min(copy.pos + (int) byteCount, copy.limit);
if (out.head == null) {
out.head = copy.next = copy.prev = copy;
} else {
out.head.prev.push(copy);
}
byteCount -= copy.limit - copy.pos;
offset = 0;
}
return this;
}

Buffer的实现中提供了大量的read/write接口,这些接口都是在双向segment列表上对byte数组进行操作,就不一一列举了
Buffer还提供了md5,SHA等计算接口

RealBufferedSink/RealBufferedSource

这两类分别实现了BufferedSink/BufferedSource接口,实现方式类似。都是内部通过一个Buffer对读写数据进行缓冲,然后再读写到对应的目标Sink/Source之中
下面给出部分代码片段

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override public void write(Buffer source, long byteCount)
throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.write(source, byteCount);
emitCompleteSegments();
}
@Override public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override public byte[] readByteArray(long byteCount) throws IOException {
require(byteCount);
return buffer.readByteArray(byteCount);
}
@Override public void require(long byteCount) throws IOException {
if (!request(byteCount)) throw new EOFException();
}
@Override public boolean request(long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");
while (buffer.size < byteCount) {
if (source.read(buffer, Segment.SIZE) == -1) return false;
}
return true;
}

Pipe

该类内部实现了两个内部类PipeSink/PipeSource, 该两个内部类通过一个Buffer字段用于缓冲数据从PipeSink到PipeSource

GzipSink/DeflaterSink

GzipSink组合DeflaterSink可以将输出流进行gzip压缩
初始化时,GzipSink,写入gzip头部

1
2
3
4
5
6
7
8
9
10
private void writeHeader() {
// Write the Gzip header directly into the buffer for the sink to avoid handling IOException.
Buffer buffer = this.sink.buffer();
buffer.writeShort(0x1f8b); // Two-byte Gzip ID.
buffer.writeByte(0x08); // 8 == Deflate compression method.
buffer.writeByte(0x00); // No flags.
buffer.writeInt(0x00); // No modification time.
buffer.writeByte(0x00); // No extra flags.
buffer.writeByte(0x00); // No OS.
}

写入数据时,GzipSink负责CRC32的计算,而输入数据交给DeflaterSink进行压缩

1
2
3
4
5
6
7
@Override public void write(Buffer source, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0) return;
updateCrc(source, byteCount);
deflaterSink.write(source, byteCount);
}

最后,在close阶段,再由GzipSink写入footer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override public void close() throws IOException {
if (closed) return;
// This method delegates to the DeflaterSink for finishing the deflate process
// but keeps responsibility for releasing the deflater's resources. This is
// necessary because writeFooter needs to query the processed byte count which
// only works when the deflater is still open.
Throwable thrown = null;
try {
deflaterSink.finishDeflate();
writeFooter();
} catch (Throwable e) {
thrown = e;
}
try {
deflater.end();
} catch (Throwable e) {
if (thrown == null) thrown = e;
}
try {
sink.close();
} catch (Throwable e) {
if (thrown == null) thrown = e;
}
closed = true;
if (thrown != null) Util.sneakyRethrow(thrown);
}

GzipSource/InflaterSource

类似GzipSink/DeflaterSink的模式,GzipSource和InflaterSource是一组,用于将gzip压缩流进行解压

ForwardingSink/ForwardingSource

ForwardingSink/ForwardSource内部实现非常简单,几乎什么都没做,就是单纯的调用内部组合的Sink/Source对应的方法

HashingSink/HashSource

其中HashingSink继承ForwardingSink,而HashSource继承ForwardingSource。这两个类实现也很简单,在输出或者输入数据的同时,计算MessageDigest。