jjzjj

singleflight 使用记录以及源码阅读

huageyiyangdewo 2023-03-28 原文

singleflight 使用方法以及源码阅读

1、简介

安装方式:

go get -u golang.org/x/sync/singleflight

singleflight 是Go官方扩展同步包的一个库。通过给每次函数调用分配一个key,相同key的函数并发调用时,在函数执行期间,相同函数的调用,只会被执行一次,返回相同的结果。其本质是对函数调用的结果进行复用

2、使用方法

2.1 使用Do获取函数执行结果

Do方法是同步返回函数执行结果

package main

import (
	"fmt"
	"golang.org/x/sync/singleflight"
	"runtime"
	"sync"
	"time"
)

func main()  {
	var sg singleflight.Group
	var wg sync.WaitGroup

	for i := 0; i < 10; i++ {
		wg.Add(1)

		go func(j int) {
			defer wg.Done()
			v, err, shared := sg.Do("testDo", testDo)

			fmt.Printf("i: %v, v:%v, err:%v, shared:%v\n", j, v, err, shared)
		}(i)
	}

	wg.Wait()
}

func testDo() (interface{}, error) {
	// 模拟函数执行需要的时间
	time.Sleep(time.Millisecond)

	return "testDo", nil
}

2.2 使用DoChan获取函数执行结果

DoChan返回一个 channel,函数执行的结果通过 channel 来进行传递。

package main

import (
	"fmt"
	"golang.org/x/sync/singleflight"
	"runtime"
	"sync"
	"time"
)

func main()  {
	var sg singleflight.Group
	var wg sync.WaitGroup

	for i := 0; i < 10; i++ {
		wg.Add(1)

		go func(j int) {
			defer wg.Done()
			ch := sg.DoChan("testDoChan", testDoChan)
			select {
			case ret := <- ch:
				fmt.Printf("i: %v, v:%v, err:%v, shared:%v\n", j, ret.Val, ret.Err, ret.Shared)

			}

		}(i)
	}

	wg.Wait()
}

func testDoChan() (interface{}, error) {
	// 模拟函数执行需要的时间
	time.Sleep(time.Millisecond)

	return "testDoChan", nil
}

3、源码解读

3.1 Group

//Group 整个库的核心结构体
type Group struct {
	mu sync.Mutex       // 并发时,保护 m 
	m  map[string]*call // 使用 懒加载 方式进行初始化
}

3.2 call

//call m中的value
type call struct {
	wg sync.WaitGroup

	//相同key,fn执行的返回结果
	val interface{}
	err error

	//fn执行期间,相同 key 添加的次数,第一次添加不算
	dups  int
	chans []chan<- Result // DoChan 返回fn执行的结果
}

3.3 Group.Do

//Do 执行函数的地方,key: 给函数自定义的标识
//fn: 需要执行的函数,fn开始运行后,未运行结果前,这个期间对相同key的调用,都会返回第一次执行fn返回的结果
//v:fn执行返回的结果,err:fn执行返回的err
//shared:fn执行结果是否会共享,fn运行期间,是否有相同的key被调用,有则返回true,反之返回false
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()
	if g.m == nil {//懒加载
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {//fn执行期间,又有相同的key添加进来执行
		c.dups++ //fn执行期间,有相同的key添加进来
		g.mu.Unlock()
		c.wg.Wait() //等待fn执行结果(fn函数里面,会调用c.wg.Done)

		//-------
		//判断fn执行过程中,是否有 panic 或者 runtime.Goexit()
		//感觉主要是为了 DoChan 函数,DoChan 返回的是channel,防止fn函数执行期间出现问题,导致无法往 chan 里面写入结果。
		//从而导致 外面需要获取 fn 执行结果的协程一直在等待
		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		return c.val, c.err, true
	}
	
	//---- 以下是key 第一次添加到 m 中时,执行的代码---
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()
	
	// 执行 fn 的地方
	g.doCall(c, key, fn) // 没有新开一个协程,和DoChan不同。
	return c.val, c.err, c.dups > 0
}

3.4 Group.DoChan

//DoChan 和Do 十分类似,只不过返回的结果通过 chan 来传递
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
	ch := make(chan Result, 1)
	g.mu.Lock()
	if g.m == nil {//懒加载
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {//fn执行期间,又有相同的key添加进来执行
		c.dups++
		c.chans = append(c.chans, ch)
		g.mu.Unlock()
		return ch
	}
	
	
	//---- 以下是key 第一次添加到 m 中时,执行的代码---
	c := &call{chans: []chan<- Result{ch}}
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	go g.doCall(c, key, fn) // 新开启了一个协程,和Do不同

	return ch
}

3.5 Group.doCall

  • 双defer+normalReturn+recovered 判断fn执行是panic还是runtime.Goexit
//doCall 真正运行fn的地方,需要重点理解
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false //是否正常返回,默认false
	recovered := false //是否recover,默认false

	// use double-defer to distinguish panic from runtime.Goexit,
	//使用双 defer 来区分 panic和runtime.Goexit
	//是需要结合 normalReturn 和 recovere 的值来进行判断,从而区分是panic还是runtime.Goexit
	defer func() {
		// the given function invoked runtime.Goexit
		if !normalReturn && !recovered {
			//既没有正常返回,又没有被 recover,所以是fn执行期间,调用了 runtime.Goexit()
			c.err = errGoexit
		}

		g.mu.Lock()
		defer g.mu.Unlock()
		c.wg.Done()
		// 走到这里,fn函数已经执行过了
		if g.m[key] == c { 
			delete(g.m, key) //fn函数执行完毕,好让后续的key可以继续进来执行fn函数
		}

		if e, ok := c.err.(*panicError); ok { // recover住的错误
			// In order to prevent the waiting channels from being blocked forever,
			// needs to ensure that this panic cannot be recovered.
			if len(c.chans) > 0 { //通过使用DoChan来执行 fn,发生的错误
				go panic(e) // recover只能够 recover住同一个协程里的panic,不是同一个协程的无法捕获。
				select {} // 保证协程不退出,错误会直接暴露出去
			} else {  //通过使用 Do来执行fn,发生的错误
				panic(e)
			}
		} else if c.err == errGoexit {
			// Already in the process of goexit, no need to call again
			//第一个调用的fn函数的协程已经退出,相同key的函数因为 chan 接收不到数据,会发生死锁()
			//fatal error: all goroutines are asleep - deadlock!
		} else {
			// Normal return
			for _, ch := range c.chans {
				ch <- Result{c.val, c.err, c.dups > 0}
			}
		}
	}()

	func() {
		defer func() {
			if !normalReturn {//fn执行期间,发生了panic
				if r := recover(); r != nil {
					c.err = newPanicError(r)  // 标识为panic错误,Do函数中判断时,好做区分 e, ok := c.err.(*panicError)
				}
			}
		}()

		c.val, c.err = fn()
		normalReturn = true  //fn执行期间,没有panic
	}()

	if !normalReturn {
		recovered = true  //fn执行期间,发生了panic,并且被 recover住了,注意:调用runtime.Goexit()时,是无法recover的
	}
}

3.6 Group.Forget

//Forget 使用Do执行fn时,可以手动删除 g.m 中的key
func (g *Group) Forget(key string){
	g.mu.Lock()
	delete(g.m, key)
	g.mu.Unlock()
}

4、执行流程

菜鸟一枚,文中难免有错误的地方,如有,恳请大佬指出。

5、参考资料

绝对详尽的singleflight讲解

有关singleflight 使用记录以及源码阅读的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  4. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  5. ruby - 在 Ruby 中使用匿名模块 - 2

    假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于

  6. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  7. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  8. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  9. ruby - 使用 ruby​​ 将 HTML 转换为纯文本并维护结构/格式 - 2

    我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h

  10. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

随机推荐