jjzjj

c# - Reactive Extensions 看起来很慢——我做错了什么吗?

coder 2024-05-24 原文

我正在为一个每秒需要处理数千条消息的交易平台项目评估 Rx。现有平台有一个复杂的事件路由系统(多播委托(delegate))响应这些消息并进行大量后续处理。

我查看了 Reactive Extensions 的明显好处,但注意到它有点慢,通常慢 100 倍。

我创建了单元测试来演示这一点,它运行一个简单的增量 100 万次,使用各种 Rx 风格和直接开箱即用的委托(delegate)“控制”测试。

结果如下:

Delegate                                 - (1000000) - 00:00:00.0410000
Observable.Range()                       - (1000000) - 00:00:04.8760000
Subject.Subscribe() - NewThread          - (1000000) - 00:00:02.7630000
Subject.Subscribe() - CurrentThread      - (1000000) - 00:00:03.0280000
Subject.Subscribe() - Immediate          - (1000000) - 00:00:03.0030000
Subject.Subscribe() - ThreadPool         - (1000000) - 00:00:02.9800000
Subject.Subscribe() - Dispatcher         - (1000000) - 00:00:03.0360000

如您所见,所有 Rx 方法都比委托(delegate)等效方法慢约 100 倍。显然,Rx 在幕后做了很多工作,这些工作将在更复杂的示例中使用,但这看起来非常慢。

这是正常现象还是我的测试假设无效?上面的 Nunit 代码如下 -

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using NUnit.Framework;
using System.Concurrency;

namespace RxTests
{
    [TestFixture]
    class ReactiveExtensionsBenchmark_Tests
    {
        private int counter = 0;

        [Test]
        public void ReactiveExtensionsPerformanceComparisons()
        {
            int iterations = 1000000;

            Action<int> a = (i) => { counter++; };

            DelegateSmokeTest(iterations, a);
            ObservableRangeTest(iterations, a);
            SubjectSubscribeTest(iterations, a, Scheduler.NewThread, "NewThread");
            SubjectSubscribeTest(iterations, a, Scheduler.CurrentThread, "CurrentThread");
            SubjectSubscribeTest(iterations, a, Scheduler.Immediate, "Immediate");
            SubjectSubscribeTest(iterations, a, Scheduler.ThreadPool, "ThreadPool");
            SubjectSubscribeTest(iterations, a, Scheduler.Dispatcher, "Dispatcher");
        }

        public void ObservableRangeTest(int iterations, Action<int> action)
        {
            counter = 0;

            long start = DateTime.Now.Ticks;

            Observable.Range(0, iterations).Subscribe(action);

            OutputTestDuration("Observable.Range()", start);
        }


        public void SubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
        {
            counter = 0;

            var eventSubject = new Subject<int>();
            var events = eventSubject.SubscribeOn(scheduler); //edited - thanks dtb
            events.Subscribe(action);

            long start = DateTime.Now.Ticks;

            Enumerable.Range(0, iterations).ToList().ForEach
                (
                    a => eventSubject.OnNext(1)
                );

            OutputTestDuration("Subject.Subscribe() - " + mode, start);
        }

        public void DelegateSmokeTest(int iterations, Action<int> action)
        {
            counter = 0;
            long start = DateTime.Now.Ticks;

            Enumerable.Range(0, iterations).ToList().ForEach
                (
                    a => action(1)
                );

            OutputTestDuration("Delegate", start);
        }


        /// <summary>
        /// Output helper
        /// </summary>
        /// <param name="test"></param>
        /// <param name="duration"></param>
        public void OutputTestDuration(string test, long duration)
        {
            Debug.WriteLine(string.Format("{0, -40} - ({1}) - {2}", test, counter, ElapsedDuration(duration)));
        }

        /// <summary>
        /// Test timing helper
        /// </summary>
        /// <param name="elapsedTicks"></param>
        /// <returns></returns>
        public string ElapsedDuration(long elapsedTicks)
        {
            return new TimeSpan(DateTime.Now.Ticks - elapsedTicks).ToString();
        }

    }
}

最佳答案

我的猜测是 Rx 团队首先专注于构建功能,而不关心性能优化。

使用分析器确定瓶颈并用您自己的优化版本替换慢速 Rx 类。

下面是两个例子。

结果:

Delegate                                 - (1000000) - 00:00:00.0368748

Simple - NewThread                       - (1000000) - 00:00:00.0207676
Simple - CurrentThread                   - (1000000) - 00:00:00.0214599
Simple - Immediate                       - (1000000) - 00:00:00.0162026
Simple - ThreadPool                      - (1000000) - 00:00:00.0169848

FastSubject.Subscribe() - NewThread      - (1000000) - 00:00:00.0588149
FastSubject.Subscribe() - CurrentThread  - (1000000) - 00:00:00.0508842
FastSubject.Subscribe() - Immediate      - (1000000) - 00:00:00.0513911
FastSubject.Subscribe() - ThreadPool     - (1000000) - 00:00:00.0529137

First of all, it seems to matter a lot how the observable is implemented. Here's an observable that cannot be unsubscribed from, but it's fast:

private IObservable<int> CreateFastObservable(int iterations)
{
    return Observable.Create<int>(observer =>
    {
        new Thread(_ =>
        {
            for (int i = 0; i < iterations; i++)
            {
                observer.OnNext(i);
            }
            observer.OnCompleted();
        }).Start();
        return () => { };
    });
}

测试:

public void SimpleObserveTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
    counter = 0;

    var start = Stopwatch.StartNew();

    var observable = CreateFastObservable(iterations);

    observable.SubscribeOn(scheduler).Run(action);

    OutputTestDuration("Simple - " + mode, start);
}

主题增加了很多开销。这是一个主题,它被剥夺了主题的大部分预期功能,但速度很快:

class FastSubject<T> : ISubject<T>
{
    private event Action onCompleted;
    private event Action<Exception> onError;
    private event Action<T> onNext;

    public FastSubject()
    {
        onCompleted += () => { };
        onError += error => { };
        onNext += value => { };
    }

    public void OnCompleted()
    {
        this.onCompleted();
    }

    public void OnError(Exception error)
    {
        this.onError(error);
    }

    public void OnNext(T value)
    {
        this.onNext(value);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        this.onCompleted += observer.OnCompleted;
        this.onError += observer.OnError;
        this.onNext += observer.OnNext;

        return Disposable.Create(() =>
        {
            this.onCompleted -= observer.OnCompleted;
            this.onError -= observer.OnError;
            this.onNext -= observer.OnNext;
        });
    }
}

测试:

public void FastSubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
    counter = 0;

    var start = Stopwatch.StartNew();

    var observable = new ConnectableObservable<int>(CreateFastObservable(iterations), new FastSubject<int>()).RefCount();

    observable.SubscribeOn(scheduler).Run(action);

    OutputTestDuration("FastSubject.Subscribe() - " + mode, start);
}

关于c# - Reactive Extensions 看起来很慢——我做错了什么吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4272354/

有关c# - Reactive Extensions 看起来很慢——我做错了什么吗?的更多相关文章

  1. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  2. ruby-on-rails - Rails - 子类化模型的设计模式是什么? - 2

    我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co

  3. ruby - 什么是填充的 Base64 编码字符串以及如何在 ruby​​ 中生成它们? - 2

    我正在使用的第三方API的文档状态:"[O]urAPIonlyacceptspaddedBase64encodedstrings."什么是“填充的Base64编码字符串”以及如何在Ruby中生成它们。下面的代码是我第一次尝试创建转换为Base64的JSON格式数据。xa=Base64.encode64(a.to_json) 最佳答案 他们说的padding其实就是Base64本身的一部分。它是末尾的“=”和“==”。Base64将3个字节的数据包编码为4个编码字符。所以如果你的输入数据有长度n和n%3=1=>"=="末尾用于填充n%

  4. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  5. ruby - 为什么 4.1%2 使用 Ruby 返回 0.0999999999999996?但是 4.2%2==0.2 - 2

    为什么4.1%2返回0.0999999999999996?但是4.2%2==0.2。 最佳答案 参见此处:WhatEveryProgrammerShouldKnowAboutFloating-PointArithmetic实数是无限的。计算机使用的位数有限(今天是32位、64位)。因此计算机进行的浮点运算不能代表所有的实数。0.1是这些数字之一。请注意,这不是与Ruby相关的问题,而是与所有编程语言相关的问题,因为它来自计算机表示实数的方式。 关于ruby-为什么4.1%2使用Ruby返

  6. ruby - ruby 中的 TOPLEVEL_BINDING 是什么? - 2

    它不等于主线程的binding,这个toplevel作用域是什么?此作用域与主线程中的binding有何不同?>ruby-e'putsTOPLEVEL_BINDING===binding'false 最佳答案 事实是,TOPLEVEL_BINDING始终引用Binding的预定义全局实例,而Kernel#binding创建的新实例>Binding每次封装当前执行上下文。在顶层,它们都包含相同的绑定(bind),但它们不是同一个对象,您无法使用==或===测试它们的绑定(bind)相等性。putsTOPLEVEL_BINDINGput

  7. ruby - Infinity 和 NaN 的类型是什么? - 2

    我可以得到Infinity和NaNn=9.0/0#=>Infinityn.class#=>Floatm=0/0.0#=>NaNm.class#=>Float但是当我想直接访问Infinity或NaN时:Infinity#=>uninitializedconstantInfinity(NameError)NaN#=>uninitializedconstantNaN(NameError)什么是Infinity和NaN?它们是对象、关键字还是其他东西? 最佳答案 您看到打印为Infinity和NaN的只是Float类的两个特殊实例的字符串

  8. ruby-on-rails - 如果 Object::try 被发送到一个 nil 对象,为什么它会起作用? - 2

    如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象

  9. ruby - 为什么 SecureRandom.uuid 创建一个唯一的字符串? - 2

    关闭。这个问题需要detailsorclarity.它目前不接受答案。想改进这个问题吗?通过editingthispost添加细节并澄清问题.关闭8年前。Improvethisquestion为什么SecureRandom.uuid创建一个唯一的字符串?SecureRandom.uuid#=>"35cb4e30-54e1-49f9-b5ce-4134799eb2c0"SecureRandom.uuid方法创建的字符串从不重复?

  10. ruby - 当使用::指定模块时,为什么 Ruby 不在更高范围内查找类? - 2

    我刚刚被困在这个问题上一段时间了。以这个基地为例:moduleTopclassTestendmoduleFooendend稍后,我可以通过这样做在Foo中定义扩展Test的类:moduleTopmoduleFooclassSomeTest但是,如果我尝试通过使用::指定模块来最小化缩进:moduleTop::FooclassFailure这失败了:NameError:uninitializedconstantTop::Foo::Test这是一个错误,还是仅仅是Ruby解析变量名的方式的逻辑结果? 最佳答案 Isthisabug,or

随机推荐