jjzjj

DataFrame

全部标签

java - 在 Java 的 Spark Dataframe 中将 CSV 值转换为 Vector

我有一个包含两列的CSV文件id,featuresid列是一个字符串,features列是以逗号分隔的机器学习算法的特征值列表,即。“[1,4,5]”我基本上只需要在值上调用Vectors.parse()来获取vector,但我不想先转换为RDD。我想将其放入SparkDataframe,其中features列是org.apache.spark.mllib.linalg.Vector我正在使用databrickscsvapi将其读入数据框,并尝试将特征列转换为vector。有人知道如何在Java中执行此操作吗? 最佳答案 我找到了一

python - 过滤器生成的 PySpark DataFrame - 它存储在哪里?

对于任何软件架构师来说,这可能是一个基本问题,但我对这个概念感到困惑。假设我有一个存储在hdfs上的大型SparkDataFrame。我现在做这样的过滤操作:df_new=my_big_hdfs_df.where("my_column='testvalue'")print(type(df_new))class'pyspark.sql.dataframe.DataFrame'>df_new到底存储在哪里?如果这是普通的python,我会猜测在内存中的某个地方。但PySpark也是如此吗?或者它只是某种引用?它是否保存在hdfs中某处的磁盘上? 最佳答案

r - 在 sparklyr 中断开连接后,spark 数据帧是否会自动删除?如果没有,我们该怎么做?

在关闭连接时,以下列方式复制到spark的数据帧会发生什么情况?library(sparklyr)library(dplyr)sc如果它们没有被自动删除,除了按以下方式删除每个数据帧之外,是否有任何简单的方法可以删除session期间创建的所有数据帧?sc%>%spark_session()%>%invoke("catalog")%>%invoke("dropTempView","iris")即使它是自动完成的,当spark看到有必要清理临时View时,它是立即完成还是延迟完成?我有一个脚本,它不断调用spark并将临时数据帧复制到spark中以进行一些操作。如果最终没有删除,我担心那

scala - 尝试从 UDF 执行 spark sql 查询

我正在尝试使用scala在spark框架中编写一个内联函数,它将接受一个字符串输入,执行一个sql语句并返回一个字符串值valtestfunc:(String=>String)=(arg1:String)=>{valk=sqlContext.sql("""selectc_codefromr_c_tblwherex_nm="something"""")k.head().getString(0)}我正在将此Scala函数注册为UDFvaltestFunc_test=udf(testFunc)我在配置单元表上有一个数据框valdf=sqlContext.table("some_table")

python - 使用 .csv 格式的 HDFS 文件创建 Pandas DataFrame

我正在尝试通过从hadoop集群获取.csv数据并将其放入PandasDataFrame来创建Spark工作流。我能够从HDFS中提取数据并将其放入RDD中,但无法将其处理到PandasDataframe中。以下是我的代码:importpandasaspdimportnumpyasnmA=sc.textFile("hdfs://localhost:9000/sales_ord_univ.csv")#thiscreatestheRDDB=pd.DataFrame(A)#thisgivesmethefollowingerror:pandas.core.common.PandasError:

java - 使用 Spark Dataframe 的 Hive 分区中缺少日期前导零

我正在向SparkDataframe添加一个分区列。新列包含年月日。我的数据框中有一个时间戳列。DataFramedfPartition=df.withColumn("year",df.col("date").substr(0,4));dfPartition=dfPartition.withColumn("month",dfPartition.col("date").substr(6,2));dfPartition=dfPartition.withColumn("day",dfPartition.col("date").substr(9,2));当我输出数据帧时,我可以看到列的正确值,

python - 来自 Hive 查询的持久 PySpark Dataframe

我正在从Hive表中获取一些数据:df=sqlContext.sql('selectshubiru,datefromthebigtablebtwherebt.num>10')df.show()#herethequeryisprocessedandtheresultsshown而且一切正常。现在我想对df进行操作,但是每次我对df进行操作时,它都会再次运行针对Hive的查询:importpyspark.sql.functionsasfuncfromdatetimeimportdatetimefrompyspark.sql.typesimportTimestampTypedt_udt=fu

scala - Spark DataFrame 并行性

下面是我使用ApacheSpark的用例1)我在HDFS上有大约2500个Parquet文件,文件大小因文件而异。2)我需要处理每个parquet文件并构建一个新的DataFrame并将一个新的DataFrame写入orc文件格式。3)我的Spark驱动程序是这样的。我正在迭代每个文件,处理单个Parquet文件,创建一个新的DataFrame并将一个新的DataFrame编写为ORC,下面是代码片段。valfs=FileSystem.get(newConfiguration())valparquetDFMap=fs.listStatus(newPath(inputFilePath))

python - Hadoop MapReduce(使用 Python)在 Pandas DataFrame 上启动 KeyError

我正在尝试使用MapReduce处理数据帧。我最初为映射器创建了脚本并尝试从本地终端运行它,它工作正常:映射器.pyimportsysimportstringimportpandasaspddf=pd.read_csv(sys.stdin)#cleaningrelevantfieldsdf['Time']=pd.to_datetime(df['Time'],unit='s').apply(lambdax:x.year)df['Summary']=df['Summary'].str.lower()df['Summary']=df['Summary'].str.replace('[{}]'

scala - 在 Spark (HDFS) 中写入 CSV 文件时选择哪个选项?

我必须比较CSV文件,然后我必须删除所有重复的行。所以,我的情况就像我有一个文件夹,我必须将每个过滤结果放在该文件夹中,当一些新文件出现时,我必须将文件夹中的现有文件与新文件进行比较,最后,我必须把将结果返回到同一文件夹。eg:/data/ingestion/file1.csva1b1c1a2b2c2a3b3c3/data/ingestion/file2.csva4b4c4a5b5c5a6b6c6newupcomingfile(upcoming_file.csv):a1b1c1a5b5c5a7b7c7现在我的方法是从/data/ingestion/*中存在的所有文件创建一个数据帧。然后