jjzjj

PHP线程池

coder 2024-04-25 原文

编辑:为了澄清和简化:我正在寻找一种“好”的方式来在 Stackable 结束时向 Pool 提交更多 Stackable 对象(使用来自第一个 Stackable 的数据添加第二个一)。我的想法是轮询对象直到结束(低效且丑陋)并将引用传递给 Pool 对象(我无法使其工作)。 基本代码是这个:https://github.com/krakjoe/pthreads/blob/master/examples/Pooling.php

现在,完整的描述:

我正在使用 PHP 开发一个应用程序,该应用程序增长太多并且需要花费大量时间。因此,我正在尝试使用线程池对该应用程序进行多线程处理(我知道 PHP 不是最佳选择,但我不想,并且此时无法更改语言)。

问题是,应用程序有 2 个阶段,必须按顺序进行,每个阶段都有很多可以同时进行的子任务。所以,这是我脑海中的过程:

  • 第 1 阶段将有 N 个子任务,这些子任务将是 Stackable 对象。
  • 当子任务 i 结束时,必须通知“主要”(创建池、Stackable 等的那个),并使用来自子任务 i 的一些数据(不同的 Stackable 对象)执行子任务 i 的阶段 2 ).在这个阶段,第 1 阶段的每个子任务都会有 M 个子任务。

我想为第 1 阶段和第 2 阶段的线程使用相同的线程池,从第 1 阶段到第 2 阶段我能想到的唯一解决方案是轮询 N 个子任务中的每一个,直到其中一个子任务结束,然后为结束的那个调用阶段 2,并重复直到所有 N 个子任务结束。

我使用 Joe Watkins 的 pthreads 源代码中包含的线程池示例作为基本代码。

最佳答案

您应该先阅读:https://gist.github.com/krakjoe/6437782

<?php
/**
* Normal worker
*/
class PooledWorker extends Worker {
    public function run(){}
}


/**
* Don't descend from pthreads, normal objects should be used for pools
*/
class Pool {
    protected $size;
    protected $workers;

    /**
    * Construct a worker pool of the given size
    * @param integer $size
    */  
    public function __construct($size) {
        $this->size = $size;
    }

    /**
    * Start worker threads
    */
    public function start() {
        while (@$worker++ < $this->size) {
            $this->workers[$worker] = new PooledWorker();
            $this->workers[$worker]->start();
        }
        return count($this->workers);
    }

    /**
    * Submit a task to pool
    */
    public function submit(Stackable $task) {
        $this->workers[array_rand($this->workers)]
            ->stack($task);
        return $task;
    }

    /**
    * Shutdown worker threads
    */
    public function shutdown() {
        foreach ($this->workers as $worker)
            $worker->shutdown();
    }
}

class StageTwo extends Stackable {
    /**
    * Construct StageTwo from a part of StageOne data
    * @param int $data
    */
    public function __construct($data) {
        $this->data = $data;
    }

    public function run(){
        printf(
            "Thread %lu got data: %d\n", 
            $this->worker->getThreadId(), $this->data);
    }
}

class StageOne extends Stackable {
    protected $done;

    /**
    * Construct StageOne with suitable storage for data
    * @param StagingData $data
    */
    public function __construct(StagingData $data) {
        $this->data = $data;
    }

    public function run() {
        /* create dummy data array */
        while (@$i++ < 100) {
            $this->data[] = mt_rand(
                20, 1000);
        }
        $this->done = true;
    }
}

/**
* StagingData to hold data from StageOne
*/
class StagingData extends Stackable {
    public function run() {}
}

/* stage and data reference arrays */
$one = [];
$two = [];
$data = [];

$pool = new Pool(8);
$pool->start();

/* construct stage one */
while (count($one) < 10) {
    $staging = new StagingData();
    /* maintain reference counts by storing return value in normal array in local scope */
    $one[] = $pool
        ->submit(new StageOne($staging));
    /* maintain reference counts */
    $data[] = $staging;
}

/* construct stage two */
while (count($one)) {

    /* find completed StageOne objects */
    foreach ($one as $id => $job) {
        /* if done is set, the data from this StageOne can be used */
        if ($job->done) {
            /* use each element of data to create new tasks for StageTwo */
            foreach ($job->data as $chunk) {
                /* submit stage two */
                $two[] = $pool
                    ->submit(new StageTwo($chunk));
            }

            /* no longer required */
            unset($one[$id]);
        }
    }

    /* in the real world, it is unecessary to keep polling the array */
    /* you probably have some work you want to do ... do it :) */
    if (count($one)) {
        /* everyone likes sleep ... */
        usleep(1000000);
    }
}

/* all tasks stacked, the pool can be shutdown */
$pool->shutdown();
?>

将输出:

Thread 140012266239744 got data: 612
Thread 140012275222272 got data: 267
Thread 140012257257216 got data: 971
Thread 140012033140480 got data: 881
Thread 140012257257216 got data: 1000
Thread 140012016355072 got data: 261
Thread 140012257257216 got data: 510
Thread 140012016355072 got data: 148
Thread 140012016355072 got data: 501
Thread 140012257257216 got data: 767
Thread 140012024747776 got data: 504
Thread 140012033140480 got data: 401
Thread 140012275222272 got data: 20
<-- trimmed from 1000 lines -->
Thread 140012041533184 got data: 285
Thread 140012275222272 got data: 811
Thread 140012041533184 got data: 436
Thread 140012257257216 got data: 977
Thread 140012033140480 got data: 830
Thread 140012275222272 got data: 554
Thread 140012024747776 got data: 704
Thread 140012033140480 got data: 50
Thread 140012257257216 got data: 794
Thread 140012024747776 got data: 724
Thread 140012033140480 got data: 624
Thread 140012266239744 got data: 756
Thread 140012284204800 got data: 997
Thread 140012266239744 got data: 708
Thread 140012266239744 got data: 981

因为您想使用一个池,所以您别无选择,只能在创建池的主上下文中创建任务。我可以想象其他解决方案,但您特别要求这种解决方案。

根据我可以使用的硬件以及要处理的任务和数据的性质,我可能有多个[小]线程池,每个工作线程一个,这将允许 StageOne 创建 StageTwo 对象在执行它们的 Worker 上下文中,这可能是需要考虑的事情。

关于PHP线程池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21098433/

有关PHP线程池的更多相关文章

  1. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  2. ruby - 如何让Ruby捕获线程中的语法错误 - 2

    我正在尝试使用ruby​​编写一个双线程客户端,一个线程从套接字读取数据并将其打印出来,另一个线程读取本地数据并将其发送到远程服务器。我发现的问题是Ruby似乎无法捕获线程内的错误,这是一个示例:#!/usr/bin/rubyThread.new{loop{$stdout.puts"hi"abc.putsefsleep1}}loop{sleep1}显然,如果我在线程外键入abc.putsef,代码将永远不会运行,因为Ruby将报告“undefinedvariableabc”。但是,如果它在一个线程内,则没有错误报告。我的问题是,如何让Ruby捕获这样的错误?或者至少,报告线程中的错误?

  3. ruby - 如何在 ruby​​ 中运行后台线程? - 2

    我是ruby​​的新手,我认为重新构建一个我用C#编写的简单聊天程序是个好主意。我正在使用Ruby2.0.0MRI(Matz的Ruby实现)。问题是我想在服务器运行时为简单的服务器命令提供I/O。这是从示例中获取的服务器。我添加了使用gets()获取输入的命令方法。我希望此方法在后台作为线程运行,但该线程正在阻塞另一个线程。require'socket'#Getsocketsfromstdlibserver=TCPServer.open(2000)#Sockettolistenonport2000defcommandsx=1whilex==1exitProgram=gets.chomp

  4. ruby - Rails 开发服务器、PDFKit 和多线程 - 2

    我有一个使用PDFKit呈现网页的pdf版本的Rails应用程序。我使用Thin作为开发服务器。问题是当我处于开发模式时。当我使用“bundleexecrailss”启动我的服务器并尝试呈现任何PDF时,整个过程会陷入僵局,因为当您呈现PDF时,会向服务器请求一些额外的资源,如图像和css,看起来只有一个线程.如何配置Rails开发服务器以运行多个工作线程?非常感谢。 最佳答案 我找到的最简单的解决方案是unicorn.geminstallunicorn创建一个unicorn.conf:worker_processes3然后使用它:

  5. ruby - Ruby 1.9.1 中的 native 线程,对我有什么好处? - 2

    所以,Ruby1.9.1现在是declaredstable.Rails应该与它一起工作,并且正在慢慢地将gem移植到它。它具有native线程和全局解释器锁(GIL)。自从GIL到位后,原生线程是否比1.9.1中的绿色线程有任何优势? 最佳答案 1.9中的线程是原生的,但它们被“放慢了速度”,一次只允许一个线程运行。这是因为如果线程真的并行运行,它会混淆现有代码。优点:IO现在在线程中是异步的。如果一个线程阻塞在IO上,那么另一个线程将继续执行直到IO完成。C扩展可以使用真正的线程。缺点:任何非线程安全的C扩展都可能存在使用Thre

  6. ruby - 使写入文件线程安全 - 2

    我在一个ruby​​文件中有一个函数可以像这样写入一个文件File.open("myfile",'a'){|f|f.puts("#{sometext}")}这个函数在不同的线程中被调用,使得像上面这样的文件写入不是线程安全的。有谁知道如何以最简单的方式使这个文件写入线程安全?更多信息:如果重要的话,我正在使用rspec框架。 最佳答案 您可以通过File#flock给锁File.open("myfile",'a'){|f|f.flock(File::LOCK_EX)f.puts("#{sometext}")}

  7. ruby-on-rails - 这个 C 和 PHP 程序员如何学习 Ruby 和 Rails? - 2

    按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visitthehelpcenter指导。关闭9年前。我来自C、php和bash背景,很容易学习,因为它们都有相同的C结构,我可以将其与我已经知道的联系起来。然后2年前我学了Python并且学得很好,Python对我来说比Ruby更容易学。然后从去年开始,我一直在尝试学习Ruby,然后是Rails,我承认,直到现在我还是学不会,讽刺的是那些打着简单易学的烙印,但是对于我这样一个老练的程序员来说,我只是无法将它

  8. Ruby 线程与 Watir - 2

    我编写了几个类来控制我想如何处理多个网站,两者都使用类似的方法(即登录、刷新)。每个类都打开自己的WATIR浏览器实例。classSite1definitialize@ie=Watir::Browser.newenddeflogin@ie.goto"www.blah.com"endend无线程的main中的代码示例如下require'watir'require_relative'site1'agents=[]agents这工作正常,但在当前代理完成登录之前不会移动到下一个代理。我想合并多线程来处理这个问题,但似乎无法让它工作。require'watir'require_relative

  9. ruby - 在多个线程中引用类方法会导致自动加载循环依赖崩溃 - 2

    代码:threads=[]Thread.abort_on_exception=truebegin#throwexceptionsinthreadssowecanseethemthreadseputs"EXCEPTION:#{e.inspect}"puts"MESSAGE:#{e.message}"end崩溃:.rvm/gems/ruby-2.1.3@req/gems/activesupport-4.1.5/lib/active_support/dependencies.rb:478:inload_missing_constant':自动加载常量MyClass时检测到循环依赖稍加研究后,

  10. Ruby 多线程/多处理读物 - 2

    任何人都可以推荐任何详细介绍Ruby多线程/多处理的复杂性的好的多线程/处理书籍/网站吗?我尝试使用ruby​​线程,基本上在1.9vm上的无死锁代码中它在jruby中遇到了死锁。是的,我意识到差异很大(jruby没有GIL),但我想知道是否有用于ruby​​中多线程编程的策略或类集,我只需要继续阅读。旁注:从java到ruby​​必须定义是否需要重新输入锁,这有点奇怪。 最佳答案 如果你使用Ruby1.9,你可以试试Fiber,它是Ruby中线程的一大改进http://ruby-doc.org/core-1.9/classes/F

随机推荐