我希望能够处理从必须在页面中访问的源读取的 java 流。作为第一种方法,我实现了一个分页迭代器,它仅在当前页面用完项目时请求页面,然后使用 StreamSupport.stream(iterator, false) 获取迭代器上的流句柄。
由于我发现我的页面获取起来非常昂贵,因此我想通过并行流来访问页面。在这一点上,我发现由于 java 直接从迭代器提供的拆分器实现,我的幼稚方法提供的并行性是不存在的。因为我实际上对我想遍历的元素了解很多(我知道请求第一页后的总结果数,并且源支持偏移量和限制)我认为应该可以实现我自己的拆分器来实现真正的并发(在页面元素上完成的工作和页面查询中)。
我已经能够很容易地实现“在元素上完成的工作”并发,但在我最初的实现中,页面的查询仅由最顶层的拆分器完成,因此无法从fork-join 实现提供的工作分工。
我怎样才能编写实现这两个目标的拆分器?
作为引用,我将提供我到目前为止所做的事情(我知道它没有适本地划分查询)。
public final class PagingSourceSpliterator<T> implements Spliterator<T> {
public static final long DEFAULT_PAGE_SIZE = 100;
private Page<T> result;
private Iterator<T> results;
private boolean needsReset = false;
private final PageProducer<T> generator;
private long offset = 0L;
private long limit = DEFAULT_PAGE_SIZE;
public PagingSourceSpliterator(PageProducer<T> generator) {
this.generator = generator;
}
public PagingSourceSpliterator(long pageSize, PageProducer<T> generator) {
this.generator = generator;
this.limit = pageSize;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (hasAnotherElement()) {
if (!results.hasNext()) {
loadPageAndPrepareNextPaging();
}
if (results.hasNext()) {
action.accept(results.next());
return true;
}
}
return false;
}
@Override
public Spliterator<T> trySplit() {
// if we know there's another page, go ahead and hand off whatever
// remains of this spliterator as a new spliterator for other
// threads to work on, and then mark that next time something is
// requested from this spliterator it needs to be reset to the head
// of the next page
if (hasAnotherPage()) {
Spliterator<T> other = result.getPage().spliterator();
needsReset = true;
return other;
} else {
return null;
}
}
@Override
public long estimateSize() {
if(limit == 0) {
return 0;
}
ensureStateIsUpToDateEnoughToAnswerInquiries();
return result.getTotalResults();
}
@Override
public int characteristics() {
return IMMUTABLE | ORDERED | DISTINCT | NONNULL | SIZED | SUBSIZED;
}
private boolean hasAnotherElement() {
ensureStateIsUpToDateEnoughToAnswerInquiries();
return isBound() && (results.hasNext() || hasAnotherPage());
}
private boolean hasAnotherPage() {
ensureStateIsUpToDateEnoughToAnswerInquiries();
return isBound() && (result.getTotalResults() > offset);
}
private boolean isBound() {
return Objects.nonNull(results) && Objects.nonNull(result);
}
private void ensureStateIsUpToDateEnoughToAnswerInquiries() {
ensureBound();
ensureResetIfNecessary();
}
private void ensureBound() {
if (!isBound()) {
loadPageAndPrepareNextPaging();
}
}
private void ensureResetIfNecessary() {
if(needsReset) {
loadPageAndPrepareNextPaging();
needsReset = false;
}
}
private void loadPageAndPrepareNextPaging() {
// keep track of the overall result so that we can reference the original list and total size
this.result = generator.apply(offset, limit);
// make sure that the iterator we use to traverse a single page removes
// results from the underlying list as we go so that we can simply pass
// off the list spliterator for the trySplit rather than constructing a
// new kind of spliterator for what remains.
this.results = new DelegatingIterator<T>(result.getPage().listIterator()) {
@Override
public T next() {
T next = super.next();
this.remove();
return next;
}
};
// update the paging for the next request and inquiries prior to the next request
// we use the page of the actual result set instead of the limit in case the limit
// was not respected exactly.
this.offset += result.getPage().size();
}
public static class DelegatingIterator<T> implements Iterator<T> {
private final Iterator<T> iterator;
public DelegatingIterator(Iterator<T> iterator) {
this.iterator = iterator;
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public T next() {
return iterator.next();
}
@Override
public void remove() {
iterator.remove();
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
iterator.forEachRemaining(action);
}
}
}
以及我的网页来源:
public interface PageProducer<T> extends BiFunction<Long, Long, Page<T>> {
}
还有一个页面:
public final class Page<T> {
private long totalResults;
private final List<T> page = new ArrayList<>();
public long getTotalResults() {
return totalResults;
}
public List<T> getPage() {
return page;
}
public Page setTotalResults(long totalResults) {
this.totalResults = totalResults;
return this;
}
public Page setPage(List<T> results) {
this.page.clear();
this.page.addAll(results);
return this;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Page)) {
return false;
}
Page<?> page1 = (Page<?>) o;
return totalResults == page1.totalResults && Objects.equals(page, page1.page);
}
@Override
public int hashCode() {
return Objects.hash(totalResults, page);
}
}
以及获取带有“慢”分页的流进行测试的示例
private <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {
PageProducer<T> producer = (offset, limit) -> {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int beginIndex = offset.intValue();
int endIndex = Math.min(offset.intValue() + limit.intValue(), things.size());
return new Page<T>().setTotalResults(things.size())
.setPage(things.subList(beginIndex, endIndex));
};
return StreamSupport.stream(new PagingSourceSpliterator<>(pageSize, producer), true);
}
最佳答案
拆分器没有让您更接近目标的主要原因是它试图拆分页面,而不是源元素空间。如果您知道元素的总数并且有一个源允许通过偏移量和限制来获取页面,那么拆分器最自然的形式是在这些元素中封装一个范围,例如通过偏移和限制或结束。然后,拆分意味着仅拆分该 范围,将拆分器的偏移量调整到拆分位置并创建一个表示前缀的新拆分器,从“旧偏移量”到拆分位置。
Before splitting:
this spliterator: offset=x, end=y
After splitting:
this spliterator: offset=z, end=y
returned spliterator: offset=x, end=z
x <= z <= y
而在最好的情况下,z 正好在 x 和 y 之间,以产生平衡的分割,但在我们的例子中,我们会稍微调整它以生成页面大小的倍数的工作集。
这个逻辑不需要抓取页面,所以如果你推迟抓取页面,框架想要开始遍历,即在拆分之后,抓取操作可以并行运行.最大的障碍是您需要获取第一页才能了解元素的总数。下面的解决方案将第一次提取与其他提取分开,简化了实现。当然,它必须传递第一个页面获取的结果,该结果将在第一次遍历时消耗(在顺序情况下)或作为第一个拆分前缀返回,此时接受一个不平衡拆分,但没有以后再处理。
public class PagingSpliterator<T> implements Spliterator<T> {
public interface PageFetcher<T> {
List<T> fetchPage(long offset, long limit, LongConsumer totalSizeSink);
}
public static final long DEFAULT_PAGE_SIZE = 100;
public static <T> Stream<T> paged(PageFetcher<T> pageAccessor) {
return paged(pageAccessor, DEFAULT_PAGE_SIZE, false);
}
public static <T> Stream<T> paged(PageFetcher<T> pageAccessor,
long pageSize, boolean parallel) {
if(pageSize<=0) throw new IllegalArgumentException();
return StreamSupport.stream(() -> {
PagingSpliterator<T> pgSp
= new PagingSpliterator<>(pageAccessor, 0, 0, pageSize);
pgSp.danglingFirstPage
=spliterator(pageAccessor.fetchPage(0, pageSize, l -> pgSp.end=l));
return pgSp;
}, CHARACTERISTICS, parallel);
}
private static final int CHARACTERISTICS = IMMUTABLE|ORDERED|SIZED|SUBSIZED;
private final PageFetcher<T> supplier;
long start, end, pageSize;
Spliterator<T> currentPage, danglingFirstPage;
PagingSpliterator(PageFetcher<T> supplier,
long start, long end, long pageSize) {
this.supplier = supplier;
this.start = start;
this.end = end;
this.pageSize = pageSize;
}
public boolean tryAdvance(Consumer<? super T> action) {
for(;;) {
if(ensurePage().tryAdvance(action)) return true;
if(start>=end) return false;
currentPage=null;
}
}
public void forEachRemaining(Consumer<? super T> action) {
do {
ensurePage().forEachRemaining(action);
currentPage=null;
} while(start<end);
}
public Spliterator<T> trySplit() {
if(danglingFirstPage!=null) {
Spliterator<T> fp=danglingFirstPage;
danglingFirstPage=null;
start=fp.getExactSizeIfKnown();
return fp;
}
if(currentPage!=null)
return currentPage.trySplit();
if(end-start>pageSize) {
long mid=(start+end)>>>1;
mid=mid/pageSize*pageSize;
if(mid==start) mid+=pageSize;
return new PagingSpliterator<>(supplier, start, start=mid, pageSize);
}
return ensurePage().trySplit();
}
/**
* Fetch data immediately before traversing or sub-page splitting.
*/
private Spliterator<T> ensurePage() {
if(danglingFirstPage!=null) {
Spliterator<T> fp=danglingFirstPage;
danglingFirstPage=null;
currentPage=fp;
start=fp.getExactSizeIfKnown();
return fp;
}
Spliterator<T> sp = currentPage;
if(sp==null) {
if(start>=end) return Spliterators.emptySpliterator();
sp = spliterator(supplier.fetchPage(
start, Math.min(end-start, pageSize), l->{}));
start += sp.getExactSizeIfKnown();
currentPage=sp;
}
return sp;
}
/**
* Ensure that the sub-spliterator provided by the List is compatible with
* ours, i.e. is {@code SIZED | SUBSIZED}. For standard List implementations,
* the spliterators are, so the costs of dumping into an intermediate array
* in the other case is irrelevant.
*/
private static <E> Spliterator<E> spliterator(List<E> list) {
Spliterator<E> sp = list.spliterator();
if((sp.characteristics()&(SIZED|SUBSIZED))!=(SIZED|SUBSIZED))
sp=Spliterators.spliterator(
StreamSupport.stream(sp, false).toArray(), IMMUTABLE | ORDERED);
return sp;
}
public long estimateSize() {
if(currentPage!=null) return currentPage.estimateSize();
return end-start;
}
public int characteristics() {
return CHARACTERISTICS;
}
}
它使用一个专门的 PageFetcher 功能接口(interface),可以通过调用回调的 accept 方法来实现总大小并返回项目列表。分页拆分器将简单地委托(delegate)给列表的拆分器进行遍历,如果并发性明显高于生成的页面数,它甚至可以从拆分这些页面拆分器中受益,这意味着随机访问列表,如 ArrayList,是这里的首选列表类型。
调整您的示例代码以适应
private static <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {
return PagingSpliterator.paged( (offset, limit, totalSizeSink) -> {
totalSizeSink.accept(things.size());
if(offset>things.size()) return Collections.emptyList();
int beginIndex = (int)offset;
assert beginIndex==offset;
int endIndex = Math.min(beginIndex+(int)limit, things.size());
System.out.printf("Page %6d-%6d:\t%s%n",
beginIndex, endIndex, Thread.currentThread());
// artificial slowdown
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return things.subList(beginIndex, endIndex);
}, pageSize, true);
}
你可以像这样测试它
List<Integer> samples=IntStream.range(0, 555_000).boxed().collect(Collectors.toList());
List<Integer> result =asSlowPagedSource(10_000, samples) .collect(Collectors.toList());
if(!samples.equals(result))
throw new AssertionError();
如果有足够的空闲 CPU 内核,它将演示如何同时获取页面,因此是无序的,而结果将正确地按遇到顺序。您还可以测试页面较少时适用的子页面并发:
Set<Thread> threads=ConcurrentHashMap.newKeySet();
List<Integer> samples=IntStream.range(0, 1_000_000).boxed().collect(Collectors.toList());
List<Integer> result=asSlowPagedSource(500_000, samples)
.peek(x -> threads.add(Thread.currentThread()))
.collect(Collectors.toList());
if(!samples.equals(result))
throw new AssertionError();
System.out.println("Concurrency: "+threads.size());
关于java - 如何创建通用分页拆分器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38128274/
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
出于纯粹的兴趣,我很好奇如何按顺序创建PI,而不是在过程结果之后生成数字,而是让数字在过程本身生成时显示。如果是这种情况,那么数字可以自行产生,我可以对以前看到的数字实现垃圾收集,从而创建一个无限系列。结果只是在Pi系列之后每秒生成一个数字。这是我通过互联网筛选的结果:这是流行的计算机友好算法,类机器算法:defarccot(x,unity)xpow=unity/xn=1sign=1sum=0loopdoterm=xpow/nbreakifterm==0sum+=sign*(xpow/n)xpow/=x*xn+=2sign=-signendsumenddefcalc_pi(digits
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta
在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/