博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringData ES中一些底层原理的分析
阅读量:6462 次
发布时间:2019-06-23

本文共 18665 字,大约阅读时间需要 62 分钟。

之前写过一篇,顺便深入学习下Spring Data Elasticsearch。

 
是针对Elasticsearch的实现。
它跟Spring Data一样,提供了Repository接口,我们只需要定义一个新的接口并继承这个Repository接口,然后就可以注入这个新的接口使用了。
 
定义接口:
 

@Repositorypublic interface TaskRepository extends ElasticsearchRepository
{ }

注入接口进行使用:
 

@Autowiredprivate TaskRepository taskRepository;....taskRepository.save(task);

Repository接口的代理生成
 
上面的例子中TaskRepository是个接口,而我们却直接注入了这个接口并调用方法;很明显,这是错误的。
其实SpringData ES内部基于这个TaskRepository接口构造一个SimpleElasticsearchRepository,真正被注入的是这个SimpleElasticsearchRepository。
这个过程是如何实现的呢?  来分析一下。
ElasticsearchRepositoriesAutoConfiguration自动化配置类会导入ElasticsearchRepositoriesRegistrar这个ImportBeanDefinitionRegistrar。
ElasticsearchRepositoriesRegistrar继承自AbstractRepositoryConfigurationSourceSupport,是个ImportBeanDefinitionRegistrar接口的实现类,会被Spring容器调用registerBeanDefinitions进行自定义bean的注册。
ElasticsearchRepositoriesRegistrar委托给RepositoryConfigurationDelegate完成bean的解析。
整个解析过程可以分3个步骤:
 

  1. 找出模块中的org.springframework.data.repository.Repository接口的实现类或者org.springframework.data.repository.RepositoryDefinition注解的修饰类,并会过滤掉org.springframework.data.repository.NoRepositoryBean注解的修饰类。找出后封装到RepositoryConfiguration中
  2. 遍历这些RepositoryConfiguration,然后构造成BeanDefinition并注册到Spring容器中。需要注意的是这些RepositoryConfiguration会以beanClass为ElasticsearchRepositoryFactoryBean这个类的方式被注册,并把对应的Repository接口当做构造参数传递给ElasticsearchRepositoryFactoryBean,还会设置相应的属性比如elasticsearchOperations、evaluationContextProvider、namedQueries、repositoryBaseClass、lazyInitqueryLookupStrategyKey
  3. ElasticsearchRepositoryFactoryBean被实例化的时候设置对应的构造参数和属性。设置完毕以后调用afterPropertiesSet方法(实现了InitializingBean接口)。在afterPropertiesSet方法内部会去创建RepositoryFactorySupport类,并进行一些初始化,比如namedQueries、repositoryBaseClass等。然后通过这个RepositoryFactorySupport的getRepository方法基于Repository接口创建出代理类,并使用AOP添加了几个MethodInterceptor

 

// 遍历基于第1步条件得到的RepositoryConfiguration集合for (RepositoryConfiguration
configuration : extension .getRepositoryConfigurations(configurationSource, resourceLoader, inMultiStoreMode)) { // 构造出BeanDefinitionBuilder BeanDefinitionBuilder definitionBuilder = builder.build(configuration); extension.postProcess(definitionBuilder, configurationSource); if (isXml) { // 设置elasticsearchOperations属性 extension.postProcess(definitionBuilder, (XmlRepositoryConfigurationSource) configurationSource); } else { // 设置elasticsearchOperations属性 extension.postProcess(definitionBuilder, (AnnotationRepositoryConfigurationSource) configurationSource); } // 使用命名策略生成bean的名字 AbstractBeanDefinition beanDefinition = definitionBuilder.getBeanDefinition(); String beanName = beanNameGenerator.generateBeanName(beanDefinition, registry); if (LOGGER.isDebugEnabled()) { LOGGER.debug(REPOSITORY_REGISTRATION, extension.getModuleName(), beanName, configuration.getRepositoryInterface(), extension.getRepositoryFactoryClassName()); } beanDefinition.setAttribute(FACTORY_BEAN_OBJECT_TYPE, configuration.getRepositoryInterface()); // 注册到Spring容器中 registry.registerBeanDefinition(beanName, beanDefinition); definitions.add(new BeanComponentDefinition(beanDefinition, beanName)); } // build方法 public BeanDefinitionBuilder build(RepositoryConfiguration
configuration) { Assert.notNull(registry, "BeanDefinitionRegistry must not be null!"); Assert.notNull(resourceLoader, "ResourceLoader must not be null!"); // 得到factoryBeanName,这里会使用extension.getRepositoryFactoryClassName()去获得 // extension.getRepositoryFactoryClassName()返回的正是ElasticsearchRepositoryFactoryBean String factoryBeanName = configuration.getRepositoryFactoryBeanName(); factoryBeanName = StringUtils.hasText(factoryBeanName) ? factoryBeanName : extension.getRepositoryFactoryClassName(); // 基于factoryBeanName构造BeanDefinitionBuilder BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(factoryBeanName); builder.getRawBeanDefinition().setSource(configuration.getSource()); // 设置ElasticsearchRepositoryFactoryBean的构造参数,这里是对应的Repository接口 // 设置一些的属性值 builder.addConstructorArgValue(configuration.getRepositoryInterface()); builder.addPropertyValue("queryLookupStrategyKey", configuration.getQueryLookupStrategyKey()); builder.addPropertyValue("lazyInit", configuration.isLazyInit()); builder.addPropertyValue("repositoryBaseClass", configuration.getRepositoryBaseClassName()); NamedQueriesBeanDefinitionBuilder definitionBuilder = new NamedQueriesBeanDefinitionBuilder( extension.getDefaultNamedQueryLocation()); if (StringUtils.hasText(configuration.getNamedQueriesLocation())) { definitionBuilder.setLocations(configuration.getNamedQueriesLocation()); } builder.addPropertyValue("namedQueries", definitionBuilder.build(configuration.getSource())); // 查找是否有对应Repository接口的自定义实现类 String customImplementationBeanName = registerCustomImplementation(configuration); // 存在自定义实现类的话,设置到属性中 if (customImplementationBeanName != null) { builder.addPropertyReference("customImplementation", customImplementationBeanName); builder.addDependsOn(customImplementationBeanName); } RootBeanDefinition evaluationContextProviderDefinition = new RootBeanDefinition( ExtensionAwareEvaluationContextProvider.class); evaluationContextProviderDefinition.setSource(configuration.getSource()); // 设置一些的属性值 builder.addPropertyValue("evaluationContextProvider", evaluationContextProviderDefinition); return builder; } // RepositoryFactorySupport的getRepository方法,获得Repository接口的代理类 public
T getRepository(Class
repositoryInterface, Object customImplementation) { // 获取Repository的元数据 RepositoryMetadata metadata = getRepositoryMetadata(repositoryInterface); // 获取Repository的自定义实现类 Class
customImplementationClass = null == customImplementation ? null : customImplementation.getClass(); // 根据元数据和自定义实现类得到Repository的RepositoryInformation信息类 // 获取信息类的时候如果发现repositoryBaseClass是空的话会根据meta中的信息去自动匹配 // 具体匹配过程在下面的getRepositoryBaseClass方法中说明 RepositoryInformation information = getRepositoryInformation(metadata, customImplementationClass); // 验证 validate(information, customImplementation); // 得到最终的目标类实例,会通过repositoryBaseClass去查找 Object target = getTargetRepository(information); // 创建代理工厂 ProxyFactory result = new ProxyFactory(); result.setTarget(target); result.setInterfaces(new Class[] { repositoryInterface, Repository.class }); // 进行aop相关的设置 result.addAdvice(SurroundingTransactionDetectorMethodInterceptor.INSTANCE); result.addAdvisor(ExposeInvocationInterceptor.ADVISOR); if (TRANSACTION_PROXY_TYPE != null) { result.addInterface(TRANSACTION_PROXY_TYPE); } // 使用RepositoryProxyPostProcessor处理 for (RepositoryProxyPostProcessor processor : postProcessors) { processor.postProcess(result, information); } if (IS_JAVA_8) { // 如果是JDK8的话,添加DefaultMethodInvokingMethodInterceptor result.addAdvice(new DefaultMethodInvokingMethodInterceptor()); } // 添加QueryExecutorMethodInterceptor result.addAdvice(new QueryExecutorMethodInterceptor(information, customImplementation, target)); // 使用代理工厂创建出代理类,这里是使用jdk内置的代理模式 return (T) result.getProxy(classLoader); } // 目标类的获取 protected Class
getRepositoryBaseClass(RepositoryMetadata metadata) { // 如果Repository接口属于QueryDsl,抛出异常。目前还不支持 if (isQueryDslRepository(metadata.getRepositoryInterface())) { throw new IllegalArgumentException("QueryDsl Support has not been implemented yet."); } // 如果主键是数值类型的话,repositoryBaseClass为NumberKeyedRepository if (Integer.class.isAssignableFrom(metadata.getIdType()) || Long.class.isAssignableFrom(metadata.getIdType()) || Double.class.isAssignableFrom(metadata.getIdType())) { return NumberKeyedRepository.class; } else if (metadata.getIdType() == String.class) { // 如果主键是String类型的话,repositoryBaseClass为SimpleElasticsearchRepository return SimpleElasticsearchRepository.class; } else if (metadata.getIdType() == UUID.class) { // 如果主键是UUID类型的话,repositoryBaseClass为UUIDElasticsearchRepository return UUIDElasticsearchRepository.class; } else { // 否则报错 throw new IllegalArgumentException("Unsupported ID type " + metadata.getIdType()); } }

ElasticsearchRepositoryFactoryBean是一个FactoryBean接口的实现类,getObject方法返回的上面提到的getRepository方法返回的代理对象;getObjectType方法返回的是对应Repository接口类型。
我们文章一开始提到的注入TaskRepository的时候,实际上这个对象是ElasticsearchRepositoryFactoryBean类型的实例,只不过ElasticsearchRepositoryFactoryBean实现了FactoryBean接口,所以注入的时候会得到一个代理对象,这个代理对象是由jdk内置的代理生成的,并且它的target对象是SimpleElasticsearchRepository(主键是String类型)。
 
 
SpringData ES中ElasticsearchOperations的介绍
 
ElasticsearchTemplate实现了ElasticsearchOperations接口。
ElasticsearchOperations接口是SpringData对Elasticsearch操作的一层封装,比如有创建索引createIndex方法、获取索引的设置信息getSetting方法、查询对象queryForObject方法、分页查询方法queryForPage、删除文档delete方法、更新文档update方法等等。
ElasticsearchTemplate是具体的实现类,它有这些属性:
 

// elasticsearch提供的基于java的客户端连接接口。java对es集群的操作使用这个接口完成private Client client;// 一个转换器接口,定义了2个方法,分别可以获得MappingContext和ConversionService// MappingContext接口用于获取所有的持久化实体和这些实体的属性// ConversionService目前在SpringData ES中没有被使用 private ElasticsearchConverter elasticsearchConverter; // 内部使用EntityMapper完成对象到json字符串和json字符串到对象的映射。默认使用jackson完成映射,可自定义 private ResultsMapper resultsMapper; // 查询超时时间 private String searchTimeout;

Client接口在ElasticsearchAutoConfiguration自动化配置类里被构造:
 

@Bean@ConditionalOnMissingBeanpublic Client elasticsearchClient() { try { return createClient(); } catch (Exception ex) { throw new IllegalStateException(ex); } }

ElasticsearchTemplate、ElasticsearchConverter以及SimpleElasticsearchMappingContext在ElasticsearchDataAutoConfiguration自动化配置类里被构造:
 

@Bean@ConditionalOnMissingBeanpublic ElasticsearchTemplate elasticsearchTemplate(Client client, ElasticsearchConverter converter) { try { return new ElasticsearchTemplate(client, converter); } catch (Exception ex) { throw new IllegalStateException(ex); } } @Bean @ConditionalOnMissingBean public ElasticsearchConverter elasticsearchConverter( SimpleElasticsearchMappingContext mappingContext) { return new MappingElasticsearchConverter(mappingContext); } @Bean @ConditionalOnMissingBean public SimpleElasticsearchMappingContext mappingContext() { return new SimpleElasticsearchMappingContext(); }

 需要注意的是这个bean被自动化配置类构造的前提是它们在Spring容器中并不存在。
 
Repository的调用过程
 
以自定义的TaskRepository的save方法为例,大致的执行流程如下所示:
SimpleElasticsearchRepository的save方法具体的分析在中分析过。
像自定义的Repository查询方法,或者Repository接口的自定义实现类的操作这些底层,可以去QueryExecutorMethodInterceptor中查看,大家有兴趣的可以自行查看源码。

http://spring4all.com/article/17

最近工作中使用了。发生它存在一个问题:

Document对应的POJO的属性跟es里面文档的字段名字不一样,这样Repository里面编写自定义的查询方法就会查询不出结果。

比如有个Person类,它有2个属性goodFace和goodAt。这2个属性在es的索引里对应的字段表为good_face和good_at:

1
2
3
4
5
6
7
8
9
10
11
@Document(replicas = 1, shards = 1, type = "person", indexName = "person")
@Getter
@Setter
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
public class Person {
@Id
private String id;
private String name;
private boolean goodFace;
private String goodAt;
}

Repository中的自定义查询:

1
2
3
4
5
@Repository
public interface PersonRepository extends ElasticsearchRepository<Person, String> {
List<Person> findByGoodFace(boolean isGoodFace);
List<Person> findByName(String name);
}

方法findByGoodFace是查询不出结果的,而findByName是ok的。

为什么findByGoodFace不行而findByName可以呢,来探究一下。

Person类的name属性跟ES中的字段名是一模一样的,而goodFace字段在ES中的字段是good_face(因为我们使用了SnakeCaseStrategy策略)。

所以产生这个问题的原因在于ES中文档的字段名跟POJO中的字段名不统一造成的。

但是我们使用PersonRepository的save方法保存文档的时候属性和字段是可以对上的。

那为什么使用repository的save方法能对应上文档和字段,而自定义的find方法却不行呢?

ES是使用来完成POJO到json的映射关系的。

在Person类上使用@JsonNaming注解完成POJO和json的映射,我们使用了SnakeCaseStrategy策略,这个策略会把属性从驼峰方式改成小写带下划线的方式。

比如goodAt属性映射的时候就会变成good_at,good_face变成good_face,name变成name。

Spring Data Elasticsearch把对ES的操作封装成了一个ElasticsearchOperations接口。比如queryForObject、queryForPage、count、queryForList方法。

ElasticsearchOperations接口目前有一个实现类ElasticsearchTemplate。

ElasticsearchTemplate内部有个ResultsMapper属性,这个ResultsMapper目前只有一个实现类DefaultResultMapper,DefaultResultMapper内部使用DefaultEntityMapper完成映射。DefaultEntityMapper是个EntityMapper接口的实现类,它的定义如下:

1
2
3
4
public interface EntityMapper {
public String mapToString(Object object) throws IOException;
public <T> T mapToObject(String source, Class<T> clazz) throws IOException;
}

方法很明白:对象到json字符串的转换和json字符串倒对象的转换。

DefaultEntityMapper内部使用jackson的ObjectMapper完成。

自定义的Repository继承自ElasticsearchRepository,最后会使用代理映射成SimpleElasticsearchRepository。

SimpleElasticsearchRepository内部有个属性ElasticsearchOperations用于完成与ES的交互。

我们看下SimpleElasticsearchRepository的save方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Override
public <S extends T> S save(S entity) {
Assert.notNull(entity,
"Cannot save 'null' entity.");
// createIndexQuery方法会构造一个IndexQuery,然后调用ElasticsearchOperations的index方法
elasticsearchOperations.index(createIndexQuery(entity));
elasticsearchOperations.refresh(entityInformation.getIndexName());
return entity;
}
 
// ElasticsearchTemplate的index方法
@Override
public String index(IndexQuery query) {
// 调用prepareIndex方法构造一个IndexRequestBuilder
String documentId = prepareIndex(query).execute().actionGet().getId();
// 设置保存文档的id
if (query.getObject() != null) {
setPersistentEntityId(query.getObject(), documentId);
}
return documentId;
}
 
private IndexRequestBuilder prepareIndex(IndexQuery query) {
try {
// 从@Document注解中得到索引的名字
String indexName = isBlank(query.getIndexName()) ? retrieveIndexNameFromPersistentEntity(query.getObject()
.getClass())[
0] : query.getIndexName();
// 从@Document注解中得到索引的类型
String type = isBlank(query.getType()) ? retrieveTypeFromPersistentEntity(query.getObject().getClass())[
0]
: query.getType();
 
IndexRequestBuilder indexRequestBuilder =
null;
 
if (query.getObject() != null) { // save方法这里保存的object就是POJO
// 得到id字段
String id = isBlank(query.getId()) ? getPersistentEntityId(query.getObject()) : query.getId();
if (id != null) { // 如果设置了id字段
indexRequestBuilder = client.prepareIndex(indexName, type, id);
}
else { // 如果没有设置id字段
indexRequestBuilder = client.prepareIndex(indexName, type);
}
// 使用ResultsMapper映射POJO到json字符串
indexRequestBuilder.setSource(resultsMapper.getEntityMapper().mapToString(query.getObject()));
}
else if (query.getSource() != null) { // 如果自定义了source属性,直接赋值
indexRequestBuilder = client.prepareIndex(indexName, type, query.getId()).setSource(query.getSource());
}
else { // 没有设置object属性或者source属性,抛出ElasticsearchException异常
throw new ElasticsearchException("object or source is null, failed to index the document [id: " + query.getId() + "]");
}
if (query.getVersion() != null) { // 设置版本
indexRequestBuilder.setVersion(query.getVersion());
indexRequestBuilder.setVersionType(EXTERNAL);
}
 
if (query.getParentId() != null) { // 设置parentId
indexRequestBuilder.setParent(query.getParentId());
}
 
return indexRequestBuilder;
}
catch (IOException e) {
throw new ElasticsearchException("failed to index the document [id: " + query.getId() + "]", e);
}
}

save方法使用ResultsMapper完成了POJO到json的转换,所以save方法保存成功对应的文档数据:

1
indexRequestBuilder.setSource(resultsMapper.getEntityMapper().mapToString(query.getObject()));

自定义的findByGoodFace方法:

由于是Repository中的自定义方法,会被Spring Data通过代理进行构造,内部还是用了AOP,最终在QueryExecutorMethodInterceptor中并解析成ElasticsearchPartQuery这个RepositoryQuery接口的实现类,然后调用execute方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public Object execute(Object[] parameters) {
ParametersParameterAccessor accessor =
new ParametersParameterAccessor(queryMethod.getParameters(), parameters);
CriteriaQuery query = createQuery(accessor);
if(tree.isDelete()) { // 如果是删除方法
Object result = countOrGetDocumentsForDelete(query, accessor);
elasticsearchOperations.delete(query, queryMethod.getEntityInformation().getJavaType());
return result;
}
else if (queryMethod.isPageQuery()) { // 如果是分页查询
query.setPageable(accessor.getPageable());
return elasticsearchOperations.queryForPage(query, queryMethod.getEntityInformation().getJavaType());
}
else if (queryMethod.isStreamQuery()) { // 如果是流式查询
Class<?> entityType = queryMethod.getEntityInformation().getJavaType();
if (query.getPageable() == null) {
query.setPageable(
new PageRequest(0, 20));
}
 
return StreamUtils.createStreamFromIterator((CloseableIterator<Object>) elasticsearchOperations.stream(query, entityType));
 
}
else if (queryMethod.isCollectionQuery()) { // 如果是集合查询
if (accessor.getPageable() == null) {
int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
query.setPageable(
new PageRequest(0, Math.max(1, itemCount)));
}
else {
query.setPageable(accessor.getPageable());
}
return elasticsearchOperations.queryForList(query, queryMethod.getEntityInformation().getJavaType());
}
else if (tree.isCountProjection()) { // 如果是count查询
return elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
}
// 单个查询
return elasticsearchOperations.queryForObject(query, queryMethod.getEntityInformation().getJavaType());
}

findByGoodFace方法是个集合查询,最终会调用ElasticsearchOperations的queryForList方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public <T> List<T> queryForList(CriteriaQuery query, Class<T> clazz) {
// 调用queryForPage方法
return queryForPage(query, clazz).getContent();
}
 
@Override
public <T> Page<T> queryForPage(CriteriaQuery criteriaQuery, Class<T> clazz) {
// 查询解析器进行语法的解析
QueryBuilder elasticsearchQuery =
new CriteriaQueryProcessor().createQueryFromCriteria(criteriaQuery.getCriteria());
QueryBuilder elasticsearchFilter =
new CriteriaFilterProcessor().createFilterFromCriteria(criteriaQuery.getCriteria());
SearchRequestBuilder searchRequestBuilder = prepareSearch(criteriaQuery, clazz);
 
if (elasticsearchQuery != null) {
searchRequestBuilder.setQuery(elasticsearchQuery);
}
else {
searchRequestBuilder.setQuery(QueryBuilders.matchAllQuery());
}
 
if (criteriaQuery.getMinScore() > 0) {
searchRequestBuilder.setMinScore(criteriaQuery.getMinScore());
}
 
if (elasticsearchFilter != null)
searchRequestBuilder.setPostFilter(elasticsearchFilter);
if (logger.isDebugEnabled()) {
logger.debug(
"doSearch query:\n" + searchRequestBuilder.toString());
}
 
SearchResponse response = getSearchResponse(searchRequestBuilder
.execute());
// 最终的结果是用ResultsMapper进行映射
return resultsMapper.mapResults(response, clazz, criteriaQuery.getPageable());
}

自定义的方法使用ElasticsearchQueryCreator去创建CriteriaQuery,内部做一些词法的分析,有了CriteriaQuery之后,使用CriteriaQueryProcessor基于Criteria构造了QueryBuilder,最后使用QueryBuilder去做rest请求得到es的查询结果。这些过程中是没有用到ResultsMapper,而只是用反射得到POJO的属性,只有在得到查询结果后才会用ResultsMapper去做映射。

如果出现了这种情况,解决方案目前有两种:

1.使用repository的search方法,参数可以是QueryBuilder或者SearchQuery

1
2
3
4
personRepository.search(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(
"good_face", true))
)

2.使用@Query注解

1
2
@Query("{\"bool\" : {\"must\" : {\"term\" : {\"good_face\" : \"?0\"}}}}")
List<Person> findByGoodFace(boolean isGoodFace);

暂时发现这两种解决方法,不知还有否更好的解决方案。http://fangjian0423.github.io/2017/05/24/spring-data-es-query-problem/

 

转载地址:http://wshzo.baihongyu.com/

你可能感兴趣的文章
[Processing]点到线段的最小距离
查看>>
考研随笔2
查看>>
乱码的情况
查看>>
虚拟机centos 同一个tomcat、不同端口访问不同的项目
查看>>
在不花一分钱的情况下,如何验证你的创业想法是否可行?《转》
查看>>
Linux/Android 性能优化工具 perf
查看>>
GitHub使用教程、注册与安装
查看>>
CODE[VS] 1294 全排列
查看>>
<<The C Programming Language>>讀書筆記
查看>>
JS详细入门教程(上)
查看>>
Android学习笔记21-ImageView获取网络图片
查看>>
线段树分治
查看>>
git代码冲突
查看>>
poll
查看>>
解析查询 queryString 请求参数的函数
查看>>
学生选课系统数据存文件
查看>>
我的毕设总结所用的技术和只是要点 基于stm32F4的AGV嵌入式控制系统的设计
查看>>
JMeter—断言
查看>>
C++的新类创建:继承与组合
查看>>
asp操作access提示“无法从指定的数据表中删除”
查看>>