okio是square公司开源的一款便于io处理的底层库。代码不长,很实用, 例如可以对输入输出流进行gzip压缩,支持socket,文件等类型的输入输出,方便计算流的MD5/SHA等信息。github地址为:okio
重要的类和字段功能 Sink, Source, BufferedSink, BufferedSource 这几个类是okio中非常重要的interface,其中Sink,BufferedSink一组用于写,Source,BufferedSource一组用于读。okio主要采用的是装饰模式,将输入或者输出分层进行装饰,类似于java中的inputStream,BufferedInputStream等 下面贴出这几个接口的代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface Sink extends Closeable , Flushable {
void write (Buffer source, long byteCount) throws IOException ;
@Override void flush () throws IOException ;
Timeout timeout () ;
* Pushes all buffered bytes to their final destination and releases the
* resources held by this sink. It is an error to write a closed sink. It is
* safe to close a sink more than once.
*/
@Override void close () throws IOException ;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface Source extends Closeable {
* Removes at least 1, and up to {@code byteCount} bytes from this and appends
* them to {@code sink}. Returns the number of bytes read, or -1 if this
* source is exhausted.
*/
long read (Buffer sink, long byteCount) throws IOException ;
Timeout timeout () ;
* Closes this source and releases the resources held by this source. It is an
* error to read a closed source. It is safe to close a source more than once.
*/
@Override void close () throws IOException ;
}
segment segment作为buffer中的一个单位,保存一小段byte数组。segment保存有prev,next指针,支持实现单项链表,和双向链表。同时segment支持共享。当处于共享时,不能修改内部byte数组,下面给出segment核心字段1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final byte [] data;
int pos;
int limit;
boolean shared;
boolean owner;
Segment next;
Segment prev;
SegmentPool 该类非常简单,用来保存被回收的segment,下面给出其代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static Segment take () {
synchronized (SegmentPool.class) {
if (next != null ) {
Segment result = next;
next = result.next;
result.next = null ;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment();
}
static void recycle (Segment segment) {
if (segment.next != null || segment.prev != null ) throw new IllegalArgumentException();
if (segment.shared) return ;
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return ;
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0 ;
next = segment;
}
}
Timeout, AsyncTimeout
Timeout 可以通过deadline,或者timeout时间控制超时。内部最终是通过object的wait,notify来实现线程等待和唤醒。因此该Timeout是同步的
1
2
3
4
5
6
7
8
9
10
11
12
long elapsedNanos = 0L ;
if (waitNanos > 0L ) {
long waitMillis = waitNanos / 1000000L ;
monitor.wait(waitMillis, (int ) (waitNanos - waitMillis * 1000000L ));
elapsedNanos = System.nanoTime() - start;
}
if (elapsedNanos >= waitNanos) {
throw new InterruptedIOException("timeout" );
}
AsyncTimeout 通过一个有序的队列进行管理AsyncTimeout节点。异步线程Watchdog,用于监视AsycTimeout列表的超时情况。在watchdog中,通过对AsyncTimeout.class.wait方式进行等待,当发生超时时,调用对应AsyncTimeout的timeOut()方法进行回调 AsyncTimeout还提供了sink和source的包装返回类
Okio 该类定义了一些类公共静态方法,方便用户将InputStream/OutputStream/File/Socket转化为Sink/Source. 转化Socket时,最终都是通过AsyncTimeout包装的Sink/Source进行返回1
2
3
4
5
6
public static Source source (Socket socket) throws IOException {
if (socket == null ) throw new IllegalArgumentException("socket == null" );
AsyncTimeout timeout = timeout(socket);
Source source = source(socket.getInputStream(), timeout);
return timeout.source(source);
}
Buffer Buffer是okio的一个非常关键的实现类。考虑到BufferedSource和BufferedSink接口的一些实现共性,因此把这些实现放到一个类中,同时集成这两个接口,这就是Buffer Buffer内部使用Segment组成环形双向列表管理字节数组,在copy buffer to buffer过程中巧妙的使用了segment的共享机制,效率上会非常快1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public Buffer copyTo (Buffer out, long offset, long byteCount) {
if (out == null ) throw new IllegalArgumentException("out == null" );
checkOffsetAndCount(size, offset, byteCount);
if (byteCount == 0 ) return this ;
out.size += byteCount;
Segment s = head;
for (; offset >= (s.limit - s.pos); s = s.next) {
offset -= (s.limit - s.pos);
}
for (; byteCount > 0 ; s = s.next) {
Segment copy = new Segment(s);
copy.pos += offset;
copy.limit = Math.min(copy.pos + (int ) byteCount, copy.limit);
if (out.head == null ) {
out.head = copy.next = copy.prev = copy;
} else {
out.head.prev.push(copy);
}
byteCount -= copy.limit - copy.pos;
offset = 0 ;
}
return this ;
}
Buffer的实现中提供了大量的read/write接口,这些接口都是在双向segment列表上对byte数组进行操作,就不一一列举了 Buffer还提供了md5,SHA等计算接口
RealBufferedSink/RealBufferedSource 这两类分别实现了BufferedSink/BufferedSource接口,实现方式类似。都是内部通过一个Buffer对读写数据进行缓冲,然后再读写到对应的目标Sink/Source之中 下面给出部分代码片段1
2
3
4
5
6
7
8
9
10
11
12
13
@Override public void write (Buffer source, long byteCount)
throws IOException {
if (closed) throw new IllegalStateException("closed" );
buffer.write(source, byteCount);
emitCompleteSegments();
}
@Override public BufferedSink emitCompleteSegments () throws IOException {
if (closed) throw new IllegalStateException("closed" );
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0 ) sink.write(buffer, byteCount);
return this ;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override public byte [] readByteArray(long byteCount) throws IOException {
require(byteCount);
return buffer.readByteArray(byteCount);
}
@Override public void require (long byteCount) throws IOException {
if (!request(byteCount)) throw new EOFException();
}
@Override public boolean request (long byteCount) throws IOException {
if (byteCount < 0 ) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed" );
while (buffer.size < byteCount) {
if (source.read(buffer, Segment.SIZE) == -1 ) return false ;
}
return true ;
}
Pipe 该类内部实现了两个内部类PipeSink/PipeSource, 该两个内部类通过一个Buffer字段用于缓冲数据从PipeSink到PipeSource
GzipSink/DeflaterSink GzipSink组合DeflaterSink可以将输出流进行gzip压缩 初始化时,GzipSink,写入gzip头部1
2
3
4
5
6
7
8
9
10
private void writeHeader () {
Buffer buffer = this .sink.buffer();
buffer.writeShort(0x1f8b );
buffer.writeByte(0x08 );
buffer.writeByte(0x00 );
buffer.writeInt(0x00 );
buffer.writeByte(0x00 );
buffer.writeByte(0x00 );
}
写入数据时,GzipSink负责CRC32的计算,而输入数据交给DeflaterSink进行压缩1
2
3
4
5
6
7
@Override public void write (Buffer source, long byteCount) throws IOException {
if (byteCount < 0 ) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0 ) return ;
updateCrc(source, byteCount);
deflaterSink.write(source, byteCount);
}
最后,在close阶段,再由GzipSink写入footer1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override public void close () throws IOException {
if (closed) return ;
Throwable thrown = null ;
try {
deflaterSink.finishDeflate();
writeFooter();
} catch (Throwable e) {
thrown = e;
}
try {
deflater.end();
} catch (Throwable e) {
if (thrown == null ) thrown = e;
}
try {
sink.close();
} catch (Throwable e) {
if (thrown == null ) thrown = e;
}
closed = true ;
if (thrown != null ) Util.sneakyRethrow(thrown);
}
GzipSource/InflaterSource 类似GzipSink/DeflaterSink的模式,GzipSource和InflaterSource是一组,用于将gzip压缩流进行解压
ForwardingSink/ForwardingSource ForwardingSink/ForwardSource内部实现非常简单,几乎什么都没做,就是单纯的调用内部组合的Sink/Source对应的方法
HashingSink/HashSource 其中HashingSink继承ForwardingSink,而HashSource继承ForwardingSource。这两个类实现也很简单,在输出或者输入数据的同时,计算MessageDigest。