diff --git a/.circleci/config.yml b/.circleci/config.yml index eef42044a7..bd588bd323 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -505,6 +505,26 @@ jobs: rm -rf src/main/scala src/test/scala src/it/scala sbt -Dakkaserverless-sdk.version=$SDK_VERSION test:compile + sample-scala-eventsourced-customer-registry: + machine: true + steps: + - checkout-and-merge-to-main + - setup_sbt + - restore_deps_cache + # note: depends on publish local as separate job before this + - copy-from-workspace + - set-sdk-version + - run: + name: Scala Event Sourced Customer Registry + command: | + cd samples/scala-eventsourced-customer-registry + echo "Running sbt with SDK version: '$SDK_VERSION'" + sbt -Dakkaserverless-sdk.version=$SDK_VERSION test + # FIXME integration tests + echo "==== Verifying that generated unmanaged sources compile ====" + rm -rf src/main/scala src/test/scala src/it/scala + sbt -Dakkaserverless-sdk.version=$SDK_VERSION test:compile + sample-scala-valueentity-counter: machine: true steps: @@ -708,6 +728,10 @@ workflows: requires: - checks - publish-local + - sample-scala-eventsourced-customer-registry: + requires: + - checks + - publish-local - sample-scala-valueentity-counter: requires: - checks diff --git a/docs/src/modules/java/pages/views.adoc b/docs/src/modules/java/pages/views.adoc index dee6cffe4a..682a351770 100644 --- a/docs/src/modules/java/pages/views.adoc +++ b/docs/src/modules/java/pages/views.adoc @@ -147,7 +147,7 @@ Scala:: [source,proto,indent=0] .src/main/proto/customer/domain/customer_domain.proto ---- -TODO +include::example$scala-eventsourced-customer-registry/src/main/proto/customer/domain/customer_domain.proto[tags=declarations;domain;events] ---- It also assumes a `customer_api.proto` that defines the state stored in the view and returned by queries: @@ -166,7 +166,7 @@ Scala:: [source,proto,indent=0] .src/main/proto/customer/api/customer_api.proto ---- -TODO +include::example$scala-eventsourced-customer-registry/src/main/proto/customer/api/customer_api.proto[tags=declarations;view] ---- @@ -194,7 +194,7 @@ Scala:: [source,proto,indent=0] .src/main/proto/customer/customer_view.proto ---- -TODO +include::example$scala-eventsourced-customer-registry/src/main/proto/customer/view/customer_view.proto[tags=declarations;service-event-sourced] ---- See <<#query>> for more examples of valid query syntax. @@ -227,8 +227,11 @@ Scala:: [source,scala,indent=0] .src/main/scala/customer/view/CustomerByNameView.scala ---- -TODO +include::example$scala-eventsourced-customer-registry/src/main/scala/customer/view/CustomerByNameView.scala[tag=process-events] ---- +<1> Extends the generated `AbstractCustomerByNameView`, which extends link:{attachmentsdir}/scala-api/com/akkaserverless/scalasdk/view/View.html[`View` {tab-icon}, window="new"]. +<2> Defines the initial, empty, state that is used before any updates. +<3> One method for each event. NOTE: This type of update transformation is a natural fit for Events emitted by an Event Sourced Entity, but it can also be used for Value Entities. For example, if the View representation is different from the Entity state you might want to transform it before presenting the View to the client. @@ -251,7 +254,7 @@ Scala:: [source,scala,indent=0] .src/main/scala/customer/Main.scala ---- -TODO +include::example$scala-eventsourced-customer-registry/src/main/scala/customer/Main.scala[tag=register] ---- [#topic-view] @@ -274,8 +277,9 @@ Scala:: [source,proto,indent=0] .src/main/proto/customer/view/customer_view.proto ---- -TODO +include::example$scala-eventsourced-customer-registry/src/main/proto/customer/view/customer_view.proto[tags=declarations;service-topic] ---- +<1> This is the only difference from <>. [#transform-results] == How to transform results @@ -379,7 +383,7 @@ Scala:: [source,scala,indent=0] .src/main/scala/customer/Main.scala ---- -TODO +include::example$scala-eventsourced-customer-registry/src/main/scala/customer/MainWithCustomViewId.scala[tag=register] ---- The View definitions are stored and validated when a new version is deployed. There will be an error message if the changes are not compatible. diff --git a/samples/scala-eventsourced-customer-registry/.scalafmt.conf b/samples/scala-eventsourced-customer-registry/.scalafmt.conf new file mode 100644 index 0000000000..d7ea3a6d5a --- /dev/null +++ b/samples/scala-eventsourced-customer-registry/.scalafmt.conf @@ -0,0 +1,47 @@ +version = 3.0.3 + +style = defaultWithAlign + +docstrings.style = Asterisk +indentOperator.preset = spray +maxColumn = 120 +rewrite.rules = [RedundantParens, SortImports, AvoidInfix] +unindentTopLevelOperators = true +align.tokens = [{code = "=>", owner = "Case"}] +align.openParenDefnSite = false +align.openParenCallSite = false +optIn.configStyleArguments = false +danglingParentheses.preset = false +spaces.inImportCurlyBraces = true +newlines.afterCurlyLambda = preserve +rewrite.neverInfix.excludeFilters = [ + and + min + max + until + to + by + eq + ne + "should.*" + "contain.*" + "must.*" + in + ignore + be + taggedAs + thrownBy + synchronized + have + when + size + only + noneOf + oneElementOf + noElementsOf + atLeastOneElementOf + atMostOneElementOf + allElementsOf + inOrderElementsOf + theSameElementsAs +] diff --git a/samples/scala-eventsourced-customer-registry/README.md b/samples/scala-eventsourced-customer-registry/README.md new file mode 100644 index 0000000000..f1c3759a0a --- /dev/null +++ b/samples/scala-eventsourced-customer-registry/README.md @@ -0,0 +1,73 @@ +# Implementing a Counter as an Event Sourced Entity + + +## Building and running unit tests + +To compile and test the code from the command line, use + +```shell +sbt test +``` + + + +## Running Locally + +In order to run your application locally, you must run the Akka Serverless proxy. The included `docker compose` file contains the configuration required to run the proxy for a locally running application. +It also contains the configuration to start a local Google Pub/Sub emulator that the Akka Serverless proxy will connect to. +To start the proxy, run the following command from this directory: + +``` +docker-compose up +``` + +To start the application locally, use the following command: + +``` +sbt run +``` + +For further details see [Running a service locally](https://developer.lightbend.com/docs/akka-serverless/developing/running-service-locally.html) in the documentation. + +## Exercise the service + +With both the proxy and your application running, any defined endpoints should be available at `http://localhost:9000`. In addition to the defined gRPC interface, each method has a corresponding HTTP endpoint. Unless configured otherwise (see [Transcoding HTTP](https://docs.lbcs.dev/js-services/proto.html#_transcoding_http)), this endpoint accepts POST requests at the path `/[package].[entity name]/[method]`. + +* Create a customer with: + ``` + grpcurl --plaintext -d '{"customer_id": "wip", "email": "wip@example.com", "name": "Very Important", "address": {"street": "Road 1", "city": "The Capital"}}' localhost:9000 customer.api.CustomerService/Create + ``` +* Retrieve the customer: + ``` + grpcurl --plaintext -d '{"customer_id": "wip"}' localhost:9000 customer.api.CustomerService/GetCustomer + ``` +* Query by name: + ``` + grpcurl --plaintext -d '{"customer_name": "Very Important"}' localhost:9000 customer.view.CustomerByName/GetCustomers + ``` +* Change name: + ``` + grpcurl --plaintext -d '{"customer_id": "wip", "new_name": "Most Important"}' localhost:9000 customer.api.CustomerService/ChangeName + ``` +* Change address: + ``` + grpcurl --plaintext -d '{"customer_id": "wip", "new_address": {"street": "Street 1", "city": "The City"}}' localhost:9000 customer.api.CustomerService/ChangeAddress + ``` + +## Deploying + +To deploy your service, install the `akkasls` CLI as documented in +[Setting up a local development environment](https://developer.lightbend.com/docs/akka-serverless/setting-up/) +and configure a Docker Registry to upload your docker image to. + +You will need to set the `docker.username` system property when starting sbt to be able to publish the image, for example `sbt -Ddocker.username=myuser docker:publish`. + +If you are publishing to a different registry than docker hub, you will also need to specify what registry using the system property `docker.registry`. + +Refer to +[Configuring registries](https://developer.lightbend.com/docs/akka-serverless/projects/container-registries.html) +for more information on how to make your docker image available to Akka Serverless. + +Finally, you can use the [Akka Serverless Console](https://console.akkaserverless.com) +to create an Akka Serverless project and then deploy your service into it +through the `akkasls` CLI or via the web interface. diff --git a/samples/scala-eventsourced-customer-registry/build.sbt b/samples/scala-eventsourced-customer-registry/build.sbt new file mode 100644 index 0000000000..539e26b05a --- /dev/null +++ b/samples/scala-eventsourced-customer-registry/build.sbt @@ -0,0 +1,38 @@ +name := "eventsourced-customer-registry" + +organization := "com.akkaseverless.samples" +organizationHomepage := Some(url("https://akkaserverless.com")) +licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0"))) + +scalaVersion := "2.13.6" + +enablePlugins(AkkaserverlessPlugin, JavaAppPackaging, DockerPlugin) +dockerBaseImage := "docker.io/library/adoptopenjdk:11-jre-hotspot" +dockerUsername := sys.props.get("docker.username") +dockerRepository := sys.props.get("docker.registry") +ThisBuild / dynverSeparator := "-" + +Compile / scalacOptions ++= Seq( + "-target:11", + "-deprecation", + "-feature", + "-unchecked", + "-Xlog-reflective-calls", + "-Xlint") +Compile / javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation", "-parameters" // for Jackson +) + +Test / parallelExecution := false +Test / testOptions += Tests.Argument("-oDF") +Test / logBuffered := false + +Compile / run := { + // needed for the proxy to access the user function on all platforms + sys.props += "akkaserverless.user-function-interface" -> "0.0.0.0" + (Compile / run).evaluated +} +run / fork := false +Global / cancelable := false // ctrl-c + +libraryDependencies ++= Seq("org.scalatest" %% "scalatest" % "3.2.7" % Test) + diff --git a/samples/scala-eventsourced-customer-registry/docker-compose.yml b/samples/scala-eventsourced-customer-registry/docker-compose.yml new file mode 100644 index 0000000000..7c7571ee82 --- /dev/null +++ b/samples/scala-eventsourced-customer-registry/docker-compose.yml @@ -0,0 +1,18 @@ +version: "3" +services: + akka-serverless-proxy: + image: gcr.io/akkaserverless-public/akkaserverless-proxy:0.7.1 + command: -Dconfig.resource=dev-mode.conf -Dakkaserverless.proxy.eventing.support=google-pubsub-emulator + ports: + - "9000:9000" + extra_hosts: + - "host.docker.internal:host-gateway" + environment: + USER_FUNCTION_HOST: ${USER_FUNCTION_HOST:-host.docker.internal} + USER_FUNCTION_PORT: ${USER_FUNCTION_PORT:-8080} + PUBSUB_EMULATOR_HOST: gcloud-pubsub-emulator + gcloud-pubsub-emulator: + image: gcr.io/google.com/cloudsdktool/cloud-sdk:341.0.0 + command: gcloud beta emulators pubsub start --project=test --host-port=0.0.0.0:8085 + ports: + - 8085:8085 diff --git a/samples/scala-eventsourced-customer-registry/project/build.properties b/samples/scala-eventsourced-customer-registry/project/build.properties new file mode 100644 index 0000000000..dbae93bcfd --- /dev/null +++ b/samples/scala-eventsourced-customer-registry/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.4.9 diff --git a/samples/scala-eventsourced-customer-registry/project/plugins.sbt b/samples/scala-eventsourced-customer-registry/project/plugins.sbt new file mode 100644 index 0000000000..31fe79f34e --- /dev/null +++ b/samples/scala-eventsourced-customer-registry/project/plugins.sbt @@ -0,0 +1,4 @@ +addSbtPlugin("com.akkaserverless" % "sbt-akkaserverless" % System.getProperty("akkaserverless-sdk.version", "0.8.0")) +addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.8.1") +addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2") diff --git a/samples/scala-eventsourced-customer-registry/src/main/proto/customer/api/customer_api.proto b/samples/scala-eventsourced-customer-registry/src/main/proto/customer/api/customer_api.proto new file mode 100644 index 0000000000..1e0f11c6e8 --- /dev/null +++ b/samples/scala-eventsourced-customer-registry/src/main/proto/customer/api/customer_api.proto @@ -0,0 +1,67 @@ +// Copyright 2021 Lightbend Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package customer.api; + +// tag::declarations[] + +import "google/protobuf/empty.proto"; +import "akkaserverless/annotations.proto"; + +// end::declarations[] + +// tag::view[] +message Customer { + string customer_id = 1 [(akkaserverless.field).entity_key = true]; + string email = 2; + string name = 3; + Address address = 4; +} + +message Address { + string street = 1; + string city = 2; +} +// end::view[] + +message GetCustomerRequest { + string customer_id = 1 [(akkaserverless.field).entity_key = true]; +} + +message ChangeNameRequest { + string customer_id = 1 [(akkaserverless.field).entity_key = true]; + string new_name = 2; +} + +message ChangeAddressRequest { + string customer_id = 1 [(akkaserverless.field).entity_key = true]; + Address new_address = 2; +} + +service CustomerService { + option (akkaserverless.service) = { + type: SERVICE_TYPE_ENTITY + component: "customer.domain.CustomerEntity" + }; + + rpc Create(Customer) returns (google.protobuf.Empty) {} + + rpc ChangeName(ChangeNameRequest) returns (google.protobuf.Empty) {} + + rpc ChangeAddress(ChangeAddressRequest) returns (google.protobuf.Empty) {} + + rpc GetCustomer(GetCustomerRequest) returns (Customer) {} +} diff --git a/samples/scala-eventsourced-customer-registry/src/main/proto/customer/domain/customer_domain.proto b/samples/scala-eventsourced-customer-registry/src/main/proto/customer/domain/customer_domain.proto new file mode 100644 index 0000000000..31d3d8f206 --- /dev/null +++ b/samples/scala-eventsourced-customer-registry/src/main/proto/customer/domain/customer_domain.proto @@ -0,0 +1,57 @@ +// Copyright 2021 Lightbend Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// tag::declarations[] +syntax = "proto3"; + +package customer.domain; + +//end::declarations[] + +// tag::domain[] +import "akkaserverless/annotations.proto"; + +option (akkaserverless.file).event_sourced_entity = { // <1> + name: "CustomerEntity" + entity_type: "customers" + state: "CustomerState" + events: ["CustomerCreated", "CustomerNameChanged", "CustomerAddressChanged"] +}; + +message CustomerState { + string customer_id = 1; + string email = 2; + string name = 3; + Address address = 4; +} + +message Address { + string street = 1; + string city = 2; +} +// end::domain[] + +// tag::events[] +message CustomerCreated { + CustomerState customer = 1; +} + +message CustomerNameChanged { + string new_name = 1; +} + +message CustomerAddressChanged { + Address new_address = 1; +} +// end::events[] diff --git a/samples/scala-eventsourced-customer-registry/src/main/proto/customer/view/customer_view.proto b/samples/scala-eventsourced-customer-registry/src/main/proto/customer/view/customer_view.proto new file mode 100644 index 0000000000..59407a47d0 --- /dev/null +++ b/samples/scala-eventsourced-customer-registry/src/main/proto/customer/view/customer_view.proto @@ -0,0 +1,133 @@ +// Copyright 2021 Lightbend Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// tag::declarations[] +syntax = "proto3"; + +package customer.view; + +import "customer/domain/customer_domain.proto"; +import "customer/api/customer_api.proto"; +import "akkaserverless/annotations.proto"; +import "google/protobuf/any.proto"; + +// end::declarations[] + +// tag::service-event-sourced[] +message ByNameRequest { + string customer_name = 1; +} + +service CustomerByName { + option (akkaserverless.service) = { // <1> + type: SERVICE_TYPE_VIEW + }; + + rpc ProcessCustomerCreated(domain.CustomerCreated) returns (api.Customer) { // <2> + option (akkaserverless.method).eventing.in = { + event_sourced_entity: "customers" // <3> + }; + option (akkaserverless.method).view.update = { + table: "customers" + transform_updates: true // <4> + }; + } + + rpc ProcessCustomerNameChanged(domain.CustomerNameChanged) returns (api.Customer) { // <2> + option (akkaserverless.method).eventing.in = { + event_sourced_entity: "customers" // <5> + }; + option (akkaserverless.method).view.update = { + table: "customers" + transform_updates: true // <6> + }; + } + + rpc ProcessCustomerAddressChanged(domain.CustomerAddressChanged) returns (api.Customer) { + option (akkaserverless.method).eventing.in = { + event_sourced_entity: "customers" + }; + option (akkaserverless.method).view.update = { + table: "customers" + transform_updates: true + }; + } + + rpc IgnoreOtherEvents(google.protobuf.Any) returns (api.Customer) { + option (akkaserverless.method).eventing.in = { + event_sourced_entity: "customers" // <5> + }; + option (akkaserverless.method).view.update = { + table: "customers" + transform_updates: true // <6> + }; + }; + + rpc GetCustomers(ByNameRequest) returns (stream api.Customer) { + option (akkaserverless.method).view.query = { + query: "SELECT * FROM customers WHERE name = :customer_name" + }; + } +} +// end::service-event-sourced[] + +// tag::service-topic[] +service CustomerByNameFromTopic { + rpc ProcessCustomerCreated(domain.CustomerCreated) returns (api.Customer) { + option (akkaserverless.method).eventing.in = { + topic: "customers" // <1> + }; + option (akkaserverless.method).view.update = { + table: "customers" + transform_updates: true + }; + } + + rpc ProcessCustomerNameChanged(domain.CustomerNameChanged) returns (api.Customer) { + option (akkaserverless.method).eventing.in = { + topic: "customers" + }; + option (akkaserverless.method).view.update = { + table: "customers" + transform_updates: true + }; + } + + rpc ProcessCustomerAddressChanged(domain.CustomerAddressChanged) returns (api.Customer) { + option (akkaserverless.method).eventing.in = { + topic: "customers" + }; + option (akkaserverless.method).view.update = { + table: "customers" + transform_updates: true + }; + } + + rpc IgnoreOtherEvents(google.protobuf.Any) returns (api.Customer) { + option (akkaserverless.method).eventing.in = { + topic: "customers" + }; + option (akkaserverless.method).view.update = { + table: "customers" + transform_updates: true + }; + }; + + rpc GetCustomers(ByNameRequest) returns (stream api.Customer) { + option (akkaserverless.method).view.query = { + query: "SELECT * FROM customers WHERE name = :customer_name" + }; + } +} +// end::service-topic[] diff --git a/samples/scala-eventsourced-customer-registry/src/main/scala/customer/Main.scala b/samples/scala-eventsourced-customer-registry/src/main/scala/customer/Main.scala new file mode 100644 index 0000000000..a422cbb479 --- /dev/null +++ b/samples/scala-eventsourced-customer-registry/src/main/scala/customer/Main.scala @@ -0,0 +1,24 @@ +package customer + +import com.akkaserverless.scalasdk.AkkaServerless +import customer.domain.CustomerEntity +import customer.view.CustomerByNameView +import org.slf4j.LoggerFactory + +object Main { + + private val log = LoggerFactory.getLogger("customer.Main") + + // tag::register[] + def createAkkaServerless(): AkkaServerless = { + AkkaServerlessFactory.withComponents( + new CustomerEntity(_), + new CustomerByNameView(_)) + } + // end::register[] + + def main(args: Array[String]): Unit = { + log.info("starting the Akka Serverless service") + createAkkaServerless().start() + } +} diff --git a/samples/scala-eventsourced-customer-registry/src/main/scala/customer/MainWithCustomViewId.scala b/samples/scala-eventsourced-customer-registry/src/main/scala/customer/MainWithCustomViewId.scala new file mode 100644 index 0000000000..d891d6d49b --- /dev/null +++ b/samples/scala-eventsourced-customer-registry/src/main/scala/customer/MainWithCustomViewId.scala @@ -0,0 +1,29 @@ +package customer + +import com.akkaserverless.scalasdk.AkkaServerless +import customer.domain.CustomerEntity +import customer.domain.CustomerEntityProvider +import customer.view.CustomerByNameView +import customer.view.CustomerByNameViewProvider +import org.slf4j.LoggerFactory + +object MainWithCustomViewId { + + private val log = LoggerFactory.getLogger("customer.Main") + + // tag::register[] + def createAkkaServerless(): AkkaServerless = + AkkaServerless() + .register( + CustomerByNameViewProvider(new CustomerByNameView(_)) + .withViewId("CustomerByNameV2")) + .register( + CustomerEntityProvider(new CustomerEntity(_)) + ) + // end::register[] + + def main(args: Array[String]): Unit = { + log.info("starting the Akka Serverless service") + createAkkaServerless().start() + } +} diff --git a/samples/scala-eventsourced-customer-registry/src/main/scala/customer/domain/CustomerEntity.scala b/samples/scala-eventsourced-customer-registry/src/main/scala/customer/domain/CustomerEntity.scala new file mode 100644 index 0000000000..4aeab0426b --- /dev/null +++ b/samples/scala-eventsourced-customer-registry/src/main/scala/customer/domain/CustomerEntity.scala @@ -0,0 +1,70 @@ + +package customer.domain + +import com.akkaserverless.scalasdk.eventsourcedentity.EventSourcedEntity +import com.akkaserverless.scalasdk.eventsourcedentity.EventSourcedEntityContext +import com.google.protobuf.empty.Empty +import customer.api + +/** An event sourced entity. */ +class CustomerEntity(context: EventSourcedEntityContext) extends AbstractCustomerEntity { + + private val entityId = context.entityId + + override def emptyState: CustomerState = CustomerState(customerId = entityId) + + override def getCustomer(currentState: CustomerState, getCustomerRequest: api.GetCustomerRequest): EventSourcedEntity.Effect[api.Customer] = + effects.reply(convertToApi(currentState)) + + override def create(currentState: CustomerState, customer: api.Customer): EventSourcedEntity.Effect[Empty] = { + val event = CustomerCreated(Some(convertToDomain(customer))) + effects.emitEvent(event).thenReply(_ => Empty.defaultInstance) + } + + override def changeName(currentState: CustomerState, changeNameRequest: api.ChangeNameRequest): EventSourcedEntity.Effect[Empty] = { + val event = CustomerNameChanged(newName = changeNameRequest.newName) + effects.emitEvent(event).thenReply(_ => Empty.defaultInstance) + } + + override def changeAddress(currentState: CustomerState, changeAddressRequest: api.ChangeAddressRequest): EventSourcedEntity.Effect[Empty] = { + val event = CustomerAddressChanged(newAddress = changeAddressRequest.newAddress.map(convertToDomain)) + effects.emitEvent(event).thenReply(_ => Empty.defaultInstance) + } + + override def customerCreated(currentState: CustomerState, customerCreated: CustomerCreated): CustomerState = + customerCreated.customer.get + + override def customerNameChanged(currentState: CustomerState, customerNameChanged: CustomerNameChanged): CustomerState = + currentState.copy(name = customerNameChanged.newName) + + override def customerAddressChanged(currentState: CustomerState, customerAddressChanged: CustomerAddressChanged): CustomerState = + currentState.copy(address = customerAddressChanged.newAddress) + + private def convertToApi(customer: CustomerState): api.Customer = + api.Customer( + customerId = customer.customerId, + name = customer.name, + email = customer.email, + address = customer.address.map(convertToApi), + ) + + private def convertToApi(address: Address): api.Address = + api.Address( + street = address.street, + city = address.city + ) + + private def convertToDomain(customer: api.Customer): CustomerState = + CustomerState( + customerId = customer.customerId, + email = customer.email, + name = customer.name, + address = customer.address.map(convertToDomain) + ) + + private def convertToDomain(address: api.Address): Address = + Address( + street = address.street, + city = address.city + ) +} diff --git a/samples/scala-eventsourced-customer-registry/src/main/scala/customer/view/CustomerByNameView.scala b/samples/scala-eventsourced-customer-registry/src/main/scala/customer/view/CustomerByNameView.scala new file mode 100644 index 0000000000..2cea7948c3 --- /dev/null +++ b/samples/scala-eventsourced-customer-registry/src/main/scala/customer/view/CustomerByNameView.scala @@ -0,0 +1,50 @@ +package customer.view + +// tag::process-events[] +import com.akkaserverless.scalasdk.view.View.UpdateEffect +import com.akkaserverless.scalasdk.view.ViewContext +import com.google.protobuf.any.{Any => ScalaPbAny} +import customer.api +import customer.api.Customer +import customer.domain +import customer.domain.CustomerAddressChanged +import customer.domain.CustomerCreated +import customer.domain.CustomerNameChanged +import customer.domain.CustomerState + +class CustomerByNameView(context: ViewContext) extends AbstractCustomerByNameView { // <1> + + override def emptyState: Customer = Customer.defaultInstance // <2> + + override def processCustomerCreated( + state: Customer, customerCreated: CustomerCreated): UpdateEffect[Customer] = // <3> + if (state == emptyState) effects.ignore() // already created + else effects.updateState(convertToApi(customerCreated.customer.get)) + + override def processCustomerNameChanged( + state: Customer, customerNameChanged: CustomerNameChanged): UpdateEffect[Customer] = // <3> + effects.updateState(state.copy(name = customerNameChanged.newName)) + + override def processCustomerAddressChanged( + state: Customer, customerAddressChanged: CustomerAddressChanged): UpdateEffect[Customer] = // <3> + effects.updateState(state.copy(address = customerAddressChanged.newAddress.map(convertToApi))) + + override def ignoreOtherEvents( + state: Customer, any: ScalaPbAny): UpdateEffect[Customer] = + effects.ignore() + + private def convertToApi(customer: CustomerState): Customer = + Customer( + customerId = customer.customerId, + name = customer.name, + email = customer.email, + address = customer.address.map(convertToApi), + ) + + private def convertToApi(address: domain.Address): api.Address = + api.Address( + street = address.street, + city = address.city + ) +} +// end::process-events[] \ No newline at end of file