redis源码阅读之rio

栏目: 数据库 · 发布时间: 4年前

内容简介:redis当中的rio的话就是在后端执行读写操作,bio相当于它的一个补充操作,它使用一个大结构体将操作全部封装起来,有点像linux当中驱动的写法,方便统一接口,具体结构体如下:由结构可知,rio会处理三种不同的read和write,分别为1. 内存buffer,内部由sds表示

redis当中的rio的话就是在后端执行读写操作,bio相当于它的一个补充操作,它使用一个大结构体将操作全部封装起来,有点像 linux 当中驱动的写法,方便统一接口,具体结构体如下:

点击( 此处 )折叠或打开

  1. struct _rio {
  2.      / * Backend functions .
  3.       * Since this functions do not tolerate short writes or reads the return
  4.       * value is simplified to : zero on error , non zero on complete success . * /
  5.     size_t ( * read ) ( struct _rio * , void * buf , size_t len ) ;
  6.     size_t ( * write ) ( struct _rio * , const void * buf , size_t len ) ;
  7.     off_t ( * tell ) ( struct _rio * ) ;
  8.      int ( * flush ) ( struct _rio * ) ;
  9.      / * The update_cksum method if not NULL is used to compute the checksum of
  10.       * all the data that was read or written so far . The method should be
  11.       * designed so that can be called with the current checksum , and the buf
  12.       * and len fields pointing to the new block of data to add to the checksum
  13.       * computation . * /
  14.     void ( * update_cksum ) ( struct _rio * , const void * buf , size_t len ) ;
  15.      / * The current checksum * /
  16.     uint64_t cksum ;
  17.      / * number of bytes read or written * /
  18.     size_t processed_bytes ;
  19.      / * maximum single read or write chunk size * /
  20.     size_t max_processing_chunk ;
  21.      / * Backend - specific vars . * /
  22.     union {
  23.          / * In - memory buffer target . * /
  24.         struct {
  25.             sds ptr ;
  26.             off_t pos ;
  27.          } buffer ;
  28.          / * Stdio file pointer target . * /
  29.         struct {
  30.             FILE * fp ;
  31.             off_t buffered ; / * Bytes written since last fsync . * /
  32.             off_t autosync ; / * fsync after ' autosync ' bytes written . * /
  33.          } file ;
  34.          / * Multiple FDs target ( used to write to N sockets ) . * /
  35.         struct {
  36.              int * fds ; / * File descriptors . * /
  37.              int * state ; / * Error state of each fd . 0 ( if ok ) or errno . * /
  38.              int numfds ;
  39.             off_t pos ;
  40.             sds buf ;
  41.          } fdset ;
  42.      } io ;
  43. } ;
  44. typedef struct _rio rio ;

由结构可知,rio会处理三种不同的read和write,分别为

1. 内存buffer,内部由sds表示

2. 文件指针,内部由FILE*表示

3. 文件描述符数组,内部由int *表示

rio结构 留给外部调用的接口有read,write,tell,flush和update_cksum,而整个rio留给外部调用的接口是将结构体的接口进一步封装了:

点击( 此处 )折叠或打开

  1. static inline size_t rioWrite ( rio * r , const void * buf , size_t len ) {
  2.      while ( len ) {
  3.         size_t bytes_to_write = ( r - > max_processing_chunk & & r - > max_processing_chunk < len ) ? r - > max_processing_chunk : len ;
  4.          if ( r - > update_cksum ) r - > update_cksum ( r , buf , bytes_to_write ) ;
  5.          if ( r - > write ( r , buf , bytes_to_write ) = = 0 ) / * 0表示失败;非0表示成功 * /
  6.             return 0 ;
  7.         buf = ( char * ) buf + bytes_to_write ;
  8.          len - = bytes_to_write ;
  9.         r - > processed_bytes + = bytes_to_write ;
  10.      }
  11.     return 1 ;
  12. }
  13. static inline size_t rioRead ( rio * r , void * buf , size_t len ) {
  14.      while ( len ) {
  15.         size_t bytes_to_read = ( r - > max_processing_chunk & & r - > max_processing_chunk < len ) ? r - > max_processing_chunk : len ;
  16.          if ( r - > read ( r , buf , bytes_to_read ) = = 0 ) / * 0表示失败;非0表示成功 * /
  17.             return 0 ;
  18.          if ( r - > update_cksum ) r - > update_cksum ( r , buf , bytes_to_read ) ;
  19.         buf = ( char * ) buf + bytes_to_read ;
  20.          len - = bytes_to_read ;
  21.         r - > processed_bytes + = bytes_to_read ;
  22.      }
  23.     return 1 ;
  24. }
  25. static inline off_t rioTell ( rio * r ) {
  26.     return r - > tell ( r ) ;
  27. }
  28. static inline int rioFlush ( rio * r ) {
  29.     return r - > flush ( r ) ;
  30. }

接下来聊聊内部实现,还是按照之前说过的顺序内存buffer-> 文件指针->文件描述符数组来进行,其总体思路就是自定义对应类别的io函数,然后实例化结构体,将对应io函数填入结构体

1. 内存buffer主要是针对sds的封装

点击( 此处 )折叠或打开

  1. / * Returns 1 or 0 for success / failure . * /
  2. static size_t rioBufferWrite ( rio * r , const void * buf , size_t len ) {
  3.     r - > io . buffer . ptr = sdscatlen ( r - > io . buffer . ptr , ( char * ) buf , len ) ;
  4.     r - > io . buffer . pos + = len ;
  5.     return 1 ;
  6. }
  7. / * Returns 1 or 0 for success / failure . * /
  8. static size_t rioBufferRead ( rio * r , void * buf , size_t len ) {
  9.      if ( sdslen ( r - > io . buffer . ptr ) - r - > io . buffer . pos < len )
  10.         return 0 ; / * not enough buffer to return len bytes . * /
  11.     memcpy ( buf , r - > io . buffer . ptr + r - > io . buffer . pos , len ) ;
  12.     r - > io . buffer . pos + = len ;
  13.     return 1 ;
  14. }
  15. / * Returns read / write position in buffer . * /
  16. static off_t rioBufferTell ( rio * r ) {
  17.     return r - > io . buffer . pos ;
  18. }
  19. / * Flushes any buffer to target device if applicable . Returns 1 on success
  20.   * and 0 on failures . * /
  21. static int rioBufferFlush ( rio * r ) {
  22.     UNUSED ( r ) ;
  23.     return 1 ; / * Nothing to do , our write just appends to the buffer . * /
  24. }
  25. static const rio rioBufferIO = {
  26.     rioBufferRead ,
  27.     rioBufferWrite ,
  28.     rioBufferTell ,
  29.     rioBufferFlush ,
  30.      NULL , / * update_checksum * /
  31.     0 , / * current checksum * /
  32.     0 , / * bytes read or written * /
  33.     0 , / * read / write chunk size * /
  34.      { { NULL , 0 } } / * union for io - specific vars * /
  35. } ;
  36. void rioInitWithBuffer ( rio * r , sds s ) {
  37.      * r = rioBufferIO ;
  38.     r - > io . buffer . ptr = s ;
  39.     r - > io . buffer . pos = 0 ;
  40. }

2. 文件指针操作的封装:

点击( 此处 )折叠或打开

  1. / * Returns 1 or 0 for success / failure . * /
  2. static size_t rioFileWrite ( rio * r , const void * buf , size_t len ) {
  3.     size_t retval ;
  4.     retval = fwrite ( buf , len , 1 , r - > io . file . fp ) ;
  5.     r - > io . file . buffered + = len ;
  6.      if ( r - > io . file . autosync & &
  7.         r - > io . file . buffered > = r - > io . file . autosync )
  8.      {
  9.         fflush ( r - > io . file . fp ) ;
  10.         aof_fsync ( fileno ( r - > io . file . fp ) ) ;
  11.         r - > io . file . buffered = 0 ;
  12.      }
  13.     return retval ;
  14. }
  15. / * Returns 1 or 0 for success / failure . * /
  16. static size_t rioFileRead ( rio * r , void * buf , size_t len ) {
  17.     return fread ( buf , len , 1 , r - > io . file . fp ) ;
  18. }
  19. / * Returns read / write position in file . * /
  20. static off_t rioFileTell ( rio * r ) {
  21.     return ftello ( r - > io . file . fp ) ;
  22. }
  23. / * Flushes any buffer to target device if applicable . Returns 1 on success
  24.   * and 0 on failures . * /
  25. static int rioFileFlush ( rio * r ) {
  26.     return ( fflush ( r - > io . file . fp ) = = 0 ) ? 1 : 0 ;
  27. }
  28. static const rio rioFileIO = {
  29.     rioFileRead ,
  30.     rioFileWrite ,
  31.     rioFileTell ,
  32.     rioFileFlush ,
  33.      NULL , / * update_checksum * /
  34.     0 , / * current checksum * /
  35.     0 , / * bytes read or written * /
  36.     0 , / * read / write chunk size * /
  37.      { { NULL , 0 } } / * union for io - specific vars * /
  38. } ;
  39. void rioInitWithFile ( rio * r , FILE * fp ) {
  40.      * r = rioFileIO ;
  41.     r - > io . file . fp = fp ;
  42.     r - > io . file . buffered = 0 ;
  43.     r - > io . file . autosync = 0 ;
  44. }

3. 文件描述符数组,这里要注意的是写操作,如果要写的buffer为空,会写入文件描述符数组自身的buffer,并且每个文件描述符最多写1024个字节,可以并行执行,代码如下:

点击( 此处 )折叠或打开

  1. / * Returns 1 or 0 for success / failure .
  2.   * The function returns success as long as we are able to correctly write
  3.   * to at least one file descriptor .
  4.   *
  5.   * When buf is NULL and len is 0 , the function performs a flush operation
  6.   * if there is some pending buffer , so this function is also used in order
  7.   * to implement rioFdsetFlush ( ) . * /
  8. static size_t rioFdsetWrite ( rio * r , const void * buf , size_t len ) {
  9.     ssize_t retval ;
  10.      int j ;
  11.     unsigned char * p = ( unsigned char * ) buf ;
  12.      int doflush = ( buf = = NULL & & len = = 0 ) ;
  13.      / * To start we always append to our buffer . If it gets larger than
  14.       * a given size , we actually write to the sockets . * /
  15.      if ( len ) {
  16.         r - > io . fdset . buf = sdscatlen ( r - > io . fdset . buf , buf , len ) ;
  17.          len = 0 ; / * Prevent entering the while below if we don ' t flush . * /
  18.          if ( sdslen ( r - > io . fdset . buf ) > PROTO_IOBUF_LEN ) doflush = 1 ;
  19.      }
  20.      if ( doflush ) {
  21.         p = ( unsigned char * ) r - > io . fdset . buf ;
  22.          len = sdslen ( r - > io . fdset . buf ) ;
  23.      }
  24.      / * Write in little chunchs so that when there are big writes we
  25.       * parallelize while the kernel is sending data in background to
  26.       * the TCP socket . * /
  27.      while ( len ) {
  28.         size_t count = len < 1024 ? len : 1024 ;
  29.          int broken = 0 ;
  30.          for ( j = 0 ; j < r - > io . fdset . numfds ; j + + ) {
  31.              if ( r - > io . fdset . state [ j ] ! = 0 ) {
  32.                  / * Skip FDs alraedy in error . * /
  33.                 broken + + ;
  34.                 continue ;
  35.              }
  36.              / * Make sure to write ' count ' bytes to the socket regardless
  37.               * of short writes . * /
  38.             size_t nwritten = 0 ;
  39.              while ( nwritten ! = count ) {
  40.                 retval = write ( r - > io . fdset . fds [ j ] , p + nwritten , count - nwritten ) ;
  41.                  if ( retval < = 0 ) {
  42.                      / * With blocking sockets , which is the sole user of this
  43.                       * rio target , EWOULDBLOCK is returned only because of
  44.                       * the SO_SNDTIMEO socket option , so we translate the error
  45.                       * into one more recognizable by the user . * /
  46.                      if ( retval = = - 1 & & errno = = EWOULDBLOCK ) errno = ETIMEDOUT ;
  47.                     break ;
  48.                  }
  49.                 nwritten + = retval ;
  50.              }
  51.              if ( nwritten ! = count ) {
  52.                  / * Mark this FD as broken . * /
  53.                 r - > io . fdset . state [ j ] = errno ;
  54.                  if ( r - > io . fdset . state [ j ] = = 0 ) r - > io . fdset . state [ j ] = EIO ;
  55.              }
  56.          }
  57.          if ( broken = = r - > io . fdset . numfds ) return 0 ; / * All the FDs in error . * /
  58.         p + = count ;
  59.         

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

IT大败局

IT大败局

Merrill R.Chapman、周良忠 / 周良忠 / 电子工业出版社 / 2004-8-1 / 35.00

这是一本由作者亲身经历写就的MBA式教案。通过作者那专业人士的敏锐、活泼流畅的文笔和美国人特有的幽默,本书为我们剖析了IT界十个有代表性且影响深远的愚蠢败局。这十个败局涉及企业经营的十个主要方面,它们是:产业标准的魔力,“缩水”产品的阴霾,产品定位的泥潭,市场关系的教训,巨型企业的困惑,企业并购的陷阱,品牌战略的迷茫,技术导向的失衡,企业公关的真谛和科技虚幻的诱惑。 书中有许多鲜为人......一起来看看 《IT大败局》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具