在我的一个 Java 6 应用程序中,我有一个线程向主线程提供数据,同时还从数据库中预取更多记录。它使用 ArrayBlockingQueue queue 作为一个 FIFO 缓冲区,它的主循环是这样的:
while (!Thread.interrupted()) {
if (source.hasNext()) {
try {
queue.put(source.next())
} catch (InterruptedException e) {
break;
}
} else {
break;
}
}
有些代码会在循环终止后进行一些清理,例如污染队列和释放任何资源,但这几乎就是全部。
就目前而言,没有从主线程到供给线程的直接通信:供给线程使用适当的选项设置,然后使用阻塞队列自行离开控制数据流。
当队列满时主线程需要关闭feeder时出现问题。由于没有直接控制 channel ,关闭方法使用Thread interrupt() 的接口(interface)馈线。不幸的是,在大多数情况下,馈线在 put() 中仍然被阻塞。 ,尽管被打断了——没有抛出异常。
通过对 interrupt() 的简要阅读文档和队列实现源代码,在我看来,put() 经常阻塞而不使用 JVM 的任何可中断设施。更具体地说,在我当前的 JVM (OpenJDK 1.6b22) 上,它会阻塞 sun.misc.Unsafe.park() native 方法。也许它使用自旋锁或其他东西,但无论如何,这似乎属于 the following case。 :
If none of the previous conditions hold then this thread's interrupt status will be set.
状态标志已设置,但线程仍阻塞在 put() 中并且不会进一步迭代以便检查标志。结果?一个不会死的僵尸线程!
我对这个问题的理解是否正确,还是我遗漏了什么?
解决此问题的可能方法有哪些?目前我只能想到两个解决方案:
一个。在队列中多次调用 poll() 以解除对馈线线程的阻塞:从我所见来看,这很丑陋且不太可靠,但它大部分都有效。 p>
使用带超时的 offer() 方法而不是 put() 以允许线程在可接受的时间范围内检查其中断状态。
除非我遗漏了什么,否则这是 Java 中 BlockingQueue 实现的一个文档不足的警告。当文档例如建议中毒队列以关闭工作线程,但我找不到任何明确的引用。
编辑:
好的,上面的解决方案 (a) 有一个更,呃,更剧烈的变化:ArrayBlockingQueue.clear() .我认为这应该始终有效,即使它不完全是优雅的定义......
最佳答案
我认为您的问题可能有两个原因。
如 The Law of the Sabotaged Doorbell 中所述您可能没有正确处理中断。在那里你会发现:
What should we do when we call code that may cause an InterruptedException? Don't immediately yank out the batteries! Typically there are two answers to that question:
Rethrow the InterruptedException from your method. This is usually the easiest and best approach. It is used by the new java.util.concurrent.* package, which explains why we are now constantly coming into contact with this exception.
Catch it, set interrupted status, return. If you are running in a loop that calls code which may cause the exception, you should set the status back to being interrupted.
For example:while (!Thread.currentThread().isInterrupted()) { // do something try { TimeUnit.SECONDS.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } }
source.hasNext() 或 source.next() 正在消耗和丢弃中断状态。请参阅下面的已添加,了解我是如何解决这个问题的。
我相信在 ArrayBlockingqueue.put() 处中断线程 是一种有效的解决方案。
已添加
我使用可以从阅读器端关闭的 CloseableBlockingQueue 解决了问题 2。这样一来,一旦关闭,所有的put调用都会快捷。然后,您可以从编写器检查队列的 closed 标志。
// A blocking queue I can close from the pull end.
// Please only use put because offer does not shortcut on close.
// <editor-fold defaultstate="collapsed" desc="// Exactly what it says on the tin.">
class CloseableBlockingQueue<E> extends ArrayBlockingQueue<E> {
// Flag indicates closed state.
private volatile boolean closed = false;
// All blocked threads. Actually this is all threads that are in the process
// of invoking a put but if put doesn't block then they disappear pretty fast.
// NB: Container is O(1) for get and almost O(1) (depending on how busy it is) for put.
private final Container<Thread> blocked;
// Limited size.
public CloseableBlockingQueue(int queueLength) {
super(queueLength);
blocked = new Container<Thread>(queueLength);
}
/**
* *
* Shortcut to do nothing if closed.
*
* Track blocked threads.
*/
@Override
public void put(E e) throws InterruptedException {
if (!closed) {
Thread t = Thread.currentThread();
// Hold my node on the stack so removal can be trivial.
Container.Node<Thread> n = blocked.add(t);
try {
super.put(e);
} finally {
// Not blocked anymore.
blocked.remove(n, t);
}
}
}
/**
*
* Shortcut to do nothing if closed.
*/
@Override
public E poll() {
E it = null;
// Do nothing when closed.
if (!closed) {
it = super.poll();
}
return it;
}
/**
*
* Shortcut to do nothing if closed.
*/
@Override
public E poll(long l, TimeUnit tu) throws InterruptedException {
E it = null;
// Do nothing when closed.
if (!closed) {
it = super.poll(l, tu);
}
return it;
}
/**
*
* isClosed
*/
boolean isClosed() {
return closed;
}
/**
*
* Close down everything.
*/
void close() {
// Stop all new queue entries.
closed = true;
// Must unblock all blocked threads.
// Walk all blocked threads and interrupt them.
for (Thread t : blocked) {
//log("! Interrupting " + t.toString());
// Interrupt all of them.
t.interrupt();
}
}
@Override
public String toString() {
return blocked.toString();
}
}
您还需要无锁的 Container 和 O(1) put/get(虽然它不是严格意义上的集合)。它在幕后使用了一个Ring。
public class Container<T> implements Iterable<T> {
// The capacity of the container.
final int capacity;
// The list.
AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
// Constructor
public Container(int capacity) {
this.capacity = capacity;
// Construct the list.
Node<T> h = new Node<T>();
Node<T> it = h;
// One created, now add (capacity - 1) more
for (int i = 0; i < capacity - 1; i++) {
// Add it.
it.next = new Node<T>();
// Step on to it.
it = it.next;
}
// Make it a ring.
it.next = h;
// Install it.
head.set(h);
}
// Empty ... NOT thread safe.
public void clear() {
Node<T> it = head.get();
for (int i = 0; i < capacity; i++) {
// Trash the element
it.element = null;
// Mark it free.
it.free.set(true);
it = it.next;
}
// Clear stats.
resetStats();
}
// Add a new one.
public Node<T> add(T element) {
// Get a free node and attach the element.
return getFree().attach(element);
}
// Find the next free element and mark it not free.
private Node<T> getFree() {
Node<T> freeNode = head.get();
int skipped = 0;
// Stop when we hit the end of the list
// ... or we successfully transit a node from free to not-free.
while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
skipped += 1;
freeNode = freeNode.next;
}
if (skipped < capacity) {
// Put the head as next.
// Doesn't matter if it fails. That would just mean someone else was doing the same.
head.set(freeNode.next);
} else {
// We hit the end! No more free nodes.
throw new IllegalStateException("Capacity exhausted.");
}
return freeNode;
}
// Mark it free.
public void remove(Node<T> it, T element) {
// Remove the element first.
it.detach(element);
// Mark it as free.
if (!it.free.compareAndSet(false, true)) {
throw new IllegalStateException("Freeing a freed node.");
}
}
// The Node class. It is static so needs the <T> repeated.
public static class Node<T> {
// The element in the node.
private T element;
// Are we free?
private AtomicBoolean free = new AtomicBoolean(true);
// The next reference in whatever list I am in.
private Node<T> next;
// Construct a node of the list
private Node() {
// Start empty.
element = null;
}
// Attach the element.
public Node<T> attach(T element) {
// Sanity check.
if (this.element == null) {
this.element = element;
} else {
throw new IllegalArgumentException("There is already an element attached.");
}
// Useful for chaining.
return this;
}
// Detach the element.
public Node<T> detach(T element) {
// Sanity check.
if (this.element == element) {
this.element = null;
} else {
throw new IllegalArgumentException("Removal of wrong element.");
}
// Useful for chaining.
return this;
}
@Override
public String toString() {
return element != null ? element.toString() : "null";
}
}
// Provides an iterator across all items in the container.
public Iterator<T> iterator() {
return new UsedNodesIterator<T>(this);
}
// Iterates across used nodes.
private static class UsedNodesIterator<T> implements Iterator<T> {
// Where next to look for the next used node.
Node<T> it;
int limit = 0;
T next = null;
public UsedNodesIterator(Container<T> c) {
// Snapshot the head node at this time.
it = c.head.get();
limit = c.capacity;
}
public boolean hasNext() {
if (next == null) {
// Scan to the next non-free node.
while (limit > 0 && it.free.get() == true) {
it = it.next;
// Step down 1.
limit -= 1;
}
if (limit != 0) {
next = it.element;
}
}
return next != null;
}
public T next() {
T n = null;
if ( hasNext () ) {
// Give it to them.
n = next;
next = null;
// Step forward.
it = it.next;
limit -= 1;
} else {
// Not there!!
throw new NoSuchElementException ();
}
return n;
}
public void remove() {
throw new UnsupportedOperationException("Not supported.");
}
}
@Override
public String toString() {
StringBuilder s = new StringBuilder();
Separator comma = new Separator(",");
// Keep counts too.
int usedCount = 0;
int freeCount = 0;
// I will iterate the list myself as I want to count free nodes too.
Node<T> it = head.get();
int count = 0;
s.append("[");
// Scan to the end.
while (count < capacity) {
// Is it in-use?
if (it.free.get() == false) {
// Grab its element.
T e = it.element;
// Is it null?
if (e != null) {
// Good element.
s.append(comma.sep()).append(e.toString());
// Count them.
usedCount += 1;
} else {
// Probably became free while I was traversing.
// Because the element is detached before the entry is marked free.
freeCount += 1;
}
} else {
// Free one.
freeCount += 1;
}
// Next
it = it.next;
count += 1;
}
// Decorate with counts "]used+free".
s.append("]").append(usedCount).append("+").append(freeCount);
if (usedCount + freeCount != capacity) {
// Perhaps something was added/freed while we were iterating.
s.append("?");
}
return s.toString();
}
}
关于java - 当线程被中断时,BlockingQueue 方法是否总是抛出 InterruptedException?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9254109/
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
我正在尝试设置一个puppet节点,但rubygems似乎不正常。如果我通过它自己的二进制文件(/usr/lib/ruby/gems/1.8/gems/facter-1.5.8/bin/facter)在cli上运行facter,它工作正常,但如果我通过由rubygems(/usr/bin/facter)安装的二进制文件,它抛出:/usr/lib/ruby/1.8/facter/uptime.rb:11:undefinedmethod`get_uptime'forFacter::Util::Uptime:Module(NoMethodError)from/usr/lib/ruby
给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru
我想了解Ruby方法methods()是如何工作的。我尝试使用“ruby方法”在Google上搜索,但这不是我需要的。我也看过ruby-doc.org,但我没有找到这种方法。你能详细解释一下它是如何工作的或者给我一个链接吗?更新我用methods()方法做了实验,得到了这样的结果:'labrat'代码classFirstdeffirst_instance_mymethodenddefself.first_class_mymethodendendclassSecond使用类#returnsavailablemethodslistforclassandancestorsputsSeco
我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer
设置:狂欢ruby1.9.2高线(1.6.13)描述:我已经相当习惯在其他一些项目中使用highline,但已经有几个月没有使用它了。现在,在Ruby1.9.2上全新安装时,它似乎不允许在同一行回答提示。所以以前我会看到类似的东西:require"highline/import"ask"Whatisyourfavoritecolor?"并得到:Whatisyourfavoritecolor?|现在我看到类似的东西:Whatisyourfavoritecolor?|竖线(|)符号是我的终端光标。知道为什么会发生这种变化吗? 最佳答案
我已经从我的命令行中获得了一切,所以我可以运行rubymyfile并且它可以正常工作。但是当我尝试从sublime中运行它时,我得到了undefinedmethod`require_relative'formain:Object有人知道我的sublime设置中缺少什么吗?我正在使用OSX并安装了rvm。 最佳答案 或者,您可以只使用“require”,它应该可以正常工作。我认为“require_relative”仅适用于ruby1.9+ 关于ruby-主要:Objectwhenrun
这个问题在这里已经有了答案:Checktoseeifanarrayisalreadysorted?(8个答案)关闭9年前。我只是想知道是否有办法检查数组是否在增加?这是我的解决方案,但我正在寻找更漂亮的方法:n=-1@arr.flatten.each{|e|returnfalseife