上下文
我最近发布了 timer class for review on Code Review .我有一种直觉,因为我曾经看到 1 个单元测试失败,但无法重现该失败。因此,我发布了代码审查。
我得到了一些很好的反馈,突出了代码中的各种竞争条件。 (我想)我了解问题和解决方案,但在进行任何修复之前,我想通过单元测试来暴露错误。当我尝试时,我意识到这很困难。各种堆栈交换答案表明我必须控制线程的执行以暴露错误,并且任何人为的时间不一定可以移植到不同的机器上。这似乎是我试图解决的问题之外的许多意外复杂性。
我尝试使用 the best static analysis (SA) tool for python ,PyLint,看看它是否会找出任何错误,但它不能。为什么人类可以通过代码审查(本质上是 SA)找到错误,而 SA 工具却不能?
不敢尝试get Valgrind working with python (这听起来像牦牛剃须),我决定在不先复制它们的情况下修复错误。现在我陷入了困境。
现在是代码。
from threading import Timer, Lock
from time import time
class NotRunningError(Exception): pass
class AlreadyRunningError(Exception): pass
class KitchenTimer(object):
'''
Loosely models a clockwork kitchen timer with the following differences:
You can start the timer with arbitrary duration (e.g. 1.2 seconds).
The timer calls back a given function when time's up.
Querying the time remaining has 0.1 second accuracy.
'''
PRECISION_NUM_DECIMAL_PLACES = 1
RUNNING = "RUNNING"
STOPPED = "STOPPED"
TIMEUP = "TIMEUP"
def __init__(self):
self._stateLock = Lock()
with self._stateLock:
self._state = self.STOPPED
self._timeRemaining = 0
def start(self, duration=1, whenTimeup=None):
'''
Starts the timer to count down from the given duration and call whenTimeup when time's up.
'''
with self._stateLock:
if self.isRunning():
raise AlreadyRunningError
else:
self._state = self.RUNNING
self.duration = duration
self._userWhenTimeup = whenTimeup
self._startTime = time()
self._timer = Timer(duration, self._whenTimeup)
self._timer.start()
def stop(self):
'''
Stops the timer, preventing whenTimeup callback.
'''
with self._stateLock:
if self.isRunning():
self._timer.cancel()
self._state = self.STOPPED
self._timeRemaining = self.duration - self._elapsedTime()
else:
raise NotRunningError()
def isRunning(self):
return self._state == self.RUNNING
def isStopped(self):
return self._state == self.STOPPED
def isTimeup(self):
return self._state == self.TIMEUP
@property
def timeRemaining(self):
if self.isRunning():
self._timeRemaining = self.duration - self._elapsedTime()
return round(self._timeRemaining, self.PRECISION_NUM_DECIMAL_PLACES)
def _whenTimeup(self):
with self._stateLock:
self._state = self.TIMEUP
self._timeRemaining = 0
if callable(self._userWhenTimeup):
self._userWhenTimeup()
def _elapsedTime(self):
return time() - self._startTime
问题
在此代码示例的上下文中,我如何公开竞争条件、修复它们并证明它们已修复?
加分
适用于其他实现和问题而不是专门针对此代码的测试框架的加分项。
外卖
我的结论是,重现已识别竞争条件的技术解决方案是控制两个线程的同步性,以确保它们按照会暴露错误的顺序执行。这里的重点是它们是已经确定的竞争条件。我发现识别竞争条件的最佳方法是将您的代码提交代码审查并鼓励更多专家对其进行分析。
最佳答案
传统上,在多线程代码中强制竞争条件是通过信号量完成的,因此您可以强制一个线程等到另一个线程达到某种边缘条件后再继续。
例如,您的对象有一些代码来检查如果对象已经在运行,是否没有调用 start。您可以通过执行以下操作强制此条件以确保其行为符合预期:
KitchenTimerAlreadyRunningError要做到这一点,您可能需要扩展 KitchenTimer 类。正式的单元测试通常会使用定义为在关键时刻阻塞的模拟对象。模拟对象是一个比我在这里可以解决的更大的话题,但是谷歌搜索“python 模拟对象”会发现很多文档和许多实现可供选择。
这是一种强制代码抛出 AlreadyRunningError 的方法:
import threading
class TestKitchenTimer(KitchenTimer):
_runningLock = threading.Condition()
def start(self, duration=1, whenTimeUp=None):
KitchenTimer.start(self, duration, whenTimeUp)
with self._runningLock:
print "waiting on _runningLock"
self._runningLock.wait()
def resume(self):
with self._runningLock:
self._runningLock.notify()
timer = TestKitchenTimer()
# Start the timer in a subthread. This thread will block as soon as
# it is started.
thread_1 = threading.Thread(target = timer.start, args = (10, None))
thread_1.start()
# Attempt to start the timer in a second thread, causing it to throw
# an AlreadyRunningError.
try:
thread_2 = threading.Thread(target = timer.start, args = (10, None))
thread_2.start()
except AlreadyRunningError:
print "AlreadyRunningError"
timer.resume()
timer.stop()
通读代码,确定您要测试的一些边界条件,然后考虑您需要在哪里暂停计时器以强制该条件出现,并添加条件、信号量、事件等来实现它发生。例如如果在计时器运行 whenTimeUp 回调时,另一个线程试图停止它会发生什么?您可以通过让计时器在输入_whenTimeUp 后立即等待来强制该条件:
import threading
class TestKitchenTimer(KitchenTimer):
_runningLock = threading.Condition()
def _whenTimeup(self):
with self._runningLock:
self._runningLock.wait()
KitchenTimer._whenTimeup(self)
def resume(self):
with self._runningLock:
self._runningLock.notify()
def TimeupCallback():
print "TimeupCallback was called"
timer = TestKitchenTimer()
# The timer thread will block when the timer expires, but before the callback
# is invoked.
thread_1 = threading.Thread(target = timer.start, args = (1, TimeupCallback))
thread_1.start()
sleep(2)
# The timer is now blocked. In the parent thread, we stop it.
timer.stop()
print "timer is stopped: %r" % timer.isStopped()
# Now allow the countdown thread to resume.
timer.resume()
子类化你想测试的类并不是一个很好的方法来测试它:你必须重写基本上所有的方法来测试每个方法的竞争条件,在那个时候有一个很好的要提出的论点是您并没有真正测试原始代码。相反,您可能会发现将信号量直接放在 KitchenTimer 对象中但默认初始化为 None 更简洁,并让您的方法在获取或等待锁之前检查 if testRunningLock is not None:。然后,您可以强制对您提交的实际代码进行竞争。
一些关于 Python 模拟框架的阅读可能会有所帮助。事实上,我不确定模拟是否有助于测试这段代码:它几乎完全是独立的,不依赖于许多外部对象。但是模拟教程有时会涉及到这些问题。我没有使用过任何这些,但是这些文档是一个很好的开始:
关于python - 如何可靠地重现此 python 代码中的竞争条件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19602535/
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
作为我的Rails应用程序的一部分,我编写了一个小导入程序,它从我们的LDAP系统中吸取数据并将其塞入一个用户表中。不幸的是,与LDAP相关的代码在遍历我们的32K用户时泄漏了大量内存,我一直无法弄清楚如何解决这个问题。这个问题似乎在某种程度上与LDAP库有关,因为当我删除对LDAP内容的调用时,内存使用情况会很好地稳定下来。此外,不断增加的对象是Net::BER::BerIdentifiedString和Net::BER::BerIdentifiedArray,它们都是LDAP库的一部分。当我运行导入时,内存使用量最终达到超过1GB的峰值。如果问题存在,我需要找到一些方法来更正我的代
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题
给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚
如何在buildr项目中使用Ruby?我在很多不同的项目中使用过Ruby、JRuby、Java和Clojure。我目前正在使用我的标准Ruby开发一个模拟应用程序,我想尝试使用Clojure后端(我确实喜欢功能代码)以及JRubygui和测试套件。我还可以看到在未来的不同项目中使用Scala作为后端。我想我要为我的项目尝试一下buildr(http://buildr.apache.org/),但我注意到buildr似乎没有设置为在项目中使用JRuby代码本身!这看起来有点傻,因为该工具旨在统一通用的JVM语言并且是在ruby中构建的。除了将输出的jar包含在一个独特的、仅限ruby