流言算法--gossip算法
阅读原文时间:2021年04月20日阅读:1

一个基于pull的gossip算法的实现。最终确保状态一致。
该协议如下:
1)A发起者发送Hello(B唯一标识,nonce)消息到B远程节点(多个)。
2)收Hello信息后发送SendDigest到A节点,其中包含nonce
3)A收到SendDigest,校验数据和nonce,把B作为待发送节点,并封装想要pull的数据SendReq到B节点
4)B收到SendReq发送SendRes到A节点,数据为SendReq不包含的数据
Other peer Initiator
O <——– Hello ————————- O /|\ ——— Digest <[3,5,8, 10…], NONCE> ——–> /|\
| <——– Request <[3,8], NONCE> —————– |
/ \ ——— Response <[item3, item8], NONCE>——-> / \

一言不合就上源码

 1.  这个是需要实现的接口。定义了上面所说的需要实现的4个接口


// PullAdapter is needed by the PullEngine in order to
// send messages to the remote PullEngine instances.
// The PullEngine expects to be invoked with
// OnHello, OnDigest, OnReq, OnRes when the respective message arrives
// from a remote PullEngine
type PullAdapter interface {
    // SelectPeers returns a slice of peers which the engine will initiate the protocol with
    SelectPeers() []string

    // Hello sends a hello message to initiate the protocol
    // and returns an NONCE that is expected to be returned
    // in the digest message.
    Hello(dest string, nonce uint64)

    // SendDigest sends a digest to a remote PullEngine.
    // The context parameter specifies the remote engine to send to.
    SendDigest(digest []string, nonce uint64, context interface{})

    // SendReq sends an array of items to a certain remote PullEngine identified
    // by a string
    SendReq(dest string, items []string, nonce uint64)

    // SendRes sends an array of items to a remote PullEngine identified by a context.
    SendRes(items []string, context interface{}, nonce uint64)
}

2. 这个是总调度类,初始化时需要传入第一步中的4个接口

stopFlag 结束标识如果为1时,就代表结束
state 一个数据集合
item2owners 需要pull的数据,key代表想要pull的数据,value代表那些节点有这些数据
peers2nonces 一个节点对应一个nonce,key节点唯一标识,value - nonce
nonces2peers 跟peers2nonces相反key,value
acceptingDigests 是否正在接收数据,Digest时锁住
acceptingResponses 是否正在接收数据,Resp时锁住
incomingNONCES 接收到的所有nonce
outgoingNONCES 发送出去的也就是自己生成的所有nonce
注意:processIncomingDigests就是随机挑选一个节点pull数据**

// NewPullEngine creates an instance of a PullEngine with a certain sleep time
// between pull initiations
func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine {
    engine := &PullEngine{
        PullAdapter:        participant,
        stopFlag:           int32(0),
        state:              util.NewSet(),
        item2owners:        make(map[string][]string),
        peers2nonces:       make(map[string]uint64),
        nonces2peers:       make(map[uint64]string),
        acceptingDigests:   int32(0),
        acceptingResponses: int32(0),
        incomingNONCES:     util.NewSet(),
        outgoingNONCES:     util.NewSet(),
    }

    go func() {
        for !engine.toDie() {
            time.Sleep(sleepTime)
            if engine.toDie() {
                return
            }
            engine.initiatePull()
        }
    }()

    return engine
}
func (engine *PullEngine) initiatePull() {
    engine.lock.Lock()
    defer engine.lock.Unlock()
    engine.acceptDigests()
    for _, peer := range engine.SelectPeers() {
        nonce := engine.newNONCE()//一个随机数
        engine.outgoingNONCES.Add(nonce)
        engine.nonces2peers[nonce] = peer
        engine.peers2nonces[peer] = nonce
        engine.Hello(peer, nonce)
    }
    time.AfterFunc(digestWaitTime, func() {
        engine.processIncomingDigests()
    })
}
func (engine *PullEngine) processIncomingDigests() {
    engine.ignoreDigests()
    engine.lock.Lock()
    defer engine.lock.Unlock()
    requestMapping := make(map[string][]string)
    for n, sources := range engine.item2owners {
        // select a random source
        source := sources[rand.Intn(len(sources))]
        if _, exists := requestMapping[source]; !exists {
            requestMapping[source] = make([]string, 0)
        }
        // append the number to that source
        requestMapping[source] = append(requestMapping[source], n)
    }
    engine.acceptResponses()
    for dest, seqsToReq := range requestMapping {
        engine.SendReq(dest, seqsToReq, engine.peers2nonces[dest])
    }
    time.AfterFunc(responseWaitTime, engine.endPull)
}
//end pull 重新重置数据
func (engine *PullEngine) endPull() {
    engine.lock.Lock()
    defer engine.lock.Unlock()
    atomic.StoreInt32(&(engine.acceptingResponses), int32(0))
    engine.outgoingNONCES.Clear()
    engine.item2owners = make(map[string][]string)
    engine.peers2nonces = make(map[string]uint64)
    engine.nonces2peers = make(map[uint64]string)
}

 3. 接收到hello消息后立马返回一个Digest


// OnHello notifies the engine a hello has arrived
func (engine *PullEngine) OnHello(nonce uint64, context interface{}) {
    engine.incomingNONCES.Add(nonce)
    time.AfterFunc(requestWaitTime, func() {
        engine.incomingNONCES.Remove(nonce)
    })

    a := engine.state.ToArray()
    digest := make([]string, len(a))
    for i, item := range a {
        digest[i] = item.(string)
    }
    engine.SendDigest(digest, nonce, context)
}

4.  接收到Req立马返回一个Res


// OnReq notifies the engine a request has arrived
func (engine *PullEngine) OnReq(items []string, nonce uint64, context interface{}) {
    if !engine.incomingNONCES.Exists(nonce) {//咱们曾经hello过
        return
    }
    engine.lock.Lock()

    var items2Send []string
    for _, item := range items {
        if engine.state.Exists(item) {
            items2Send = append(items2Send, item)
        }
    }

    engine.lock.Unlock()

    go engine.SendRes(items2Send, context, nonce)
}

5.  Res把数据加入自己的数据集    


// OnRes notifies the engine a response has arrived
func (engine *PullEngine) OnRes(items []string, nonce uint64) {
    if !engine.outgoingNONCES.Exists(nonce) || !engine.isAcceptingResponses() {
        return
    }

    engine.Add(items...)
}

6. 然而测试类也很重要,不然我写到这里还是一脸懵


package algo

import (
    "sync"
    "testing"
    "time"

    "fmt"
    "sync/atomic"

    "github.com/hyperledger/fabric/gossip/util"
    "github.com/stretchr/testify/assert"
)

func init() {
    requestWaitTime = time.Duration(200) * time.Millisecond
    digestWaitTime = time.Duration(100) * time.Millisecond
    responseWaitTime = time.Duration(200) * time.Millisecond
}

type messageHook func(interface{})

type pullTestInstance struct {
    msgHooks          []messageHook
    peers             map[string]*pullTestInstance
    name              string
    nextPeerSelection []string
    msgQueue          chan interface{}
    lock              sync.Mutex
    stopChan          chan struct{}
    *PullEngine
}

type helloMsg struct {
    nonce  uint64
    source string
}

type digestMsg struct {
    nonce  uint64
    digest []string
    source string
}

type reqMsg struct {
    items  []string
    nonce  uint64
    source string
}

type resMsg struct {
    items []string
    nonce uint64
}

func newPushPullTestInstance(name string, peers map[string]*pullTestInstance) *pullTestInstance {
    inst := &pullTestInstance{
        msgHooks:          make([]messageHook, 0),
        peers:             peers,
        msgQueue:          make(chan interface{}, 100),
        nextPeerSelection: make([]string, 0),
        stopChan:          make(chan struct{}, 1),
        name:              name,
    }

    inst.PullEngine = NewPullEngine(inst, time.Duration(500)*time.Millisecond)

    peers[name] = inst
    go func() {
        for {
            select {
            case <-inst.stopChan:
                return
            case m := <-inst.msgQueue:
                inst.handleMessage(m)
                break
            }
        }
    }()

    return inst
}

// Used to test the messages one peer sends to another.
// Assert statements should be passed via the messageHook f
func (p *pullTestInstance) hook(f messageHook) {
    p.lock.Lock()
    defer p.lock.Unlock()
    p.msgHooks = append(p.msgHooks, f)
}
//处理接收到的消息
func (p *pullTestInstance) handleMessage(m interface{}) {
    p.lock.Lock()
    for _, f := range p.msgHooks {
        f(m)
    }
    p.lock.Unlock()

    if helloMsg, isHello := m.(*helloMsg); isHello {
        p.OnHello(helloMsg.nonce, helloMsg.source)
        return
    }

    if digestMsg, isDigest := m.(*digestMsg); isDigest {
        p.OnDigest(digestMsg.digest, digestMsg.nonce, digestMsg.source)
        return
    }

    if reqMsg, isReq := m.(*reqMsg); isReq {
        p.OnReq(reqMsg.items, reqMsg.nonce, reqMsg.source)
        return
    }

    if resMsg, isRes := m.(*resMsg); isRes {
        p.OnRes(resMsg.items, resMsg.nonce)
    }
}

func (p *pullTestInstance) stop() {
    p.stopChan <- struct{}{}
    p.Stop()
}

func (p *pullTestInstance) setNextPeerSelection(selection []string) {
    p.lock.Lock()
    defer p.lock.Unlock()
    p.nextPeerSelection = selection
}

func (p *pullTestInstance) SelectPeers() []string {
    p.lock.Lock()
    defer p.lock.Unlock()
    return p.nextPeerSelection
}

func (p *pullTestInstance) Hello(dest string, nonce uint64) {
    p.peers[dest].msgQueue <- &helloMsg{nonce: nonce, source: p.name}
}

func (p *pullTestInstance) SendDigest(digest []string, nonce uint64, context interface{}) {
    p.peers[context.(string)].msgQueue <- &digestMsg{source: p.name, nonce: nonce, digest: digest}
}

func (p *pullTestInstance) SendReq(dest string, items []string, nonce uint64) {
    p.peers[dest].msgQueue <- &reqMsg{nonce: nonce, source: p.name, items: items}
}

func (p *pullTestInstance) SendRes(items []string, context interface{}, nonce uint64) {
    p.peers[context.(string)].msgQueue <- &resMsg{items: items, nonce: nonce}
}

func TestPullEngine_Add(t *testing.T) {
    t.Parallel()
    peers := make(map[string]*pullTestInstance)
    inst1 := newPushPullTestInstance("p1", peers)
    defer inst1.Stop()
    inst1.Add("0")
    inst1.Add("0")
    assert.True(t, inst1.PullEngine.state.Exists("0"))
}

func TestPullEngine_Remove(t *testing.T) {
    t.Parallel()
    peers := make(map[string]*pullTestInstance)
    inst1 := newPushPullTestInstance("p1", peers)
    defer inst1.Stop()
    inst1.Add("0")
    assert.True(t, inst1.PullEngine.state.Exists("0"))
    inst1.Remove("0")
    assert.False(t, inst1.PullEngine.state.Exists("0"))
    inst1.Remove("0") // remove twice
    assert.False(t, inst1.PullEngine.state.Exists("0"))
}

func TestPullEngine_Stop(t *testing.T) {
    t.Parallel()
    peers := make(map[string]*pullTestInstance)
    inst1 := newPushPullTestInstance("p1", peers)
    inst2 := newPushPullTestInstance("p2", peers)
    defer inst2.stop()
    inst2.setNextPeerSelection([]string{"p1"})
    go func() {
        for i := 0; i < 100; i++ {
            inst1.Add(string(i))
            time.Sleep(time.Duration(10) * time.Millisecond)
        }
    }()

    time.Sleep(time.Duration(800) * time.Millisecond)
    len1 := len(inst2.state.ToArray())
    inst1.stop()
    time.Sleep(time.Duration(800) * time.Millisecond)
    len2 := len(inst2.state.ToArray())
    assert.Equal(t, len1, len2, "PullEngine was still active after Stop() was invoked!")
}

func TestPullEngineAll2AllWithIncrementalSpawning(t *testing.T) {
    t.Parallel()
    // Scenario: spawn 10 nodes, each 50 ms after the other
    // and have them transfer data between themselves.
    // Expected outcome: obviously, everything should succeed.
    // Isn't that's why we're here?
    instanceCount := 10
    peers := make(map[string]*pullTestInstance)

    for i := 0; i < instanceCount; i++ {
        inst := newPushPullTestInstance(fmt.Sprintf("p%d", i+1), peers)
        inst.Add(string(i + 1))
        time.Sleep(time.Duration(50) * time.Millisecond)
    }
    for i := 0; i < instanceCount; i++ {
        pID := fmt.Sprintf("p%d", i+1)
        peers[pID].setNextPeerSelection(keySet(pID, peers))
    }
    time.Sleep(time.Duration(4000) * time.Millisecond)

    for i := 0; i < instanceCount; i++ {
        pID := fmt.Sprintf("p%d", i+1)
        assert.Equal(t, instanceCount, len(peers[pID].state.ToArray()))
    }
}

func TestByzantineResponder(t *testing.T) {
    t.Parallel()
    // Scenario: inst1 sends hello to inst2 but inst3 is byzantine so it attempts to send a digest and a response to inst1.
    // expected outcome is for inst1 not to process updates from inst3.
    peers := make(map[string]*pullTestInstance)
    inst1 := newPushPullTestInstance("p1", peers)
    inst2 := newPushPullTestInstance("p2", peers)
    inst3 := newPushPullTestInstance("p3", peers)
    defer inst1.stop()
    defer inst2.stop()
    defer inst3.stop()

    receivedDigestFromInst3 := int32(0)

    inst2.Add("1", "2", "3")
    inst3.Add("1", "6", "7")

    inst1.setNextPeerSelection([]string{"p2"})

    time.Sleep(time.Duration(1000) * time.Millisecond)

    assert.Equal(t, int32(1), atomic.LoadInt32(&receivedDigestFromInst3), "inst1 hasn't received a digest from inst3")

    assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "1", Strcmp) != -1)
    assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "2", Strcmp) != -1)
    assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "3", Strcmp) != -1)

    assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "5", Strcmp) == -1)
    assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "6", Strcmp) == -1)
    assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "7", Strcmp) == -1)
//p1,p2的数据同步了
}

func Strcmp(a interface{}, b interface{}) bool {
    return a.(string) == b.(string)
}

func keySet(selfPeer string, m map[string]*pullTestInstance) []string {
    peers := make([]string, len(m)-1)
    i := 0
    for pID := range m {
        if pID == selfPeer {
            continue
        }
        peers[i] = pID
        i++
    }

    return peers
}

 7.  此为超级账本源码中一个片段,拿出来跟大家一起分享。具体源码请下载超级账本源码

不要企图爱上哥,哥只是个传说!