我正在尝试训练一个 Apache Spark 应用程序,它应该在 MongoDB 数据库上运行聚合查询并写回结果。我能够解决问题的 Java 版本,但现在需要使用 RStudio 将其移植到 R 语言。
有效的 Java 版本:-
public static void main(String args[]) {
SparkConf sparkConf = new SparkConf(true)
.setMaster("local[*]")
.setSparkHome(SPARK_HOME)
.setAppName("SparklingMongoApp")
.set("spark.ui.enabled", "false")
.set("spark.app.id", APP)
.set("spark.mongodb.input.uri", "mongodb://admin:password@host:27017/input_collection")
.set("spark.mongodb.output.uri", "mongodb://admin:password@host:27017/output_collection");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaMongoRDD<Document> javaMongoRDD = MongoSpark.load(javaSparkContext);
Dataset<Row> dataset = javaMongoRDD.toDF();
dataset.createOrReplaceTempView(TEMP_VIEW);
// a valid spark sql QUERY
Dataset<Row> computedDataSet = dataset.sqlContext().sql(QUERY);
MongoSpark.save(computedDataSet);
javaSparkContext.close();
我正在尝试锻炼的等效 R/RStudio 版本:-
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
##PROBLEM - Is this correct way of setting configuration?
sparkConfig <- list("spark.driver.memory"="1g","spark.mongodb.input.uri"="mongodb://username:password@localhost:27017/price_subset?authSource=admin","spark.mongodb.output.uri"="mongodb://username:password@localhost:27017/price_subset_output?authSource=admin")
customSparkPackages <- c("org.mongodb.spark:mongo-spark1-connector_2.11:1.0.0");
##Starting Up: SparkSession
##PROBLEM-1 Is this correct way of initializing spark session ?
sparkSession <- sparkR.session(appName="MongoSparkConnectorTour",master = "local[*]",enableHiveSupport = FALSE,sparkConfig = sparkConfig,sparkPackages = customSparkPackages)
##PROBLEM-2 - This complains about being deprecated. How to fix this ?
sqlContext <- sparkRSQL.init(sparkSession)
## Save some data
charactersRdf <- data.frame(list(name=c("Bilbo Baggins", "Gandalf", "Thorin", "Balin", "Kili", "Dwalin", "Oin", "Gloin", "Fili", "Bombur"),
age=c(50, 1000, 195, 178, 77, 169, 167, 158, 82, NA)))
charactersSparkdf <- createDataFrame(sqlContext, charactersRdf)
#PROBLEM-3 This throws an error - Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
# java.lang.NoClassDefFoundError: com/mongodb/ConnectionString
write.df(charactersSparkdf, "", source = "com.mongodb.spark.sql.DefaultSource", mode = "overwrite")
我尝试关注 SparkR文档,但仍然无法锻炼运行示例。
期望:-
在 RStudio 中初始化 spark session 的正确方法是什么。 MongoDB official sample不适用于我,因为它仅适用于 SparkShell(它卡在我的机器上)并且已弃用。我想要可以在 RStudio 中运行的代码片段。
如何修复 java.lang.NoClassDefFoundError。
任何有关 SparkR 2.x + MongoDB 3.x 代码的示例/引用都将受到高度赞赏。
版本:- Apache 星火 - 2.0.1 java - 1.8 MongoDB-3 R - 最新的
最佳答案
终于成功了。原来 MongoDB 文档有 Spark 1.6 的示例,而我运行的是 Spark 2.0.1。
无论如何,这就是我使用 RStudio 的方法:-
## Make sure you have SPARK_HOME environment variable set to your spark home director.
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
spark <- sparkR.session(master="local[*]", appName = "mongoSparkR",enableHiveSupport = FALSE,sparkPackages = c("org.mongodb.spark:mongo-spark-connector_2.11:2.0.0-rc0"),sparkConfig = list(spark.mongodb.input.uri="mongodb://username:password@hostname:27017/database.collection_name?authSource=admin",spark.mongodb.output.uri="mongodb://username:password@hostname:27017/database.collection_name_output?authSource=admin"))
pricing_df <- read.df(source = "com.mongodb.spark.sql.DefaultSource",x=10000)
head(pricing_df)
createOrReplaceTempView(pricing_df,"T_YOUR_TABLE")
## Obviously this is just a dummy SQL, replace with it yours.
result_df <- sql("SELECT year(price) as YEAR, month(price) as MONTH , SUM(midPrice) as SUM_PRICING_DATA FROM T_YOUR_TABLE GROUP BY year(price),month(price) ORDER BY year(price),month(price)")
## stop instance when done.
sparkR.stop()
确保您的 SPARK_HOME/jars 文件夹中有依赖的 jar。
我放置的额外 jars(版本可能会随着时间的推移而变化):-
org.mongodb.spark_mongo-spark-connector_2.11-2.0.0-rc0.jar
org.mongodb_mongo-java-driver-3.2.2.jar
关于r - 使用 MongoDB 和 RStudio 的 SparkR 2.x 应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40084644/
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看rubyzip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于
我正在尝试使用ruby和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我
我需要在客户计算机上运行Ruby应用程序。通常需要几天才能完成(复制大备份文件)。问题是如果启用sleep,它会中断应用程序。否则,计算机将持续运行数周,直到我下次访问为止。有什么方法可以防止执行期间休眠并让Windows在执行后休眠吗?欢迎任何疯狂的想法;-) 最佳答案 Here建议使用SetThreadExecutionStateWinAPI函数,使应用程序能够通知系统它正在使用中,从而防止系统在应用程序运行时进入休眠状态或关闭显示。像这样的东西:require'Win32API'ES_AWAYMODE_REQUIRED=0x0
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
对于具有离线功能的智能手机应用程序,我正在为Xml文件创建单向文本同步。我希望我的服务器将增量/差异(例如GNU差异补丁)发送到目标设备。这是计划:Time=0Server:hasversion_1ofXmlfile(~800kiB)Client:hasversion_1ofXmlfile(~800kiB)Time=1Server:hasversion_1andversion_2ofXmlfile(each~800kiB)computesdeltaoftheseversions(=patch)(~10kiB)sendspatchtoClient(~10kiBtransferred)Cl