jjzjj

c# - 加入 Rx 流

coder 2023-07-11 原文

我正在尝试对(对我来说)不重要的 Rx 查询建模:

  • 房间里有男人和女人。
  • 他们进出房间,在房间里时,他们有时会改变位置。
  • 每个男人在给定时间可以看一个(或零个)女人。
  • 每个人都有以下属性:

    class Man
    {
      public const int LookingAtNobody = 0;
      public int Id { get; set; }
      public double Location { get; set; }
      public int LookingAt { get; set; }
    }
    
  • 每个女人都有以下属性:

    class Woman
    {
      public int Id { get; set; }
      public double Location { get; set; }
    }
    
  • 为了代表男人,我们有 IObservable<IObservable<Man>> ,并代表我们拥有的女性IObservable<IObservable<Woman>> .

您将如何使用 Rx 生成从男性到他们正在看的女性的向量:IObservable<IObservable<Tuple<double,double>>>

为了提供帮助,这里有一些针对一些简单情况的单元测试:

public class Tests : ReactiveTest
{
    [Test]
    public void Puzzle1()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnCompleted<Man>(300));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(300),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle2()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnCompleted<Man>(400));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(350),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle3()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnNext(300, new Man { Id = 1, Location = 2.0, LookingAt = Man.LookingAtNobody }),
            OnCompleted<Man>(400));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(300),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle4()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnNext(300, new Man { Id = 1, Location = 3.0, LookingAt = 20 }),
            OnNext(400, new Man { Id = 1, Location = 4.0, LookingAt = 20 }),
            OnCompleted<Man>(500));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));
        var w2 = scheduler.CreateHotObservable(
            OnNext(155, new Woman { Id = 20, Location = 100.0 }),
            OnNext(255, new Woman { Id = 20, Location = 200.0 }),
            OnNext(355, new Woman { Id = 20, Location = 300.0 }),
            OnCompleted<Woman>(455));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1), OnNext(50, w2));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(300),
                       };
        var expectedVector2 = new[]
                       {
                           OnNext(300, Tuple.Create(3.0, 200.0)),
                           OnNext(355, Tuple.Create(3.0, 300.0)),
                           OnNext(400, Tuple.Create(4.0, 300.0)),
                           OnCompleted<Tuple<double,double>>(455),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
        ReactiveAssert.AreElementsEqual(expectedVector2, innerResults[1]);
    }

    private static IEnumerable<Recorded<Notification<IList<Recorded<Notification<Tuple<double, double>>>>>>> runQuery(TestScheduler scheduler, IObservable<IObservable<Woman>> women, IObservable<IObservable<Man>> men)
    {
        // assuming nested sequences are hot
        var vectors =
            from manDuration in men
            join womanDuration in women on manDuration equals womanDuration
            select from man in manDuration
                   join woman in womanDuration on manDuration equals womanDuration
                   where man.LookingAt == woman.Id
                   select Tuple.Create(man.Location, woman.Location);

        var query = vectors.Select(vectorDuration =>
        {
            var vectorResults = scheduler.CreateObserver<Tuple<double, double>>();
            vectorDuration.Subscribe(vectorResults);
            return vectorResults.Messages;
        });

        var results = scheduler.Start(() => query, 0, 0, 1000).Messages;
        return results;
    }
}

(注意:这个问题交叉发布到 Rx 论坛:http://social.msdn.microsoft.com/Forums/en-US/rx/thread/e73ae4e2-68c3-459a-a5b6-ea957b205abe)

最佳答案

如果我对您的理解正确,目标是创建一个“跟随可观察对象”的可观察对象,其中“跟随可观察对象”从男人开始看女人时开始,到男人停止看女人时结束。 “follow observable”应该包含男人和女人最近位置的元组。

这里的想法是使用CombineLatest,它将接受两个可观察值,当它们中的任何一个产生一个值时,组合器将针对两个可观察值的最新值进行评估,从而产生一个组合可观察值中的值。但是,CombineLatest 仅在两个可观察对象都已完成时才完成。在这种情况下,我们希望在两个来源中的任何一个完成时完成可观察对象。为此,我们定义了以下扩展方法(我不认为这样的方法已经存在,但可能有更简单的解决方案):

public static IObservable<TSource>
  UntilCompleted<TSource, TWhile>(this IObservable<TSource> source,
                                       IObservable<TWhile> lifetime)
{
  return Observable.Create<TSource>(observer =>
  {
    var subscription = source.Subscribe(observer);
    var limiter = lifetime.Subscribe(next => { }, () =>
    {
      subscription.Dispose();
      observer.OnCompleted();
    });
    return new CompositeDisposable(subscription, limiter);
  });
}

此方法类似于TakeUntil,但它不是一直持续到lifetime 产生一个值,而是一直持续到lifetime 完成。我们还可以定义一个简单的扩展方法,它采用满足谓词的第一个连胜:

public static IObservable<TSource>
  Streak<TSource>(this IObservable<TSource> source,
                       Func<TSource, bool> predicate)
{
  return source.SkipWhile(x => !predicate(x)).TakeWhile(predicate);
}

现在对于最终查询,我们使用 CombineLatest 将所有男性和所有女性组合在一起,并使用 UntilCompleted 尽早完成该可观察对象。为了获得“follow observables”,我们选择男人注视女人的那一连串。然后我们简单地将其映射到一个位置元组。

var vectors =
  from manDuration in men
  from womanDuration in women
  select manDuration
  .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w })
  .UntilCompleted(womanDuration)
  .UntilCompleted(manDuration)
  .Streak(pair => pair.Man.LookingAt == pair.Woman.Id)
  .Select(pair => Tuple.Create(pair.Man.Location, pair.Woman.Location));

这通过了您的所有测试,但它无法处理男人看女人 10 一会儿,然后看 20 一会儿,然后再看 10 一会儿的场景;仅使用第一个 连胜。要观察所有条纹,我们可以使用以下扩展方法,它返回条纹的可观察值:

public static IObservable<IObservable<TSource>>
  Streaks<TSource>(this IObservable<TSource> source,
                        Func<TSource, bool> predicate)
{
  return Observable.Create<IObservable<TSource>>(observer =>
  {
    ReplaySubject<TSource> subject = null;
    bool previous = false;
    return source.Subscribe(x =>
    {
      bool current = predicate(x);
      if (!previous && current)
      {
        subject = new ReplaySubject<TSource>();
        observer.OnNext(subject);
      }
      if (previous && !current) subject.OnCompleted();
      if (current) subject.OnNext(x);
      previous = current;
    }, () =>
    {
      if (subject != null) subject.OnCompleted();
      observer.OnCompleted();
    });
  });
}

通过只订阅一次源流,并使用 ReplaySubject,此方法适用于热和冷可观察对象。现在对于最终查询,我们按如下方式选择所有条纹:

var vectors =
  from manDuration in men
  from womanDuration in women
  from streak in manDuration
  .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w })
  .UntilCompleted(womanDuration)
  .UntilCompleted(manDuration)
  .Streaks(pair => pair.Man.LookingAt == pair.Woman.Id)
  select streak.Select(pair =>
    Tuple.Create(pair.Man.Location, pair.Woman.Location));

关于c# - 加入 Rx 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9758083/

有关c# - 加入 Rx 流的更多相关文章

  1. ruby - 如何离开加入Arel? - 2

    Arel3.0.2提供了两个类来指定连接类型:Arel::Nodes::InnerJoin和Arel::Nodes::OuterJoin并使用InnerJoin默认。foo=Arel::Table.new('foo')bar=Arel::Table.new('bar')foo.join(bar,Arel::Nodes::InnerJoin)#innerfoo.join(bar,Arel::Nodes::OuterJoin)#outerfoo.join(bar,???)#left如果要生成左连接,如何连接两个表? 最佳答案 你可以使用

  2. c# - 如何在 ruby​​ 中调用 C# dll? - 2

    如何在ruby​​中调用C#dll? 最佳答案 我能想到几种可能性:为您的DLL编写(或找人编写)一个COM包装器,如果它还没有,则使用Ruby的WIN32OLE库来调用它;看看RubyCLR,其中一位作者是JohnLam,他继续在Microsoft从事IronRuby方面的工作。(估计不会再维护了,可能不支持.Net2.0以上的版本);正如其他地方已经提到的,看看使用IronRuby,如果这是您的技术选择。有一个主题是here.请注意,最后一篇文章实际上来自JohnLam(看起来像是2009年3月),他似乎很自在地断言RubyCL

  3. C# 到 Ruby sha1 base64 编码 - 2

    我正在尝试在Ruby中复制Convert.ToBase64String()行为。这是我的C#代码:varsha1=newSHA1CryptoServiceProvider();varpasswordBytes=Encoding.UTF8.GetBytes("password");varpasswordHash=sha1.ComputeHash(passwordBytes);returnConvert.ToBase64String(passwordHash);//returns"W6ph5Mm5Pz8GgiULbPgzG37mj9g="当我在Ruby中尝试同样的事情时,我得到了相同sha

  4. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  5. c# - C# 中的 Flatten Ruby 方法 - 2

    我如何做Ruby方法"Flatten"RubyMethod在C#中。此方法将锯齿状数组展平为一维数组。例如:s=[1,2,3]#=>[1,2,3]t=[4,5,6,[7,8]]#=>[4,5,6,[7,8]]a=[s,t,9,10]#=>[[1,2,3],[4,5,6,[7,8]],9,10]a.flatten#=>[1,2,3,4,5,6,7,8,9,10 最佳答案 递归解决方案:IEnumerableFlatten(IEnumerablearray){foreach(variteminarray){if(itemisIEnume

  6. ruby - 可以像在 C# 中使用#region 一样在 Ruby 中使用 begin/end 吗? - 2

    我最近从C#转向了Ruby,我发现自己无法制作可折叠的标记代码区域。我只是想到做这种事情应该没问题:classExamplebegin#agroupofmethodsdefmethod1..enddefmethod2..endenddefmethod3..endend...但是这样做真的可以吗?method1和method2最终与method3是同一种东西吗?还是有一些我还没有见过的用于执行此操作的Ruby惯用语? 最佳答案 正如其他人所说,这不会改变方法定义。但是,如果要标记方法组,为什么不使用Ruby语义来标记它们呢?您可以使用

  7. c# - Ruby 等效于 C# Linq 聚合方法 - 2

    什么是Linq聚合方法的ruby​​等价物。它的工作原理是这样的varfactorial=new[]{1,2,3,4,5}.Aggregate((acc,i)=>acc*i);每次将数组序列中的值传递给lambda时,变量acc都会累积。 最佳答案 这在数学以及几乎所有编程语言中通常称为折叠。它是更普遍的变形概念的一个实例。Ruby从Smalltalk中继承了这个特性的名称,它被称为inject:into:(像aCollectioninject:aStartValueinto:aBlock一样使用。)所以,在Ruby中,它称为inj

  8. c# - 先学什么? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭8年前。Improvethisquestion几年前我去学校学习编程,毕业后我找到了一份系统管理方面的工作,这就是我职业生涯的方向。我想重新开始某种开发,并且一直在“玩”C#和ASP.NET,但我已经听到很多关于其他"new"语言的讨论(新的意思是它们是新的)我)喜欢Ruby和F#。我想我想知道我是否在浪费时间学习主要的MS语言,而不是成为一名通才。很长一段时间没有离开开发社区(如果我曾经离开过的话)让我在潮流中挣扎,我不想落在时代的

  9. c# - 在 C# 中重现 Ruby OpenSSL private_encrypt 输出 - 2

    我有一个简单的Ruby脚本,我用它在某些HTTPheader上执行private_encrypt以签署要发送到ruby​​RESTAPI的Web请求,该API会根据Base64编码字符串测试Base64编码字符串生成而不是解码Base64和解密数据然后测试原始字符串。我使用的脚本是require"openssl"require"base64"path_to_cert=ARGV[0].dupplain_text=Base64.decode64(ARGV[1].dup)private_key=OpenSSL::PKey::RSA.new(File.read(path_to_cert))pu

  10. C# 的 LINQ 用于在 ruby​​ 中等效的集合操作 - 2

    我是ruby​​开发的新手,我目前正在使用rails2.3.11在ruby​​1.8.7中开发一个项目,我想知道这种语言是否有与C#的linq等效的集合操作,例如where子句。谢谢。 最佳答案 Ruby中Linq的where等价于find_all检查documentationfortheEnumerableModule用于其他功能。 关于C#的LINQ用于在ruby​​中等效的集合操作,我们在StackOverflow上找到一个类似的问题: https://

随机推荐