在做数据分析的时候,往往需要回溯历史数据。但有时候构建历史数据时需要变更参数重复跑数,公司的数仓调度系统往往只支持日期这一个参数,而且为临时数据生产调度脚本显得有点浪费。这个时候就可以结合python的字符串格式化和PySpark的Hive写入,就可以完成循环写入临时数据。
⚠️注意:以下需要在企业服务器上的jupyter上操作,本地jupyter是无法连接企业hive集群的
案例背景:写入每天的热搜数据,热搜类型分为当日、近1日、近2日、近3日。这里为了方便,简化了循环的力度。
from pyspark.sql import *
# spark配置
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.executor.instances", "20") \
.config("spark.executor.cores", "2") \
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "8g") \
.enableHiveSupport() \
.getOrCreate()
# 导入其他相关库
import pandas as pd
from datetime import datetime
# sql创建临时表
sql_create = '''
CREATE TABLE temp.loop_write_example
(
cnt string comment "近n日cnt"
)
PARTITIONED BY (`point_date` string, `dtype` int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''
spark.sql(sql_create)
DataFrame[]
构造日期'{dt}'和热搜类型{num}两个参数
# sql写入临时表
sql_insert = '''
insert overwrite table temp.loop_write_example partition (point_date = '{dt}',dtype={num})
select
sum(if(dt between date_add('{dt}',-{num}) and '{dt}',cnt,null)) as cnt
from
temp.loop_write_example_fake_data
where
dt between date_add('{dt}',-4) and '{dt}'
'''
dates = pd.date_range('2021-01-01','2021-01-10').strftime("%Y-%m-%d").to_list() # 日期范围
# 循环写入临时表
for point_date in dates:
if point_date>='2021-01-01' and point_date<'2021-01-03':
for dtype in range(0,4):
start_time = datetime.now()
spark.sql(sql_insert.format(dt=point_date, num=dtype))
end_time=datetime.now()
print (point_date, dtype, "succeed", '耗时'+str((end_time-start_time).seconds)+'秒')
2021-01-01 0 succeed 耗时8秒
2021-01-01 1 succeed 耗时7秒
2021-01-01 2 succeed 耗时8秒
2021-01-01 3 succeed 耗时8秒
2021-01-02 0 succeed 耗时8秒
2021-01-02 1 succeed 耗时8秒
2021-01-02 2 succeed 耗时8秒
2021-01-02 3 succeed 耗时8秒
案例背景:将2亿+题目按规则分批写入hdfs,供研发通过接口查询,每个hdfs要求最大1000w。
from pyspark.sql import *
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.executor.instances", "20") \
.config("spark.executor.cores", "2") \
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "8g") \
.enableHiveSupport() \
.getOrCreate()
import math
import pandas as pd
from datetime import datetime
import time
import os
# 为了方便,通过规则生成的数据存入临时表temp.hh_qids中,规则细节无需了解
# 查看数据量级
df_cnt = spark.sql('select count(1) as cnt from temp.hh_qids').toPandas()
N = df_cnt['cnt'].loc[0] # 获取数据量级
print(N)
273230858
# 创建表,通过参数i生成表后缀
creat_sql = '''
CREATE TABLE IF NOT EXISTS temp.hh_mult_write_{i}
(
questionid string comment "题目ID"
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
'''
# 写入表,写入上述创建的临时表
insert_sql = '''
insert overwrite table temp.hh_mult_write_{i}
select
questionid
from
temp.hh_qids
where
ceil(rn/10000000)={i}
order by
questionid
limit 100000000
'''
%%time
# 通过循环创建多个临时表并写入
for i in range(1,math.ceil(N/10000000)+1):
start_time = datetime.now()
spark.sql(creat_sql.format(i=i)) # 创建表
spark.sql(insert_sql.format(i=i)) # 写入表
end_time=datetime.now()
print(f"成功写入hh_mult_write_{i},"+'耗时'+str((end_time-start_time).seconds)+'秒')
成功写入hh_mult_write_1,耗时38秒
成功写入hh_mult_write_2,耗时59秒
成功写入hh_mult_write_3,耗时36秒
成功写入hh_mult_write_4,耗时34秒
成功写入hh_mult_write_5,耗时29秒
成功写入hh_mult_write_6,耗时26秒
成功写入hh_mult_write_7,耗时44秒
成功写入hh_mult_write_8,耗时43秒
成功写入hh_mult_write_9,耗时32秒
成功写入hh_mult_write_10,耗时49秒
成功写入hh_mult_write_11,耗时33秒
成功写入hh_mult_write_12,耗时34秒
成功写入hh_mult_write_13,耗时38秒
成功写入hh_mult_write_14,耗时24秒
成功写入hh_mult_write_15,耗时40秒
成功写入hh_mult_write_16,耗时34秒
成功写入hh_mult_write_17,耗时39秒
成功写入hh_mult_write_18,耗时45秒
成功写入hh_mult_write_19,耗时50秒
成功写入hh_mult_write_20,耗时35秒
成功写入hh_mult_write_21,耗时46秒
成功写入hh_mult_write_22,耗时38秒
成功写入hh_mult_write_23,耗时29秒
成功写入hh_mult_write_24,耗时31秒
成功写入hh_mult_write_25,耗时28秒
成功写入hh_mult_write_26,耗时36秒
成功写入hh_mult_write_27,耗时32秒
成功写入hh_mult_write_28,耗时17秒
CPU times: user 124 ms, sys: 31.8 ms, total: 156 ms
Wall time: 17min 15s
这次通过大量级数据实战演示,可以发现效率还可以,写入28个文件仅需17min 15s。但日常业务中可能存在更复杂的写入或者更大的量级,那有没有办法提高效率呢?
大家都知道python的循环是单线程的,在一次循环结束前是不会调起下次循环的。而调度系统一般也可以支持并发,那python是不是也能通过并发实现多线程呢?当然可以了,方法有不少,但我实验后发现还是
joblib好用。
这里通过一个简单的小case演示
joblib的效果# 查看集群服务器cpu数量 print(os.cpu_count())48
%%time # 查看简单循环的执行时间:15s for i in range(5): for j in range(3): time.sleep(1) print(i*j)0
0
0
0
1
2
0
2
4
0
3
6
0
4
8
CPU times: user 12.2 ms, sys: 6.18 ms, total: 18.3 ms
Wall time: 15 s%%time # 查看多线程下的执行时间:1.35s(好家伙,快了10倍多!) from joblib import Parallel, delayed def product2(x,y): time.sleep(1) return x*y # n_jobs=-1表示使用全部cpu Parallel(n_jobs=-1)(delayed(product2)(i,j) for i in range(5) for j in range(3))CPU times: user 111 ms, sys: 233 ms, total: 344 ms
Wall time: 1.35 s[0, 0, 0, 0, 1, 2, 0, 2, 4, 0, 3, 6, 0, 4, 8]
大家可以看到,提速效果还是杠杠滴,那实际应用会不会也如此优秀呢?
# 构造函数-将单次循环的主要过程包装成函数以便Parallel调用
def creat_insert(i):
start_time = datetime.now()
spark.sql(creat_sql.format(i=i)) # 创建表
spark.sql(insert_sql.format(i=i)) # 写入表
end_time=datetime.now()
print_str = f"成功写入hh_mult_test_{i},"+'耗时'+str((end_time-start_time).seconds)+'秒'
return print_str
%%time
# 并发写入
from joblib import Parallel, delayed
# 集群服务器大家都在用,在做大任务处理时,不建议使用全部cpu,这里使用一半足矣
Parallel(n_jobs=24, prefer="threads")(delayed(creat_insert)(i) for i in range(1,math.ceil(N/10000000)+1))
CPU times: user 87.6 ms, sys: 18.8 ms, total: 106 ms
Wall time: 1min 49s
['成功写入hh_mult_test_1,耗时44秒',
'成功写入hh_mult_test_2,耗时41秒',
'成功写入hh_mult_test_3,耗时83秒',
'成功写入hh_mult_test_4,耗时49秒',
'成功写入hh_mult_test_5,耗时89秒',
'成功写入hh_mult_test_6,耗时71秒',
'成功写入hh_mult_test_7,耗时89秒',
'成功写入hh_mult_test_8,耗时72秒',
'成功写入hh_mult_test_9,耗时83秒',
'成功写入hh_mult_test_10,耗时77秒',
'成功写入hh_mult_test_11,耗时80秒',
'成功写入hh_mult_test_12,耗时65秒',
'成功写入hh_mult_test_13,耗时53秒',
'成功写入hh_mult_test_14,耗时109秒',
'成功写入hh_mult_test_15,耗时81秒',
'成功写入hh_mult_test_16,耗时73秒',
'成功写入hh_mult_test_17,耗时41秒',
'成功写入hh_mult_test_18,耗时78秒',
'成功写入hh_mult_test_19,耗时84秒',
'成功写入hh_mult_test_20,耗时93秒',
'成功写入hh_mult_test_21,耗时68秒',
'成功写入hh_mult_test_22,耗时78秒',
'成功写入hh_mult_test_23,耗时48秒',
'成功写入hh_mult_test_24,耗时88秒',
'成功写入hh_mult_test_25,耗时54秒',
'成功写入hh_mult_test_26,耗时59秒',
'成功写入hh_mult_test_27,耗时62秒',
'成功写入hh_mult_test_28,耗时37秒']
可以看到,每个文件的写入时间与循环差不多,都是在60秒左右。但整体只花了1min 49s,快了10倍以上。
%%time
# 测试数据量较大,无端占用公司资源是不对的,所以需要删除下。
# 但要我手动一个个删除那也是不可能的,做个简单的for循环即可
for i in range(1,29):
drop_sql='''
DROP TABLE IF EXISTS temp.hh_mult_test_1{i};
'''
spark.sql(drop_sql.format(i=i)) # 删除表
CPU times: user 3.94 ms, sys: 1.96 ms, total: 5.91 ms
Wall time: 148 ms
至此,python小案例系列也结束了,案例基本来源于我的日常业务。在处理复杂需求,提升工作效率方面,Python还是有一席之地的。不知道大家有没有什么实用的python处理日常需求的小案例呢?
共勉~
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我脑子里浮现出一些关于一种新编程语言的想法,所以我想我会尝试实现它。一位friend建议我尝试使用Treetop(Rubygem)来创建一个解析器。Treetop的文档很少,我以前从未做过这种事情。我的解析器表现得好像有一个无限循环,但没有堆栈跟踪;事实证明很难追踪到。有人可以指出入门级解析/AST指南的方向吗?我真的需要一些列出规则、常见用法等的东西来使用像Treetop这样的工具。我的语法分析器在GitHub上,以防有人希望帮助我改进它。class{initialize=lambda(name){receiver.name=name}greet=lambda{IO.puts("He
我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信
我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("
有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳
这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_