概述
这篇文章详细介绍了天真的实现流式从数据库更新到对该数据感兴趣的任何其他组件。 更准确地说,如何更改Spring Data R2DBC存储库以向相关订阅者发出事件。
A little bit of background knowledge of R2DBC and Spring will be helpful for this post. My previous writings, 一种synchronous RDBMS access with Spring Data R2DBC and Spring Data R2DBC for Microsoft SQL Server should help in that regard.
如前所述,这将是一个幼稚的实现。 因此,代码不会花哨。
为此,我劫持了一个简单的R2dbc存储库创建一个存储库实现,每次保存新记录时都会发出一个事件。 新事件已添加到直接处理器并发送给任何发行人s订阅了它。 看起来像:
class PersonRepository(
entity: RelationalEntityInformation<Person, Int>,
databaseClient: DatabaseClient,
converter: R2dbcConverter,
accessStrategy: ReactiveDataAccessStrategy
) : SimpleR2dbcRepository<Person, Int>(entity, databaseClient, converter, accessStrategy) {
private val source: DirectProcessor<Person> = DirectProcessor.create<Person>()
val events: Flux<Person> = source
override fun <S : Person> save(objectToSave: S): Mono<S> {
return super.save(objectToSave).doOnNext(source::onNext)
}
}
唯一的功能简单的R2dbc存储库需要覆盖的是保存(保存All代表参加保存)。doOnNext is added to the original 保存 call, which pushes a new event to the 资源(the 导演处理器)onNext。
的资源投放到助焊剂防止来自存储库外部的类添加新事件。 从技术上讲,他们仍然可以添加事件,但是他们需要自己进行转换。
您可能已经注意到,存储库正在加载大量参数并将其传递给简单的R2dbc存储库。 存储库的一个实例需要手动创建,因为它的某些依赖项无法自动注入:
@Configuration
class RepositoryConfiguration {
@Bean
fun personRepository(
databaseClient: DatabaseClient,
dataAccessStrategy: ReactiveDataAccessStrategy
): PersonRepository {
val entity: RelationalPersistentEntity<Person> = dataAccessStrategy
.converter
.mappingContext
.getRequiredPersistentEntity(Person::class.java) as RelationalPersistentEntity<Person>
val relationEntityInformation: MappingRelationalEntityInformation<Person, Int> =
MappingRelationalEntityInformation(entity, Int::class.java)
return PersonRepository(
relationEntityInformation,
databaseClient,
dataAccessStrategy.converter,
dataAccessStrategy
)
}
}
至此,所有内容都已设置好并可以使用。 以下是其工作的示例:
personRepository.events
.doOnComplete { log.info("Events flux has closed") }
.subscribe { log.info("From events stream - $it") }
// insert people records over time
MARVEL_CHARACTERS
.toFlux()
.delayElements(Duration.of(1, SECONDS))
.concatMap { personRepository.save(it) }
.subscribe()
哪个输出:
29-08-2019 09:08:27.674 [reactor-tcp-nio-1] From events stream - Person(id=481, name=Spiderman, age=18)
29-08-2019 09:08:28.550 [reactor-tcp-nio-2] From events stream - Person(id=482, name=Ironman, age=48)
29-08-2019 09:08:29.555 [reactor-tcp-nio-3] From events stream - Person(id=483, name=Thor, age=1000)
29-08-2019 09:08:30.561 [reactor-tcp-nio-4] From events stream - Person(id=484, name=Hulk, age=49)
29-08-2019 09:08:31.568 [reactor-tcp-nio-5] From events stream - Person(id=485, name=Antman, age=49)
29-08-2019 09:08:32.571 [reactor-tcp-nio-6] From events stream - Person(id=486, name=Blackwidow, age=34)
29-08-2019 09:08:33.576 [reactor-tcp-nio-7] From events stream - Person(id=487, name=Starlord, age=38)
29-08-2019 09:08:34.581 [reactor-tcp-nio-8] From events stream - Person(id=488, name=Captain America, age=100)
29-08-2019 09:08:35.585 [reactor-tcp-nio-9] From events stream - Person(id=489, name=Warmachine, age=50)
29-08-2019 09:08:36.589 [reactor-tcp-nio-10] From events stream - Person(id=490, name=Wasp, age=26)
29-08-2019 09:08:37.596 [reactor-tcp-nio-11] From events stream - Person(id=491, name=Winter Soldier, age=101)
29-08-2019 09:08:38.597 [reactor-tcp-nio-12] From events stream - Person(id=492, name=Black Panther, age=42)
29-08-2019 09:08:39.604 [reactor-tcp-nio-1] From events stream - Person(id=493, name=Doctor Strange, age=42)
29-08-2019 09:08:40.609 [reactor-tcp-nio-2] From events stream - Person(id=494, name=Gamora, age=29)
29-08-2019 09:08:41.611 [reactor-tcp-nio-3] From events stream - Person(id=495, name=Groot, age=4)
29-08-2019 09:08:42.618 [reactor-tcp-nio-4] From events stream - Person(id=496, name=Hawkeye, age=47)
29-08-2019 09:08:43.620 [reactor-tcp-nio-5] From events stream - Person(id=497, name=Pepper Potts, age=44)
29-08-2019 09:08:44.627 [reactor-tcp-nio-6] From events stream - Person(id=498, name=Captain Marvel, age=59)
29-08-2019 09:08:45.631 [reactor-tcp-nio-7] From events stream - Person(id=499, name=Rocket Raccoon, age=30)
29-08-2019 09:08:46.637 [reactor-tcp-nio-8] From events stream - Person(id=500, name=Drax, age=49)
29-08-2019 09:08:47.639 [reactor-tcp-nio-9] From events stream - Person(id=501, name=Nebula, age=30)
每秒保存一条记录,该记录与从存储库发出的事件相匹配。
至少在此基本实现中,这就是全部。 我敢肯定还有很多事情可以做,但是我首先需要弄清楚该怎么做……总而言之,加上一些补充,您可以将插入数据库的数据流式传输到对记录感兴趣的组件 被添加。
If you enjoyed this post or found it helpful (or both) then please feel free to follow me on Twitter at @LankyDanDev and remember to share with anyone else who might find this useful!
from: https://dev.to//lankydandev/streaming-live-updates-from-a-reactive-spring-data-repository-8f
最后
以上就是粗心煎蛋为你收集整理的从响应式Spring Data存储库流式传输实时更新的全部内容,希望文章能够帮你解决从响应式Spring Data存储库流式传输实时更新所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复