1.新建连接etcd客户端的代码:
package etcd
import (
"errors"
"fmt"
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
"google.golang.org/grpc"
"strings"
"sync"
"time"
)
type ServiceRegister struct {
kApi client.KeysAPI
serviceList map[string]string
lock sync.Mutex
}
func NewClient(ip string, port, timeout int) (*clientv3.Client, error) {
if ip == "" {
return nil, errors.New("etcd ip config is empty")
}
if port <= 0 {
return nil, errors.New("etcd port config is zero")
}
serverList := make([]string, 0)
ipList := strings.Split(ip, ",")
for _, ip := range ipList {
serverList = append(serverList, fmt.Sprintf("http://%s:%d", ip, port))
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: serverList,
DialTimeout: time.Duration(timeout) * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()}, //解决DialTimeout不起效果的bug,待更新etcd版本
})
return cli, err
}
2.新建分布式加锁的类:
package etcd
import (
"context"
"errors"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/siddontang/go/log"
"stream_engine_lslive_dispatch/base/config"
"time"
)
type Locker struct {
Client *clientv3.Client
LeaseId clientv3.LeaseID
Session *concurrency.Session
Mutex *concurrency.Mutex
CancelFunc context.CancelFunc
}
// NewLock 加锁
func NewLock(key string, lockTimeout int, isRefresh bool) (lock *Locker, err error, connectRes bool) {
cli, err := NewClient()
if err != nil {
err = errors.New(fmt.Sprintf("etcd new lock client error: %s", err))
return
}
if cli == nil {
err = errors.New("etcd new client empty")
return
}
defer func() {
if err != nil {
closeErr := cli.Close()
if closeErr != nil {
log.Warn(fmt.Sprintf("etcd new lock error close client fail: %s", err))
}
}
}()
lock = new(Locker)
lock.Client = cli
//加锁等操作超时
actionTimeout := 10
// 获取租约
if isRefresh {
var response *clientv3.LeaseGrantResponse
//创建租约
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(actionTimeout)*time.Second)
defer cancelFunc()
response, err = cli.Grant(ctx, int64(lockTimeout))
if err != nil {
err = errors.New(fmt.Sprintf("etcd new client get lease error: %s", err))
return
}
lock.LeaseId = response.ID
//创建session
lock.Session, err = concurrency.NewSession(cli, concurrency.WithLease(response.ID))
if err != nil {
err = errors.New(fmt.Sprintf("etcd new lock init lease session error: %s", err))
return
}
} else {
//创建session
lock.Session, err = concurrency.NewSession(cli, concurrency.WithTTL(lockTimeout))
if err != nil {
err = errors.New(fmt.Sprintf("etcd new lock init session error: %s", err))
return
}
}
defer func() {
if err != nil {
closeErr := lock.Session.Close()
if closeErr != nil {
log.Warn(fmt.Sprintf("etcd new lock error close session fail: %s", closeErr))
}
}
}()
//创建锁
lock.Mutex = concurrency.NewMutex(lock.Session, key)
//加锁
lockCtx, lockCancelFunc := context.WithTimeout(context.Background(), time.Duration(actionTimeout)*time.Second)
defer lockCancelFunc()
err = lock.Mutex.Lock(lockCtx)
if err != nil {
err = errors.New(fmt.Sprintf("etcd new lock error: %s", err))
return
}
log.Debug(fmt.Sprintf("etcd get lock success [key:%s, lockTimeout:%d, isRefresh:%v]", key, lockTimeout, isRefresh))
return
}
// Unlock 解锁
func (l *Locker) Unlock() error {
defer func() {
err := l.Client.Close()
if err != nil {
log.Warn(fmt.Sprintf("etcd unlock close client error: %s", err))
}
}()
defer func() {
err := l.Session.Close()
if err != nil {
log.Warn(fmt.Sprintf("etcd unlock close session error: %s", err))
}
}()
// 释放锁
actionTimeout := 10 //操作超时
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(actionTimeout)*time.Second)
defer cancelFunc()
err := l.Mutex.Unlock(ctx)
if err != nil {
err = errors.New(fmt.Sprintf("etcd unlock error: %s", err))
return err
}
return nil
}
3.使用方式:
//加锁
lock, err, _, _ := etcdService.Lock("testMyLock", 0)
if err != nil {
logs.Error("testMyLock get lock error [err:%s]", err)
return false
}
//解锁
defer lock.Unlock()
//编写加锁过程中的代码
fmt.Pringln("加锁中,正在处理...")
fmt.Pringln("加锁中,处理结束...")
注意:本文归作者所有,未经作者允许,不得转载