jjzjj

PHP pthreads - 共享对象

coder 2024-01-05 原文

我正在寻找一种安全快捷的方式来使用共享对象。

我已经在这里问了这个问题:https://github.com/krakjoe/pthreads/issues/470
但显然这不是正确的地方。

试图与许多其他上下文(线程)共享一个对象(线程)。
所有线程都在更新这个分片对象——它们可以设置自己的请求,也必须响应其他人的请求。

现在 krakjoe 回应说在 7 中无法使用锁定/解锁,我遇到了问题。

我知道 :.synchronized 但不知道如何使用它来满足我的需求。

我如何使用::synchronized 来编写类似的方法

  • 锁()
  • 解锁()
  • is_locked() -- 检查是否已锁定,如果已锁定,请不要尝试 - 稍后再尝试

  • 编辑:

    我写了一个(imo)非常简单的测试脚本。

    此脚本包括 syc/lock/... 方法 atm。

    它应该只是显示我想要做什么。

    我仍在寻找一种使用::的方法来使这个共享安全。

    代码:
    <?php
    /*
    TEST:
        create n threads
        each will
            - Shared::set() its own ref
            - check if Shared::exists() its own ref
            - Shared::get() its ref back
            - call method ::isRunning() at returned val to easily check if is ref or got overwritten by another context
    
    TODO:
        using ::synchronized to handle multi-context-access
    
    NOTES:
        every method as public to prevent pthreads v2 "Method Modifiers - Special Behaviour"
            see: "Method Modifiers - Special Behaviour"
                at http://blog.krakjoe.ninja/2015/08/a-letter-from-future.html
    */
    class Shared extends Threaded
    {
        public $data;
        public function exists($ident)
        {
            return isset($this->data[$ident]);
        }
        public function set($ident, $ref)
        {
            $return = false;
            if(!isset($this->data[$ident])){
                $data = $this->data;
                $data[$ident] = $ref;
                $this->data = $data;
                $return = $this->data[$ident];
            }
            #echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL;
            return $return;
        }
        public function get($ident)
        {
            $return = false;
            if($this->exists($ident) === true){
                $data = $this->data;
                $return = $data[$ident];
                unset($data[$ident]);
                $this->data = $data;
            }
            #echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL;
            return $return;
        }
    }
    
    class T extends Thread
    {
        public $count;
        public function __construct(Shared $Shared, $ident)
        {
            $this->Shared = $Shared;
            $this->ident = $ident;
        }
        public function run()
        {
            $slowdown = true;
            $this->count = 0;
            while(true){
                if($slowdown){
                    // "don't allow usleep or sleep" : https://github.com/krakjoe/pthreads/commit/a157b34057b0f584b4db326f30961b5c760dead8
                    //  loop a bit to simulate work:
                    $start = microtime(true);
                    $until = rand(1, 100000)/1000000;
                    while(microtime(true)-$start < $until){
                        // ...
                    }
                }
    
                if($this->Shared->exists($this->ident) === true){
                    $ref = $this->Shared->get($this->ident);
                }
                else{
                    $ref = $this->Shared->set($this->ident, $this);
                }
                // calling a method on $ref -- if not a ref we crash
                $ref->isRunning();
                unset($ref);
                $this->count++;
            }
        }
    }
    
    
    echo 'start ...' . PHP_EOL;
    
    $n = 8;
    $Shared = new Shared();
    for($i = 0, $refs = array(); $i < $n; $i++){
        $refs[$i] = new T($Shared, $i);
        $refs[$i]->start();
    }
    
    while(!empty($refs)){
        // print status:
        if(!isset($t)or microtime(true)-$t > 1){
            $t = microtime(true);
            echo 'status: ' . count($refs) . ' running atm ...' . PHP_EOL;
        }
    
        // join crashed threads:
        foreach($refs as $i => $thread){
            if($thread->isRunning() === false){
                echo 'T-' . $i . ' stopped after ' . $thread->count . PHP_EOL;
                if($thread->isJoined() === false){
                    $thread->join();
                }
                unset($refs[$i]);
            }
        }
    }
    
    echo 'no thread running anymore.' . PHP_EOL;
    
    /* output
    start ...
    status: 8 running atm ...
    
    Notice: Undefined offset: 6 in ...\shared_test.php on line 33
    
    Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
    T-6 stopped after 10
    status: 7 running atm ...
    
    Notice: Undefined offset: 4 in ...\shared_test.php on line 33
    
    Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
    T-4 stopped after 35
    status: 6 running atm ...
    
    Notice: Undefined offset: 7 in ...\shared_test.php on line 33
    
    Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
    T-7 stopped after 43
    status: 5 running atm ...
    status: 5 running atm ...
    status: 5 running atm ...
    
    [...]
    */
    ?>
    

    最佳答案

    Threaded对象已经是线程安全的,也就是说,任何时候你读、写、检查是否存在或删除(取消设置)一个成员,操作都是原子的——没有其他上下文可以执行任何上述操作,而第一个操作发生。对于用户不知道的引擎处理程序也是如此,低至最低级别的所有内容都是隐式安全的。

    然而,相当令人放心......当逻辑变得更复杂时,这有明显的限制,例如在设置之前检查成员的存在或对其进行其他操作,就像您正在做的那样:虽然对对象的操作是原子的,没有什么可以阻止另一个上下文 unset调用 isset 之间的成员以及读取属性/维度的调用。

    这适用于 PHP7 (pthreads v3+)

    安全和诚信在这里是两件不同的事情。当完整性很重要时,您可以使用 Threaded::synchronized在 PHP7 中正确保存它。在 PHP5 中,您也可以保留它,但代码会更复杂,解释也会更复杂。

    如果我理解它的逻辑,你的第二个例子应该无限期地运行。所以我使用这个假设来构建正确的代码,我将进一步假设您可能想要在这个无限循环中做什么,并在似乎需要的地方提供一些见解。

    <?php
    class Referee extends Threaded {
    
        public function find(string $ident, Threaded $reference) {
            return $this->synchronized(function () use($ident, $reference) {
                if (isset($this[$ident])) {
                    return $this[$ident];
                } else return ($this[$ident] = $reference);
            });
        }
    
        public function foreach(Closure $closure) {
            $this->synchronized(function() use($closure) {
                foreach ($this as $ident => $reference) {
                    $closure($ident, $reference);
                }
            });
        }
    }
    
    class Test extends Thread {
    
        public function __construct(Referee $referee, string $ident, bool $delay) {
            $this->referee = $referee;
            $this->ident   = $ident;
            $this->delay   = $delay;
        }
    
        public function run() {
            while (1) {
                if ($this->delay) {
                    $this->synchronized(function(){
                        $this->wait(1000000);
                    });
                }
    
                $reference = 
                    $this->referee->find($this->ident, $this);
    
                /* do something with reference here, I guess */         
    
                /* do something with all references here */
                $this->referee->foreach(function($ident, $reference){
                    var_dump(Thread::getCurrentThreadId(),
                            $reference->getIdent(), 
                            $reference->isRunning());
                });
            }
        }
    
        public function getIdent() {
            return $this->ident;
        }
    
        private $referee;
        private $ident;
        private $delay;
    }
    
    $referee = new Referee();
    $threads = [];
    $thread = 0;
    $idents = [
        "smelly",
        "dopey",
        "bashful",
        "grumpy",
        "sneezy",
        "sleepy",
        "happy",
        "naughty"
    ];
    
    while ($thread < 8) {
        $threads[$thread] = new Test($referee, $idents[$thread], rand(0, 1));
        $threads[$thread]->start();
        $thread++;
    }
    
    foreach ($threads as $thread)
        $thread->join();
    ?>
    

    所以我们会看看差异,我会告诉你为什么它们是这样的,以及你可以如何编写它们,你已经知道我们现在不是在谈论安全,而是完整性,你得到了(相当非凡)假设你写的任何东西都是“安全的”,正如解释的那样。

    第一个主要区别是:
    if ($this->delay) {
        $this->synchronized(function(){
            $this->wait(1000000);
        });
    }
    

    这只是制作 Thread 的合适方法等等,您不必使用 Thread本身来同步,你可以使用任何 Threaded目的。正确做事的好处是,如果不清楚,sleep 和 usleep 不会让线程处于接受状态,使用 ::wait做。

    在现实世界中,您真的应该只等待某些东西,这将是一个更复杂的块,它可能(并且应该)看起来更像:
    if ($this->delay) {
        $this->synchronized(function(){
            while ($this->condition) {
                $this->wait(1000000);
            }
        });
    }
    

    注意:从技术上讲,等待超时是在等待某些事情,但是,您可能会被超时以外的其他事情唤醒,并且应该为此准备代码。

    这样,另一个上下文能够通知 Thread它应该停止等待并优雅地关闭,或者立即执行一些其他重要的操作,只需同步、更改条件并通知 Thread .

    对于可预测的代码,熟悉同步、等待和通知的工作方式非常重要。

    接下来,我们有设置和/或获取引用的逻辑:
    $reference = 
        $this->referee->find($this->ident, $this);
    

    哪个调用这个:
    public function find(string $ident, Threaded $reference) {
        return $this->synchronized(function () use($ident, $reference) {
            if (isset($this[$ident])) {
                return $this[$ident];
            } else return ($this[$ident] = $reference);
        });
    }
    

    这是一个糟糕的命名,命名事情很困难,但是您可以看到,在这些分组操作发生时,通过同步保持了完整性。相同的方法也可用于获取对另一个对象的引用,只需稍作调整。

    我猜你对那个特定的引用做了一些事情(目前总是 $this)。我猜不出是什么。继续 ...

    我已经假设您希望对这些 Threads 中的每一个都做些什么,并且您希望在整个迭代发生时保持数据的完整性:
    $this->referee->foreach(function($ident, $reference){
        var_dump(Thread::getCurrentThreadId(),
                $reference->getIdent(), 
                $reference->isRunning());
    });
    

    其中调用:
    public function foreach(Closure $closure) {
        $this->synchronized(function() use($closure) {
            foreach ($this as $ident => $reference) {
                $closure($ident, $reference);
            }
        });
    }
    

    这就是你会如何做这样的事情。

    值得一提的是,这里并不一定需要synchronized;就像如果您从正在迭代的数组中删除成员不会发生任何坏事一样,如果您在迭代发生时取消设置或设置或对对象执行任何其他操作,也不会发生任何坏事。

    关于PHP pthreads - 共享对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32341744/

    有关PHP pthreads - 共享对象的更多相关文章

    1. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

      总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

    2. ruby-on-rails - 按天对 Mongoid 对象进行分组 - 2

      在控制台中反复尝试之后,我想到了这种方法,可以按发生日期对类似activerecord的(Mongoid)对象进行分组。我不确定这是完成此任务的最佳方法,但它确实有效。有没有人有更好的建议,或者这是一个很好的方法?#eventsisanarrayofactiverecord-likeobjectsthatincludeatimeattributeevents.map{|event|#converteventsarrayintoanarrayofhasheswiththedayofthemonthandtheevent{:number=>event.time.day,:event=>ev

    3. ruby-on-rails - 如何验证非模型(甚至非对象)字段 - 2

      我有一个表单,其中有很多字段取自数组(而不是模型或对象)。我如何验证这些字段的存在?solve_problem_pathdo|f|%>... 最佳答案 创建一个简单的类来包装请求参数并使用ActiveModel::Validations。#definedsomewhere,atthesimplest:require'ostruct'classSolvetrue#youcouldevencheckthesolutionwithavalidatorvalidatedoerrors.add(:base,"WRONG!!!")unlesss

    4. Ruby 写入和读取对象到文件 - 2

      好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信

    5. ruby - 通过 ruby​​ 进程共享变量 - 2

      我正在编写一个gem,我必须在其中fork两个启动两个webrick服务器的进程。我想通过基类的类方法启动这个服务器,因为应该只有这两个服务器在运行,而不是多个。在运行时,我想调用这两个服务器上的一些方法来更改变量。我的问题是,我无法通过基类的类方法访问fork的实例变量。此外,我不能在我的基类中使用线程,因为在幕后我正在使用另一个不是线程安全的库。所以我必须将每个服务器派生到它自己的进程。我用类变量试过了,比如@@server。但是当我试图通过基类访问这个变量时,它是nil。我读到在Ruby中不可能在分支之间共享类变量,对吗?那么,还有其他解决办法吗?我考虑过使用单例,但我不确定这是

    6. ruby-on-rails - 如果 Object::try 被发送到一个 nil 对象,为什么它会起作用? - 2

      如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象

    7. ruby-on-rails - 未在 Ruby 中初始化的对象 - 2

      我在Rails工作并有以下类(class):classPlayer当我运行时bundleexecrailsconsole然后尝试:a=Player.new("me",5.0,"UCLA")我回来了:=>#我不知道为什么Player对象不会在这里初始化。关于可能导致此问题的操作/解释的任何建议?谢谢,马里奥格 最佳答案 havenoideawhythePlayerobjectwouldn'tbeinitializedhere它没有初始化很简单,因为你还没有初始化它!您已经覆盖了ActiveRecord::Base初始化方法,但您没有调

    8. ruby - 如何在 Rails 4 中使用表单对象之前的验证回调? - 2

      我有一个服务模型/表及其注册表。在表单中,我几乎拥有服务的所有字段,但我想在验证服务对象之前自动设置其中一些值。示例:--服务Controller#创建Action:defcreate@service=Service.new@service_form=ServiceFormObject.new(@service)@service_form.validate(params[:service_form_object])and@service_form.saverespond_with(@service_form,location:admin_services_path)end在验证@ser

    9. ruby - 一个 YAML 对象可以引用另一个吗? - 2

      我想让一个yaml对象引用另一个,如下所示:intro:"Hello,dearuser."registration:$introThanksforregistering!new_message:$introYouhaveanewmessage!上面的语法只是它如何工作的一个例子(这也是它在thiscpanmodule中的工作方式。)我正在使用标准的ruby​​yaml解析器。这可能吗? 最佳答案 一些yaml对象确实引用了其他对象:irb>require'yaml'#=>trueirb>str="hello"#=>"hello"ir

    10. ruby - 更改 ActiveRecord 中对象的类 - 2

      假设我有一个FireNinja我的数据库中的对象,使用单表继承存储。后来才知道他真的是WaterNinja.将他更改为不同的子类的最干净的方法是什么?更好的是,我很想创建一个新的WaterNinja对象并替换旧的FireNinja在数据库中,保留ID。编辑我知道如何创建新的WaterNinja来self现有FireNinja的对象,我也知道我可以删除旧的并保存新的。我想做的是改变现有项目的类别。我是通过创建一个新对象并执行一些ActiveRecord魔法来替换行,还是通过对对象本身做一些疯狂的事情,或者甚至通过删除它并使用相同的ID重新插入来做到这一点,这是问题的一部分。

    随机推荐