目录
最近项目上有个日志采集,我作为接收端接收udp发送过来的报文数据缓存到es上,然后查询es上的数据分页展示。但是之后我发现es对分页支持很不友好,它分为深分页与浅分页,浅分页就是MySQL里的limit,但是他最大展示长度只能到10000,也就是说当每页100条数据的话,只能翻100页,超过会报错。 所以你要么做限制,尽可能的把数据控制在10000条以内,要么对前端翻页进行限制。
下面我们针对es提供的search after深分页来完成小幅跳页的操作, 所谓的小幅跳页就是虽然我不能直接从第一页到最后一页,但是我也可以通过缓存游标的方式实现几页几页的跳,search after深分页的方式只能一直往后翻,scroll我不太了解,但是应该原理差不多。
jdk8, es7.6.1, maven3.3.9, springboot2.3.2
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 朝花不迟暮
* @version 1.0
* @date 2020/9/26 9:08
*/
@Configuration
public class ElasticSearchClientConfig
{
@Bean
public RestHighLevelClient restHighLevelClient()
{
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("119.29.10.76", 9200, "http"))
);
return client;
}
}
索引类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.Date;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Document {
/**
* es中的唯一id
*/
private Long id;
/**
* 文档标题
*/
private String title;
/**
* 文档内容
*/
private String content;
/**
* 创建时间
*/
private Date createTime;
/**
* 当前时间
*/
private Long currentTime;
}
传输层,当然有冗余设计,各位取其精华去其糟粕吧
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.Date;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class DocumentDTO {
private Integer pageNum = 1;
private Integer pageSize = 10;
/**
* es中的唯一id
*/
private Long id;
/**
* 文档标题
*/
private String title;
/**
* 文档内容
*/
private String content;
/**
* 创建时间
*/
private Date createTime;
/**
* 当前时间
*/
private Long currentTime;
/**
* 开始时间
*/
private String startTime;
/**
* 结束时间
*/
private String endTime;
/**
* 最后一页的游标页码
*/
private Object[] lastPageSort;
}
返回对象,可根据需求自定义
import com.study.sample.entity.Document;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.List;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class DocumentVO {
private Integer pageNum;
private Integer pageSize;
private long total;
private List<Document> data;
}
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.study.sample.entity.Document;
import com.study.sample.entity.dto.DocumentDTO;
import com.study.sample.entity.vo.DocumentVO;
import com.study.sample.service.DocumentService;
import com.study.sample.utils.DateParseUtil;
import com.study.sample.utils.EsClientUtil;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.*;
@Service
@Slf4j
public class DocumentServiceImpl implements DocumentService {
@Autowired
private RestHighLevelClient restHighLevelClient;
//存储游标的集合
private static final Map<String, Map<Integer, Object[]>> sortMap = new HashMap<>(256);
@Override
public DocumentVO deepSearchPage(DocumentDTO documentDTO, HttpServletRequest req) {
String id = req.getSession().getId();
//条件构造器
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//返回的数据
List<Document> documents = new ArrayList<>();
//当前页
int currentPageNum;
//总数
long total = 0;
DocumentVO documentVO = new DocumentVO();
if (StrUtil.isEmpty(id)) throw new RuntimeException("id不能为空");
//页码和游标对应的集合
Map<Integer, Object[]> pageMap = sortMap.get(id);
//---------------设置查询条件start--------------
//范围查询
if (documentDTO.getStartTime() != null) {
Date startDate = DateParseUtil.parseString2Date(documentDTO.getStartTime());
boolQueryBuilder.filter(QueryBuilders.rangeQuery("createTime").gte(startDate));
}
if (documentDTO.getEndTime() != null) {
Date endDate = DateParseUtil.parseString2Date(documentDTO.getEndTime());
boolQueryBuilder.filter(QueryBuilders.rangeQuery("createTime").lte(endDate));
}
// 模糊查询
if (documentDTO.getContent() != null) {
// 同一字段在多个field里查询
// boolQueryBuilder.filter(QueryBuilders.multiMatchQuery(documentDTO.getContent(), fields));
boolQueryBuilder.must((QueryBuilders.wildcardQuery("content", documentDTO.getContent())));
}
if (documentDTO.getTitle() != null) {
boolQueryBuilder.should((QueryBuilders.wildcardQuery("title", documentDTO.getTitle())));
}
//---------------设置查询条件end----------------
/*首先不能是第一页,其次页码集合不能是空的,对应的页码也得在这个集合里,最后是当前页要小于此集合。
* 我觉得重点在于最后一条,为什么一定要小于呢?因为当前页数=集合的容量,可以视为已经翻到了最后一页,那么我们要继续向后查询5页
* 索引所以我们把这个边界处理放到了最后一层,本层只处理缓存有的游标,存在就放search after里查*/
if (documentDTO.getPageNum() != 1 && MapUtil.isNotEmpty(pageMap)
&& pageMap.containsKey(documentDTO.getPageNum())
&& pageMap.size() > documentDTO.getPageNum()) {
try {
//构造查询条件
searchSourceBuilder.query(boolQueryBuilder)
.sort("_id", SortOrder.DESC) //拿什么排序,游标就是什么
.size(documentDTO.getPageSize());
//从缓存里拿到了当前页的游标---> 存放的时候就已经做了对应处理!!
searchSourceBuilder.searchAfter(pageMap.get(documentDTO.getPageNum()));
SearchRequest searchRequest2 = new SearchRequest("document")
.source(searchSourceBuilder);
SearchResponse searchResponse2 = restHighLevelClient.search(searchRequest2, RequestOptions.DEFAULT);
SearchHits searchHits = searchResponse2.getHits();
if (searchHits.getTotalHits().value > 0) {
SearchHit[] hits = searchHits.getHits();
EsClientUtil.convertResult(documents, Document.class, hits);
total = searchHits.getTotalHits().value;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/*当pageNum=1的时候我就默认他刚接在或者已经刷新当然也有可能是从第2页回去之类的情况,但这里均不予考虑,只要是1就
* 重新构造页标和游标的对应关系*/
else if (documentDTO.getPageNum() == 1) {
// 先移除
sortMap.remove(id);
// 上面被移除,pageMap更不可能获取到,这里必须自己初始化
pageMap = new HashMap<>();
//游标
Object[] sortValues;
//当前页
currentPageNum = 1;
//下一页
int nextPageNum = currentPageNum + 1;
try {
searchSourceBuilder.query(boolQueryBuilder)
.sort("_id", SortOrder.DESC)
.from(0) //必须是0,不熟悉的朋友可能会觉得这里就可以循环,from从1开始就可以拿第二页,其实不行
//这样拿到的数据会有一点点错位,而easy-es这个框架是直接不允许深分页查询from > 0的
.size(documentDTO.getPageSize());
SearchRequest searchRequest = new SearchRequest("document").source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = searchResponse.getHits().getHits();
if (hits.length != 0) {
//查询最后一个数据
SearchHit result = hits[hits.length - 1];
sortValues = result.getSortValues();
pageMap.put(1, new Object[]{}); // 第一页没有游标
pageMap.put(2, sortValues); //第一页的游标是去拿第二页的数据的,所以是2
EsClientUtil.convertResult(documents, Document.class, hits);
total = searchResponse.getHits().getTotalHits().value;
}
//向后获取5页的游标数据 所以你要品nextPageNum和currentPageNum的作用,就是处理游标和页码的对应关系的
for (int i = nextPageNum; i < nextPageNum + 5; i++) {
//取出上一页的游标
searchSourceBuilder.searchAfter(pageMap.get(i));
SearchRequest searchRequest2 = new SearchRequest("document")
.source(searchSourceBuilder);
SearchResponse searchResponse2 = restHighLevelClient.search(searchRequest2, RequestOptions.DEFAULT);
SearchHits searchHits = searchResponse2.getHits();
if (searchHits.getTotalHits().value > 0) {
SearchHit[] nextHits = searchHits.getHits();
//当数据量不大的情况下且每页pageSize很大的话,他可能都没有5页,所以每次循环要判断,一旦
//不足就要终止,因为总数据已经不足分页了,在遍历就越界了
if (nextHits.length < documentDTO.getPageSize()) break;
SearchHit nextHit = nextHits[nextHits.length - 1];
sortValues = nextHit.getSortValues();
//从3开始 3/4/5/6/7
pageMap.put(i + 1, sortValues);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/*这里是边界,也就是当前端页面显示当前展示的最大页数到第7页了,而你的页码正好是7,那么就该继续向后拿后面的游标并且要和页码对应*/
else if (pageMap.containsKey(documentDTO.getPageNum()) && pageMap.size() == documentDTO.getPageNum()) {
searchSourceBuilder.query(boolQueryBuilder)
.sort("_id", SortOrder.DESC)
.size(documentDTO.getPageSize());
currentPageNum = documentDTO.getPageNum();
try {
for (int i = currentPageNum; i < currentPageNum + 5; i++) {
//这里要知道当前页的游标在上面的集合里已经有了
searchSourceBuilder.searchAfter(pageMap.get(i));
SearchRequest searchRequest2 = new SearchRequest("document")
.source(searchSourceBuilder);
SearchResponse searchResponse2 = restHighLevelClient.search(searchRequest2, RequestOptions.DEFAULT);
SearchHits searchHits = searchResponse2.getHits();
total = searchHits.getTotalHits().value;
if (searchHits.getTotalHits().value > 0) {
SearchHit[] hits = searchHits.getHits();
//这里是数据边界的终止,上面已说
if (hits.length < documentDTO.getPageSize()) {
EsClientUtil.convertResult(documents, Document.class, hits);
break;
}
SearchHit result = hits[hits.length - 1];
Object[] sortValues = result.getSortValues();
//存放游标
pageMap.put(i + 1, sortValues);
//这里是拿出当前页的数据
if (i == documentDTO.getPageNum()) {
EsClientUtil.convertResult(documents, Document.class, hits);
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
documentVO.setPageNum(documentDTO.getPageNum());
documentVO.setPageSize(documentDTO.getPageSize());
documentVO.setTotal(total);
documentVO.setData(documents);
sortMap.put(id, pageMap);
return documentVO;
}
}
其实从上面的代码加注释,我觉得你应该就可以理解了,思路只有一个那就是缓存游标,这里我有个三个判断,第一个判断是判断当前页数是不是已经在缓存里了,进了第一个说明是有的,就直接拿出游标查询并返给前端。
第二个判断是判断是不是初次加载,如果是就清掉之前缓存的游标集合,因为你要考虑数据增量的情况,如果你没有数据增量的情况甚至都不用按标记分,直接建立个游标缓存,什么时候有增量数据(比如那种一天一增),就什么时候删缓存。然后还要获取后五页的游标数据。
第三个判断是边界判断,主要任务有三个,第一个任务是获取后5页的游标,第二个任务是判断总数据是不是没得分了,第三个任务是拿到当前边界的数据
首先是关于这个缓存的维护,比如session已经不再有效,怎么移除,其实我的项目里是还有个map的,他就是来实时更新这个session的最后查询时间的,可以通过定时任务,一旦超过一个时间点,就从sortMap移除。
其次关于时间的建议,我个人建议你在存时间的时候字段设置成Long型,就算不方便你也要一个Date类型一个Long型,Es读取出来的那个时间你不好转Date,所以建议用Long比较,建议你采纳我的建议!!
第三是谈浅分页,其实我们一开始也不是用深分这个方案的,而是通过限制数据的首次加载条数,我们后台逻辑处理好,尽量避免超出那个1w的限制。跟前端也说好,比如我每页50条数据,那么前端那边翻页的总页数就不能大于200,也不要展示总页数,也不要让前端弄那个尾页最大页的那个按钮,就让用户5页5页往后跳。如果你不能的客户不允许这样,那我这边建议你放弃es拥抱MySQL,两难自解!
第四条如果你可以随意选型的话,且你对传统的api不熟悉的话,建议你考虑easy-es,让你操作es如同操作关型数据库,且封装好了深分页查询。
第五是关于上面的代码逻辑,可能还会有漏洞,就是关于跳页的问题,我也许还没有处理的太成熟,但是前端那边老老实实jump的话应该不会出什么问题!
有问题可以联系707409741
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta
查看Ruby的CSV库的文档,我非常确定这是可能且简单的。我只需要使用Ruby删除CSV文件的前三列,但我没有成功运行它。 最佳答案 csv_table=CSV.read(file_path_in,:headers=>true)csv_table.delete("header_name")csv_table.to_csv#=>ThenewCSVinstringformat检查CSV::Table文档:http://ruby-doc.org/stdlib-1.9.2/libdoc/csv/rdoc/CSV/Table.html
我发现ActiveRecord::Base.transaction在复杂方法中非常有效。我想知道是否可以在如下事务中从AWSS3上传/删除文件:S3Object.transactiondo#writeintofiles#raiseanexceptionend引发异常后,每个操作都应在S3上回滚。S3Object这可能吗?? 最佳答案 虽然S3API具有批量删除功能,但它不支持事务,因为每个删除操作都可以独立于其他操作成功/失败。该API不提供任何批量上传功能(通过PUT或POST),因此每个上传操作都是通过一个独立的API调用完成的
我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden
我正在阅读SandiMetz的POODR,并且遇到了一个我不太了解的编码原则。这是代码:classBicycleattr_reader:size,:chain,:tire_sizedefinitialize(args={})@size=args[:size]||1@chain=args[:chain]||2@tire_size=args[:tire_size]||3post_initialize(args)endendclassMountainBike此代码将为其各自的属性输出1,2,3,4,5。我不明白的是查找方法。当一辆山地自行车被实例化时,因为它没有自己的initialize方法
我们的git存储库中目前有一个Gemfile。但是,有一个gem我只在我的环境中本地使用(我的团队不使用它)。为了使用它,我必须将它添加到我们的Gemfile中,但每次我checkout到我们的master/dev主分支时,由于与跟踪的gemfile冲突,我必须删除它。我想要的是类似Gemfile.local的东西,它将继承从Gemfile导入的gems,但也允许在那里导入新的gems以供使用只有我的机器。此文件将在.gitignore中被忽略。这可能吗? 最佳答案 设置BUNDLE_GEMFILE环境变量:BUNDLE_GEMFI
我喜欢使用Textile或Markdown为我的项目编写自述文件,但是当我生成RDoc时,自述文件被解释为RDoc并且看起来非常糟糕。有没有办法让RDoc通过RedCloth或BlueCloth而不是它自己的格式化程序运行文件?它可以配置为自动检测文件后缀的格式吗?(例如README.textile通过RedCloth运行,但README.mdown通过BlueCloth运行) 最佳答案 使用YARD直接代替RDoc将允许您包含Textile或Markdown文件,只要它们的文件后缀是合理的。我经常使用类似于以下Rake任务的东西:
我想让一个yaml对象引用另一个,如下所示:intro:"Hello,dearuser."registration:$introThanksforregistering!new_message:$introYouhaveanewmessage!上面的语法只是它如何工作的一个例子(这也是它在thiscpanmodule中的工作方式。)我正在使用标准的rubyyaml解析器。这可能吗? 最佳答案 一些yaml对象确实引用了其他对象:irb>require'yaml'#=>trueirb>str="hello"#=>"hello"ir
当谈到运行时自省(introspection)和动态代码生成时,我认为ruby没有任何竞争对手,可能除了一些lisp方言。前几天,我正在做一些代码练习来探索ruby的动态功能,我开始想知道如何向现有对象添加方法。以下是我能想到的3种方法:obj=Object.new#addamethoddirectlydefobj.new_method...end#addamethodindirectlywiththesingletonclassclass这只是冰山一角,因为我还没有探索instance_eval、module_eval和define_method的各种组合。是否有在线/离线资