Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ lazy val sdkJava = project

lazy val sdkScala = project
.in(file("sdk/scala-sdk"))
.dependsOn(sdkCore)
.dependsOn(sdkJava)
.enablePlugins(AkkaGrpcPlugin, BuildInfoPlugin, PublishSonatype)
.settings(
name := "akkaserverless-scala-sdk",
Expand All @@ -88,6 +88,8 @@ lazy val sdkScala = project
"protocolMinorVersion" -> AkkaServerless.ProtocolVersionMinor,
"scalaVersion" -> scalaVersion.value),
buildInfoPackage := "com.akkaserverless.scalasdk",
//FIXME skip for now
(publish / skip) := true,
Compile / scalacOptions ++= Seq("-release", "8"),
Compile / akkaGrpcGeneratedSources := Seq(AkkaGrpc.Server),
Compile / akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Scala),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object AkkaServerlessRunner {
final class AkkaServerlessRunner private[this] (
_system: ActorSystem,
serviceFactories: Map[String, java.util.function.Function[ActorSystem, Service]]) {
private[javasdk] implicit val system: ActorSystem = _system
private[akkaserverless] implicit val system: ActorSystem = _system
private val log = LoggerFactory.getLogger(getClass)

private[this] final val configuration =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,85 +30,100 @@ import scala.jdk.CollectionConverters._
import scala.collection.immutable
import scala.compat.java8.OptionConverters._

private[impl] class MetadataImpl(val entries: immutable.Seq[MetadataEntry]) extends Metadata with CloudEvent {
private[akkaserverless] class MetadataImpl(val entries: immutable.Seq[MetadataEntry]) extends Metadata with CloudEvent {

override def has(key: String): Boolean = entries.exists(_.key.equalsIgnoreCase(key))

override def get(key: String): Optional[String] =
getScala(key).asJava

private[akkaserverless] def getScala(key: String): Option[String] =
entries.collectFirst {
case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) =>
value
}.asJava
}

override def getAll(key: String): util.List[String] =
getAllScala(key).asJava

private[akkaserverless] def getAllScala(key: String): Seq[String] =
entries.collect {
case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) =>
value
}.asJava
}

override def getBinary(key: String): Optional[ByteBuffer] =
getBinaryScala(key).asJava

private[akkaserverless] def getBinaryScala(key: String): Option[ByteBuffer] =
entries.collectFirst {
case MetadataEntry(k, MetadataEntry.Value.BytesValue(value), _) if key.equalsIgnoreCase(k) =>
value.asReadOnlyByteBuffer()
}.asJava
}

override def getBinaryAll(key: String): util.List[ByteBuffer] =
getBinaryAllScala(key).asJava

private[akkaserverless] def getBinaryAllScala(key: String): Seq[ByteBuffer] =
entries.collect {
case MetadataEntry(k, MetadataEntry.Value.BytesValue(value), _) if key.equalsIgnoreCase(k) =>
value.asReadOnlyByteBuffer()
}.asJava
}

override def getAllKeys: util.List[String] = entries.map(_.key).asJava
override def getAllKeys: util.List[String] = getAllKeysScala.asJava
private[akkaserverless] def getAllKeysScala: Seq[String] = entries.map(_.key)

override def set(key: String, value: String): MetadataImpl = {
Objects.requireNonNull(key, "Key must not be null")
Objects.requireNonNull(value, "Value must not be null")
new MetadataImpl(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value)))
}

override def setBinary(key: String, value: ByteBuffer): Metadata = {
override def setBinary(key: String, value: ByteBuffer): MetadataImpl = {
Objects.requireNonNull(key, "Key must not be null")
Objects.requireNonNull(value, "Value must not be null")
new MetadataImpl(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value))))
}

override def add(key: String, value: String): Metadata = {
override def add(key: String, value: String): MetadataImpl = {
Objects.requireNonNull(key, "Key must not be null")
Objects.requireNonNull(value, "Value must not be null")
new MetadataImpl(entries :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value)))
}

override def addBinary(key: String, value: ByteBuffer): Metadata = {
override def addBinary(key: String, value: ByteBuffer): MetadataImpl = {
Objects.requireNonNull(key, "Key must not be null")
Objects.requireNonNull(value, "Value must not be null")
new MetadataImpl(entries :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value))))
}

override def remove(key: String): MetadataImpl = new MetadataImpl(removeKey(key))

override def clear(): Metadata = MetadataImpl.Empty
override def clear(): MetadataImpl = MetadataImpl.Empty

private[akkaserverless] def iteratorScala[R](f: MetadataEntry => R): Iterator[R] =
entries.iterator.map(f)

override def iterator(): util.Iterator[Metadata.MetadataEntry] =
entries.iterator.map { entry =>
iteratorScala(entry =>
new Metadata.MetadataEntry {
override def getKey: String = entry.key
override def getValue: String = entry.value.stringValue.orNull
override def getBinaryValue: ByteBuffer = entry.value.bytesValue.map(_.asReadOnlyByteBuffer()).orNull
override def isText: Boolean = entry.value.isStringValue
override def isBinary: Boolean = entry.value.isBytesValue
}
}.asJava
}).asJava

private def removeKey(key: String) = entries.filterNot(_.key.equalsIgnoreCase(key))

def isCloudEvent: Boolean = MetadataImpl.CeRequired.forall(h => has(h))

override def asCloudEvent(): CloudEvent =
override def asCloudEvent(): MetadataImpl =
if (!isCloudEvent) {
throw new IllegalStateException("Metadata is not a CloudEvent!")
} else this

override def asCloudEvent(id: String, source: URI, `type`: String): CloudEvent =
override def asCloudEvent(id: String, source: URI, `type`: String): MetadataImpl =
new MetadataImpl(
entries.filterNot(e => MetadataImpl.CeRequired(e.key)) ++
Seq(
Expand All @@ -131,41 +146,46 @@ private[impl] class MetadataImpl(val entries: immutable.Seq[MetadataEntry]) exte

override def id(): String = getRequiredCloudEventField(MetadataImpl.CeId)

override def withId(id: String): CloudEvent = set(MetadataImpl.CeId, id)
override def withId(id: String): MetadataImpl = set(MetadataImpl.CeId, id)

override def source(): URI = URI.create(getRequiredCloudEventField(MetadataImpl.CeSource))

override def withSource(source: URI): CloudEvent = set(MetadataImpl.CeSource, source.toString)
override def withSource(source: URI): MetadataImpl = set(MetadataImpl.CeSource, source.toString)

override def `type`(): String = getRequiredCloudEventField(MetadataImpl.CeType)

override def withType(`type`: String): CloudEvent = set(MetadataImpl.CeType, `type`)
override def withType(`type`: String): MetadataImpl = set(MetadataImpl.CeType, `type`)

override def datacontenttype(): Optional[String] = get(MetadataImpl.CeDatacontenttype)
override def datacontenttype(): Optional[String] = getScala(MetadataImpl.CeDatacontenttype).asJava
private[akkaserverless] def datacontenttypeScala(): Option[String] = getScala(MetadataImpl.CeDatacontenttype)

override def withDatacontenttype(datacontenttype: String): CloudEvent =
override def withDatacontenttype(datacontenttype: String): MetadataImpl =
set(MetadataImpl.CeDatacontenttype, datacontenttype)

override def clearDatacontenttype(): CloudEvent = remove(MetadataImpl.CeDatacontenttype)
override def clearDatacontenttype(): MetadataImpl = remove(MetadataImpl.CeDatacontenttype)

override def dataschema(): Optional[URI] = get(MetadataImpl.CeDataschema).map(URI.create(_))
override def dataschema(): Optional[URI] = dataschemaScala().asJava
private[akkaserverless] def dataschemaScala(): Option[URI] = getScala(MetadataImpl.CeDataschema).map(URI.create(_))

override def withDataschema(dataschema: URI): CloudEvent = set(MetadataImpl.CeDataschema, dataschema.toString)
override def withDataschema(dataschema: URI): MetadataImpl = set(MetadataImpl.CeDataschema, dataschema.toString)

override def clearDataschema(): CloudEvent = remove(MetadataImpl.CeDataschema)
override def clearDataschema(): MetadataImpl = remove(MetadataImpl.CeDataschema)

override def subject(): Optional[String] = get(MetadataImpl.CeSubject)
override def subject(): Optional[String] = subjectScala.asJava
private[akkaserverless] def subjectScala: Option[String] = getScala(MetadataImpl.CeSubject)

override def withSubject(subject: String): CloudEvent = set(MetadataImpl.CeSubject, subject)
override def withSubject(subject: String): MetadataImpl = set(MetadataImpl.CeSubject, subject)

override def clearSubject(): CloudEvent = remove(MetadataImpl.CeSubject)
override def clearSubject(): MetadataImpl = remove(MetadataImpl.CeSubject)

override def time(): Optional[ZonedDateTime] = get(MetadataImpl.CeTime).map(ZonedDateTime.parse(_))
override def time(): Optional[ZonedDateTime] = timeScala.asJava
private[akkaserverless] def timeScala: Option[ZonedDateTime] =
getScala(MetadataImpl.CeTime).map(ZonedDateTime.parse(_))

override def withTime(time: ZonedDateTime): CloudEvent =
override def withTime(time: ZonedDateTime): MetadataImpl =
set(MetadataImpl.CeTime, DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time))

override def clearTime(): CloudEvent = remove(MetadataImpl.CeTime)
override def clearTime(): MetadataImpl = remove(MetadataImpl.CeTime)

override def asMetadata(): Metadata = this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@ import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Source

import com.akkaserverless.javasdk.AkkaServerlessRunner.Configuration

// FIXME these don't seem to be 'public API', more internals?
import com.akkaserverless.javasdk.ComponentOptions
import com.akkaserverless.javasdk.Context
import com.akkaserverless.javasdk.Metadata
import com.akkaserverless.javasdk.Service
import com.akkaserverless.javasdk.ServiceCallFactory
import com.akkaserverless.javasdk.valueentity._

import com.akkaserverless.javasdk.impl.ValueEntityFactory
import com.akkaserverless.javasdk.impl._
import com.akkaserverless.javasdk.impl.effect.EffectSupport
Expand All @@ -38,7 +43,6 @@ import com.akkaserverless.javasdk.impl.effect.MessageReplyImpl
import com.akkaserverless.javasdk.impl.valueentity.ValueEntityEffectImpl.DeleteState
import com.akkaserverless.javasdk.impl.valueentity.ValueEntityEffectImpl.UpdateState
import com.akkaserverless.javasdk.impl.valueentity.ValueEntityHandler.CommandResult
import com.akkaserverless.javasdk.valueentity._
import com.akkaserverless.protocol.value_entity.ValueEntityAction.Action.Delete
import com.akkaserverless.protocol.value_entity.ValueEntityAction.Action.Update
import com.akkaserverless.protocol.value_entity.ValueEntityStreamIn.Message.{ Command => InCommand }
Expand Down Expand Up @@ -119,7 +123,7 @@ final class ValueEntitiesImpl(
private def runEntity(init: ValueEntityInit): Flow[ValueEntityStreamIn, ValueEntityStreamOut, NotUsed] = {
val service =
services.getOrElse(init.serviceName, throw ProtocolException(init, s"Service not found: ${init.serviceName}"))
val handler = service.factory.create(new ValueEntityContextImpl(init.entityId))
val handler = service.factory.create(new ValueEntityContextImpl(rootContext, init.entityId))
val thisEntityId = init.entityId

init.state match {
Expand All @@ -146,7 +150,7 @@ final class ValueEntitiesImpl(
val cmd =
service.anySupport.decode(
ScalaPbAny.toJavaProto(command.payload.getOrElse(throw ProtocolException(command, "No command payload"))))
val context = new CommandContextImpl(thisEntityId, command.name, command.id, metadata)
val context = new CommandContextImpl(rootContext, thisEntityId, command.name, command.id, metadata)

val CommandResult(effect: ValueEntityEffectImpl[_]) =
try {
Expand Down Expand Up @@ -205,21 +209,23 @@ final class ValueEntitiesImpl(
}
}

private trait AbstractContext extends ValueEntityContext {
override def serviceCallFactory(): ServiceCallFactory = rootContext.serviceCallFactory()
}

private final class CommandContextImpl(
override val entityId: String,
override val commandName: String,
override val commandId: Long,
override val metadata: Metadata)
extends CommandContext
with AbstractContext
with ActivatableContext

private final class ValueEntityContextImpl(override val entityId: String)
extends ValueEntityContext
with AbstractContext
}

private[akkaserverless] trait AbstractContext extends ValueEntityContext {
private[akkaserverless] def rootContext: Context
override def serviceCallFactory(): ServiceCallFactory = rootContext.serviceCallFactory()
}

private[akkaserverless] final class CommandContextImpl(
val rootContext: Context,
override val entityId: String,
override val commandName: String,
override val commandId: Long,
override val metadata: Metadata)
extends CommandContext
with AbstractContext
with ActivatableContext

private[akkaserverless] final class ValueEntityContextImpl(val rootContext: Context, override val entityId: String)
extends ValueEntityContext
with AbstractContext
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ import java.util

import scala.jdk.CollectionConverters._

import com.akkaserverless.javasdk.Metadata
import com.akkaserverless.javasdk.ServiceCall
import com.akkaserverless.javasdk.SideEffect
import com.akkaserverless.javasdk.impl.effect.ErrorReplyImpl
import com.akkaserverless.javasdk.impl.effect.ForwardReplyImpl
import com.akkaserverless.javasdk.impl.effect.MessageReplyImpl
import com.akkaserverless.javasdk.impl.effect.NoReply
import com.akkaserverless.javasdk.impl.effect.NoSecondaryEffectImpl
import com.akkaserverless.javasdk.impl.effect.SecondaryEffectImpl

import com.akkaserverless.javasdk.Metadata
import com.akkaserverless.javasdk.ServiceCall
import com.akkaserverless.javasdk.SideEffect
import com.akkaserverless.javasdk.valueentity.ValueEntity.Effect
import Effect.Builder
import Effect.OnSuccessBuilder
Expand All @@ -50,66 +51,66 @@ class ValueEntityEffectImpl[S] extends Builder[S] with OnSuccessBuilder[S] with

def secondaryEffect: SecondaryEffectImpl = _secondaryEffect

override def updateState(newState: S): OnSuccessBuilder[S] = {
override def updateState(newState: S): ValueEntityEffectImpl[S] = {
_primaryEffect = UpdateState(newState)
this
}

override def deleteState(): OnSuccessBuilder[S] = {
override def deleteState(): ValueEntityEffectImpl[S] = {
_primaryEffect = DeleteState
this
}

override def reply[T](message: T): Effect[T] =
override def reply[T](message: T): ValueEntityEffectImpl[T] =
reply(message, Metadata.EMPTY)

override def reply[T](message: T, metadata: Metadata): Effect[T] = {
override def reply[T](message: T, metadata: Metadata): ValueEntityEffectImpl[T] = {
_secondaryEffect = MessageReplyImpl(message, metadata, _secondaryEffect.sideEffects)
this.asInstanceOf[Effect[T]]
this.asInstanceOf[ValueEntityEffectImpl[T]]
}

override def forward[T](serviceCall: ServiceCall): Effect[T] = {
override def forward[T](serviceCall: ServiceCall): ValueEntityEffectImpl[T] = {
_secondaryEffect = ForwardReplyImpl(serviceCall, _secondaryEffect.sideEffects)
this.asInstanceOf[Effect[T]]
this.asInstanceOf[ValueEntityEffectImpl[T]]
}

override def error[T](description: String): Effect[T] = {
override def error[T](description: String): ValueEntityEffectImpl[T] = {
_secondaryEffect = ErrorReplyImpl(description, _secondaryEffect.sideEffects)
this.asInstanceOf[Effect[T]]
this.asInstanceOf[ValueEntityEffectImpl[T]]
}

def hasError(): Boolean =
_secondaryEffect.isInstanceOf[ErrorReplyImpl[_]]

override def noReply[T](): Effect[T] = {
override def noReply[T](): ValueEntityEffectImpl[T] = {
_secondaryEffect = NoReply(_secondaryEffect.sideEffects)
this.asInstanceOf[Effect[T]]
this.asInstanceOf[ValueEntityEffectImpl[T]]
}

override def thenReply[T](message: T): Effect[T] =
override def thenReply[T](message: T): ValueEntityEffectImpl[T] =
thenReply(message, Metadata.EMPTY)

override def thenReply[T](message: T, metadata: Metadata): Effect[T] = {
override def thenReply[T](message: T, metadata: Metadata): ValueEntityEffectImpl[T] = {
_secondaryEffect = MessageReplyImpl(message, metadata, _secondaryEffect.sideEffects)
this.asInstanceOf[Effect[T]]
this.asInstanceOf[ValueEntityEffectImpl[T]]
}

override def thenForward[T](serviceCall: ServiceCall): Effect[T] = {
override def thenForward[T](serviceCall: ServiceCall): ValueEntityEffectImpl[T] = {
_secondaryEffect = ForwardReplyImpl(serviceCall, _secondaryEffect.sideEffects)
this.asInstanceOf[Effect[T]]
this.asInstanceOf[ValueEntityEffectImpl[T]]
}

override def thenNoReply[T](): Effect[T] = {
override def thenNoReply[T](): ValueEntityEffectImpl[T] = {
_secondaryEffect = NoReply(_secondaryEffect.sideEffects)
this.asInstanceOf[Effect[T]]
this.asInstanceOf[ValueEntityEffectImpl[T]]
}

override def addSideEffects(sideEffects: util.Collection[SideEffect]): Effect[S] = {
override def addSideEffects(sideEffects: util.Collection[SideEffect]): ValueEntityEffectImpl[S] = {
_secondaryEffect = _secondaryEffect.addSideEffects(sideEffects.asScala)
this
}

override def addSideEffects(sideEffects: SideEffect*): Effect[S] = {
override def addSideEffects(sideEffects: SideEffect*): ValueEntityEffectImpl[S] = {
_secondaryEffect = _secondaryEffect.addSideEffects(sideEffects)
this
}
Expand Down
Loading