jjzjj

java - Scala Actor 效率低下问题

coder 2024-03-31 原文

首先让我说我是 Scala 的新手;但是,我发现基于 Actor 的并发模型很有趣,并且我尝试将其用于一个相对简单的应用程序。我遇到的问题是,尽管我能够让应用程序运行,但结果(在实时、CPU 时间和内存使用方面)的效率远低于基于 Java 的等效解决方案使用从 ArrayBlockingQueue 中提取消息的线程。我想明白为什么。我怀疑这可能是我缺乏 Scala 知识,并且我造成了所有的低效率,但在多次尝试重新设计应用程序但没有成功之后,我决定向社区寻求帮助。

我的问题是: 我有一个包含许多行的 gzip 文件,格式为:

SomeID comma_separated_list_of_values

例如:

1234 12,45,82

我想解析每一行并获得逗号分隔列表中每个值出现次数的总计数。

此文件可能非常大(压缩了数 GB),但每个文件的唯一值数量非常少(最多 500 个)。我认为这将是尝试编写基于 Actor 的并发 Scala 应用程序的好机会。我的解决方案涉及一个创建解析器 Actors 池的主要驱动程序。然后主驱动程序从 stdin 读取行,将行传递给解析行并保留值的本地计数的 Actor。当主驱动程序读完最后一行时,它会向每个参与者传递一条消息,指示所有行都已读完。当 Actor 收到“完成”消息时,他们将他们的计数传递给聚合器,该聚合器将所有 Actor 的计数相加。汇总所有解析器的计数后,主驱动程序将打印出统计信息。

问题: 我遇到的主要问题是这个应用程序的效率低得令人难以置信。与使用线程和 ArrayBlockingQueue 的“等效”Java 应用程序相比,它使用的 CPU 和内存要多得多。为了正确看待这一点,这里是我为 1000 万行测试输入文件收集的一些统计数据:

Scala 1 Actor(解析器):

    real    9m22.297s
    user    235m31.070s
    sys     21m51.420s

Java 1 线程(解析器):

    real    1m48.275s
    user    1m58.630s
    sys     0m33.540s

Scala 5 Actor :

    real    2m25.267s
    user    63m0.730s
    sys     3m17.950s

Java 5 线程:

    real    0m24.961s
    user    1m52.650s
    sys     0m20.920s

此外,top 报告说 Scala 应用程序的常驻内存大小约为 10 倍。因此,我们在这里讨论的是数量级更多的 CPU 和内存,但性能却差了几个数量级,我只是想不通是什么原因造成的。这是 GC 问题,还是我以某种方式创建了比我意识到的多得多的对象副本?

可能重要也可能不重要的其他详细信息:

  • scala 应用程序由 Java 类包装,这样我就可以 交付一个独立的可执行 JAR 文件(我没有 Scala 我可能想要运行此应用程序的每台机器上的 jars)。
  • 应用程序被调用如下:gunzip -c gzFilename | java -jar StatParser.jar

代码如下:

主要驱动因素:

import scala.actors.Actor._
import scala.collection.{ immutable, mutable }
import scala.io.Source

class StatCollector (numParsers : Int ) {
    private val parsers = new mutable.ArrayBuffer[StatParser]()
    private val aggregator = new StatAggregator()

    def generateParsers {
        for ( i <- 1 to numParsers ) {
            val parser = new StatParser( i, aggregator )
            parser.start
            parsers += parser
        }
    }


    def readStdin {
        var nextParserIdx = 0
        var lineNo = 1
        for ( line <- Source.stdin.getLines() ) {
            parsers( nextParserIdx ) ! line
            nextParserIdx += 1
            if ( nextParserIdx >= numParsers ) {
                nextParserIdx = 0
            }
            lineNo += 1
        }
    }

    def informParsers {
        for ( parser <- parsers ) {
            parser ! true
        }
    }

    def printCounts {
        val countMap = aggregator.getCounts()
        println( "ID,Count" )
        /*
        for ( key <- countMap.keySet ) {
            println( key + "," + countMap.getOrElse( key, 0 ) )
            //println( "Campaign '" + key + "': " + countMap.getOrElse( key, 0 ) )
        }
        */
        countMap.toList.sorted foreach {
            case (key, value) =>
                println( key + "," + value )
        }
    }

    def processFromStdIn {
        aggregator.start

        generateParsers

        readStdin
        process
    }

    def process {

        informParsers

        var completedParserCount = aggregator.getNumParsersAggregated
        while ( completedParserCount < numParsers ) {
            Thread.sleep( 250 )
            completedParserCount = aggregator.getNumParsersAggregated
        }

        printCounts
    }
}

解析器 Actor :

import scala.actors.Actor
import collection.mutable.HashMap
import scala.util.matching

class StatParser( val id: Int, val aggregator: StatAggregator ) extends Actor {

    private var countMap = new HashMap[String, Int]()
    private val sep1 = "\t"
    private val sep2 = ","


    def getCounts(): HashMap[String, Int] = {
        return countMap
    }

    def act() {
        loop {
            react {
                case line: String =>
                    {
                        val idx = line.indexOf( sep1 )
                        var currentCount = 0
                        if ( idx > 0 ) {
                            val tokens = line.substring( idx + 1 ).split( sep2 )
                            for ( token <- tokens ) {
                                if ( !token.equals( "" ) ) {
                                    currentCount = countMap.getOrElse( token, 0 )
                                    countMap( token ) = ( 1 + currentCount )
                                }
                            }

                        }
                    }
                case doneProcessing: Boolean =>
                    {
                        if ( doneProcessing ) {
                            // Send my stats to Aggregator
                            aggregator ! this
                        }
                    }
            }
        }
    }
}

聚合器参与者:

import scala.actors.Actor
import collection.mutable.HashMap

class StatAggregator extends Actor {
    private var countMap = new HashMap[String, Int]()
    private var parsersAggregated = 0

    def act() {
        loop {
            react {
                case parser: StatParser =>
                    {
                        val cm = parser.getCounts()
                        for ( key <- cm.keySet ) {
                            val currentCount = countMap.getOrElse( key, 0 )
                            val incAmt = cm.getOrElse( key, 0 )
                            countMap( key ) = ( currentCount + incAmt )
                        }
                        parsersAggregated += 1
                    }
            }
        }
    }

    def getNumParsersAggregated: Int = {
        return parsersAggregated
    }

    def getCounts(): HashMap[String, Int] = {
        return countMap
    }
}

如果能提供任何有助于理解这里发生的事情的帮助,我们将不胜感激。

提前致谢!

---- 编辑 ---

由于许多人响应并要求提供 Java 代码,这里是我创建的简单 Java 应用程序,用于比较目的。我意识到这不是很好的 Java 代码,但是当我看到 Scala 应用程序的性能时,我只是快速地想出了一些东西来查看基于 Java 线程的实现如何作为基线执行:

解析线程:

import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class JStatParser extends Thread
{
    private ArrayBlockingQueue<String> queue;
    private Map<String, Integer> countMap;
    private boolean done;

    public JStatParser( ArrayBlockingQueue<String> q )
    {
        super( );
        queue = q;
        countMap = new Hashtable<String, Integer>( );
        done = false;
    }

    public Map<String, Integer> getCountMap( )
    {
        return countMap;
    }

    public void alldone( )
    {
        done = true;
    }

    @Override
    public void run( )
    {
        String line = null;
        while( !done || queue.size( ) > 0 )
        {
            try
            {
                // line = queue.take( );
                line = queue.poll( 100, TimeUnit.MILLISECONDS );
                if( line != null )
                {
                    int idx = line.indexOf( "\t" ) + 1;
                    for( String token : line.substring( idx ).split( "," ) )
                    {
                        if( !token.equals( "" ) )
                        {
                            if( countMap.containsKey( token ) )
                            {
                                Integer currentCount = countMap.get( token );
                                currentCount++;
                                countMap.put( token, currentCount );
                            }
                            else
                            {
                                countMap.put( token, new Integer( 1 ) );
                            }
                        }
                    }
                }
            }
            catch( InterruptedException e )
            {
                // TODO Auto-generated catch block
                System.err.println( "Failed to get something off the queue: "
                        + e.getMessage( ) );
                e.printStackTrace( );
            }
        }
    }
}

司机:

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;

public class JPS
{
    public static void main( String[] args )
    {
        if( args.length <= 0 || args.length > 2 || args[0].equals( "-?" ) )
        {
            System.err.println( "Usage: JPS [filename]" );
            System.exit( -1 );
        }

        int numParsers = Integer.parseInt( args[0] );
        ArrayBlockingQueue<String> q = new ArrayBlockingQueue<String>( 1000 );
        List<JStatParser> parsers = new ArrayList<JStatParser>( );

        BufferedReader reader = null;

        try
        {
            if( args.length == 2 )
            {
                reader = new BufferedReader( new FileReader( args[1] ) );
            }
            else
            {
                reader = new BufferedReader( new InputStreamReader( System.in ) );
            }

            for( int i = 0; i < numParsers; i++ )
            {
                JStatParser parser = new JStatParser( q );
                parser.start( );
                parsers.add( parser );
            }

            String line = null;
            while( (line = reader.readLine( )) != null )
            {
                try
                {
                    q.put( line );
                }
                catch( InterruptedException e )
                {
                    // TODO Auto-generated catch block
                    System.err.println( "Failed to add line to q: "
                            + e.getMessage( ) );
                    e.printStackTrace( );
                }
            }

            // At this point, we've put everything on the queue, now we just
            // need to wait for it to be processed.
            while( q.size( ) > 0 )
            {
                try
                {
                    Thread.sleep( 250 );
                }
                catch( InterruptedException e )
                {
                }
            }

            Map<String,Integer> countMap = new Hashtable<String,Integer>( );
            for( JStatParser jsp : parsers )
            {
                jsp.alldone( );
                Map<String,Integer> cm = jsp.getCountMap( );
                for( String key : cm.keySet( ) )
                {
                    if( countMap.containsKey( key ))
                    {
                        Integer currentCount = countMap.get(  key );
                        currentCount += cm.get( key );
                        countMap.put( key, currentCount );
                    }
                    else
                    {
                        countMap.put(  key, cm.get( key ) );
                    }
                }
            }

            System.out.println( "ID,Count" );
            for( String key : new TreeSet<String>(countMap.keySet( ))  )
            {
                System.out.println( key + "," + countMap.get( key ) );
            }

            for( JStatParser parser : parsers )
            {
                try
                {
                    parser.join( 100 );
                }
                catch( InterruptedException e )
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            System.exit(  0  );
        }
        catch( IOException e )
        {
            System.err.println( "Caught exception: " + e.getMessage( ) );
            e.printStackTrace( );
        }
    }
}

最佳答案

我不确定这对 Actor 来说是不是一个很好的测试用例。一方面, Actor 之间几乎没有互动。这是一个简单的 map/reduce,它需要并行性,而不是并发性。

actors 的开销也很重,我不知道实际分配了多少线程。根据您拥有的处理器数量,您的线程可能比 Java 程序少——这似乎是事实,因为加速是 4 倍而不是 5 倍。

并且您编写 actors 的方式针对 idle actors 进行了优化,这种情况下您有成百上千的 actors,但在任何时候只有少数人在做实际工作。如果您使用 while/receive 而不是 loop/react 编写 actors,它们的表现会更好。

现在,actor 可以很容易地将应用程序分发到多台计算机上,除非您违反了 actor 的一项原则:您正在调用 actor 对象的方法。你永远不应该对 Actor 这样做,事实上,Akka 会阻止你这样做。一种更类似于 Actor 的方式是聚合器向每个 Actor 询问他们的 key 集,计算他们的并集,然后对于每个 key ,要求所有 Actor 发送他们对该 key 的计数。

不过,我不确定头顶上的 actor 是否就是您所看到的。您没有提供有关 Java 实现的信息,但我敢说您使用了可变映射,甚至可能使用了一个并发可变映射——与您在 Scala 中所做的实现截然不同。

也没有关于如何读取文件(如此大的文件可能存在缓冲问题)或如何在 Java 中对其进行解析的信息。由于大部分工作是读取和解析文件,而不是计算 token ,因此实现中的差异可以轻松克服任何其他问题。

最后,关于驻留内存大小,Scala 有一个 9 MB 的库(除了 JVM 带来的),这可能就是您所看到的。当然,如果您在 Java 中使用单个并发映射与在 Scala 中使用 6 个不可变映射,那肯定会在内存使用模式上产生很大差异。

关于java - Scala Actor 效率低下问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11725237/

有关java - Scala Actor 效率低下问题的更多相关文章

  1. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

  2. ruby - 通过 rvm 升级 ruby​​gems 的问题 - 2

    尝试通过RVM将RubyGems升级到版本1.8.10并出现此错误:$rvmrubygemslatestRemovingoldRubygemsfiles...Installingrubygems-1.8.10forruby-1.9.2-p180...ERROR:Errorrunning'GEM_PATH="/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/ruby-1.9.2-p180@global:/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/rub

  3. ruby - 通过 RVM (OSX Mountain Lion) 安装 Ruby 2.0.0-p247 时遇到问题 - 2

    我的最终目标是安装当前版本的RubyonRails。我在OSXMountainLion上运行。到目前为止,这是我的过程:已安装的RVM$\curl-Lhttps://get.rvm.io|bash-sstable检查已知(我假设已批准)安装$rvmlistknown我看到当前的稳定版本可用[ruby-]2.0.0[-p247]输入命令安装$rvminstall2.0.0-p247注意:我也试过这些安装命令$rvminstallruby-2.0.0-p247$rvminstallruby=2.0.0-p247我很快就无处可去了。结果:$rvminstall2.0.0-p247Search

  4. ruby - Fast-stemmer 安装问题 - 2

    由于fast-stemmer的问题,我很难安装我想要的任何ruby​​gem。我把我得到的错误放在下面。Buildingnativeextensions.Thiscouldtakeawhile...ERROR:Errorinstallingfast-stemmer:ERROR:Failedtobuildgemnativeextension./System/Library/Frameworks/Ruby.framework/Versions/2.0/usr/bin/rubyextconf.rbcreatingMakefilemake"DESTDIR="cleanmake"DESTDIR=

  5. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  6. ruby - 安装 Ruby 时遇到问题(无法下载资源 "readline--patch") - 2

    当我尝试安装Ruby时遇到此错误。我试过查看this和this但无济于事➜~brewinstallrubyWarning:YouareusingOSX10.12.Wedonotprovidesupportforthispre-releaseversion.Youmayencounterbuildfailuresorotherbreakages.Pleasecreatepull-requestsinsteadoffilingissues.==>Installingdependenciesforruby:readline,libyaml,makedepend==>Installingrub

  7. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  8. ruby-on-rails - 简单的 Ruby on Rails 问题——如何将评论附加到用户和文章? - 2

    我意识到这可能是一个非常基本的问题,但我现在已经花了几天时间回过头来解决这个问题,但出于某种原因,Google就是没有帮助我。(我认为部分问题在于我是一个初学者,我不知道该问什么......)我也看过O'Reilly的RubyCookbook和RailsAPI,但我仍然停留在这个问题上.我找到了一些关于多态关系的信息,但它似乎不是我需要的(尽管如果我错了请告诉我)。我正在尝试调整MichaelHartl'stutorial创建一个包含用户、文章和评论的博客应用程序(不使用脚手架)。我希望评论既属于用户又属于文章。我的主要问题是:我不知道如何将当前文章的ID放入评论Controller。

  9. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  10. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

随机推荐