内容简介:go-ethereum所有的数据存储在levelDB这个Google开源的KeyValue文件数据库中,整个区块链的所有数据都存储在一个levelDB的数据库中,levelDB支持按照文件大小切分文件的功能,所以我们看到的区块链的数据都是一个一个小文件,其实这些小文件都是一个同一个levelDB实例。这里简单的看下levelDB的go封装代码。levelDB官方网站介绍的特点
go-ethereum所有的数据存储在levelDB这个Google开源的KeyValue文件数据库中,整个区块链的所有数据都存储在一个levelDB的数据库中,levelDB支持按照文件大小切分文件的功能,所以我们看到的区块链的数据都是一个一个小文件,其实这些小文件都是一个同一个levelDB实例。这里简单的看下levelDB的 go 封装代码。
levelDB官方网站介绍的特点
**特点** :
- key和value都是任意长度的字节数组;
- entry(即一条K-V记录)默认是按照key的字典顺序存储的,当然开发者也可以重载这个 排序 函数;
- 提供的基本操作接口:Put()、Delete()、Get()、Batch();
- 支持批量操作以原子操作进行;
- 可以创建数据全景的snapshot(快照),并允许在快照中查找数据;
- 可以通过前向(或后向)迭代器遍历数据(迭代器会隐含的创建一个snapshot);
- 自动使用Snappy压缩数据;
- 可移植性;
**限制** :
- 非关系型数据模型(NoSQL),不支持 sql 语句,也不支持索引;
- 一次只允许一个进程访问一个特定的数据库;
- 没有内置的C/S架构,但开发者可以使用LevelDB库自己封装一个server;
源码所在的目录在ethereum/ethdb目录。代码比较简单, 分为下面三个文件
- database.go levelDB的封装代码
- memory_database.go 供测试用的基于内存的数据库,不会持久化为文件,仅供测试
- interface.go 定义了数据库的接口
- database_test.go 测试案例
## interface.go
看下面的代码,基本上定义了KeyValue数据库的基本操作, Put, Get, Has, Delete等基本操作,levelDB是不支持SQL的,基本可以理解为数据结构里面的Map。
package ethdb
const IdealBatchSize = 100 * 1024
// Putter wraps the database write operation supported by both batches and regular databases.
//Putter接口定义了批量操作和普通操作的写入接口
type Putter interface {
Put(key []byte, value []byte) error
}
// Database wraps all database operations. All methods are safe for concurrent use.
//数据库接口定义了所有的数据库操作, 所有的方法都是多线程安全的。
type Database interface {
Putter
Get(key []byte) ([]byte, error)
Has(key []byte) (bool, error)
Delete(key []byte) error
Close()
NewBatch() Batch
}
// Batch is a write-only database that commits changes to its host database
// when Write is called. Batch cannot be used concurrently.
//批量操作接口,不能多线程同时使用,当Write方法被调用的时候,数据库会提交写入的更改。
type Batch interface {
Putter
ValueSize() int // amount of data in the batch
Write() error
}
## memory_database.g
这个基本上就是封装了一个内存的Map结构。然后使用了一把锁来对多线程进行资源的保护。
type MemDatabase struct {
db map[string][]byte
lock sync.RWMutex
}
func NewMemDatabase() (*MemDatabase, error) {
return &MemDatabase{
db: make(map[string][]byte),
}, nil
}
func (db *MemDatabase) Put(key []byte, value []byte) error {
db.lock.Lock()
defer db.lock.Unlock()
db.db[string(key)] = common.CopyBytes(value)
return nil
}
func (db *MemDatabase) Has(key []byte) (bool, error) {
db.lock.RLock()
defer db.lock.RUnlock()
_, ok := db.db[string(key)]
return ok, nil
}
然后是Batch的操作。也比较简单,一看便明白。
type kv struct{ k, v []byte }
type memBatch struct {
db *MemDatabase
writes []kv
size int
}
func (b *memBatch) Put(key, value []byte) error {
b.writes = append(b.writes, kv{common.CopyBytes(key), common.CopyBytes(value)})
b.size += len(value)
return nil
}
func (b *memBatch) Write() error {
b.db.lock.Lock()
defer b.db.lock.Unlock()
for _, kv := range b.writes {
b.db.db[string(kv.k)] = kv.v
}
return nil
}
##database.go
这个就是实际ethereum客户端使用的代码, 封装了levelDB的接口。
import (
"strconv"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
gometrics "github.com/rcrowley/go-metrics"
)
使用了github.com/syndtr/goleveldb/leveldb的leveldb的封装,所以一些使用的文档可以在那里找到。可以看到,数据结构主要增加了很多的Mertrics用来记录数据库的使用情况,增加了quitChan用来处理停止时候的一些情况,这个后面会分析。如果下面代码可能有疑问的地方应该再Filter: filter.NewBloomFilter(10)这个可以暂时不用关注,这个是levelDB里面用来进行性能优化的一个选项,可以不用理会。
type LDBDatabase struct {
fn string // filename for reporting
db *leveldb.DB // LevelDB instance
getTimer gometrics.Timer // Timer for measuring the database get request counts and latencies
putTimer gometrics.Timer // Timer for measuring the database put request counts and latencies
...metrics
quitLock sync.Mutex // Mutex protecting the quit channel access
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
log log.Logger // Contextual logger tracking the database path
}
// NewLDBDatabase returns a LevelDB wrapped object.
func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) {
logger := log.New("database", file)
// Ensure we have some minimal caching and file guarantees
if cache < 16 {
cache = 16
}
if handles < 16 {
handles = 16
}
logger.Info("Allocated cache and file handles", "cache", cache, "handles", handles)
// Open the db and recover any potential corruptions
db, err := leveldb.OpenFile(file, &opt.Options{
OpenFilesCacheCapacity: handles,
BlockCacheCapacity: cache / 2 * opt.MiB,
WriteBuffer: cache / 4 * opt.MiB, // Two of these are used internally
Filter: filter.NewBloomFilter(10),
})
if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
db, err = leveldb.RecoverFile(file, nil)
}
// (Re)check for errors and abort if opening of the db failed
if err != nil {
return nil, err
}
return &LDBDatabase{
fn: file,
db: db,
log: logger,
}, nil
}
再看看下面的Put和Has的代码,因为github.com/syndtr/goleveldb/leveldb封装之后的代码是支持多线程同时访问的,所以下面这些代码是不用使用锁来保护的,这个可以注意一下。这里面大部分的代码都是直接调用leveldb的封装,所以不详细介绍了。 有一个比较有意思的地方是Metrics代码。
// Put puts the given key / value to the queue
func (db *LDBDatabase) Put(key []byte, value []byte) error {
// Measure the database put latency, if requested
if db.putTimer != nil {
defer db.putTimer.UpdateSince(time.Now())
}
// Generate the data to write to disk, update the meter and write
//value = rle.Compress(value)
if db.writeMeter != nil {
db.writeMeter.Mark(int64(len(value)))
}
return db.db.Put(key, value, nil)
}
func (db *LDBDatabase) Has(key []byte) (bool, error) {
return db.db.Has(key, nil)
}
###Metrics的处理
之前在创建NewLDBDatabase的时候,并没有初始化内部的很多Mertrics,这个时候Mertrics是为nil的。初始化Mertrics是在Meter方法中。外部传入了一个prefix参数,然后创建了各种Mertrics(具体如何创建Merter,会后续在Meter专题进行分析),然后创建了quitChan。 最后启动了一个线程调用了db.meter方法。
// Meter configures the database metrics collectors and
func (db *LDBDatabase) Meter(prefix string) {
// Short circuit metering if the metrics system is disabled
if !metrics.Enabled {
return
}
// Initialize all the metrics collector at the requested prefix
db.getTimer = metrics.NewTimer(prefix + "user/gets")
db.putTimer = metrics.NewTimer(prefix + "user/puts")
db.delTimer = metrics.NewTimer(prefix + "user/dels")
db.missMeter = metrics.NewMeter(prefix + "user/misses")
db.readMeter = metrics.NewMeter(prefix + "user/reads")
db.writeMeter = metrics.NewMeter(prefix + "user/writes")
db.compTimeMeter = metrics.NewMeter(prefix + "compact/time")
db.compReadMeter = metrics.NewMeter(prefix + "compact/input")
db.compWriteMeter = metrics.NewMeter(prefix + "compact/output")
// Create a quit channel for the periodic collector and run it
db.quitLock.Lock()
db.quitChan = make(chan chan error)
db.quitLock.Unlock()
go db.meter(3 * time.Second)
}
这个方法每3秒钟获取一次leveldb内部的计数器,然后把他们公布到metrics子系统。 这是一个无限循环的方法, 直到quitChan收到了一个退出信号。
// meter periodically retrieves internal leveldb counters and reports them to
// the metrics subsystem.
// This is how a stats table look like (currently):
//下面的注释就是我们调用 db.db.GetProperty("leveldb.stats")返回的字符串,后续的代码需要解析这个字符串并把信息写入到Meter中。
// Compactions
// Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)
// -------+------------+---------------+---------------+---------------+---------------
// 0 | 0 | 0.00000 | 1.27969 | 0.00000 | 12.31098
// 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
// 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
// 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
func (db *LDBDatabase) meter(refresh time.Duration) {
// Create the counters to store current and previous values
counters := make([][]float64, 2)
for i := 0; i < 2; i++ {
counters[i] = make([]float64, 3)
}
// Iterate ad infinitum and collect the stats
for i := 1; ; i++ {
// Retrieve the database stats
stats, err := db.db.GetProperty("leveldb.stats")
if err != nil {
db.log.Error("Failed to read database stats", "err", err)
return
}
// Find the compaction table, skip the header
lines := strings.Split(stats, "\n")
for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" {
lines = lines[1:]
}
if len(lines) <= 3 {
db.log.Error("Compaction table not found")
return
}
lines = lines[3:]
// Iterate over all the table rows, and accumulate the entries
for j := 0; j < len(counters[i%2]); j++ {
counters[i%2][j] = 0
}
for _, line := range lines {
parts := strings.Split(line, "|")
if len(parts) != 6 {
break
}
for idx, counter := range parts[3:] {
value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
if err != nil {
db.log.Error("Compaction entry parsing failed", "err", err)
return
}
counters[i%2][idx] += value
}
}
// Update all the requested meters
if db.compTimeMeter != nil {
db.compTimeMeter.Mark(int64((counters[i%2][0] - counters[(i-1)%2][0]) * 1000 * 1000 * 1000))
}
if db.compReadMeter != nil {
db.compReadMeter.Mark(int64((counters[i%2][1] - counters[(i-1)%2][1]) * 1024 * 1024))
}
if db.compWriteMeter != nil {
db.compWriteMeter.Mark(int64((counters[i%2][2] - counters[(i-1)%2][2]) * 1024 * 1024))
}
// Sleep a bit, then repeat the stats collection
select {
case errc := <-db.quitChan:
// Quit requesting, stop hammering the database
errc <- nil
return
case <-time.After(refresh):
// Timeout, gather a new set of stats
}
}
}
网址:http://www.qukuailianxueyuan.io/
欲领取造币技术与全套虚拟机资料
区块链技术交流QQ群: 756146052 备注:CSDN
尹成学院微信:备注:CSDN
以上所述就是小编给大家介绍的《以太坊源码分析(36)ethdb源码分析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
- [源码分析] kube-scheduler源码分析(五)之 PrioritizeNodes
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Seasoned Schemer
Daniel P. Friedman、Matthias Felleisen / The MIT Press / 1995-12-21 / USD 38.00
drawings by Duane Bibbyforeword and afterword by Guy L. Steele Jr.The notion that "thinking about computing is one of the most exciting things the human mind can do" sets both The Little Schemer (form......一起来看看 《The Seasoned Schemer》 这本书的介绍吧!