卓越飞翔博客卓越飞翔博客

卓越飞翔 - 您值得收藏的技术分享站
技术文章16333本站已运行3317

bufio-Reader 源码分析

bufio 库解析

通过分析bufio库的代码,来更好的作为我们的技术选型和使用它。

环境

go version go1.17.13 windows/amd64

包分析

在分析之前我们需要查看这个包对外提供的方法。

分析一个包的源码,最好的方式就是查看它对外提供的方法;再一层一层的深入。

下面就是对外提供的方法:

// NewReaderSize returns a new Reader whose buffer has at least the specified
// size. If the argument io.Reader is already a Reader with large enough
// size, it returns the underlying Reader.
// NewReaderSize 返回缓冲区至少具有指定大小的新Reader。
// 如果参数io.Reader已经是一个足够大的Reader,它将返回基础Reader。
func NewReaderSize(rd io.Reader, size int) *Reader 

// NewReader returns a new Reader whose buffer has the default size.
// NewReader 返回一个缓冲区具有默认大小的新 Reader。
func NewReader(rd io.Reader) *Reader 

// NewWriterSize returns a new Writer whose buffer has at least the specified
// size. If the argument io.Writer is already a Writer with large enough
// size, it returns the underlying Writer.
// NewWriterSize返回缓冲区至少具有指定大小的新Writer。
// 如果参数io.Writer已经是足够大的Writer,则返回基础Writer。
func NewWriterSize(w io.Writer, size int) *Writer 

// NewWriter returns a new Writer whose buffer has the default size.
// NewWriter返回缓冲区大小为默认值的新Writer。
func NewWriter(w io.Writer) *Writer 

// NewReadWriter allocates a new ReadWriter that dispatches to r and w.
// NewReadWriter分配一个新的ReadWriter,它分派给r和w。
func NewReadWriter(r *Reader, w *Writer) *ReadWriter

对于不清楚 io.Writer 和 io.Reader 接口的有必要去自己了解一下。

此包对外提供的结构体有下面3个:

// Reader implements buffering for an io.Reader object.
// Reader为io.Reader对象实现缓冲。
type Reader struct {
 buf          []byte
 rd           io.Reader // reader provided by the client
 r, w         int       // buf read and write positions
 err          error
 lastByte     int // last byte read for UnreadByte; -1 means invalid
 lastRuneSize int // size of last rune read for UnreadRune; -1 means invalid
}

// Writer implements buffering for an io.Writer object.
// If an error occurs writing to a Writer, no more data will be
// accepted and all subsequent writes, and Flush, will return the error.
// After all data has been written, the client should call the
// Flush method to guarantee all data has been forwarded to
// the underlying io.Writer.
// Writer为io.Writer对象实现缓冲。
// 如果写入Writer时发生错误,将不再接受数据,所有后续写入和刷新都将返回错误。
// 写入所有数据后,客户端应调用Flush方法以确保所有数据都已转发到基础io.Writer。
type Writer struct {
 err error
 buf []byte
 n   int
 wr  io.Writer
}

// ReadWriter stores pointers to a Reader and a Writer.
// It implements io.ReadWriter.
// ReadWriter存储指向Reader和Writer的指针。
// 它实现io.ReadWriter。
type ReadWriter struct {
 *Reader
 *Writer
}

现在你对这3个结构内部的字段不了解没有关系。但是通过注释你需要知道他们分别的作用是什么。如果你都不了解也可以跟着我一起来分析它的作用,相信看完之后对你有一定的作用,并能熟练的使用起来。

Reader 及其作用

先说结论:

读缓冲区,可以先把需要的数据缓冲一部分在这个对象中,你需要的时候可以直接取走,而不需要去真正的源读取。

举个例子:在你读取大文件的时候,不能一次读完;采用file.Read的方式,你每次读取内容都会产生一个系统调用(陷入内核,用户态切换);

如果这个时候你用使用 Reader来提供一个缓冲区,那么有可能会减少系统调用的次数(缓冲区需要比每次读取的数据长度大)。比如每次读取1K,缓冲区有2K,那么你读取2次内容,缓冲区才去调用 file.Read 一次。

对于这里的解释,说的是一种思路,它能做到这种效果;而不是说就是例子的效果。

NewReaderSize 分析

func NewReaderSize(rd io.Reader, size int) *Reader {
 // Is it already a Reader?
 // 如果已经是一个Reader 的对象,并且它的buf容量大于 size 直接返回原对象。
 b, ok := rd.(*Reader)
 if ok && len(b.buf) >= size {
  return b
 }
 
    // 最小 cap 判断
 if size < minReadBufferSize {
  size = minReadBufferSize
 }
 r := new(Reader)
 // 设置默认值,并给buf创建空间
 r.reset(make([]byte, size), rd)
 return r
}

func (b *Reader) reset(buf []byte, r io.Reader) {
 *b = Reader{
  buf:          buf,
  rd:           r,
  lastByte:     -1,
  lastRuneSize: -1,
 }
}

整个的创建方法比较简单,可以通过注释进行理解。

Read 分析

Read 主要是实现 io.Reader 的方法,大概率也是常用的方法,放在这里先分析。

// Read reads data into p.
// Read将数据读入p。
// It returns the number of bytes read into p.
// 它返回读入p的字节数。
// The bytes are taken from at most one Read on the underlying Reader,
// hence n may be less than len(p).
// 这些字节来自底层Reader上的最多一个Read,因此n可能小于len(p)。
// To read exactly len(p) bytes, use io.ReadFull(b, p).
// 要准确读取len(p)字节,请使用io.ReadFull(b,p)。
// At EOF, the count will be zero and err will be io.EOF.
// 对于EOF,计数将为零,错误将为io.EOF。
func (b *Reader) Read(p []byte) (n int, err error) {
 n = len(p)
 if n == 0 {
     // 读取0字节内容,如果存在内容,直接返回
  if b.Buffered() > 0 {
   return 0, nil
  }
  // 如果不存在内容,返回可能存在的错误。
  return 0, b.readErr()
 }
 if b.r == b.w {
     // 如果 r==w 说明这个buf中是空的,具体原理请了解 环形数组,方便内存重用
  if b.err != nil {
   return 0, b.readErr()
  }
  
        // 当前还没有缓存数据的情况下
        // 需要的数据大于当前的buf长度,会直接通过源进行读取。
        // 避免无效的复制
  if len(p) >= len(b.buf) {
   // Large read, empty buffer.
   // Read directly into p to avoid copy.
   n, b.err = b.rd.Read(p)
   if n < 0 {
    panic(errNegativeRead)
   }
   if n > 0 {
    b.lastByte = int(p[n-1])
    b.lastRuneSize = -1
   }
   return n, b.readErr()
  }
  
        // 如果需要读取的内容比 buf 长度小,那么进行一次填装buf的过程。
  // One read.
  // Do not use b.fill, which will loop.
  b.r = 0
  b.w = 0
  n, b.err = b.rd.Read(b.buf)
  if n < 0 {
   panic(errNegativeRead)
  }
  if n == 0 {
   return 0, b.readErr()
  }
  b.w += n
 }

 // copy as much as we can
 // 这里复制的内容存在3个情况
 // 1. len(p) > b.Buffered()  复制所有buf的内容,返回复制的长度
 // 2. len(p) == b.Buffered() 刚好复制完所有内容。
 // 当发生1,2的情况下,下次进入又会走上面的源读取一次。因为此时 r == w
 // 3. len(p) < b.Buffered() 复制一部分 buf,返回 len(p) ,下次继续读。
 n = copy(p, b.buf[b.r:b.w])
 b.r += n
 b.lastByte = int(b.buf[b.r-1])
 b.lastRuneSize = -1
 return n, nil
}

通过这个方法的代码实现,可以看出来:

  1. 如果读取的内容大于buf 的长度,那么就不会有性能的提升。因为每次都走源读取。
  2. 如果读取小于buf的长度,那么他会一次缓存尽可能满buf的情况。(存在下层b.rd.Read返回的限制。)

ReadSlice 分析

这个方法为 ReadLineReadBytes,ReadString 等其他的方法提供了基础。下面我会拿一个出来分析,剩下的希望各位自己分析出来;就当是学习后的实践吧。

// ReadSlice 读取直到输入中第一次出现delim,返回一个指向缓冲区中字节的切片。字节在下一次读取时停止有效。
// 如果ReadSlice在找到分隔符之前遇到错误,它将返回缓冲区中的所有数据和错误本身(通常为io.EOF)  
// 如果缓冲区在没有delim的情况下填充,ReadSlice将失败,并出现错误ErrBufferFull。
// 由于从ReadSlice返回的数据将被下一次IO操作覆盖,因此大多数客户端应改用ReadBytes或ReadString。
func (b *Reader) ReadSlice(delim byte) (line []byte, err error) {
    // 用于buf中不满的情况下,进行搜索优化。
 s := 0 // search start index
 for {
  // Search buffer.
        // 第一次 s = 0,进行存量的buf 搜索
    // 第二次,下面已经又填充了一些数据在buf 中,只需要搜索新加的内容。 
  if i := bytes.IndexByte(b.buf[b.r+s:b.w], delim); i >= 0 {
      // 搜索到指定的 delim,进行buf 的返回
   i += s
   line = b.buf[b.r : b.r+i+1]
   b.r += i + 1
   break
  }

  // Pending error?
  // 可能在上次的 b.fill 发生了错误,这里返回整个未读的buf,并返回错误。
  if b.err != nil {
   line = b.buf[b.r:b.w]
   b.r = b.w
   // 重置 r,w 的位置
   err = b.readErr()
   break
  }

  // Buffer full?
  // buf 被装满了,但是没有找到 delim ,返回所有数据,并返回一个 ErrBufferFull 错误。
  if b.Buffered() >= len(b.buf) {
   b.r = b.w
   line = b.buf
   err = ErrBufferFull
   break
  }

        // 处理上次查找的内容大小,下次就不在搜索了。
  s = b.w - b.r // do not rescan area we scanned before

        // 尽量装满整个buf,里面的内容我就不分析了。
  b.fill() // buffer is not full
 }

    // 设置最后因为byte,如果有数据。
 // Handle last byte, if any.
 if i := len(line) - 1; i >= 0 {
  b.lastByte = int(line[i])
  b.lastRuneSize = -1
 }

 return
}

通过源码分析+注释,可以预见我们在使用整个方法的时候,有几个情况需要处理:

  1. err 返回nil,找到指定的 delim,我们高兴的处理。
  2. err != ErrBufferFull ,需要保存返回的数据,并捕获错误或继续返回错误。
  3. err == ErrBufferFull 说明这次buf不存在 delim ,保存数据,继续找。

这里看着这几个情况,你自己实现可能觉得太麻烦了,所以官方帮我们解决了。

// 读取直到输入中第一次出现delim。
// 它返回
// fullBuffers: 完整缓冲区的切片,
// finalFragment: delim之前的剩余字节,
// totalLen: 前两个元素组合的总字节数,
// err: 错误
// 完整结果等于"bytes.Join(append(fullBuffers,finalFragment),nil)",
// 长度为"totalLen"。结果以这种方式构造,以允许调用者最小化分配和复制。
func (b *Reader) collectFragments(delim byte) (fullBuffers [][]byte, finalFragment []byte, totalLen int, err error) {
 var frag []byte
 // Use ReadSlice to look for delim, accumulating full buffers.
 for {
  var e error
  frag, e = b.ReadSlice(delim)
  if e == nil { // got final fragment
   break
  }
  if e != ErrBufferFull { // unexpected error
   err = e
   break
  }

  // Make a copy of the buffer.
  buf := make([]byte, len(frag))
  copy(buf, frag)
  fullBuffers = append(fullBuffers, buf)
  totalLen += len(buf)
 }

 totalLen += len(frag)
 return fullBuffers, frag, totalLen, err
}

不过很可惜,它不是包导出类型,我们无法直接使用,如果想使用它,你可以使用一个包装器,通过ReadSlice实现同样的方法。

ReadBytes 分析

那么我们来看一下 ReadBytes 是怎么通过collectFragments来实现的。

// 读取直到输入中第一次出现delim,返回一个包含数据的片段,直到并包括分隔符。
// 如果ReadBytes在找到分隔符之前遇到错误,它将返回错误之前读取的数据和错误本身(通常为io.EOF)。
// ReadBytes 返回 err != nil 仅仅是返回的数据未以delim结尾。
// 这句话的意思是:就算是最后发生了 io.EOF错误,那么也说明所有数据中没有 delim 结尾的字节。
// 对于简单的用途,Scanner 可能更方便;这个后面在分析。
func (b *Reader) ReadBytes(delim byte) ([]byte, error) {
    // 这个方法上面已经进行过说明了
    // 返回一个 [][]byte, []byte,前面2个的字节总长度,一个错误。
 full, frag, n, err := b.collectFragments(delim)
 // Allocate new buffer to hold the full pieces and the fragment.
    // 分配新的缓冲区来保存完整的片段和一部分片段。
 buf := make([]byte, n)
 n = 0
 // 复制完整的片段
 for i := range full {
  n += copy(buf[n:], full[i])
 }
 
    // 复制剩下的部分
 copy(buf[n:], frag)
 return buf, err
}

总结

对于包中的 ReadStringReadLinePeek 这些剩下未分析的方法,就靠你自己根据现有的注释和理解去分析了。

对于包中的 Writer,这篇幅有限,我就不在本章进行分析了。

有想法,有想知道其他标准库源码解析的,也可以留言,我会尽我自己的水平进行学习并分享。

卓越飞翔博客
上一篇: php怎么除去数组里的空值
下一篇: 如何使用Go语言进行代码错误监控与报警
留言与评论(共有 0 条评论)
   
验证码:
隐藏边栏