jjzjj

r - 使用 MongoDB 和 RStudio 的 SparkR 2.x 应用程序

coder 2023-11-04 原文

我正在尝试训练一个 Apache Spark 应用程序,它应该在 MongoDB 数据库上运行聚合查询并写回结果。我能够解决问题的 Java 版本,但现在需要使用 RStudio 将其移植到 R 语言。

有效的 Java 版本:-

public static void main(String args[]) {

SparkConf sparkConf = new SparkConf(true)
        .setMaster("local[*]")
        .setSparkHome(SPARK_HOME)
        .setAppName("SparklingMongoApp")
        .set("spark.ui.enabled", "false")
        .set("spark.app.id", APP)
        .set("spark.mongodb.input.uri", "mongodb://admin:password@host:27017/input_collection")
        .set("spark.mongodb.output.uri", "mongodb://admin:password@host:27017/output_collection");


JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaMongoRDD<Document> javaMongoRDD = MongoSpark.load(javaSparkContext);

Dataset<Row> dataset = javaMongoRDD.toDF();

dataset.createOrReplaceTempView(TEMP_VIEW);

// a valid spark sql QUERY
Dataset<Row> computedDataSet = dataset.sqlContext().sql(QUERY);
MongoSpark.save(computedDataSet);
javaSparkContext.close();

我正在尝试锻炼的等效 R/RStudio 版本:-

library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))

##PROBLEM - Is this correct way of setting configuration?
sparkConfig <- list("spark.driver.memory"="1g","spark.mongodb.input.uri"="mongodb://username:password@localhost:27017/price_subset?authSource=admin","spark.mongodb.output.uri"="mongodb://username:password@localhost:27017/price_subset_output?authSource=admin")

customSparkPackages <- c("org.mongodb.spark:mongo-spark1-connector_2.11:1.0.0");

##Starting Up: SparkSession
##PROBLEM-1 Is this correct way of initializing spark session ?
sparkSession <- sparkR.session(appName="MongoSparkConnectorTour",master = "local[*]",enableHiveSupport = FALSE,sparkConfig = sparkConfig,sparkPackages = customSparkPackages)


##PROBLEM-2 - This complains about being deprecated. How to fix this ?
sqlContext <- sparkRSQL.init(sparkSession)

## Save some data
charactersRdf <- data.frame(list(name=c("Bilbo Baggins", "Gandalf", "Thorin", "Balin", "Kili", "Dwalin", "Oin", "Gloin", "Fili", "Bombur"),
                                 age=c(50, 1000, 195, 178, 77, 169, 167, 158, 82, NA)))

charactersSparkdf <- createDataFrame(sqlContext, charactersRdf)
#PROBLEM-3 This throws an error - Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : 
#  java.lang.NoClassDefFoundError: com/mongodb/ConnectionString
write.df(charactersSparkdf, "", source = "com.mongodb.spark.sql.DefaultSource", mode = "overwrite")

我尝试关注 SparkR文档,但仍然无法锻炼运行示例。

期望:-

  1. 在 RStudio 中初始化 spark session 的正确方法是什么。 MongoDB official sample不适用于我,因为它仅适用于 SparkShell(它卡在我的机器上)并且已弃用。我想要可以在 RStudio 中运行的代码片段。

  2. 如何修复 java.lang.NoClassDefFoundError。

任何有关 SparkR 2.x + MongoDB 3.x 代码的示例/引用都将受到高度赞赏。

版本:- Apache 星火 - 2.0.1 java - 1.8 MongoDB-3 R - 最新的

最佳答案

终于成功了。原来 MongoDB 文档有 Spark 1.6 的示例,而我运行的是 Spark 2.0.1。

无论如何,这就是我使用 RStudio 的方法:-

 ## Make sure you have SPARK_HOME environment variable set to your spark home director.
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))

spark <- sparkR.session(master="local[*]", appName = "mongoSparkR",enableHiveSupport = FALSE,sparkPackages = c("org.mongodb.spark:mongo-spark-connector_2.11:2.0.0-rc0"),sparkConfig = list(spark.mongodb.input.uri="mongodb://username:password@hostname:27017/database.collection_name?authSource=admin",spark.mongodb.output.uri="mongodb://username:password@hostname:27017/database.collection_name_output?authSource=admin"))

pricing_df <- read.df(source = "com.mongodb.spark.sql.DefaultSource",x=10000)
head(pricing_df)
createOrReplaceTempView(pricing_df,"T_YOUR_TABLE")

 ## Obviously this is just a dummy SQL, replace with it yours.
result_df <- sql("SELECT year(price) as YEAR, month(price) as MONTH , SUM(midPrice) as SUM_PRICING_DATA FROM T_YOUR_TABLE GROUP BY year(price),month(price)  ORDER BY year(price),month(price)")


 ## stop instance when done.
sparkR.stop()

确保您的 SPARK_HOME/jars 文件夹中有依赖的 jar。

我放置的额外 jars(版本可能会随着时间的推移而变化):-

org.mongodb.spark_mongo-spark-connector_2.11-2.0.0-rc0.jar

org.mongodb_mongo-java-driver-3.2.2.jar

关于r - 使用 MongoDB 和 RStudio 的 SparkR 2.x 应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40084644/

有关r - 使用 MongoDB 和 RStudio 的 SparkR 2.x 应用程序的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  4. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  5. ruby - 在 Ruby 中使用匿名模块 - 2

    假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于

  6. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  7. ruby - 在 Ruby 程序执行时阻止 Windows 7 PC 进入休眠状态 - 2

    我需要在客户计算机上运行Ruby应用程序。通常需要几天才能完成(复制大备份文件)。问题是如果启用sleep,它会中断应用程序。否则,计算机将持续运行数周,直到我下次访问为止。有什么方法可以防止执行期间休眠并让Windows在执行后休眠吗?欢迎任何疯狂的想法;-) 最佳答案 Here建议使用SetThreadExecutionStateWinAPI函数,使应用程序能够通知系统它正在使用中,从而防止系统在应用程序运行时进入休眠状态或关闭显示。像这样的东西:require'Win32API'ES_AWAYMODE_REQUIRED=0x0

  8. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  9. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  10. ruby - 将差异补丁应用于字符串/文件 - 2

    对于具有离线功能的智能手机应用程序,我正在为Xml文件创建单向文本同步。我希望我的服务器将增量/差异(例如GNU差异补丁)发送到目标设备。这是计划:Time=0Server:hasversion_1ofXmlfile(~800kiB)Client:hasversion_1ofXmlfile(~800kiB)Time=1Server:hasversion_1andversion_2ofXmlfile(each~800kiB)computesdeltaoftheseversions(=patch)(~10kiB)sendspatchtoClient(~10kiBtransferred)Cl

随机推荐