我是靠谱客的博主 任性蜜粉,最近开发中收集的这篇文章主要介绍Lagom Framework参考指南(五)1.Persistent Entity2. Storing Persistent Entities in Cassandra3. Storing Persistent Entities in a Relational Database4. Persistent Read-Side5. Cassandra Read-Side support6. Relational Database Read-Side support 关系数据库读取方面支持7. JDB,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
Writing persistent and clustered services(编写持久的和群集的服务)
1.Persistent Entity
事件源和CQRS是本节推荐的介绍。
PersistentEntity具有一个稳定的实体标识符,可以通过这个标识,从服务实现或其他地方访问它。使用事件源,实体的状态是可以持久化的(持久的)。我们将所有状态更改表示为事件,这些不可变的事实被附加到事件日志中。当启动时,我们可以通过回放事件,来重新创建实体的当前的状态。
一个持久化实体对应于域驱动设计术语中的聚合根。每个实例都有一个稳定的标识符,对于给定的id,只会有一个实体的实例。Lagom负责将这些实例分布到服务的集群中。如果知道标识符,可以向实体发送消息,即命令。
持久化实体也是一个事务边界。不变量可以在一个实体中保持,但不能在多个实体中保持。
如果您熟悉JPA,那么值得注意的是,一个PersistentEntity可以用于类似于JPA @Entity的类似东西,但是几个方面却截然不同。例如,JPA @Entity从数据库中从需要的地方加载,也就是说,可能有许多具有相同实体标识符的Java对象实例。相反,只有一个使用给定标识符的PersistentEntity实例。使用JPA,您通常只存储当前状态,但是如何达到这个当前的状态历史却是无法获取到的。
通过向它发送命令消息,您与一个PersistentEntity交互。
对于指定的实体实例,命令依次处理,一次一个。命令可能会导致状态更改,这些更改被持久化为表示命令的效果的事件。当前状态不是为每一个变更而存储,因为它可以从事件中派生出来。这些事件只会被附加到存储,没有会突变的,这就带来了非常高的事务率和有效率的复制。
实体会自动分布在服务集群中的节点上。每个实体只在一个地方运行,消息可以发送给实体,而不需要发送方知道实体的位置。如果一个节点停止了,当下一次向这个实体发送消息,那么运行在这个节点上的实体将会在其他的节点上重新启动。
如果一个节点停止了运行在该节点上的实体,那么下次将在另一个节点上启动消息。当将新节点添加到集群中时,一些现有的实体将重新平衡到新节点以分散负载。当将新节点添加到集群中时,一些现有的实体将重新平衡到新节点以分散负载。
一个实体被保存,在内存中保持它的当前状态,只要它被使用。当它不被使用一段时间后,它将自动被钝化以释放资源。
当一个实体启动时,它会重新播放存储的事件来恢复当前状态。这可以是更改的全部历史或者从一个我们保存的快照开始以这将减少恢复时间。
1.1 Choosing a database
>Cassandra
>PstgreSQL
>MySQL
>Oracle
>H2
我们建议是Cassandra,Cassandra是一个可伸缩的分布式数据库,它还可以灵活地支持响应式服务的典型用例。与大多数关系数据库相比,它天生支持切分和复制,并且正在成为行业标准的开源NoSQL数据库。
Lagom还提供了开箱即用的支持以在开发环境中中运行Cassandra,开发人员不需要安装、配置和管理自己当使用Lagom Cassandra,极大的提高了开发速度,这意味着开发人员花天设置开发环境,然后才能开始生产项目的日子已经一去不复返了。
有关配置您的项目使用Cassandra的说明,请参见使用 Using Cassandra for Persistent Entities。如果您希望使用上面列出的关系数据库之一,请参阅Using a Relational Database for Persistent Entities 如何配置项目。
1.2 PersistentEntity Stub
这是在填写实现细节之前,PersistentEntity类的样子:
import com.lightbend.lagom.javadsl.persistence.PersistentEntity;
public class Post1
extends PersistentEntity<BlogCommand, BlogEvent, BlogState> {
@Override
public Behavior initialBehavior(Optional<BlogState> snapshotState) {
BehaviorBuilder b = newBehaviorBuilder(
snapshotState.orElse(BlogState.EMPTY));
// TODO define command and event handlers
return b.build();
}
}
扩展的PersistentEntity类的三个类型参数定义:
>Command :命令的超类/接口
>Event :事件的超类/接口
>State
: 状态的类
initialBehavior是您的具体子类必须实现的抽象方法。它返回实体的Behavior。 使用newBehaviorBuilder 来为定义的行为而创建一个可变的构造器。这个行为是包含由当前状态和处理传入命令的函数,以及在下面几节中描述的持久化时间所组成。
1.3 Command Handlers
处理传入的命令函数,通过BehaviorBuilder的setCommandHandler 来注册到Behavior 上。
// Command handlers are invoked for incoming messages (commands).
// A command handler must "return" the events to be persisted (if any).
b.setCommandHandler(AddPost.class, (AddPost cmd, CommandContext<AddPostDone> ctx) -> {
final PostAdded postAdded = new PostAdded(entityId(), cmd.getContent());
return ctx.thenPersist(postAdded, (PostAdded evt) ->
// After persist is done additional side effects can be performed
ctx.reply(new AddPostDone(entityId())));
});
您应该为实体可以接收的每个命令类定义一个命令处理程序。
命令处理程序返回一个持久指令,该指令定义了什么事件或事件集合,如果怎么样的话就要持久化。使用传递给命令处理器函数的上下文中的thenPersist, thenPersistAll or done方法,来创建保存指令。
>thenPersist :持久化单个事件
>thenPersistAll 自动的持久化几个事件,事务存储,要么全部存储,要么全部失败
>done:没有事件需要持久化
在完成afterPersist功能后,可以执行外部副作用。在上面的例子中,一个答复是用ctx.reply方法发送的。
在持久化状态更改之前,可以验证该命令。使用ctx.invalidCommand或ctx.commandFailed ,以拒绝无效命令。
b.setCommandHandler(AddPost.class, (AddPost cmd, CommandContext<AddPostDone> ctx) -> {
if (cmd.getContent().getTitle() == null || cmd.getContent().getTitle().equals("")) {
ctx.invalidCommand("Title must be defined");
return ctx.done();
}
一个PersistentEntity 一样可以处理不改变应用状态的命令,例如查询命令或在实体的当前状态中无效的命令(如拍卖结束后的投标)。这样的命令处理程序注册使用BehaviorBuilder的setReadOnlyCommandHandler方法。响应被发送到传递给命令处理程序函数的上下文的reply方法。
setReadOnlyCommandHandler只是一个方便的函数,避免您必须返回任何事件,其次是副作用。
b.setReadOnlyCommandHandler(GetPost.class, (cmd, ctx) ->
ctx.reply(state().getContent().get()));
命令必须是不可变的,以避免可能发生的并发问题,这些问题可能发生在更改已发送的命令实例的情况下。
Immutable Objects 这一节讲了如何定义不可变的命令类。
1.4 Event Handlers
当事件被成功持久化时,当前状态将通过将事件应用到当前状态来更新。更新状态的函数是由BehaviorBuilder的setEventHandler方法注册的
// Event handlers are used both when persisting new events
// and when replaying events.
b.setEventHandler(PostAdded.class, evt ->
new BlogState(Optional.of(evt.getContent()), false));
您应该为每个事件类定义一个事件处理程序,事件实体是可以持久化的。
事件处理程序返回新状态。状态必须是不可变的,因此您返回一个新的状态实例。使用PersistentEntity的state方法可以从事件处理程序访问当前状态。当实体开始从存储事件中恢复状态时,也会使用相同的事件处理程序。
事件必须是不可变的,以避免可能发生的并发性问题,这些问题可能发生在将要持久化的事件实例中。
Immutable Objects 这一节讲了怎么定义不可变的事件类
1.5 Replies
每个命令必须通过实现 PersistentEntity.ReplyType 接口使,定义哪种类型的消息用作为对命令的回应。
final class AddPost implements BlogCommand, PersistentEntity.ReplyType<AddPostDone> {
private final PostContent content;
@JsonCreator
public AddPost(PostContent content) {
this.content = content;
}
public PostContent getContent() {
return content;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AddPost addPost = (AddPost) o;
return content.equals(addPost.content);
}
@Override
public int hashCode() {
return content.hashCode();
}
}
使用传递给命令处理器函数的上下文的reply方法来发送回复信息。
通常,应答将是确认实体成功地处理了该命令,即您在持久化之后发送它。
b.setCommandHandler(ChangeBody.class,
(cmd, ctx) -> ctx.thenPersist(new BodyChanged(entityId(), cmd.getBody()), evt ->
ctx.reply(Done.getInstance())));
为了方便起见,您可以使用akk.done作为确认消息。
它也可以是对只读查询命令的回复。
b.setReadOnlyCommandHandler(GetPost.class, (cmd, ctx) ->
ctx.reply(state().getContent().get()));
您可以使用ctx.invalidCommand 拒绝无效命令,该命令将在发送端,使CompletionStage 以PersistentEntity.InvalidCommandException失败。
你可以通过ctx.commandFailed发出否定的应答,这将在发送端以给定的异常,使CompletionStage 失败。
如果持久化事件失败,则会自动发送消极应答,它将在发送端,使CompletionStage以PersistentEntity.PersistException失败。
如果PersistentEntity接收到一个没有注册命令处理程序的命令,则自动发送一个否定应答。这在发送端会使CompletionStage 以PersistentEntity.UnhandledCommandException失败。
如果您不回复一个命令,发送方的CompletionStage将在超时后使用akka.pattern.AskTimeoutException完成。
1.6 Changing Behavior
事件处理程序通常只更新状态,但是它们也可能改变实体的行为,因为可以定义处理命令和事件的新函数。在实现类似实体的有限状态机(FSM)时,这一点非常有用。更改行为的事件处理程序通过BehaviorBuilder的setEventHandlerChangingBehavior 方法改变。这样的事件处理程序返回新的Behavior ,而不是返回新状态。
b.setEventHandlerChangingBehavior(PostAdded.class, evt ->
becomePostAdded(new BlogState(Optional.of(evt.getContent()), false)));
private Behavior becomePostAdded(BlogState newState) {
BehaviorBuilder b = newBehaviorBuilder(newState);
b.setReadOnlyCommandHandler(GetPost.class, (cmd, ctx) ->
ctx.reply(state().getContent().get()));
b.setCommandHandler(ChangeBody.class,
(cmd, ctx) -> ctx.thenPersist(new BodyChanged(entityId(), cmd.getBody()), evt ->
ctx.reply(Done.getInstance())));
b.setEventHandler(BodyChanged.class, evt -> state().withBody(evt.getBody()));
return b.build();
}
在上面的例子中,我们正在通过newBehaviorBuilder创建一个全新的Behavior。它也可以从当前的Behavior启动 ,并可以修改它。您可以使用PersistentEntity的behavior 方法访问当前行为,然后使用Behavior的builder 方法。
1.7 Snapshots
当实体已经启动了,重新播放存储事件来恢复状态。为了减少恢复时间,实体可以从状态快照开始恢复,然后只重新播放快照后存储的事件。
这样的快照在被配置的持久事件数量之后自动保存。如果任何的快照作为参数传递给initialBehavior方法,那么您应该使用该状态作为返回的Behavior的状态。
要记住的一件事是,如果您正在使用事件处理程序,改变行为(setEventHandlerChangingBehavior)你还必须从快照状态恢复相应的Behavior,这个快照状态是作为initialBehavior 方法的参数传递来的。
@Override
public Behavior initialBehavior(Optional<BlogState> snapshotState) {
if (snapshotState.isPresent() && !snapshotState.get().isEmpty()) {
// behavior after snapshot must be restored by initialBehavior
// if we have a non-empty BlogState we know that the initial
// AddPost has been performed
return becomePostAdded(snapshotState.get());
} else {
// behavior when no snapshot is used
BehaviorBuilder b = newBehaviorBuilder(BlogState.EMPTY);
// TODO define command and event handlers
return b.build();
}
}
状态必须是不可变的,以避免可能发生的并发性问题,这些问题可能发生在即将被保存为快照的状态实例中。
Immutable Objects 这一节讲了怎么定义不可变的状态类
1.8 Usage from Service Implementation
要从服务实现中访问实体,首先需要注入PersistentEntityRegistry,然后在启动(在构造函数中)注册实现了PersistentEntity的类。
在服务方法中,从注册表中检索给定实体标识符的PersistentEntityRef。然后,可以使用persistenttyref的ask方法将命令发送到实体。请求返回一个带有完整的CompletionStage 的回复消息。
import com.lightbend.lagom.javadsl.persistence.PersistentEntityRef;
import javax.inject.Inject;
import com.lightbend.lagom.javadsl.persistence.PersistentEntityRegistry;
import com.lightbend.lagom.javadsl.api.*;
public class BlogServiceImpl implements BlogService {
private final PersistentEntityRegistry persistentEntities;
@Inject
public BlogServiceImpl(PersistentEntityRegistry persistentEntities) {
this.persistentEntities = persistentEntities;
persistentEntities.register(Post.class);
}
@Override
public ServiceCall<BlogCommand.AddPost, String> addPost(String id) {
return request -> {
PersistentEntityRef<BlogCommand> ref =
persistentEntities.refFor(Post.class, id);
return ref.ask(request).thenApply(ack -> "OK");
};
}
}
在本例中,我们使用了命令AddPost作为服务方法的请求参数,但是您当然可以使用另一种类型的服务的外部API。
命令作为消息发送给可能在不同节点上运行的实体。如果该节点由于网络问题而无法使用,则JVM崩溃或类似的消息可能会丢失,直到问题被检测到,并且实体已经迁移到另一个节点。在这种情况下ask方法将会超时,CompletionStage 也将以akka.pattern.AskTimeoutException而完成结束。
请注意,AskTimeoutException并不能保证该命令未被处理。例如,可能已经处理过命令,但是应答消息丢失了。
1.9 Serialization
JSON是持久事件和状态的推荐格式。序列化部分描述如何将杰克逊序列化支持添加到此类类,以及如何改进类,这对于持久状态和事件尤其重要,因为您必须能够反序列化存储的旧对象。
1.10 Unit Testing
单元测试的实体可以使用PersistentEntityTestDriver,将不使用一个数据库的情况下运行PersistentEntity。您可以验证它会在响应传入命令时发出预期的事件和副作用。
import static org.junit.Assert.assertEquals;
import java.util.Collections;
import java.util.Optional;
import com.lightbend.lagom.javadsl.persistence.PersistentEntity.InvalidCommandException;
import com.lightbend.lagom.javadsl.testkit.PersistentEntityTestDriver;
import com.lightbend.lagom.javadsl.testkit.PersistentEntityTestDriver.Outcome;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import akka.Done;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
public class PostTest {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create();
}
@AfterClass
public static void teardown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void testAddPost() {
PersistentEntityTestDriver<BlogCommand, BlogEvent, BlogState> driver =
new PersistentEntityTestDriver<>(system, new Post(), "post-1");
PostContent content = new PostContent("Title", "Body");
Outcome<BlogEvent, BlogState> outcome = driver.run(
new AddPost(content));
assertEquals(new PostAdded("post-1", content),
outcome.events().get(0));
assertEquals(1, outcome.events().size());
assertEquals(false, outcome.state().isPublished());
assertEquals(Optional.of(content), outcome.state().getContent());
assertEquals(new AddPostDone("post-1"), outcome.getReplies().get(0));
assertEquals(Collections.emptyList(), outcome.issues());
}
@Test
public void testInvalidTitle() {
PersistentEntityTestDriver<BlogCommand, BlogEvent, BlogState> driver =
new PersistentEntityTestDriver<>(system, new Post(), "post-1");
Outcome<BlogEvent, BlogState> outcome = driver.run(
new AddPost(new PostContent("", "Body")));
assertEquals(InvalidCommandException.class,
outcome.getReplies().get(0).getClass());
assertEquals(0, outcome.events().size());
assertEquals(Collections.emptyList(), outcome.issues());
}
@Test
public void testChangeBody() {
PersistentEntityTestDriver<BlogCommand, BlogEvent, BlogState> driver =
new PersistentEntityTestDriver<>(system, new Post(), "post-1");
driver.run(new AddPost(new PostContent("Title", "Body")));
Outcome<BlogEvent, BlogState> outcome = driver.run(
new ChangeBody("New body 1"),
new ChangeBody("New body 2"));
assertEquals(new BodyChanged("post-1", "New body 1"), outcome.events().get(0));
assertEquals(new BodyChanged("post-1", "New body 2"), outcome.events().get(1));
assertEquals(2, outcome.events().size());
assertEquals(false, outcome.state().isPublished());
assertEquals("New body 2", outcome.state().getContent().get().getBody());
assertEquals(Done.getInstance(), outcome.getReplies().get(0));
assertEquals(Done.getInstance(), outcome.getReplies().get(1));
assertEquals(2, outcome.getReplies().size());
assertEquals(Collections.emptyList(), outcome.issues());
}
}
run可能被多次调用,以将命令序列分成可管理的步骤。结果包含上一次run的事件和副作用,但是状态不会在不同的运行之间重置。
请注意,它还验证了所有命令、事件、回复和状态都是可序列化的,并在结果的问题中报告任何此类问题。
要使用此特性,请在项目的构建中添加以下功能。
在maven中
<dependency>
<groupId>com.lightbend.lagom</groupId>
<artifactId>lagom-javadsl-testkit_2.11</artifactId>
<version>${lagom.version}</version>
<scope>test</scope>
</dependency>
1.11 完整样例
<dependency>
<groupId>com.lightbend.lagom</groupId>
<artifactId>lagom-javadsl-testkit_2.11</artifactId>
<version>${lagom.version}</version>
<scope>test</scope>
</dependency>
import com.fasterxml.jackson.annotation.JsonCreator;
import com.lightbend.lagom.javadsl.persistence.PersistentEntity;
import com.lightbend.lagom.serialization.Jsonable;
import akka.Done;
public interface BlogCommand extends Jsonable {
//#AddPost
final class AddPost implements BlogCommand, PersistentEntity.ReplyType<AddPostDone> {
private final PostContent content;
@JsonCreator
public AddPost(PostContent content) {
this.content = content;
}
public PostContent getContent() {
return content;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AddPost addPost = (AddPost) o;
return content.equals(addPost.content);
}
@Override
public int hashCode() {
return content.hashCode();
}
}
//#AddPost
final class AddPostDone implements Jsonable {
private final String postId;
@JsonCreator
public AddPostDone(String postId) {
this.postId = postId;
}
public String getPostId() {
return postId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AddPostDone that = (AddPostDone) o;
return postId.equals(that.postId);
}
@Override
public int hashCode() {
return postId.hashCode();
}
}
enum GetPost implements BlogCommand, PersistentEntity.ReplyType<PostContent> {
INSTANCE
}
final class ChangeBody implements BlogCommand, PersistentEntity.ReplyType<Done> {
private final String body;
@JsonCreator
public ChangeBody(String body) {
this.body = body;
}
public String getBody() {
return body;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChangeBody that = (ChangeBody) o;
return body.equals(that.body);
}
@Override
public int hashCode() {
return body.hashCode();
}
}
enum Publish implements BlogCommand, PersistentEntity.ReplyType<Done> {
INSTANCE
}
}
import com.fasterxml.jackson.annotation.JsonCreator;
import com.lightbend.lagom.javadsl.persistence.AggregateEvent;
import com.lightbend.lagom.javadsl.persistence.AggregateEventShards;
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag;
import com.lightbend.lagom.serialization.Jsonable;
import org.pcollections.PSequence;
//#sharded-tags
interface BlogEvent extends Jsonable, AggregateEvent<BlogEvent> {
int NUM_SHARDS = 20;
AggregateEventShards<BlogEvent> TAG =
AggregateEventTag.sharded(BlogEvent.class, NUM_SHARDS);
@Override
default AggregateEventShards<BlogEvent> aggregateTag() {
return TAG;
}
//#sharded-tags
final class PostAdded implements BlogEvent {
private final String postId;
private final PostContent content;
@JsonCreator
public PostAdded(String postId, PostContent content) {
this.postId = postId;
this.content = content;
}
public String getPostId() {
return postId;
}
public PostContent getContent() {
return content;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PostAdded postAdded = (PostAdded) o;
if (!postId.equals(postAdded.postId)) return false;
return content.equals(postAdded.content);
}
@Override
public int hashCode() {
int result = postId.hashCode();
result = 31 * result + content.hashCode();
return result;
}
}
final class BodyChanged implements BlogEvent {
private final String postId;
private final String body;
@JsonCreator
public BodyChanged(String postId, String body) {
this.postId = postId;
this.body = body;
}
public String getPostId() {
return postId;
}
public String getBody() {
return body;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BodyChanged that = (BodyChanged) o;
if (!postId.equals(that.postId)) return false;
return body.equals(that.body);
}
@Override
public int hashCode() {
int result = postId.hashCode();
result = 31 * result + body.hashCode();
return result;
}
}
final class PostPublished implements BlogEvent {
private final String postId;
public PostPublished(String postId) {
this.postId = postId;
}
public String getPostId() {
return postId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PostPublished that = (PostPublished) o;
return postId.equals(that.postId);
}
@Override
public int hashCode() {
return postId.hashCode();
}
}
}
public final class BlogState implements Jsonable {
public static final BlogState EMPTY = new BlogState(Optional.empty(), false);
private final Optional<PostContent> content;
private final boolean published;
@JsonCreator
public BlogState(Optional<PostContent> content, boolean published) {
this.content = content;
this.published = published;
}
public BlogState withBody(String body) {
if (isEmpty())
throw new IllegalStateException("Can't set body without content");
PostContent c = content.get();
return new BlogState(Optional.of(new PostContent(c.getTitle(), body)), published);
}
@JsonIgnore
public boolean isEmpty() {
return !content.isPresent();
}
public Optional<PostContent> getContent() {
return content;
}
public boolean isPublished() {
return published;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BlogState blogState = (BlogState) o;
if (published != blogState.published) return false;
return content.equals(blogState.content);
}
@Override
public int hashCode() {
int result = content.hashCode();
result = 31 * result + (published ? 1 : 0);
return result;
}
}
1.12 重构的考虑
如果您更改了PersistentEntity的类名,则必须重写entityTypeName并保留原来的名称,因为这个名称是存储数据的键的一部分(它是底层持久参与者的持久性的一部分)。默认情况下,entityTypeName使用的是具体的PersistentEntity类的短类名
1.13 Configuration
默认的配置应该是一个好的起点,接下来的设置可能会被修改,以定制需要的行为。以下是针对Lagom持久性的非数据库特定设置的列表:
lagom.persistence {
# As a rule of thumb, the number of shards should be a factor ten greater
# than the planned maximum number of cluster nodes. Less shards than number
# of nodes will result in that some nodes will not host any shards. Too many
# shards will result in less efficient management of the shards, e.g.
# rebalancing overhead, and increased latency because the coordinator is
# involved in the routing of the first message for each shard. The value
# must be the same on all nodes in a running cluster. It can be changed
# after stopping all nodes in the cluster.
max-number-of-shards = 100
# Persistent entities saves snapshots after this number of persistent
# events. Snapshots are used to reduce recovery times.
# It may be configured to "off" to disable snapshots.
snapshot-after = 100
# A persistent entity is passivated automatically if it does not receive
# any messages during this timeout. Passivation is performed to reduce
# memory consumption. Objects referenced by the entity can be garbage
# collected after passivation. Next message will activate the entity
# again, which will recover its state from persistent storage.
passivate-after-idle-timeout = 120s
# Specifies that entities run on cluster nodes with a specific role.
# If the role is not specified (or empty) all nodes in the cluster are used.
# The entities can still be accessed from other nodes.
run-entities-on-role = ""
# Default timeout for PersistentEntityRef.ask replies.
ask-timeout = 5s
dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 16
}
throughput = 1
}
}
1.14 底层实现
每个PersistentEntity实例由Akka集群共享管理的PersistentActor执行。
2. Storing Persistent Entities in Cassandra
这个页面描述了如何配置Cassandra用于使用Lagom的持久实体API。
2.1 Project dependencies
maven
<dependency>
<groupId>com.lightbend.lagom</groupId>
<artifactId>lagom-javadsl-persistence-cassandra_2.11</artifactId>
<version>${lagom.version}</version>
</dependency>
2.2 Configuration
Lagom的持久性需要一些表来存储数据。这些表存储在Cassandra keyspace中。Cassandra的一个keyspace是一个名称空间,它定义了Cassandra节点上的数据复制。每个服务应该使用一个惟一的keyspace名称,这样不同服务的表就不会互相冲突。您需要配置在每个服务实现项目中用于这些表的keyspace。
Cassandra keyspace名称必须从字母数字字符开始,只包含字母数字和下划线。它们不区分大小写,以小写形式存储。
Lagom 具有三个内部组件需要密钥空间配置:
> journal存储的是序列化的事件
> snapshot store存储的是状态快照,以更快的恢复进行优化
> offset store 用于Cassandra read端支持,以跟踪每个read端处理器处理的最新事件(详情参考Persistent Read-Side,在参考指南4.4)
您可以在每个服务实现项目的应用程序中配置这些keyspace名称。配置文件:
cassandra-journal.keyspace = my_service_journal
cassandra-snapshot-store.keyspace = my_service_snapshot
lagom.persistence.read-side.cassandra.keyspace = my_service_read_side
虽然不同的服务应该被使用不同的密钥空间来隔离,但是在一个服务中使用相同的keyspace来实现所有这些组件是非常好的。在这种情况下,可以方便地定义一个自定义的keyspace配置属性并使用属性替换来避免重复它。
my-service.cassandra.keyspace = my_service
cassandra-journal.keyspace = ${my-service.cassandra.keyspace}
cassandra-snapshot-store.keyspace = ${my-service.cassandra.keyspace}
lagom.persistence.read-side.cassandra.keyspace = ${my-service.cassandra.keyspace}
当您的服务启动时,如果它们丢失,Lagom将默认创建这些keyspace,并自动创建其内部表。如果您倾向于显式地管理模式,则可以使用以下属性禁用自动创建:
cassandra-journal {
keyspace-autocreate = false
tables-autocreate = false
}
cassandra-snapshot-store {
keyspace-autocreate = false
tables-autocreate = false
}
lagom.persistence.read-side.cassandra {
keyspace-autocreate = false
}
与这些属性设置为 false,如果 keyspaces 或表丢失在启动时,您的服务将单出错,无法启动。
Lagom对于Cassandra 的支持是akka-persistence-cassandra方面提供的功能。一个完整的配置参考可以在reference.conf中看到。
2.3 Cassandra Location
在开发模式下,Lagom将启动一个嵌入式的Cassandra服务器。您可以查看配置选项,或者如何在开发中运行Lagom的Cassandra服务器部分禁用嵌入式服务器。
在生产过程中,您通常会更喜欢动态地可定位的Cassandra服务器来实现弹性。如果您需要使用静态的联络点列表来定位您的Cassandra服务器,查看参考指南七。
3. Storing Persistent Entities in a Relational Database
这个页面描述了如何配置关系数据库,以便使用Lagom的持久实体API。
3.1 Project dependencies
要使用关系数据库,请在项目的构建中添加以下内容:
<dependency>
<groupId>com.lightbend.lagom</groupId>
<artifactId>lagom-javadsl-persistence-jdbc_2.11</artifactId>
<version>${lagom.version}</version>
</dependency>
您还需要为JDBC数据库驱动程序添加jar。
3.2 Configuration
Lagom 使用akka-persistence-jdbc插件用于持久化到数据库的实体。这支持四个不同的关系型数据库:
>PostgreSQL
>MySql
>Oracle
>H2
我们建议不要在生产中使用H2,但它适用于开发和测试。
在Lagom的默认配置中,Lagom将使用Play的JDBC支持来配置和创建连接池。关于如何配置它的详细信息可以在这里(地址:https://www.playframework.com/documentation/2.5.x/JavaDatabase)找到。Play应该配置为为数据源提供JNDI绑定,默认的Lagom将它绑定到DefaultDS。
然后,Lagom将akka - persistence jdbc配置为使用DefaultDS JNDI绑定。akka - persistence JDBC使用浮油来映射表和管理JDBC调用的异步执行。这意味着我们需要配置它来为您的数据库使用合适的光滑配置文件,默认的Lagom将使用H2配置文件。
例如,为了配置PostgreSQL数据库,您可以在您的应用程序中添加以下内容。
db.default {
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://database.example.com/playdb"
}
jdbc-defaults.slick.driver = "slick.driver.PostgresDriver$"
3.2 Table creation
默认情况下,如果不存在的话,Lagom将自动为您创建所需的表。这对于开发和测试非常有用,但是在某些情况下可能不适合生产。可以使用以下配置禁用表自动创建功能:
lagom.persistence.jdbc.create-tables.auto = false
表中所需的数据库模式可以在这里(地址:https://github.com/dnvriend/akka-persistence-jdbc/tree/v2.6.8/src/test/resources/schema)找到。
Lagom为管理表的创建提供的完整配置选项在这里:
# Defaults to use for each Akka persistence plugin
jdbc-defaults.slick {
# The driver to use
driver = "slick.driver.H2Driver$"
# The JNDI name
jndiName = "DefaultDS"
}
# Configure the default database to be bound to JNDI
db.default.jndiName=DefaultDS
lagom.persistence.jdbc {
# Configuration for creating tables
create-tables {
# Whether tables should be created automatically as needed
auto = true
# How long to wait for tables to be created, before failing
timeout = 20s
# The cluster role to create tables from
run-on-role = ""
# Exponential backoff for failures configuration for creating tables
failure-exponential-backoff {
# minimum (initial) duration until processor is started again
# after failure
min = 3s
# the exponential back-off is capped to this duration
max = 30s
# additional random delay is based on this factor
random-factor = 0.2
}
}
}
4. Persistent Read-Side
事件源和CQRS是本节推荐的介绍。
持久化实体用于保存单个实体的状态,但不能用于服务跨多个实体的查询。您需要知道实体的标识符能够与它交互。因此,您需要创建针对服务提供的查询定制的数据的另一个视图。Lagom支持对数据的读取端视图进行填充,也支持对读取端进行查询。
这一分离的写端和持久数据的读取端通常被称为CQRS(命令查询责任隔离)模式。CQRS Journey(这是本书,国内就翻译了1/3,c#语言的描述的额,喜欢的去看看)是学习更多关于CQRS的重要资源。
4.1 Read-side design
在Lagom中,可以使用任何数据存储来实现read端,使用JVM上运行的任何库来填充和查询它。Lagom确实提供了一些使用Cassandra和关系数据库的助手,但是它们是可选的,它们不需要一定要使用。
如果您熟悉使用带有行和列的表的更传统的持久性方法,那么实现read侧可能比实现持久实体要更熟悉一些。不过,有一个主要规则,只有在接收来自持久化实体的事件时,才应该更新read侧。
要处理这些事件,需要提供一个ReadSideProcessor。read端处理器不仅负责处理持久实体产生的事件,还负责跟踪它处理的事件。这是用偏移量完成的。
一个持久实体产生的每一个事件都有一个偏移量。当一个读取端处理器开始时,它需要加载它处理的最后一个事件的偏移量。处理完事件后,它应该存储刚刚处理的事件的偏移量。如果对偏移量的存储与事件所产生的任何更新进行了原子性的处理,事件处理将恰好为每个事件发生一次,因此它最少发生一次。
也就是说,如果您在Cassandra或关系数据库的read端支持中使用了Lagom的构建,那么偏移跟踪是为您自动完成的,您只需要担心处理事件本身。这一页的其余部分将是关于使用Lagom的通用读取端处理器支持来实现一个read端。如果您正在使用Cassandra或关系型数据库为您的读者,您应该阅读本页面的其余部分,以了解读边是如何工作的,但同时也要阅读关于Lagom的具体支持的下面的文档,以便利用Lagom的优势,这是我们之后几个小节要讲的。
4.2 Query the Read-Side Database
如何查询read端数据库取决于您的数据库,但是有两件事需要注意:
>确保任何连接池都启动一次,然后在Lagom关闭时关闭。Lagom建立在play上,并使用Play的生命周期支持来注册回调以执行关闭。有关如何连接这方面的信息,请参阅Play文档。
>确保在适当的执行上下文中执行任何阻塞操作。Lagom假设所有操作都是异步的,并且有针对异步任务的线程池。未管理的阻塞的使用会导致应用程序停止响应非常低的负载。有关如何正确管理阻塞数据库调用的线程池的详细信息,请参见Play的线程池的文档。
4.3 Update the Read-Side
我们需要将持久实体生成的事件转换为可查询的数据库表,如前一节所示。为此,我们将实现一个ReadSideProcessor。它将使用持久实体产生的事件并更新数据库表。
4.3.1 Event tags
为了从一个read-side(读取端)消费事件,事件需要被标记。带有特定标记的所有事件都可以作为连续的有序的事件流来使用。可以通过使事件实现AggregateEvent接口来标记事件。标记使用aggregateTag方法定义。
标记事件最简单的方法是为特定的实体提供相同的标记。为此,定义一个静态标记,并将其从事件的aggregateTag方法返回:
public interface BlogEvent extends AggregateEvent<BlogEvent>, Jsonable {
AggregateEventTag<BlogEvent> BLOG_EVENT_TAG = AggregateEventTag.of(BlogEvent.class);
@Override
default AggregateEventTag<BlogEvent> aggregateTag() {
return BLOG_EVENT_TAG;
}
}
虽然这是非常直接的,但它意味着您只能一次使用一个事件,这可能是规模化的瓶颈。如果您期望事件仅以每秒几次的顺序发生,那么这可能很好,但是如果您期望每秒发生数百或数千次事件,那么您可能需要修改您的read-side(读取端)事件处理负载。
切分可以通过两种方式,一种是手动的根据事件中的信息返回不同的标记,另一种是自动返回一个AggregateEventShards标签,这将告诉Lagom使用基于实体的持久性ID的标记。重要的是要确保同一实体的所有事件都以相同的标记(以及相同的shard)结束,否则,该实体的事件处理可能是不正常的,因为读取端节点将以不同的速度消耗事件流。
当您处理事件时,您需要预先决定您想要使用多少切片。shards分片越多,可以在多个节点水平上横向扩展服务,但是,shards付出了代价,每个额外的碎片增加了读取端处理器的数量,这些处理器查询数据库中的新事件。很难在一个实体中不影响事件的排序得前提下,改变分片的数量,因此,最好预先计算出在系统的生命周期中需要处理的事件的峰值率,然后计算出需要处理该负载的节点数,然后将其用作分片的数量。
Lagom提供一些工具来帮助创建sharded tags。要创建sharded标记,在定义静态变量中设置shards的分区数量,作为shards标记,并实现aggregateTag方法来返回shards标记:
interface BlogEvent extends Jsonable, AggregateEvent<BlogEvent> {
int NUM_SHARDS = 20;
AggregateEventShards<BlogEvent> TAG =
AggregateEventTag.sharded(BlogEvent.class, NUM_SHARDS);
@Override
default AggregateEventShards<BlogEvent> aggregateTag() {
return TAG;
}
现在,这里的Lagom将生成一个标记名称,它会附加实体ID模块的哈希代码,并将分片的数量分配给类名。
4.3.2 Defining a read side processor
下面是一个还没有填充实现体的ReadSideProcessor所长得模样。
public class BlogEventProcessor extends ReadSideProcessor<BlogEvent> {
@Override
public ReadSideProcessor.ReadSideHandler<BlogEvent> buildHandler() {
// TODO build read side handler
return null;
}
@Override
public PSequence<AggregateEventTag<BlogEvent>> aggregateTags() {
// TODO return the tag for the events
return null;
}
}
我们要实现的第一种方法是aggregateTags方法。此方法必须返回处理器将处理的所有标记的List列表——如果您返回多个标记,那么Lagom将在您的服务集群中分区这些tag标记。要实现此方法,只需返回类的所有事件列表:
@Override
public PSequence<AggregateEventTag<BlogEvent>> aggregateTags() {
return BlogEvent.TAG.allTags();
}
现在我们需要实现buildHandler方法。假设您已经创建了一个与首选数据库交互的组件,它提供了以下方法:
public interface MyDatabase {
/**
* Create the tables needed for this read side if not already created.
*/
CompletionStage<Done> createTables();
/**
* Load the offset of the last event processed.
*/
CompletionStage<Offset> loadOffset(AggregateEventTag<BlogEvent> tag);
/**
* Handle the post added event.
*/
CompletionStage<Done> handleEvent(BlogEvent event, Offset offset);
}
createTables方法将创建read-side读取端处理器使用的表,如果它们不存在的话——这是完全可选的,但在开发和测试环境中可能有用,因为它减轻了开发人员手动设置环境的需求。
loadOffset方法读取特定标签的read-side处理器处理的最后一个Offset(PS:这是一个对象,用于事件的排序,用于追踪事件是否被消费)。通常,这将存储在具有标记名作为主键的表中。Offsets有两个变种,一个是使用long的Sequence,他是Offset的子类,另一个是使用UUID表示的TimeBasedUUID,他也是Offset的子类。您的数据库需要能够保存这两种类型。如果一个特定的标签没有offset被存储,比如当处理器第一次运行时,你就可以返回Offset.NONE。
最后,handleEvent方法负责处理实际事件。它同时传递事件和offset(偏移量),并在事件处理成功后继续保留offset(偏移量)。
给定这个接口,我们现在可以实现buildHandler方法:
@Override
public ReadSideHandler<BlogEvent> buildHandler() {
return new ReadSideHandler<BlogEvent>() {
@Override
public CompletionStage<Done> globalPrepare() {
return myDatabase.createTables();
}
@Override
public CompletionStage<Offset> prepare(AggregateEventTag<BlogEvent> tag) {
return myDatabase.loadOffset(tag);
}
@Override
public Flow<Pair<BlogEvent, Offset>, Done, ?> handle() {
return Flow.<Pair<BlogEvent, Offset>>create()
.mapAsync(1, eventAndOffset ->
myDatabase.handleEvent(eventAndOffset.first(),
eventAndOffset.second())
);
}
};
}
globalPrepare方法用于只在全局范围内运行的任务。请记住,Lagom将创建许多read side(读端)处理器,每个shard分区都有一个,如果他们每个都写朝一个表写入,您只希望其中一个尝试创建该表,否则它可能在您的数据库中创建竞争条件。Lagom将确保globalPrepare方法至少在读取端处理开始之前执行一次。它可能会多次执行,特别是当您的集群重新启动时,但这些执行只会一次执行一次。如果globalPrepare失败,Lagom将重试,在随后的失败中以指数级的方式后退,直到成功。
prepare方法用于加载最后的Offset(偏移量),并且对于需要准备的其他任何东西都很有用,比如在开始处理之前优化update语句。它将被执行一次读取端处理器。
handle方法必须返回Akka流Flow来处理事件流。通常,它将使用mapAsync来处理事件,并行度为1。
4.3.3 Registering your read-side processor
一旦您创建了read side(读端)处理器,您需要将其注册到Lagom。这是使用ReadSide组件完成的:
@Inject
public BlogServiceImpl(
PersistentEntityRegistry persistentEntityRegistry,
ReadSide readSide) {
this.persistentEntityRegistry = persistentEntityRegistry;
readSide.register(BlogEventProcessor.class);
}
4.4 Raw Stream of Events
如果您想要更灵活的事件处理,还可以使用另一个工具。你可以在Lagom。通过PersistentEntityRegistry 的eventStream方法,直接得到一个持久事件流。
public ServiceCall<NotUsed, Source<PostSummary, ?>> newPosts() {
final PartialFunction<BlogEvent, PostSummary> collectFunction =
new PFBuilder<BlogEvent, PostSummary>()
.match(BlogEvent.PostAdded.class, evt ->
new PostSummary(evt.getPostId(), evt.getContent().getTitle()))
.build();
return request -> {
Source<PostSummary, ?> stream = persistentEntityRegistry
.eventStream(BlogEvent.TAG.forEntityId(""), Offset.NONE)
.map(pair -> pair.first()).collect(collectFunction);
return CompletableFuture.completedFuture(stream);
};
}
eventStream方法使用实现了AggregateEventType的一个可选偏移量的事件类,这是流的起点。它返回内容为Pair的Source对象,其中包含事件和相关的offset偏移量。
这条流永远不会完成,除非从数据库中检索事件失败。它将继续提供新的事件,因为它们仍然存在。
每一个这样的事件流都将不断地生成对持久实体实现的查询(如Cassandra)来获取新的事件,因此这个工具应该被小心地使用。不要运行太多这样的流。它通常不应该用于由未知数量的客户端调用的服务调用,但它对于有限数量的后台处理作业是有用的。
4.5 Refactoring Consideration
如果您使用事件类型的类名作为AggregateEventTag中的聚合标记,如果您更改事件类名称,则必须保留原始标记,因为该字符串是存储事件数据的一部分。AggregateEventTag有一个带字符串标记参数的工厂方法(和构造函数)。而不是使用类名作为标记标识符,可以考虑在前面使用字符串标记。在服务的事件类型中,标记应该是惟一的。
4.6 Configuration
默认配置应该是良好的起点,后面的设置可能稍后会被修改以自定义在需要时的行为。
lagom.persistence.read-side {
# Exponential backoff for failures in ReadSideProcessor
failure-exponential-backoff {
# minimum (initial) duration until processor is started again
# after failure
min = 3s
# the exponential back-off is capped to this duration
max = 30s
# additional random delay is based on this factor
random-factor = 0.2
}
# The amount of time that a node should wait for the global prepare callback to execute
global-prepare-timeout = 20s
# Specifies that the read side processors should run on cluster nodes with a specific role.
# If the role is not specified (or empty) all nodes in the cluster are used.
run-on-role = ""
# The Akka dispatcher to use for read-side actors and tasks.
use-dispatcher = "lagom.persistence.dispatcher"
}
4.7 Underlying Implementation
PersistentEntityRegistry的eventStream也由eventsByTag查询实现。
5. Cassandra Read-Side support
PS:需要注意的是Cassandra 是一个AP数据库,无法实现事务
这一节特别提到了Lagom对Cassandra的支持。在阅读这篇文章之前,您应该熟悉Lagom的总体的read-side(读端)支持。
5.1 Query the Read-Side Database
让我们先看看服务实现如何从Cassandra检索数据。
import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraSession;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.javadsl.Source;
public class BlogServiceImpl implements BlogService {
private final CassandraSession cassandraSession;
@Inject
public BlogServiceImpl(CassandraSession cassandraSession) {
this.cassandraSession = cassandraSession;
}
@Override
public ServiceCall<NotUsed, Source<PostSummary, ?>> getPostSummaries() {
return request -> {
Source<PostSummary, ?> summaries = cassandraSession.select(
"SELECT id, title FROM blogsummary;").map(row ->
new PostSummary(row.getString("id"), row.getString("title")));
return CompletableFuture.completedFuture(summaries);
};
}
}
注意,在构造函数中注入了CassandraSession。CassandraSession为执行查询提供了几种不同的方法。在上面的示例中使用的返回一个Source,即流响应。还有一些方法可以检索行列表,当您知道结果集很小的时候,这是很有用的,例如,当您包含了一个LIMIT子句时。
在CassandraSession中所有的方法都是非阻塞的,它们返回一个CompletionStage或一个Source。语句以Cassandra Query Language(CQL)语法表达。有关CQL查询的信息,请参见查询表(自己去官方文档找吧)。
5.2Update the Read-Side
我们需要将 Persistent Entities(参见5.1,讲的就是Persistent Entities)生成的事件转换为数据库表,如前一节所示。为此,我们将在CassandraReadSide支持组件的帮助下实现一个ReadSideProcessor。它将消费持久实体产生的事件,并在Cassandra中更新一个或多个表,以优化查询。
在填写实现细节之前,这是一个ReadSideProcessor类的样子:
import akka.Done;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag;
import com.lightbend.lagom.javadsl.persistence.ReadSideProcessor;
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraReadSide;
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraSession;
import org.pcollections.PSequence;
import javax.inject.Inject;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import static com.lightbend.lagom.javadsl.persistence.cassandra.CassandraReadSide.*;
public class BlogEventProcessor extends ReadSideProcessor<BlogEvent> {
private final CassandraSession session;
private final CassandraReadSide readSide;
@Inject
public BlogEventProcessor(CassandraSession session, CassandraReadSide readSide) {
this.session = session;
this.readSide = readSide;
}
@Override
public ReadSideProcessor.ReadSideHandler<BlogEvent> buildHandler() {
// TODO build read side handler
return null;
}
@Override
public PSequence<AggregateEventTag<BlogEvent>> aggregateTags() {
// TODO return the tag for the events
return null;
}
}
您可以看到我们已经注入了Cassandra会话和Cassandra read端支持,这些将在以后需要。
您应该已经为您的事件实现了标记,就像在Read-Side documentation(参考5.4小节中讲的)中描述的那样,所以首先我们将在我们的read-side处理器存根中实现aggregateTags方法,比如:
@Override
public PSequence<AggregateEventTag<BlogEvent>> aggregateTags() {
return BlogEvent.TAG.allTags();
}
5.2.1 Building the read-side handler
ReadSideProcessor的另一个方法是buildHandler。这负责创建处理事件的ReadSideHandler。它还提供运行两个回调的机会,一个是全局准备回调,另一个是常规准备回调。
CassandraReadSide有一个builder 方法为这些处理程序创建一个构建器,这个构建器将创建一个处理程序,该处理程序将自动为您处理read-side(读端)偏移量。它可以这样创建:
CassandraReadSide.ReadSideHandlerBuilder<BlogEvent> builder =
readSide.builder("blogsummaryoffset");
传递给该方法的参数是Lagom在持久化偏移量到offset store(偏移量存储)里时候,将使用的事件处理器的ID.这个offset store(偏移量存储)是一个Cassandra表,如果它不存在的话,它将为您创建。您可以手动创建这个表,如果您愿意,它的创建的DDL如下:
CREATE TABLE IF NOT EXISTS offsetStore (
eventProcessorId text,
tag text,
timeUuidOffset timeuuid,
sequenceOffset bigint,
PRIMARY KEY (eventProcessorId, tag)
)
5.2.2 Global prepare
全局准备回调在整个集群中至少运行一次。它的目的是做一些事情,比如创建表和准备需要在读取端处理开始前可用的任何数据。读侧处理器可以在多个节点上共享,因此创建表这样的任务通常只能从一个节点完成。
全局准备回调是从一个Akka集群单例上运行的。它可能会运行多次-每当一个新的节点变成了一个新的单例,这个回调就会调用。因此,任务必须是幂等的。如果失败,它将再次使用指数回滚运行,并且整个集群的读取端的处理直到运行成功后才会启动。
当然,设置全局准备回调是完全可选的,您可能更喜欢手动管理Cassandra表,但是开发和测试环境非常方便地使用这个回调来为您创建它们。
下面是我们实现创建表的示例方法:
private CompletionStage<Done> createTable() {
return session.executeCreateTable("CREATE TABLE IF NOT EXISTS blogsummary ( " +
"id TEXT, title TEXT, PRIMARY KEY (id))");
}
它可以注册为buildHandler方法中的全局准备回调:
builder.setGlobalPrepare(this::createTable);
5.2.3 Prepare
除了全局准备回调之外,还有一个准备回调。当读端处理器启动时,每个分区将执行一次。它可以用于准备语句,以优化Cassandra对它们的处理。
这个回调是可选的,这里有一个例子,说明如何为更新表准备语句:
private PreparedStatement writeTitle = null; // initialized in prepare
private CompletionStage<Done> prepareWriteTitle() {
return session.prepare("INSERT INTO blogsummary (id, title) VALUES (?, ?)")
.thenApply(ps -> {
this.writeTitle = ps;
return Done.getInstance();
});
}
然后登记他们:
builder.setPrepare(tag -> prepareWriteTitle());
5.2.4 Event handlers
事件处理程序接受一个事件,并返回绑定语句的列表.与其在处理程序本身中执行更新,还建议您返回想要执行到Lagom的语句。这就允许Lagom去用偏移表更新语句批量处理这些语句,然后哪个Lagom将作为一个日志批执行,Cassandra会以原子方式执行。通过这样做,您可以确保对所有事件进行精确处理,否则处理可能至少一次。
以下是处理post添加事件的回调示例:
private CompletionStage<List<BoundStatement>> processPostAdded(BlogEvent.PostAdded event) {
BoundStatement bindWriteTitle = writeTitle.bind();
bindWriteTitle.setString("id", event.getPostId());
bindWriteTitle.setString("title", event.getContent().getTitle());
return completedStatements(Arrays.asList(bindWriteTitle));
}
然后使用setEventHandler注册建造者:
builder.setEventHandler(BlogEvent.PostAdded.class, this::processPostAdded);
完成注册所有事件处理程序后,就可以调用构建方法并返回构建的处理程序:
return builder.build();
5.3 Underlying implementation底层实现
CassandraSession正在使用Datastax Java驱动程序来支持Apache Cassandra。
每个ReadSideProcessor实例由一个由Akka cluster Sharding管理的actor执行。处理器使用由akka - Persistence - cassandra实现的eventsByTag持久性查询传递的持久事件流。这个标记对应于AggregateEventTag定义的标记。
6. Relational Database Read-Side support 关系数据库读取方面支持
这节特别介绍了Lagom对关系数据库的支持。在阅读这篇文章之前,您应该熟悉Lagom的总体的read-side(读端)支持,就是第四节所讲的知识。
Lagom支持两种选择,可以在读写方面访问关系数据库:直接使用JDBC API,或者使用Java Persistence API(JPA)自动映射Java对象和关系数据。
6.1 Choosing between JDBC and JPA
JDBC提供了一个非常低级的API,直接映射到大多数数据库驱动程序的功能。您提供的查询直接传递到后端数据库,因此它们可能需要使用SQL的特定于实现的方言。查询结果节将会以ResultSet对象的形式返回,它允许你遍历结果的行并检索列数据。
JPA在JDBC之上构建以提供对象关系映射功能。在许多其他特性中,JPA包括从Java类生成数据库模式的能力,通过将Java对象属性映射到数据库列来检索、插入和更新行,并在数据库独立的、类似sql的查询语言或类型安全的criteria API中编写查询。
使用带有Lagom服务的JPA需要向JPA 2.1提供者添加额外的依赖项,比如Hibernate ORM(推荐)或EclipseLink。
这两种选择都建立在相同的支持之上,用于在关系数据库中存储持久化实体,并共享相同的偏移存储库。在相同的服务中,可以将基于jdbc的和基于jpa的read- sides混合在一起。
6.2 Implementing a relational database read side
根据你所要使用的方式,参考下面的两节所讲的。
7. JDBC Read-Side support
这节特别介绍了Lagom对使用JDBC的关系数据库读取端的支持。在阅读这篇文章之前,您应该熟悉Lagom的总体的read-side(读端)支持,就是第四节所讲的知识和关系数据库read端支持概述。
7.1 Query the Read-Side Database
让我们先看看服务实现如何使用JDBC从关系数据库检索数据。
import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.persistence.jdbc.JdbcSession;
import org.pcollections.PSequence;
import org.pcollections.TreePVector;
import javax.inject.Inject;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class BlogServiceImpl implements BlogService {
private final JdbcSession jdbcSession;
@Inject
public BlogServiceImpl(JdbcSession jdbcSession) {
this.jdbcSession = jdbcSession;
}
@Override
public ServiceCall<NotUsed, PSequence<PostSummary>> getPostSummaries() {
return request -> {
return jdbcSession.withConnection(connection -> {
try (PreparedStatement ps = connection.prepareStatement("SELECT id, title FROM blogsummary")) {
try (ResultSet rs = ps.executeQuery()) {
PSequence<PostSummary> summaries = TreePVector.empty();
while (rs.next()) {
summaries = summaries.plus(
new PostSummary(rs.getString("id"), rs.getString("title"))
);
}
return summaries;
}
}
});
};
}
}
请注意,JdbcSession是在构造函数中注入的。JdbcSession允许以withConnection 方法这样,通过连接池获得一个连接,并将使用withTransaction方法管理事务。重要的是,JdbcSession还管理用于处理它的线程池中的阻塞JDBC调用的执行,这就是为什么withConnection和withTransaction方法返回CompletionStage。
7.2 Update the Read-Side
我们需要将持久实体生成的事件转换为数据库表,如前一节所示。为此,我们将在JdbcReadSide支持组件的帮助下实现一个ReadSideProcessor。它将使用持久实体产生的事件,并更新针对查询优化的一个或多个数据库表。
下面是没有填充实现的ReadSideProcessor 的样子:
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag;
import com.lightbend.lagom.javadsl.persistence.ReadSideProcessor;
import com.lightbend.lagom.javadsl.persistence.jdbc.JdbcReadSide;
import org.pcollections.PSequence;
import javax.inject.Inject;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class BlogEventProcessor extends ReadSideProcessor<BlogEvent> {
private final JdbcReadSide readSide;
@Inject
public BlogEventProcessor(JdbcReadSide readSide) {
this.readSide = readSide;
}
@Override
public ReadSideHandler<BlogEvent> buildHandler() {
// TODO build read side handler
return null;
}
@Override
public PSequence<AggregateEventTag<BlogEvent>> aggregateTags() {
// TODO return the tag for the events
return null;
}
}
您可以看到,我们已经注入了JDBC read-side support,这将在以后需要。
您应该已经为您的事件实现了标记,就像在read端文档中描述的那样,首先,我们将在read-side处理器存根中实现aggregateTags方法,比如:
@Override
public PSequence<AggregateEventTag<BlogEvent>> aggregateTags() {
return BlogEvent.TAG.allTags();
}
7.2.1 Building the read-side handler
ReadSideProcessor的另一个方法是buildHandler。这负责创建处理事件的ReadSideHandler。它还提供运行两个回调的机会,一个是全局准备回调,另一个是常规准备回调。
JdbcReadSide有一个builder方法为这些处理程序创建一个构建器,这个构建器将创建一个处理程序,它将自动管理事务,并为您处理准备端偏移量。它可以这样创建:
JdbcReadSide.ReadSideHandlerBuilder<BlogEvent> builder =
readSide.builder("blogsummaryoffset");
传递给该方法的参数是一个标识符,该标识符用于Lagom在持久化偏移量时使用。Lagom将把偏移量存储在一个表中,如果它不存在,它将自动创建它自己。如果你宁愿Lagom没有自动为您创建这个表,你可以在application.conf通过设置lagom.persistence.jdbc.create-tables = false关闭此功能。此表的模式的DDL如下:
CREATE TABLE read_side_offsets (
read_side_id VARCHAR(255), tag VARCHAR(255),
sequence_offset bigint, time_uuid_offset char(36),
PRIMARY KEY (read_side_id, tag)
)
7.2.2 Global prepare
全局准备回调在整个集群中至少运行一次。它的目的是做一些事情,比如创建表和准备需要在读取端处理开始前可用的任何数据。read-side读端处理器可以在多个节点上共享,因此创建表这样的任务通常只能从一个节点完成。
全局准备回调是从一个Akka集群单例上运行的。它可能会运行多次-每当一个新的节点变成了一个新的单例,这个回调就会调用。因此,任务必须是幂等的。如果失败,它将再次使用指数回滚运行,并且整个集群的读取端的处理直到运行成功后才会启动。
当然,设置全局准备回调是完全可选的,您可能更喜欢手动管理Cassandra表,但是开发和测试环境非常方便地使用这个回调来为您创建它们。
下面是我们实现创建表的示例方法:
private void createTable(Connection connection) throws SQLException {
try (PreparedStatement ps = connection.prepareStatement("CREATE TABLE IF NOT EXISTS blogsummary ( " +
"id VARCHAR(64), title VARCHAR(256), PRIMARY KEY (id))")) {
ps.execute();
}
}
它可以注册为buildHandler方法中的全局准备回调:
builder.setGlobalPrepare(this::createTable);
7.2.3 Prepare
除了全局准备回调之外,还有一个准备回调,可以通过调用builder.setPrepare来指定。当读侧处理器启动时,每个分片将执行一次。
如果您阅读了Cassandra read-side support指南,您可能已经看到了用于编写数据库语句以备将来使用的方法。然而,JDBC PreparedStatement实例不能保证是线程安全的,所以准备回调不应该被用于关系数据库。
再次强调,这个回调是可选的,在我们的例子中,我们不需要准备回调,所以没有指定。
7.2.4 Event handlers
事件处理程序接受事件和连接,并相应地更新读取端。
以下是处理PostAdded事件的回调示例:
private void processPostAdded(Connection connection, BlogEvent.PostAdded event) throws SQLException {
PreparedStatement statement = connection.prepareStatement(
"INSERT INTO blogsummary (id, title) VALUES (?, ?)");
statement.setString(1, event.getPostId());
statement.setString(2, event.getContent().getTitle());
statement.execute();
}
然后使用setEventHandler注册建造者:
builder.setEventHandler(BlogEvent.PostAdded.class, this::processPostAdded);
完成注册所有事件处理程序后,就可以调用构建方法并返回构建的处理程序:
return builder.build();
8. JPA Read-Side support
这节特别介绍了Lagom对使用JPA的关系数据库读边的支持。在阅读这篇文章之前,您应该熟悉Lagom的总体的read-side(读端)支持(就是第四节所讲的知识)和关系数据库read端支持概述。
8.1 Project dependencies
要使用JPA支持,请在项目的构建中添加以下内容
在maven中
<dependency>
<groupId>com.lightbend.lagom</groupId>
<artifactId>lagom-javadsl-persistence-jpa_2.11</artifactId>
<version>${lagom.version}</version>
</dependency>
在sbt中
libraryDependencies += lagomJavadslPersistenceJpa
您还需要在JPA提供者(如Hibernate ORM或EclipseLink)和数据库驱动程序中添加依赖项。
8.2 Configuration
JPA支持建立在Lagom在关系数据库中存储持久实体的支持之上。请参阅关于配置Lagom使用正确的JDBC驱动程序和数据库URL的说明指南。
接下来,我们需要配置JPA来与我们的数据库进行通信,还需要配置Lagom来初始化一个JPA持久性单元。
JPA 被配置使用一个名为 persistence.xml 文件。在您的服务实现项目中,创建 src/main/resources/META-INF/persistence.xml文件,作为一个模板指南。
<persistence xmlns="http://xmlns.jcp.org/xml/ns/persistence"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence
http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd"
version="2.1">
<persistence-unit name="default" transaction-type="RESOURCE_LOCAL">
<!-- Replace provider with the correct provider
class for your JPA implementation -->
<provider>org.hibernate.jpa.HibernatePersistenceProvider</provider>
<non-jta-data-source>DefaultDS</non-jta-data-source>
<properties>
<!-- Configure the provider for the database you use -->
<property name="hibernate.dialect"
value="org.hibernate.dialect.H2Dialect"/>
<!-- Add any other standard or provider-specific properties -->
</properties>
</persistence-unit>
</persistence>
默认情况下,Lagom期望持久性单元被命名为“default”,就像在这个例子中一样,但是在您的application.conf中可以更改它.
初始化持久化单元需要与配置的数据库进行通信。如果在永久和退出之前失败最多的重试次数,那么Lagom将自动重试初始化。在application . conf中,可以配置最大的重试次数、初始重试间隔和可选的后退因子。
Lagom为初始化JPA提供的全套配置选项在这里
lagom.persistence.jpa {
# This must match the name in persistence.xml
persistence-unit = "default"
# Controls retry when initializing the EntityManagerFactory throws an exception
initialization-retry {
# The first retry will be delayed by the min interval
# Each subsequent delay will be multiplied by the factor
interval {
min = 5s
factor = 1.0
}
# After retrying this many times, the final exception will be thrown
max-retries = 10
}
}
8.3 Write a JPA entity class
JPA实体表示read侧-side端的数据库的表,下面是一个JPA实体的示例,它表示一个博客文章的摘要,可以用来查询所有博客条目的索引:
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.validation.constraints.NotNull;
@Entity
public class BlogSummaryJpaEntity {
@Id
private String id;
@NotNull
private String title;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
}
请注意,JPA实体需要遵循典型的JavaBeans风格的可变对象和getter和setter,而不是Lagom对不可变对象的使用。JPA实体不是线程安全的,重要的是确保它们只在Lagom-managed事务的范围内使用。我们将在后面看到如何实现它。
8.4 Query the Read-Side Database
接下来让我们看看服务实现如何使用JPA从关系数据库检索数据。
import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.persistence.jpa.JpaSession;
import org.pcollections.PSequence;
import org.pcollections.TreePVector;
import javax.inject.Inject;
import javax.persistence.EntityManager;
import java.util.List;
import java.util.concurrent.CompletionStage;
public class BlogServiceImpl implements BlogService {
private final JpaSession jpaSession;
@Inject
public BlogServiceImpl(JpaSession jpaSession) {
this.jpaSession = jpaSession;
}
@Override
public ServiceCall<NotUsed, PSequence<PostSummary>> getPostSummaries() {
return request -> jpaSession
.withTransaction(this::selectPostSummaries)
.thenApply(TreePVector::from);
}
private List<PostSummary> selectPostSummaries(EntityManager entityManager) {
return entityManager
.createQuery("SELECT" +
" NEW com.example.PostSummary(s.id, s.title)" +
" FROM BlogSummaryJpaEntity s",
PostSummary.class
)
.getResultList();
}
}
注意,在构造函数中注入了JpaSession.JpaSession允许获取JPA的entityManager,并将使用withTransaction方法管理事务。重要的是,JpaSession还管理用于处理它的线程池中的阻塞JPA调用的执行,这就是withTransaction方法返回CompletionStage的原因。
如上所述,防止可变JPA实体实例从用于执行阻塞JPA调用的线程中逃逸,是很重要的。为了实现这一点,在查询本身中,我们使用JPQL构造函数表达式来从查询中返回不可变的PostSummary实例,而不是以可变的BlogSummaryJpaEntity实例返回。JPA需要构造函数表达式来使用类的完全限定名来构造。您还可以通过其他方式转换为不可变数据,比如从查询中返回JPA实体,然后显式地转换它们,但是使用构造函数表达式是一种方便的方法,可以避免额外的代码和对象分配。
8.5 Update the Read-Side
我们需要将持久实体生成的事件转换为数据库表,如前一节所示。为此,我们将在JpaReadSide支持组件的帮助下实现一个ReadSideProcessor。它将使用持久实体产生的事件,并更新针对查询优化的一个或多个数据库表。
这是一个填写实现细节之前的ReadSideProcessor类的样子:
import com.google.common.collect.ImmutableMap;
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag;
import com.lightbend.lagom.javadsl.persistence.ReadSideProcessor;
import com.lightbend.lagom.javadsl.persistence.jpa.JpaReadSide;
import org.pcollections.PSequence;
import javax.inject.Inject;
import javax.persistence.EntityManager;
import javax.persistence.Persistence;
public class BlogEventProcessor extends ReadSideProcessor<BlogEvent> {
private final JpaReadSide readSide;
@Inject
public BlogEventProcessor(JpaReadSide readSide) {
this.readSide = readSide;
}
@Override
public ReadSideHandler<BlogEvent> buildHandler() {
// TODO build read side handler
return null;
}
@Override
public PSequence<AggregateEventTag<BlogEvent>> aggregateTags() {
// TODO return the tag for the events
return null;
}
}
您可以看到,我们已经注入了JPA read-side support,这将在以后需要。
您应该已经为您的事件实现了标记,就像在read-side文档(即我们在第四节中讲的内容)中描述的那样,所以首先我们将在我们的read-side处理器存根中实现aggregateTags方法,比如:
@Override
public PSequence<AggregateEventTag<BlogEvent>> aggregateTags() {
return BlogEvent.TAG.allTags();
}
8.5.1 Building the read-side handler
ReadSideProcessor的另一个方法是buildHandler。这负责创建处理事件的ReadSideHandler。它还提供运行两个回调的机会,一个是全局准备回调,另一个是常规准备回调。
JpaReadSide有一个为这些处理程序创建构建器的builder方法,这个构建器将创建一个处理程序,该处理程序将自动管理事务并为您处理read-side偏移量。它可以这样创建:
JpaReadSide.ReadSideHandlerBuilder<BlogEvent> builder =
readSide.builder("blogsummaryoffset");
传递给该方法的参数是一个标识符,该标识符用于Lagom在坚持偏移量时使用。Lagom将把偏移量存储在一个表中,如果它不存在,它将自动创建它自己。如果你宁愿Lagom没有自动为您创建这个表,你可以通过设置lagom.persistence.jdbc.create-tables关闭此功能。汽车在application.conf = false。此表的模式的DDL如下:
传递给该方法的参数是一个标识符,该标识符用于Lagom在持久化偏移量时使用。Lagom将把偏移量存储在一个表中,如果它不存在,它将自动创建它自己。如果你宁愿Lagom没有自动为您创建这个表,你可以在application.conf通过设置lagom.persistence.jdbc.create-tables = false关闭此功能。此表的模式的DDL如下:
CREATE TABLE read_side_offsets (
read_side_id VARCHAR(255), tag VARCHAR(255),
sequence_offset bigint, time_uuid_offset char(36),
PRIMARY KEY (read_side_id, tag)
)
8.5.2 Global prepare
全局准备回调在整个集群中至少运行一次。它的目的是做一些事情,比如创建表和准备需要在读取端处理开始前可用的任何数据。read-side读端处理器可以在多个节点上共享,因此创建表这样的任务通常只能从一个节点完成。
全局准备回调是从一个Akka集群单例上运行的。它可能会运行多次-每当一个新的节点变成了一个新的单例,这个回调就会调用。因此,任务必须是幂等的。如果失败,它将再次使用指数回滚运行,并且整个集群的读取端的处理直到运行成功后才会启动。
当然,设置全局准备回调是完全可选的,您可能更喜欢手动管理Cassandra表,但是开发和测试环境非常方便地使用这个回调来为您创建它们。
下面是我们实现的创建模式的示例方法:
private void createSchema(@SuppressWarnings("unused") EntityManager ignored) {
Persistence.generateSchema("default",
ImmutableMap.of("hibernate.hbm2ddl.auto", "update"));
}
在本例中,我们使用JPA generateSchema方法和hibernate特有的属性,可以在现有模式中添加丢失的表和列,以及从头创建模式,但不会删除任何现有数据。
它可以注册为buildHandler方法中的全局准备回调:
builder.setGlobalPrepare(this::createSchema);
8.5.3 Prepare
除了全局准备回调之外,还有一个准备回调,可以通过调用builder. setprepare来指定。当读取端处理器启动时,每个分片将执行一次。
如果您阅读了Cassandra read-side support指南,您可能已经看到这个用于准备数据库语句以供以后使用。但是,JPA查询和CriteriaQuery实例不能保证是线程安全的,因此准备回调不应该用于此目的与关系数据库。
再次强调,这个回调是可选的,在我们的例子中,我们不需要准备回调,所以没有指定。
8.5.4 Event handlers
事件处理程序接受一个事件和一个JPA EntityManger,并相应地更新读取端。
以下是处理postAdd事件的回调示例:
private void processPostAdded(EntityManager entityManager,
BlogEvent.PostAdded event) {
BlogSummaryJpaEntity summary = new BlogSummaryJpaEntity();
summary.setId(event.getPostId());
summary.setTitle(event.getContent().getTitle());
entityManager.persist(summary);
}
然后使用setEventHandler注册建造者:
builder.setEventHandler(BlogEvent.PostAdded.class, this::processPostAdded);
事件处理程序,以及回调,都将自动封装在一个事务中,当处理程序成功或回滚时,它会自动提交一个异常。在事件处理程序中使用JPA实体是安全的,但是如上所述,重要的是确保它们不会逃到其他线程中。您可以将它们分配给本地变量,例如在本例中,或者将它们作为参数传递给同步方法,而这些方法不保留对其他域中实体的引用。避免将JPA实体分配给实例或静态字段,将它们提供给在另一个线程中执行的代码,或者将它们传递给可能这样做的方法。
完成注册所有事件处理程序后,就可以调用build方法并返回构建的处理程序:
return builder.build();
9.Publish-Subscribe
发布-订阅是一个众所周知的消息传递模式。消息的发送者,称为发布者,不直接将消息指向特定的接收方,而是将消息发布到主题,而不了解接收方(如果有的话)可能存在的接收者。类似地,订阅者对一个主题表示兴趣,并接收发布到该主题的消息,而不了解哪个发布者,如果有的话。
9.1 Dependency
要使用此功能,请在您的项目的构建中添加以下内容:
maven中
<dependency>
<groupId>com.lightbend.lagom</groupId>
<artifactId>lagom-javadsl-pubsub_2.11</artifactId>
<version>${lagom.version}</version>
</dependency>
sbt中
libraryDependencies += lagomJavadslPubSub
9.2 Usage from Service Implementation
让我们看一个发布硬件设备温度测量服务的例子。一个设备可以提交它的当前温度,感兴趣的方可以得到一个温度样品的流。
服务API定义为:
public interface SensorService extends Service {
ServiceCall<Temperature, NotUsed> registerTemperature(String id);
ServiceCall<NotUsed, Source<Temperature, ?>> temperatureStream(String id);
@Override
default Descriptor descriptor() {
return named("/sensorservice").withCalls(
pathCall("/device/:id/temperature", this::registerTemperature),
pathCall("/device/:id/temperature/stream", this::temperatureStream)
);
}
}
此接口的实现看起来像:
import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.pubsub.PubSubRef;
import com.lightbend.lagom.javadsl.pubsub.PubSubRegistry;
import com.lightbend.lagom.javadsl.pubsub.TopicId;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.javadsl.Source;
public class SensorServiceImpl implements SensorService {
private final PubSubRegistry pubSub;
@Inject
public SensorServiceImpl(PubSubRegistry pubSub) {
this.pubSub = pubSub;
}
@Override
public ServiceCall<Temperature, NotUsed> registerTemperature(String id) {
return temperature -> {
final PubSubRef<Temperature> topic =
pubSub.refFor(TopicId.of(Temperature.class, id));
topic.publish(temperature);
return CompletableFuture.completedFuture(NotUsed.getInstance());
};
}
@Override
public ServiceCall<NotUsed, Source<Temperature, ?>> temperatureStream(String id) {
return request -> {
final PubSubRef<Temperature> topic =
pubSub.refFor(TopicId.of(Temperature.class, id));
return CompletableFuture.completedFuture(topic.subscriber());
};
}
}
当一个设备提交其当前温度时,它将被发布到一个与该设备唯一的主题上。注意,消息被发布到的主题是由消息类,即这里的Temperature类和可选的分类器定义的,即这里是设备id。这个主题的消息将是消息类或其子类的实例。限定符可以用来区分使用相同消息类的主题。如果消息类足够定义主题标识,则可以将空字符串用作限定符。
使用PubSubRef的方法发布一个给定的主题来发布一条消息,查看上面代码中的registerTemperature 方法。
使用PubSubRef的方法订阅者获取发布到给定主题的消息流源,请参阅上面代码中的temperatureStream。
也可以将消息流发布到主题中,就像这个SensorService的变体所示:
import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.pubsub.PubSubRef;
import com.lightbend.lagom.javadsl.pubsub.PubSubRegistry;
import com.lightbend.lagom.javadsl.pubsub.TopicId;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.Materializer;
import akka.stream.javadsl.Source;
public class SensorServiceImpl2 implements SensorService2 {
private final PubSubRegistry pubSub;
private final Materializer materializer;
@Inject
public SensorServiceImpl2(PubSubRegistry pubSub, Materializer mat) {
this.pubSub = pubSub;
this.materializer = mat;
}
@Override
public ServiceCall<Source<Temperature, ?>, NotUsed> registerTemperatures(String id) {
return request -> {
final PubSubRef<Temperature> topic =
pubSub.refFor(TopicId.of(Temperature.class, id));
request.runWith(topic.publisher(), materializer);
return CompletableFuture.completedFuture(NotUsed.getInstance());
};
}
@Override
public ServiceCall<NotUsed, Source<Temperature, ?>> temperatureStream(String id) {
return request -> {
final PubSubRef<Temperature> topic =
pubSub.refFor(TopicId.of(Temperature.class, id));
return CompletableFuture.completedFuture(topic.subscriber());
};
}
}
请注意,registerTemperature 中的传入的Source如何使用在构造函数中注入的Materializer与runWith方法连接到该主题的发布者接收器。当然,您可以在连接到发布者程序之前应用传入流的普通流转换。
9.3 Usage from Persistent Entity
您可以从一个持久化实体里发布消息。首先,您必须注入PubSubRegistry以获取给定主题的PubSubRef。
private final PubSubRef<PostPublished> publishedTopic;
@Inject
public Post4(PubSubRegistry pubSub) {
publishedTopic = pubSub.refFor(TopicId.of(PostPublished.class, ""));
}
发布消息的命令处理程序(在本例中是PostPublished 的事件)可能是这样的:
b.setCommandHandler(Publish.class,
(cmd, ctx) -> ctx.thenPersist(new PostPublished(entityId()), evt -> {
ctx.reply(Done.getInstance());
publishedTopic.publish(evt);
}));
为了完成图片,一个服务方法将这些post发布的事件作为流传递:
import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.pubsub.PubSubRef;
import com.lightbend.lagom.javadsl.pubsub.PubSubRegistry;
import com.lightbend.lagom.javadsl.pubsub.TopicId;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.javadsl.Source;
public class BlogServiceImpl4 implements BlogService4 {
private final PubSubRef<BlogEvent.PostPublished> publishedTopic;
@Inject
public BlogServiceImpl4(PubSubRegistry pubSub) {
publishedTopic = pubSub.refFor(TopicId.of(BlogEvent.PostPublished.class, ""));
}
@Override
public ServiceCall<NotUsed, Source<BlogEvent.PostPublished, ?>> getNewPosts() {
return request ->
CompletableFuture.completedFuture(publishedTopic.subscriber());
}
}
9.4 Limitations
该特性专门用于在单个服务集群中提供发布和订阅功能。要在服务之间发布和订阅,您应该使用Lagom的message broker support(消息代理支持)。
发布的消息可能会丢失。例如,在网络问题上,消息可能不会发送给所有订阅者。未来版本的Lagom可能包括在服务中使用至少一次交付,同时您可以通过使用Lagom的message broker support来实现至少一次交付。
请注意,当您返回到message broker support时,您将通过公共主题公开您的消息,使它们成为您的公共API的一部分。
注册用户的注册表最终是一致的,即在其他节点上不能立即看到新的订阅者,但通常情况下,信息将在几秒钟后完全复制到所有其他节点.
9.5 Serialization
发布的消息必须是可序列化的,因为它们将被发送到服务集群中的节点。JSON是这些消息的推荐序列化格式。序列化部分描述了如何向此类消息类添加Jackson序列化支持。
9.6 Underlying Implementation底层实现
它采用分布式的akka发布订阅。更多资料请查看:http://doc.akka.io/docs/akka/2.4/java/distributed-pub-sub.html
10. Serialization序列化
Lagom将为服务的外部API使用JSON请求和响应消息格式,使用Jackson对消息进行序列化和反序列化。在服务集群中发送的消息也必须是可序列化的,因此必须由持久化实体存储的事件。我们也推荐JSON作为这些类的JSON,而Lagom则可以很容易地将Jackson序列化支持添加到这些类里。
不要依赖于用于生产部署的Java序列化。它在序列化的大小和速度上都是低效的。当使用Java序列化时,很难去发展类,这对于持久化状态和事件尤其重要,因为您必须能够对存储的旧对象进行反序列化。
10.1 Enabling JSON Serialization
要为一个类启用JSON序列化,您需要实现Jsonable标记接口。
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.lightbend.lagom.javadsl.immutable.ImmutableStyle;
import com.lightbend.lagom.serialization.Jsonable;
import org.immutables.value.Value;
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = User.class)
public interface AbstractUser extends Jsonable {
String getName();
String getEmail();
}
注意,这里使用的是Immutables库,因此它将生成一个不可变的User类。这就是添加@ jsondeserialize注释的原因。
10.1.1Jackson Modules
默认情况下启用下列Jackson模块:
# The Jackson JSON serializer will register these modules.
# It is also possible to use jackson-modules = ["*"] to dynamically
# find and register all modules in the classpath.
jackson-modules += "com.fasterxml.jackson.module.paramnames.ParameterNamesModule"
jackson-modules += "com.fasterxml.jackson.datatype.jdk8.Jdk8Module"
jackson-modules += "com.fasterxml.jackson.datatype.jsr310.JavaTimeModule"
jackson-modules += "com.fasterxml.jackson.datatype.pcollections.PCollectionsModule"
jackson-modules += "com.fasterxml.jackson.datatype.guava.GuavaModule"
你可以修改配置lagom.serialization.json.jackson-modules可以启用其他模块。
ParameterNamesModule要求启用-parameters Java编译器选项。
section不可变对象包含更多的类的例子,这些类是Jsonable。
您可以使用PersistentEntityTestDriver持久化实体单元测试部分描述,验证所有命令,事件,回复和状态是可序列化的。
10.1.2 Compression
这里描述的压缩只用于持久事件、持久快照和与服务集群的远程消息。它不用于在服务的外部API中序列化的消息。
JSON可能相当冗长,对于大型消息来说,它可能有助于压缩。这是通过使用CompressedJsonable 而不是Jsonable标记接口来完成的。
import com.lightbend.lagom.serialization.CompressedJsonable;
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = Author.class)
public interface AbstractAuthor extends CompressedJsonable {
String getName();
String biography();
}
序列化器默认只压缩大于1024字节的消息。这可以改变配置属性lagom.serialization.json.compress-larger-than阈值。
10.2 Schema Evolution
当在长期运行的项目中工作使用我们在第一节中讲的持久化enityt的时候,或者任何形式的事件源,Schema Evolution就变为了一种重要的开发你的应用的方面。随着时间的推移,我们对业务领域的需求也会随着时间的推移而改变。
在反序列化过程中,Lagom提供了一种执行JSON树模型转换的方法。
我们将讨论一些关于类如何演化的场景。
10.2.1 Remove Field
删除一个字段可以在没有任何迁移代码的情况下完成。Jackson JSON序列化器将忽略类中不存在的属性。
10.2.2 Add Field
添加可选字段可以在没有任何迁移代码的情况下完成。默认值为Option.empty。
Old class:
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = ItemAdded.class)
public interface AbstractItemAdded extends Jsonable {
String getShoppingCartId();
String getProductId();
int getQuantity();
}
具有新可选折扣属性的新类和带有默认值的新注释字段:
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = ItemAdded.class)
public interface AbstractItemAdded extends Jsonable {
String getShoppingCartId();
String getProductId();
int getQuantity();
Optional<Double> getDiscount();
@Value.Default
default String getNote() {
return "";
}
}
让我们说我们要有一个强制性discount属性没有默认值相反:
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = ItemAdded.class)
public interface AbstractItemAdded extends Jsonable {
String getShoppingCartId();
String getProductId();
int getQuantity();
double getDiscount();
}
要添加一个新的强制字段,我们必须使用一个JSON迁移类,并在迁移代码中设置默认值,从而扩展JacksonJsonMigration。
这就是迁移类在添加折扣字段时的样子:
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.DoubleNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.lightbend.lagom.serialization.JacksonJsonMigration;
public class ItemAddedMigration extends JacksonJsonMigration {
@Override
public int currentVersion() {
return 2;
}
@Override
public JsonNode transform(int fromVersion, JsonNode json) {
ObjectNode root = (ObjectNode) json;
if (fromVersion <= 1) {
root.set("discount", DoubleNode.valueOf(0.0));
}
return root;
}
}
覆盖当前版本的方法,以定义当前(最新)版本的版本号。第一个版本,当没有使用迁移时,总是1。当您执行一个不向后兼容而没有迁移代码的更改时,增加这个版本号。
实现将旧JSON结构转换为新的JSON结构的transform方法。JsonNode是可变的,因此您可以添加和删除字段,或者更改值。请注意,您必须转换到特定的子类,比如ObjectNode和ArrayNode,以获得调整器。
迁移类必须在配置文件中定义:
lagom.serialization.json.migrations {
"com.myservice.event.ItemAdded" = "com.myservice.event.ItemAddedMigration"
}
10.2.3 Rename Field
假设我们要将前面的示例中的productId字段重命名为itemId。
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = ItemAdded.class)
public interface AbstractItemAdded extends Jsonable {
String getShoppingCartId();
String getItemId();
int getQuantity();
}
迁移代码将是:
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.lightbend.lagom.serialization.JacksonJsonMigration;
public class ItemAddedMigration extends JacksonJsonMigration {
@Override
public int currentVersion() {
return 2;
}
@Override
public JsonNode transform(int fromVersion, JsonNode json) {
ObjectNode root = (ObjectNode) json;
if (fromVersion <= 1) {
root.set("itemId", root.get("productId"));
root.remove("productId");
}
return root;
}
}
10.2.4 Structural Changes
按照同样的方式,我们可以改变任意的结构。
Old class:
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = Customer.class)
public interface AbstractCustomer extends Jsonable {
String getName();
String getStreet();
String getCity();
String getZipCode();
String getCountry();
}
New class:
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = Customer.class)
public interface AbstractCustomer extends Jsonable {
String getName();
Address getShippingAddress();
Optional<Address> getBillingAddress();
}
Address类
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = Address.class)
public interface AbstractAddress extends Jsonable {
String getStreet();
String getCity();
String getZipCode();
String getCountry();
}
迁移代码将会是这样:
public class CustomerMigration extends JacksonJsonMigration {
@Override
public int currentVersion() {
return 2;
}
@Override
public JsonNode transform(int fromVersion, JsonNode json) {
ObjectNode root = (ObjectNode) json;
if (fromVersion <= 1) {
ObjectNode shippingAddress = root.with("shippingAddress");
shippingAddress.set("street", root.get("street"));
shippingAddress.set("city", root.get("city"));
shippingAddress.set("zipCode", root.get("zipCode"));
shippingAddress.set("country", root.get("country"));
root.remove("street");
root.remove("city");
root.remove("zipCode");
root.remove("country");
}
return root;
}
}
10.2.5 Rename Class
还可以重命名类。例如,让我们将OrderAdded 变为OrderPlaced.
Old class:
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = OrderAdded.class)
public interface AbstractOrderAdded extends Jsonable {
String getShoppingCartId();
}
new class
@Value.Immutable
@ImmutableStyle
@JsonDeserialize(as = OrderPlaced.class)
public interface AbstractOrderPlaced extends Jsonable {
String getShoppingCartId();
}
迁移代码将会是这样:
public class OrderPlacedMigration extends JacksonJsonMigration {
@Override
public int currentVersion() {
return 2;
}
@Override
public String transformClassName(int fromVersion, String className) {
return OrderPlaced.class.getName();
}
@Override
public JsonNode transform(int fromVersion, JsonNode json) {
return json;
}
}
注意,重写transformClassName方法来定义新的类名。
这种类型的迁移必须以旧类名作为键进行配置。实际的类可以被删除。
lagom.serialization.json.migrations {
"com.myservice.event.OrderAdded" = "com.myservice.event.OrderPlacedMigration"
}
11. Cluster
相同服务的实例可以在多个节点上运行,以获得可伸缩性和冗余性。节点可以是物理或虚拟机,分组在集群中。
底层集群技术是Akka集群。
如果服务的实例需要了解彼此,它们必须联接相同的集群。在集群中,服务可以使用Lagom的持久性和发布-订阅模块。
11.1Dependency
如果您使用的是持久性或pubsub模块,则已经包含了集群特性。
如果您想在没有这些模块的情况下启用它,请添加项目构建的以下依赖项。
In Maven:
<dependency>
<groupId>com.lightbend.lagom</groupId>
<artifactId>lagom-javadsl-cluster_2.11</artifactId>
<version>${lagom.version}</version>
</dependency>
In sbt
libraryDependencies += lagomJavadslCluster
11.2 Cluster composition
集群应该只跨运行相同服务的节点.
您可以想象在不同的服务中使用集群特性,但我们建议不要这样做,因为它对服务太过严格。不同的服务应该只通过每个服务的API进行交互。
11.3 Joining
服务实例在服务启动时加入集群。
11.3.1 Joining during development
在开发中,您通常只在一个集群节点上运行服务。没有必要的显式连接;Lagom开发环境会自动处理它。
11.3.2 Joining via ConductR
在生产中, Lightbend ConductR 自动负责连接节点。
11.3.3 Joining without ConductR
如果您没有使用指挥系统,您需要实现如下的连接。
首先,定义集群的一些初始接触点,即所谓的种子节点。您可以在application . conf中定义种子节点.
akka.cluster.seed-nodes = [
"akka.tcp://MyService@host1:2552",
"akka.tcp://MyService@host2:2552"]
或者,可以将此定义为启动JVM时的Java系统属性:
-Dakka.cluster.seed-nodes.0=akka.tcp://MyService@host1:2552
-Dakka.cluster.seed-nodes.1=akka.tcp://MyService@host2:2552
在seed-nodes列表中首先配置的节点是特殊的。只有那个节点才会加入。它用于引导集群。
特殊的第一个种子节点的原因是当从一个空的集群开始时,避免形成分离的岛屿。如果第一个种子节点重新启动,并且有一个现有集群,它将尝试加入其他的种子节点,也就是说它将加入现有的集群。
您可以阅读更多关于集群加入Akka文档的内容。
11.4 Downing
在运行Lagom服务集群时,您必须考虑如何处理网络分区(a.k.a
split brain scenarios大脑分割场景)和机器崩溃(包括JVM和硬件故障)。这对于使用持久化实体时的正确行为至关重要。持久性实体必须是single-writers,即必须只有一个具有给定实体身份的活动实体。如果集群分为两部分,使用错误的策略,则将在两个集群中使用相同标识符的活动实体,写入相同的数据库。这会导致腐败的数据。
天真的方法是在超时之后从集群成员中删除不可到达的节点。这对崩溃和短暂的临时网络分区很有用,但对长网络分区却不适用。网络分区的两边都将看到另一端是不可访问的,并在一段时间后将其从集群成员中移除。由于这种情况发生在两边,结果就是创建了两个独立的断开的集群。这种方法是由在OSS版本的Akka集群中的默认关闭的。因为这种自动关闭不应该用于生产系统。
我们强烈建议不要使用Akka集群的自动关闭特性。
一个预先打包的解决方案是由akka的Split Brain Resolver提供的,它是Lightbend企业套件的一部分。如果使用带有企业套件的Lagom,则配置“keep-majority”策略以启用默认策略。
有关如何在项目的构建中启用它,请参阅Split Brain Resolver文档和反应性平台说明。
即使您不使用商业企业套件,您仍然应该阅读和理解Split Brain Resolver背后的概念,以确保您的解决方案处理所描述的关注点。
11.5 Leaving
当使用持久化实体时候,你可以使用PersistentEntityRegistry.gracefulShutdown 优雅的关闭停止了持久化实体,并以一种优雅的方式离开集群。这不是强制性的,但当您在执行一个服务节点的受控关闭时,它可能是好的。在故障转移到另一个节点期间,它将减少丢失的发送中的消息数量。
最后
以上就是任性蜜粉为你收集整理的Lagom Framework参考指南(五)1.Persistent Entity2. Storing Persistent Entities in Cassandra3. Storing Persistent Entities in a Relational Database4. Persistent Read-Side5. Cassandra Read-Side support6. Relational Database Read-Side support 关系数据库读取方面支持7. JDB的全部内容,希望文章能够帮你解决Lagom Framework参考指南(五)1.Persistent Entity2. Storing Persistent Entities in Cassandra3. Storing Persistent Entities in a Relational Database4. Persistent Read-Side5. Cassandra Read-Side support6. Relational Database Read-Side support 关系数据库读取方面支持7. JDB所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复