jjzjj

Python小案例(十)利用PySpark循环写入数据

HsuHeinrich 2024-07-14 原文

Python小案例(十)利用PySpark循环写入数据

在做数据分析的时候,往往需要回溯历史数据。但有时候构建历史数据时需要变更参数重复跑数,公司的数仓调度系统往往只支持日期这一个参数,而且为临时数据生产调度脚本显得有点浪费。这个时候就可以结合python的字符串格式化和PySpark的Hive写入,就可以完成循环写入临时数据。

⚠️注意:以下需要在企业服务器上的jupyter上操作,本地jupyter是无法连接企业hive集群的

案例一:多参数循环写入临时表

案例背景:写入每天的热搜数据,热搜类型分为当日、近1日、近2日、近3日。这里为了方便,简化了循环的力度。

from pyspark.sql import *
# spark配置
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.executor.instances", "20") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .enableHiveSupport() \
    .getOrCreate()

# 导入其他相关库
import pandas as pd
from datetime import datetime
# sql创建临时表
sql_create = '''
CREATE TABLE temp.loop_write_example
    (
        cnt string comment "近n日cnt"
    )
PARTITIONED BY (`point_date` string, `dtype` int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''

spark.sql(sql_create)
DataFrame[]

构造日期'{dt}'和热搜类型{num}两个参数

# sql写入临时表
sql_insert = '''
insert overwrite table temp.loop_write_example partition (point_date = '{dt}',dtype={num})


select
    sum(if(dt between date_add('{dt}',-{num}) and '{dt}',cnt,null)) as cnt
from
    temp.loop_write_example_fake_data
where
    dt between date_add('{dt}',-4) and '{dt}'
'''
dates = pd.date_range('2021-01-01','2021-01-10').strftime("%Y-%m-%d").to_list() # 日期范围
# 循环写入临时表
for point_date in dates:
    if point_date>='2021-01-01' and point_date<'2021-01-03':
        for dtype in range(0,4):
            start_time = datetime.now()
            spark.sql(sql_insert.format(dt=point_date, num=dtype))
            end_time=datetime.now()
            print (point_date, dtype, "succeed", '耗时'+str((end_time-start_time).seconds)+'秒')
2021-01-01 0 succeed 耗时8秒
2021-01-01 1 succeed 耗时7秒
2021-01-01 2 succeed 耗时8秒
2021-01-01 3 succeed 耗时8秒
2021-01-02 0 succeed 耗时8秒
2021-01-02 1 succeed 耗时8秒
2021-01-02 2 succeed 耗时8秒
2021-01-02 3 succeed 耗时8秒

案例二:并发批量写入hdfs

案例背景:将2亿+题目按规则分批写入hdfs,供研发通过接口查询,每个hdfs要求最大1000w。

from pyspark.sql import *
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.executor.instances", "20") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .enableHiveSupport() \
    .getOrCreate()

import math
import pandas as pd
from datetime import datetime
import time
import os
# 为了方便,通过规则生成的数据存入临时表temp.hh_qids中,规则细节无需了解
# 查看数据量级
df_cnt = spark.sql('select count(1) as cnt from temp.hh_qids').toPandas()
N = df_cnt['cnt'].loc[0] # 获取数据量级
print(N)
273230858
# 创建表,通过参数i生成表后缀
creat_sql = '''
CREATE TABLE IF NOT EXISTS temp.hh_mult_write_{i}
    (
        questionid string comment "题目ID"
    )
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''
# 写入表,写入上述创建的临时表
insert_sql = '''
insert overwrite table temp.hh_mult_write_{i}

select
    questionid
from
    temp.hh_qids
where
    ceil(rn/10000000)={i}
order by
    questionid
limit 100000000
'''
  • 循环写入
%%time

# 通过循环创建多个临时表并写入
for i in range(1,math.ceil(N/10000000)+1):
    start_time = datetime.now()
    
    spark.sql(creat_sql.format(i=i)) # 创建表
    spark.sql(insert_sql.format(i=i)) # 写入表
    
    end_time=datetime.now()
    
    print(f"成功写入hh_mult_write_{i},"+'耗时'+str((end_time-start_time).seconds)+'秒')
成功写入hh_mult_write_1,耗时38秒
成功写入hh_mult_write_2,耗时59秒
成功写入hh_mult_write_3,耗时36秒
成功写入hh_mult_write_4,耗时34秒
成功写入hh_mult_write_5,耗时29秒
成功写入hh_mult_write_6,耗时26秒
成功写入hh_mult_write_7,耗时44秒
成功写入hh_mult_write_8,耗时43秒
成功写入hh_mult_write_9,耗时32秒
成功写入hh_mult_write_10,耗时49秒
成功写入hh_mult_write_11,耗时33秒
成功写入hh_mult_write_12,耗时34秒
成功写入hh_mult_write_13,耗时38秒
成功写入hh_mult_write_14,耗时24秒
成功写入hh_mult_write_15,耗时40秒
成功写入hh_mult_write_16,耗时34秒
成功写入hh_mult_write_17,耗时39秒
成功写入hh_mult_write_18,耗时45秒
成功写入hh_mult_write_19,耗时50秒
成功写入hh_mult_write_20,耗时35秒
成功写入hh_mult_write_21,耗时46秒
成功写入hh_mult_write_22,耗时38秒
成功写入hh_mult_write_23,耗时29秒
成功写入hh_mult_write_24,耗时31秒
成功写入hh_mult_write_25,耗时28秒
成功写入hh_mult_write_26,耗时36秒
成功写入hh_mult_write_27,耗时32秒
成功写入hh_mult_write_28,耗时17秒
CPU times: user 124 ms, sys: 31.8 ms, total: 156 ms
Wall time: 17min 15s

这次通过大量级数据实战演示,可以发现效率还可以,写入28个文件仅需17min 15s。但日常业务中可能存在更复杂的写入或者更大的量级,那有没有办法提高效率呢?

大家都知道python的循环是单线程的,在一次循环结束前是不会调起下次循环的。而调度系统一般也可以支持并发,那python是不是也能通过并发实现多线程呢?当然可以了,方法有不少,但我实验后发现还是joblib好用。

这里通过一个简单的小case演示joblib的效果

# 查看集群服务器cpu数量
print(os.cpu_count())

48

%%time

# 查看简单循环的执行时间:15s
for i in range(5):
    for j in range(3):
        time.sleep(1)
        print(i*j)

0
0
0
0
1
2
0
2
4
0
3
6
0
4
8
CPU times: user 12.2 ms, sys: 6.18 ms, total: 18.3 ms
Wall time: 15 s

%%time

# 查看多线程下的执行时间:1.35s(好家伙,快了10倍多!)
from joblib import Parallel, delayed

def product2(x,y):
    time.sleep(1)
    return x*y

# n_jobs=-1表示使用全部cpu
Parallel(n_jobs=-1)(delayed(product2)(i,j) for i in range(5) for j in range(3))

CPU times: user 111 ms, sys: 233 ms, total: 344 ms
Wall time: 1.35 s

[0, 0, 0, 0, 1, 2, 0, 2, 4, 0, 3, 6, 0, 4, 8]

大家可以看到,提速效果还是杠杠滴,那实际应用会不会也如此优秀呢?

  • 并发写入
# 构造函数-将单次循环的主要过程包装成函数以便Parallel调用
def creat_insert(i):
    start_time = datetime.now()
    
    spark.sql(creat_sql.format(i=i)) # 创建表
    spark.sql(insert_sql.format(i=i)) # 写入表
    
    end_time=datetime.now()
    
    print_str = f"成功写入hh_mult_test_{i},"+'耗时'+str((end_time-start_time).seconds)+'秒'
    return print_str
%%time

# 并发写入
from joblib import Parallel, delayed

# 集群服务器大家都在用,在做大任务处理时,不建议使用全部cpu,这里使用一半足矣
Parallel(n_jobs=24, prefer="threads")(delayed(creat_insert)(i) for i in range(1,math.ceil(N/10000000)+1))
CPU times: user 87.6 ms, sys: 18.8 ms, total: 106 ms
Wall time: 1min 49s

['成功写入hh_mult_test_1,耗时44秒',
 '成功写入hh_mult_test_2,耗时41秒',
 '成功写入hh_mult_test_3,耗时83秒',
 '成功写入hh_mult_test_4,耗时49秒',
 '成功写入hh_mult_test_5,耗时89秒',
 '成功写入hh_mult_test_6,耗时71秒',
 '成功写入hh_mult_test_7,耗时89秒',
 '成功写入hh_mult_test_8,耗时72秒',
 '成功写入hh_mult_test_9,耗时83秒',
 '成功写入hh_mult_test_10,耗时77秒',
 '成功写入hh_mult_test_11,耗时80秒',
 '成功写入hh_mult_test_12,耗时65秒',
 '成功写入hh_mult_test_13,耗时53秒',
 '成功写入hh_mult_test_14,耗时109秒',
 '成功写入hh_mult_test_15,耗时81秒',
 '成功写入hh_mult_test_16,耗时73秒',
 '成功写入hh_mult_test_17,耗时41秒',
 '成功写入hh_mult_test_18,耗时78秒',
 '成功写入hh_mult_test_19,耗时84秒',
 '成功写入hh_mult_test_20,耗时93秒',
 '成功写入hh_mult_test_21,耗时68秒',
 '成功写入hh_mult_test_22,耗时78秒',
 '成功写入hh_mult_test_23,耗时48秒',
 '成功写入hh_mult_test_24,耗时88秒',
 '成功写入hh_mult_test_25,耗时54秒',
 '成功写入hh_mult_test_26,耗时59秒',
 '成功写入hh_mult_test_27,耗时62秒',
 '成功写入hh_mult_test_28,耗时37秒']

可以看到,每个文件的写入时间与循环差不多,都是在60秒左右。但整体只花了1min 49s,快了10倍以上。

  • 删除测试数据
%%time

# 测试数据量较大,无端占用公司资源是不对的,所以需要删除下。
# 但要我手动一个个删除那也是不可能的,做个简单的for循环即可
for i in range(1,29):
    drop_sql='''
    DROP TABLE IF EXISTS temp.hh_mult_test_1{i};
    '''
    
    spark.sql(drop_sql.format(i=i)) # 删除表
CPU times: user 3.94 ms, sys: 1.96 ms, total: 5.91 ms
Wall time: 148 ms

总结

至此,python小案例系列也结束了,案例基本来源于我的日常业务。在处理复杂需求,提升工作效率方面,Python还是有一席之地的。不知道大家有没有什么实用的python处理日常需求的小案例呢?

共勉~

有关Python小案例(十)利用PySpark循环写入数据的更多相关文章

  1. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  2. ruby - 树顶语法无限循环 - 2

    我脑子里浮现出一些关于一种新编程语言的想法,所以我想我会尝试实现它。一位friend建议我尝试使用Treetop(Rubygem)来创建一个解析器。Treetop的文档很少,我以前从未做过这种事情。我的解析器表现得好像有一个无限循环,但没有堆栈跟踪;事实证明很难追踪到。有人可以指出入门级解析/AST指南的方向吗?我真的需要一些列出规则、常见用法等的东西来使用像Treetop这样的工具。我的语法分析器在GitHub上,以防有人希望帮助我改进它。class{initialize=lambda(name){receiver.name=name}greet=lambda{IO.puts("He

  3. ruby-on-rails - 在 Ruby 中循环遍历多个数组 - 2

    我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代

  4. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  5. Ruby 写入和读取对象到文件 - 2

    好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信

  6. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  7. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  8. Python 相当于 Perl/Ruby ||= - 2

    这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。

  9. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  10. ruby - 我如何添加二进制数据来遏制 POST - 2

    我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_

随机推荐