jjzjj

C++ pthread阻塞队列死锁(我认为)

coder 2024-02-03 原文

我在使用 pthreads 时遇到问题,我认为我遇到了死锁。我创建了一个我认为有效的阻塞队列,但在进行更多测试后,我发现如果我尝试取消阻塞在 blocking_queue 上的多个线程,我似乎会遇到死锁。

阻塞队列很简单,看起来像这样:

template <class T> class Blocking_Queue
{
public:
    Blocking_Queue()
    {
        pthread_mutex_init(&_lock, NULL);
        pthread_cond_init(&_cond, NULL);
    }

    ~Blocking_Queue()
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_cond);
    }

    void put(T t)
    {
        pthread_mutex_lock(&_lock);
        _queue.push(t);
        pthread_cond_signal(&_cond);
        pthread_mutex_unlock(&_lock);
    }

     T pull()
     {
        pthread_mutex_lock(&_lock);
        while(_queue.empty())
        {
            pthread_cond_wait(&_cond, &_lock);
        }

        T t = _queue.front();
        _queue.pop();

        pthread_mutex_unlock(&_lock);

        return t;
     }

priavte:
    std::queue<T> _queue;
    pthread_cond_t _cond;
    pthread_mutex_t _lock;
}

为了测试,我创建了 4 个线程来处理这个阻塞队列。我向阻塞队列添加了一些打印语句,每个线程都进入了 pthread_cond_wait() 方法。但是,当我尝试在每个线程上调用 pthread_cancel() 和 pthread_join() 时,程序就会挂起。

我也只用一个线程测试过它,它工作得很好。

根据文档,pthread_cond_wait() 是一个取消点,因此在这些线程上调用取消应该会导致它们停止执行(这只适用于 1 个线程)。但是 pthread_mutex_lock 不是取消点。调用 pthread_cancel() 时是否会发生某些事情,被取消的线程在终止之前获取互斥锁并且不解锁它,然后当下一个线程被取消时它无法获取互斥锁和死锁?还是我做错了什么。

任何建议都会很好。谢谢:)

最佳答案

pthread_cancel() 最好避免。

您可以通过从那里抛出异常来解除阻塞在 Blocking_Queue::pull() 上阻塞的所有线程。

队列中的一个弱点是 T t = _queue.front(); 调用 T 的复制构造函数可能会抛出异常,从而使您的队列互斥体永远锁定。最好使用 C++ 作用域锁。

这里是一个优雅的线程终止的例子:

$ cat test.cc
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition_variable.hpp>
#include <exception>
#include <list>
#include <stdio.h>

struct BlockingQueueTerminate
    : std::exception
{};

template<class T>
class BlockingQueue
{
private:
    boost::mutex mtx_;
    boost::condition_variable cnd_;
    std::list<T> q_;
    unsigned blocked_;
    bool stop_;

public:
    BlockingQueue()
        : blocked_()
        , stop_()
    {}

    ~BlockingQueue()
    {
        this->stop(true);
    }

    void stop(bool wait)
    {
        // tell threads blocked on BlockingQueue::pull() to leave
        boost::mutex::scoped_lock lock(mtx_);
        stop_ = true;
        cnd_.notify_all();

        if(wait) // wait till all threads blocked on the queue leave BlockingQueue::pull()
            while(blocked_)
                cnd_.wait(lock);
    }

    void put(T t)
    {
        boost::mutex::scoped_lock lock(mtx_);
        q_.push_back(t);
        cnd_.notify_one();
    }

    T pull()
    {
        boost::mutex::scoped_lock lock(mtx_);

        ++blocked_;
        while(!stop_ && q_.empty())
            cnd_.wait(lock);
        --blocked_;

        if(stop_) {
            cnd_.notify_all(); // tell stop() this thread has left
            throw BlockingQueueTerminate();
        }

        T front = q_.front();
        q_.pop_front();
        return front;
    }
};

void sleep_ms(unsigned ms)
{
    // i am using old boost
    boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(ms));
    // with latest one you can do this
    //boost::thread::sleep(boost::posix_time::milliseconds(10));
}

void thread(int n, BlockingQueue<int>* q)
try
{
    for(;;) {
        int m = q->pull();
        printf("thread %u: pulled %d\n", n, m);
        sleep_ms(10);
    }
}
catch(BlockingQueueTerminate&)
{
    printf("thread %u: finished\n", n);
}

int main()
{
    BlockingQueue<int> q;

    // create two threads
    boost::thread_group tg;
    tg.create_thread(boost::bind(thread, 1, &q));
    tg.create_thread(boost::bind(thread, 2, &q));
    for(int i = 1; i < 10; ++i)
        q.put(i);
    sleep_ms(100); // let the threads do something
    q.stop(false); // tell the threads to stop
    tg.join_all(); // wait till they stop
}

$ g++ -pthread -Wall -Wextra -o test -lboost_thread-mt test.cc

$ ./test
thread 2: pulled 1
thread 1: pulled 2
thread 1: pulled 3
thread 2: pulled 4
thread 1: pulled 5
thread 2: pulled 6
thread 1: pulled 7
thread 2: pulled 8
thread 1: pulled 9
thread 2: finished
thread 1: finished

关于C++ pthread阻塞队列死锁(我认为),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5018783/

有关C++ pthread阻塞队列死锁(我认为)的更多相关文章

  1. ruby-on-rails - 如果为空或不验证数值,则使属性默认为 0 - 2

    我希望我的UserPrice模型的属性在它们为空或不验证数值时默认为0。这些属性是tax_rate、shipping_cost和price。classCreateUserPrices8,:scale=>2t.decimal:tax_rate,:precision=>8,:scale=>2t.decimal:shipping_cost,:precision=>8,:scale=>2endendend起初,我将所有3列的:default=>0放在表格中,但我不想要这样,因为它已经填充了字段,我想使用占位符。这是我的UserPrice模型:classUserPrice回答before_val

  2. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

    我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

  3. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  4. ruby - 使用 `+=` 和 `send` 方法 - 2

    如何将send与+=一起使用?a=20;a.send"+=",10undefinedmethod`+='for20:Fixnuma=20;a+=10=>30 最佳答案 恐怕你不能。+=不是方法,而是语法糖。参见http://www.ruby-doc.org/docs/ProgrammingRuby/html/tut_expressions.html它说Incommonwithmanyotherlanguages,Rubyhasasyntacticshortcut:a=a+2maybewrittenasa+=2.你能做的最好的事情是:

  5. ruby - 如何计算 Liquid 中的变量 +1 - 2

    我对如何计算通过{%assignvar=0%}赋值的变量加一完全感到困惑。这应该是最简单的任务。到目前为止,这是我尝试过的:{%assignamount=0%}{%forvariantinproduct.variants%}{%assignamount=amount+1%}{%endfor%}Amount:{{amount}}结果总是0。也许我忽略了一些明显的东西。也许有更好的方法。我想要存档的只是获取运行的迭代次数。 最佳答案 因为{{incrementamount}}将输出您的变量值并且不会影响{%assign%}定义的变量,我

  6. ruby - 在好的 Ruby 代码中没有注释是否被认为是可以接受的? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭5年前。Improvethisquestion我审查了一些用Ruby编写的专业代码,没有发现任何评论。代码读起来相当清晰,但没有self记录。我应该期望专业编写的Ruby代码有注释吗?或者,是否有一些Ruby原则认为注释不是必需的?

  7. arrays - Ruby 数组 += vs 推送 - 2

    我有一个数组数组,想将元素附加到子数组。+=做我想做的,但我想了解为什么push不做。我期望的行为(并与+=一起工作):b=Array.new(3,[])b[0]+=["apple"]b[1]+=["orange"]b[2]+=["frog"]b=>[["苹果"],["橙子"],["Frog"]]通过推送,我将推送的元素附加到每个子数组(为什么?):a=Array.new(3,[])a[0].push("apple")a[1].push("orange")a[2].push("frog")a=>[[“苹果”、“橙子”、“Frog”]、[“苹果”、“橙子”、“Frog”]、[“苹果”、“

  8. += 的 Ruby 方法 - 2

    有没有办法让Ruby能够做这样的事情?classPlane@moved=0@x=0defx+=(v)#thisiserror@x+=v@moved+=1enddefto_s"moved#{@moved}times,currentxis#{@x}"endendplane=Plane.newplane.x+=5plane.x+=10putsplane.to_s#moved2times,currentxis15 最佳答案 您不能在Ruby中覆盖复合赋值运算符。任务在内部处理。您应该覆盖+,而不是+=。plane.a+=b与plane.a=

  9. ruby-on-rails - Ruby 长时间运行的进程对队列事件使用react - 2

    我有一个将某些事件写入队列的Rails3应用。现在我想在服务器上创建一个服务,每x秒轮询一次队列,并按计划执行其他任务。除了创建ruby​​脚本并通过cron作业运行它之外,还有其他稳定的替代方案吗? 最佳答案 尽管启动基于Rails的持久任务是一种选择,但您可能希望查看更有序的系统,例如delayed_job或Starling管理您的工作量。我建议不要在cron中运行某些东西,因为启动整个Rails堆栈的开销可能很大。每隔几秒运行一次它是不切实际的,因为Rails上的启动时间通常为5-15秒,具体取决于您的硬件。不过,每天这样做几

  10. ruby - Sinatra + Heroku + Datamapper 使用 dm-sqlite-adapter 部署问题 - 2

    出于某种原因,heroku尝试要求dm-sqlite-adapter,即使它应该在这里使用Postgres。请注意,这发生在我打开任何URL时-而不是在gitpush本身期间。我构建了一个默认的Facebook应用程序。gem文件:source:gemcuttergem"foreman"gem"sinatra"gem"mogli"gem"json"gem"httparty"gem"thin"gem"data_mapper"gem"heroku"group:productiondogem"pg"gem"dm-postgres-adapter"endgroup:development,:t

随机推荐