【原创】golahg etcd v3版本分布式加锁

小豆丁 1年前 ⋅ 443 阅读

1.新建连接etcd客户端的代码:

package etcd

import (
    "errors"
    "fmt"
    "github.com/coreos/etcd/client"
    "github.com/coreos/etcd/clientv3"
    "google.golang.org/grpc"
    "strings"
    "sync"
    "time"
)

type ServiceRegister struct {
    kApi        client.KeysAPI
    serviceList map[string]string
    lock        sync.Mutex
}

func NewClient(ip string, port, timeout int) (*clientv3.Client, error) {
    if ip == "" {
       return nil, errors.New("etcd ip config is empty")
    }

    if port <= 0 {
       return nil, errors.New("etcd port config is zero")
    }

    serverList := make([]string, 0)
    ipList := strings.Split(ip, ",")

    for _, ip := range ipList {
       serverList = append(serverList, fmt.Sprintf("http://%s:%d", ip, port))
    }

    cli, err := clientv3.New(clientv3.Config{
       Endpoints:   serverList,
       DialTimeout: time.Duration(timeout) * time.Second,
       DialOptions: []grpc.DialOption{grpc.WithBlock()}, //解决DialTimeout不起效果的bug,待更新etcd版本
    })

    return cli, err
}
 
2.新建分布式加锁的类:
 
package etcd

import (
    "context"
    "errors"
    "fmt"
    "github.com/coreos/etcd/clientv3"
    "github.com/coreos/etcd/clientv3/concurrency"
    "github.com/siddontang/go/log"
    "stream_engine_lslive_dispatch/base/config"
    "time"
)

type Locker struct {
    Client     *clientv3.Client
    LeaseId    clientv3.LeaseID
    Session    *concurrency.Session
    Mutex      *concurrency.Mutex
    CancelFunc context.CancelFunc
}

// NewLock 加锁
func NewLock(key string, lockTimeout int, isRefresh bool) (lock *Locker, err error, connectRes bool) {
    cli, err := NewClient()
    if err != nil {
       err = errors.New(fmt.Sprintf("etcd new lock client error: %s", err))
       return
    }

    if cli == nil {
       err = errors.New("etcd new client empty")
       return
    }

    defer func() {
       if err != nil {
          closeErr := cli.Close()
          if closeErr != nil {
             log.Warn(fmt.Sprintf("etcd new lock error close client fail: %s", err))
          }
       }
    }()

    lock = new(Locker)
    lock.Client = cli
    
    //加锁等操作超时
    actionTimeout := 10 

    // 获取租约
    if isRefresh {
       var response *clientv3.LeaseGrantResponse
       //创建租约
       ctx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(actionTimeout)*time.Second)
       defer cancelFunc()
       response, err = cli.Grant(ctx, int64(lockTimeout))

       if err != nil {
          err = errors.New(fmt.Sprintf("etcd new client get lease error: %s", err))
          return
       }

       lock.LeaseId = response.ID

       //创建session
       lock.Session, err = concurrency.NewSession(cli, concurrency.WithLease(response.ID))
       if err != nil {
          err = errors.New(fmt.Sprintf("etcd new lock init lease session error: %s", err))
          return
       }
    } else {
       //创建session
       lock.Session, err = concurrency.NewSession(cli, concurrency.WithTTL(lockTimeout))
       if err != nil {
          err = errors.New(fmt.Sprintf("etcd new lock init session error: %s", err))
          return
       }
    }

    defer func() {
       if err != nil {
          closeErr := lock.Session.Close()
          if closeErr != nil {
             log.Warn(fmt.Sprintf("etcd new lock error close session fail: %s", closeErr))
          }
       }
    }()

    //创建锁
    lock.Mutex = concurrency.NewMutex(lock.Session, key)

    //加锁
    lockCtx, lockCancelFunc := context.WithTimeout(context.Background(), time.Duration(actionTimeout)*time.Second)
    defer lockCancelFunc()

    err = lock.Mutex.Lock(lockCtx)

    if err != nil {
       err = errors.New(fmt.Sprintf("etcd new lock error: %s", err))
       return
    }

    log.Debug(fmt.Sprintf("etcd get lock success [key:%s, lockTimeout:%d, isRefresh:%v]", key, lockTimeout, isRefresh))

    return
}

// Unlock 解锁
func (l *Locker) Unlock() error {
    defer func() {
       err := l.Client.Close()
       if err != nil {
          log.Warn(fmt.Sprintf("etcd unlock close client error: %s", err))
       }
    }()

    defer func() {
       err := l.Session.Close()
       if err != nil {
          log.Warn(fmt.Sprintf("etcd unlock close session error: %s", err))
       }
    }()

    // 释放锁
    actionTimeout := 10 //操作超时
    ctx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(actionTimeout)*time.Second)
    defer cancelFunc()

    err := l.Mutex.Unlock(ctx)
    if err != nil {
       err = errors.New(fmt.Sprintf("etcd unlock error: %s", err))
       return err
    }

    return nil
}

 

3.使用方式:

//加锁
lock, err, _, _ := etcdService.Lock("testMyLock", 0)

if err != nil {
    logs.Error("testMyLock get lock error [err:%s]", err)
    return false
}

//解锁
defer lock.Unlock()

//编写加锁过程中的代码
fmt.Pringln("加锁中,正在处理...")
fmt.Pringln("加锁中,处理结束...")

全部评论: 0

    我有话说: