jjzjj

JavaSparkContext 不可序列化

coder 2024-03-16 原文

我将 spark 与 cassandra 一起使用,我有一个 JavaRDD<String>客户。对于每个客户,我想从 cassandra 中选择他这样的交互:

avaPairRDD<String, List<InteractionByMonthAndCustomer>> a = client.mapToPair(new PairFunction<String, String, List<InteractionByMonthAndCustomer>>() {
        @Override
        public Tuple2<String, List<InteractionByMonthAndCustomer>> call(String s) throws Exception {               
            List<InteractionByMonthAndCustomer> b = javaFunctions(sc)
                    .cassandraTable(CASSANDRA_SCHEMA, "interaction_by_month_customer")
                    .where("ctid =?", s)
                    .map(new Function<CassandraRow, InteractionByMonthAndCustomer>() {
                        @Override
                        public InteractionByMonthAndCustomer call(CassandraRow cassandraRow) throws Exception {
                            return new InteractionByMonthAndCustomer(cassandraRow.getString("channel"),
                                    cassandraRow.getString("motif"),
                                    cassandraRow.getDate("start"),
                                    cassandraRow.getDate("end"),
                                    cassandraRow.getString("ctid"),
                                    cassandraRow.getString("month")
                            );
                        }
                    }).collect();
            return new Tuple2<String, List<InteractionByMonthAndCustomer>>(s, b);
        }
    });

为此,我使用了一个 JavaSparkContext sc .但是我得到了这个错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:99)
at org.apache.spark.api.java.JavaRDD.mapToPair(JavaRDD.scala:32)
at fr.aid.cim.spark.dao.GenrateCustumorJourney.AllCleintInteractions(GenrateCustumorJourney.java:91)
at fr.aid.cim.spark.dao.GenrateCustumorJourney.main(GenrateCustumorJourney.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 14 more

我认为 JavaSparkContext 必须是可序列化的。但是我怎样才能让它序列化呢?

谢谢。

最佳答案

不,JavaSparkContext 不是可序列化的,也不应该是。它不能用于您发送给远程工作人员的功能。在这里您没有显式引用它,但无论如何都会对引用进行序列化,因为您的匿名内部类函数不是 static,因此具有对封闭类的引用。

尝试将此函数重写为static 独立对象的代码。

关于JavaSparkContext 不可序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27706813/

有关JavaSparkContext 不可序列化的更多相关文章

  1. ruby - 是否有用于序列化和反序列化各种格式的对象层次结构的模式? - 2

    给定一个复杂的对象层次结构,幸运的是它不包含循环引用,我如何实现支持各种格式的序列化?我不是来讨论实际实现的。相反,我正在寻找可能会派上用场的设计模式提示。更准确地说:我正在使用Ruby,我想解析XML和JSON数据以构建复杂的对象层次结构。此外,应该可以将该层次结构序列化为JSON、XML和可能的HTML。我可以为此使用Builder模式吗?在任何提到的情况下,我都有某种结构化数据-无论是在内存中还是文本中-我想用它来构建其他东西。我认为将序列化逻辑与实际业务逻辑分开会很好,这样我以后就可以轻松支持多种XML格式。 最佳答案 我最

  2. ruby - 在 Ruby 中比较序列 - 2

    假设我必须(小型到中型)阵列:tokens=["aaa","ccc","xxx","bbb","ccc","yyy","zzz"]template=["aaa","bbb","ccc"]如何确定tokens是否以相同的顺序包含template的所有条目?(请注意,在上面的示例中,应忽略第一个“ccc”,从而由于最后一个“ccc”而导致匹配。) 最佳答案 这适用于您的示例数据。tokens=["aaa","ccc","xxx","bbb","ccc","yyy","zzz"]template=["aaa","bbb","ccc"]po

  3. ruby-on-rails - carrierwave:在序列化动态属性上安装 uploader - 2

    首先,我使用的是rails3.1.3和来自master的carrierwavegithub仓库的分支。我使用after_init钩子(Hook)来确定基于属性的字段页面模型实例并为这些字段定义属性访问器将值存储在序列化哈希中(希望它清楚我是什么谈论)。这是我正在做的事情的精简版:classPage省略mount_uploader命令让我可以访问我想要的属性。但是当我安装uploader时出现错误消息说“nil类的未定义新方法”我在源代码中读到有方法read_uploader和扩展模块中的write_uploader。我如何必须覆盖这些来制作mount_uploader命令使用我的“虚拟

  4. 机器学习——时间序列ARIMA模型(四):自相关函数ACF和偏自相关函数PACF用于判断ARIMA模型中p、q参数取值 - 2

    文章目录1、自相关函数ACF2、偏自相关函数PACF3、ARIMA(p,d,q)的阶数判断4、代码实现1、引入所需依赖2、数据读取与处理3、一阶差分与绘图4、ACF5、PACF1、自相关函数ACF自相关函数反映了同一序列在不同时序的取值之间的相关性。公式:ACF(k)=ρk=Cov(yt,yt−k)Var(yt)ACF(k)=\rho_{k}=\frac{Cov(y_{t},y_{t-k})}{Var(y_{t})}ACF(k)=ρk​=Var(yt​)Cov(yt​,yt−k​)​其中分子用于求协方差矩阵,分母用于计算样本方差。求出的ACF值为[-1,1]。但对于一个平稳的AR模型,求出其滞

  5. ruby-on-rails - Rails 编辑序列化的 JSON 数据 - 2

    我有一个存储JSON数据的列。当它处于编辑状态时,我不知道如何显示它。serialize:value,JSON=f.fields_for:valuedo|ff|.form-group=ff.label:short=ff.text_field:short,class:'form-control'.form-group=ff.label:long=ff.text_field:long,class:'form-control' 最佳答案 代替=f.fields_for:valuedo|ff|请使用以下代码:=f.fields_for:va

  6. ruby-on-rails - 序列化时无法将空数组保存到数据库 - 2

    在RubyonRails中,如果数组为空,则具有序列化数组字段的模型将不会在.save()上更新,而它之前有数据。我正在使用:ruby2.2.1rails4.2.1sqlite31.3.10我创建了一个字段设置为文本的新模型:railsgmodel用户名:stringexample:text在我添加的User.rb文件中:serialize:example,Array我实例化了User类的一个新实例:test=User.new然后我保存用户以确保它正确保存:test.save()(0.1ms)begintransactionSQL(0.4ms)INSERTINTO"users"("cr

  7. ruby - 使用 Ruby CSV 创建 Rails 记录,其中字符串字段不可查询 - 2

    我正在尝试将种子数据从CSV文件加载到我的Rails应用程序中。我最初安装了fastercsvgem,却发现从ruby​​1.9开始,fastercsv已被弃用,取而代之的是CSV库。所以在收到一个非常有用的错误告诉我切换后,我切换到CSV。然而,现在我遇到了最奇怪的现象,当我加载数据时一切看起来都很正常,但我似乎无法查询字符串字段。字符串字段由看似正确的字符串填充,但我无法访问它们。我可以查询任何数字字段,结果将返回,但不会返回字符串字段。我尝试使用引号的定界符,但无济于事。我什至从我的csv文件中删除了所有引号,但我仍然无法查询字符串字段。下面是我的代码,以及一些来自Rails控制

  8. ruby - IO::EAGAINWaitReadable:资源暂时不可用 - 读取会阻塞 - 2

    当我尝试使用“套接字”库中的方法“read_nonblock”时出现以下错误IO::EAGAINWaitReadable:Resourcetemporarilyunavailable-readwouldblock但是当我通过终端上的IRB尝试时它工作正常如何让它读取缓冲区? 最佳答案 IgetthefollowingerrorwhenItrytousethemethod"read_nonblock"fromthe"socket"library当缓冲区中的数据未准备好时,这是预期的行为。由于异常IO::EAGAINWaitReadab

  9. ruby - 加载使用 YAML 序列化的对象时调用初始化 - 2

    是否可以在使用YAML.load_file时强制Ruby调用初始化方法?我想调用该方法以便为我不序列化的实例变量提供值。我知道我可以将代码分解成一个单独的方法并在调用YAML.load_file之后调用该方法,但我想知道是否有更优雅的方法来处理这个问题。 最佳答案 我认为你做不到。由于您要添加的代码确实特定于要反序列化的类,因此您应该考虑在类中添加该功能。例如,让Foo成为您要反序列化的类,您可以添加一个类方法,例如:classFoodefself.from_yaml(yaml)foo=YAML::load(yaml)#editth

  10. ruby-on-rails - FactoryGirl工厂特征内的序列不使用主序列计数器 - 2

    我有以下工厂:FactoryGirl.definedofactory:foodosequence(:name){|n|"Foo#{n}"}trait:ydosequence(:name){|n|"Fooy#{n}"}endendend如果我跑create:foocreate:foocreate:foo,:y我得到Foo1,Foo2,Fooy1。但我想要Foo1,Foo2,Fooy3。我怎样才能做到这一点? 最佳答案 经过smile2day'sanswer的一些提示后和thisanswer,我得出以下解决方案:FactoryGirl.

随机推荐