jjzjj

concurrency - 戈朗 : Producer/Consumer concurrency model but with serialized results

coder 2024-07-08 原文

func main() {
    jobs := []Job{job1, job2, job3}
    numOfJobs := len(jobs)
    resultsChan := make(chan *Result, numOfJobs)
    jobChan := make(chan *job, numOfJobs)
    go consume(numOfJobs, jobChan, resultsChan)
    for i := 0; i < numOfJobs; i++ {
        jobChan <- jobs[i]
    }
    close(jobChan)

    for i := 0; i < numOfJobs; i++ {
        <-resultsChan
    }
    close(resultsChan)
}

func (b *Blockchain) consume(num int, jobChan chan *Job, resultsChan chan *Result) {
    for i := 0; i < num; i++ {
        go func() {
            job := <-jobChan
            resultsChan <- doJob(job)
        }()
    }
}

在上面的示例中,作业被推送到 jobChan 中,goroutines 将其从 jobChan 中拉出并并发执行作业并将结果推送到 resultsChan 中。然后我们将从 resultsChan 中提取结果。

问题一:

在我的代码中,没有序列化/线性化的结果。虽然jobs的顺序是job1, job2, job3。结果可能会显示为 job3、job1、job2,具体取决于哪个花费的时间最长。

我仍然希望同时执行这些作业,但是,我需要确保结果从 resultsChan 中以与作为作业进入时相同的顺序出现。

问题2:

我有大约 30 万个作业,这意味着代码将生成多达 30 万个 goroutine。拥有如此多的 goroutine 是否有效,或者我最好将这些作业分成 100 个左右的片段,让每个 goroutine 处理 100 个而不是 1 个。

最佳答案

这是我处理序列化的一种方式(并且还设置了有限数量的工作人员)。我设置了一些带有输入和输出字段以及同步 channel 的工作对象,然后我循环遍历它们,挑选它们完成的所有工作并给它们一个新的工作。然后我最后一次通过它们以拾取所有遗留下来的已完成工作。请注意,您可能希望工作人员数量稍微超过您的核心数量,这样即使有一个异常长的工作,您也可以让所有资源保持忙碌一段时间。代码位于 http://play.golang.org/p/PM9y4ieMxw及以下。

这是毛茸茸的(比我记得在坐下来写一个例子之前毛茸茸的!)——很想看看其他人有什么,要么只是更好的实现,要么是一种完全不同的方式来实现你的目标。

package main

import (
    "fmt"
    "math/rand"
    "runtime"
    "time"
)

type Worker struct {
    in     int
    out    int
    inited bool

    jobReady chan bool
    done     chan bool
}

func (w *Worker) work() {
    time.Sleep(time.Duration(rand.Float32() * float32(time.Second)))
    w.out = w.in + 1000
}
func (w *Worker) listen() {
    for <-w.jobReady {
        w.work()
        w.done <- true
    }
}
func doSerialJobs(in chan int, out chan int) {
    concurrency := 23
    workers := make([]Worker, concurrency)
    i := 0
    // feed in and get out items
    for workItem := range in {
        w := &workers[i%
            concurrency]
        if w.inited {
            <-w.done
            out <- w.out
        } else {
            w.jobReady = make(chan bool)
            w.done = make(chan bool)
            w.inited = true
            go w.listen()
        }
        w.in = workItem
        w.jobReady <- true
        i++
    }
    // get out any job results left over after we ran out of input
    for n := 0; n < concurrency; n++ {
        w := &workers[i%concurrency]
        if w.inited {
            <-w.done
            out <- w.out
        }
        close(w.jobReady)
        i++
    }
    close(out)
}
func main() {
    runtime.GOMAXPROCS(10)
    in, out := make(chan int), make(chan int)
    allFinished := make(chan bool)
    go doSerialJobs(in, out)
    go func() {
        for result := range out {
            fmt.Println(result)
        }
        allFinished <- true
    }()
    for i := 0; i < 100; i++ {
        in <- i
    }
    close(in)
    <-allFinished
}

请注意,此示例中只有 inout 携带实际数据——所有其他 channel 仅用于同步。

关于concurrency - 戈朗 : Producer/Consumer concurrency model but with serialized results,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20978778/

有关concurrency - 戈朗 : Producer/Consumer concurrency model but with serialized results的更多相关文章

  1. ruby-on-rails - Rails add_index 算法 : :concurrently still causes database lock up during migration - 2

    为了防止在迁移到生产站点期间出现数据库事务错误,我们遵循了https://github.com/LendingHome/zero_downtime_migrations中列出的建议。(具体由https://robots.thoughtbot.com/how-to-create-postgres-indexes-concurrently-in概述),但在特别大的表上创建索引期间,即使是索引创建的“并发”方法也会锁定表并导致该表上的任何ActiveRecord创建或更新导致各自的事务失败有PG::InFailedSqlTransaction异常。下面是我们运行Rails4.2(使用Acti

  2. javascript - 从 Chrome 扩展 : concurrency issue? 在页面中注入(inject) Javascript - 2

    我正在参与Chrome扩展程序的开发。在某些时候,我们需要运行一些静态的非生成代码,这些代码必须在页面而不是扩展的上下文中运行。对于简单的脚本,没有问题,使用$.getScript(chrome.extension.getURL(....))或者script=document.createElement('script');...document.body.appendChild(脚本);对于更复杂的脚本,我们有时需要包含jquery本身或其他一些脚本定义(因为依赖关系)。尽管在后一种情况下,尽管Javascript应该是单线程的,但似乎在运行依赖脚本时JQuery并未完全解析,导致以

  3. javascript - 异步/等待分配给对象键 : is it concurrent? - 2

    我知道这样做:constresultA=awaita()constresultB=awaitb()//codehere有效a().then(resultA=>{b().then(resultB=>{//codehere})})基本上,a()运行然后b()运行。我嵌套它们以表明resultA和resultB都在我们的范围内;但是这两个功能都没有同时运行。但是这个呢:constobj={result1:awaita(),result2:awaitb()}a()和b()是否同时运行?供引用:constasyncFunc=async(func)=>awaitfunc.call()constre

  4. javascript - jQuery when/then/fail with concurrent ajax requests : Which request failed? - 2

    想象这样一种场景,我们想要在对“foo”和“bar”的并发请求成功完成后做一些事情,或者如果其中一个或两个失败则报告错误:$.when($.getJSON('foo'),$.getJSON('bar')).then(function(foo,bar){console.log('IfireifBOTHrequestsaresuccessful!');}).fail(function(){console.log('Ifireifoneormorerequestsfailed.');});我如何确定1)对“foo”的请求是否失败,或者2)对“bar”的请求是否失败,或者3)如果两者都失败了?

  5. 戈朗 "Log in to the site and download the xls file"? - 2

    关闭。这个问题需要更多focused.它目前不接受答案。想改进这个问题吗?更新问题,使其只关注一个问题editingthispost.关闭3年前。Improvethisquestion告诉我如何使用Golang登录网站。下载xls文件是得到了,但是为了在Excel表格中有数据,需要登录网站。该站点位于公司的服务器上。如果你能告诉你怎么做。例如,我用来执行此操作的VBA代码。SetoFields=CreateObject("Scripting.Dictionary")WithoFields.Add"login","sdiscor".Add"password","sdiscor"EndWi

  6. arrays - 戈朗 : Could not understand how below code is executing - 2

    下面是我查询的代码:我有一个单维数组a当我打印a[0][0]时,我不明白为什么它返回字符a的ascii值:packagemainimport("fmt")funcmain(){a:=[3]string{"a","b","c"}fmt.Println(a[0][0])}输出:97 最佳答案 下面是如何打印ascii的代码示例a:=[3]string{"a","b","c"}for_,rune:=rangea{fmt.Println(rune)//Itwillprinta,b,c}因为你在你的代码中使用了[0][0],所以它是等价的fo

  7. 戈朗 :which way is more efficient about using "for range" - 2

    typepath[]bytefunc(ppath)ToUpper(){fori,b:=rangep{if'a'在上面(这个例子是从“TheGoBlog”复制过来的),如果ToUpper变成这样:func(ppath)ToUpper(){fori,_:=rangep{if'a'哪个会更有效率为什么?“TheGoBlog”对前一个说:“这里的ToUpper方法在forrange构造中使用两个变量来捕获索引和slice元素。这种形式的循环避免了在主体中多次写入p[i]。”什么意思? 最佳答案 前者有更多的内存操作,即在b上:它在循环的第一

  8. 戈朗 : go command inside script? - 2

    我有一个用Golang编写的脚本,我不太明白。我想知道他为什么要在脚本里面写goserver.Start()?为什么不简单地编写server.Start?packagemainimport("github.com/miekg/dns""testing""time")constTEST_ADDR="127.0.0.1:9953"funcTestDNSResponse(t*testing.T){server:=NewDNSServer(&Config{dnsAddr:TEST_ADDR,})goserver.Start()//Allowsometimeforservertostarttim

  9. 戈朗 : Is there a way to modify one of the multi-value return parameters in one line? - 2

    我正在尝试在Go中做一些相对简单的事情——将字符串转换为整数,然后将其加倍:myInt,_:=strconv.Atoi(args[1])doubleArg:=myInt*2由于Atoi()返回两个参数(整数和err),我使用myInt,_:=来检索值的整数。我想将它加倍(因此是第二行)但不能在一行中完成所有操作:myInt,_:=strconv.Atoi(args[1])*2给我:multiple-valuestrconv.Atoi()insingle-valuecontext但是,根据我使用大多数其他语言的经验,必须在两行中执行此操作似乎有很多样板。这只是我必须处理的一个限制,还是有

  10. elasticsearch - 戈朗错误 "not enough arguments in call" - 2

    我刚接触golang。尝试通过golang实现批量上传到Elasticsearch。我正在使用golang库->https://github.com/olivere/elastic用于与Elasticsearch通信。此外,我正在尝试一段示例代码,但出现以下错误...suresh@BLR-245:~/Desktop/tools/golang/src$goinstallgithub.com/crazyheart/elastic-bulk-upload#github.com/crazyheart/elastic-bulk-uploadgithub.com/crazyheart/elasti

随机推荐