Skip to content

Commit f45f750

Browse files
feat!(scalasdk): WIP
feat(scalasdk): scaffolding for AkkaServerless, just to compile. Some type scaffolding / glue Not extending from the public facing javasdk. Not ideal yet, but compiles. This might mean later that we have to duplicate some more code. Comment. Structure for ValueEntity. Tmp. AkkaServerlessRunner. Some more wip. More wip. Skip publish of scalasdk.
1 parent 5d4d391 commit f45f750

20 files changed

Lines changed: 1547 additions & 74 deletions

build.sbt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ lazy val sdkJava = project
7676

7777
lazy val sdkScala = project
7878
.in(file("sdk/scala-sdk"))
79-
.dependsOn(sdkCore)
79+
.dependsOn(sdkJava)
8080
.enablePlugins(AkkaGrpcPlugin, BuildInfoPlugin, PublishSonatype)
8181
.settings(
8282
name := "akkaserverless-scala-sdk",
@@ -88,6 +88,8 @@ lazy val sdkScala = project
8888
"protocolMinorVersion" -> AkkaServerless.ProtocolVersionMinor,
8989
"scalaVersion" -> scalaVersion.value),
9090
buildInfoPackage := "com.akkaserverless.scalasdk",
91+
//FIXME skip for now
92+
(publish / skip) := true,
9193
Compile / scalacOptions ++= Seq("-release", "8"),
9294
Compile / akkaGrpcGeneratedSources := Seq(AkkaGrpc.Server),
9395
Compile / akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Scala),

sdk/java-sdk/src/main/scala/com/akkaserverless/javasdk/AkkaServerlessRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ object AkkaServerlessRunner {
7676
final class AkkaServerlessRunner private[this] (
7777
_system: ActorSystem,
7878
serviceFactories: Map[String, java.util.function.Function[ActorSystem, Service]]) {
79-
private[javasdk] implicit val system: ActorSystem = _system
79+
private[akkaserverless] implicit val system: ActorSystem = _system
8080
private val log = LoggerFactory.getLogger(getClass)
8181

8282
private[this] final val configuration =

sdk/java-sdk/src/main/scala/com/akkaserverless/javasdk/impl/MetadataImpl.scala

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,85 +30,100 @@ import scala.jdk.CollectionConverters._
3030
import scala.collection.immutable
3131
import scala.compat.java8.OptionConverters._
3232

33-
private[impl] class MetadataImpl(val entries: immutable.Seq[MetadataEntry]) extends Metadata with CloudEvent {
33+
private[akkaserverless] class MetadataImpl(val entries: immutable.Seq[MetadataEntry]) extends Metadata with CloudEvent {
3434

3535
override def has(key: String): Boolean = entries.exists(_.key.equalsIgnoreCase(key))
3636

3737
override def get(key: String): Optional[String] =
38+
getScala(key).asJava
39+
40+
private[akkaserverless] def getScala(key: String): Option[String] =
3841
entries.collectFirst {
3942
case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) =>
4043
value
41-
}.asJava
44+
}
4245

4346
override def getAll(key: String): util.List[String] =
47+
getAllScala(key).asJava
48+
49+
private[akkaserverless] def getAllScala(key: String): Seq[String] =
4450
entries.collect {
4551
case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) =>
4652
value
47-
}.asJava
53+
}
4854

4955
override def getBinary(key: String): Optional[ByteBuffer] =
56+
getBinaryScala(key).asJava
57+
58+
private[akkaserverless] def getBinaryScala(key: String): Option[ByteBuffer] =
5059
entries.collectFirst {
5160
case MetadataEntry(k, MetadataEntry.Value.BytesValue(value), _) if key.equalsIgnoreCase(k) =>
5261
value.asReadOnlyByteBuffer()
53-
}.asJava
62+
}
5463

5564
override def getBinaryAll(key: String): util.List[ByteBuffer] =
65+
getBinaryAllScala(key).asJava
66+
67+
private[akkaserverless] def getBinaryAllScala(key: String): Seq[ByteBuffer] =
5668
entries.collect {
5769
case MetadataEntry(k, MetadataEntry.Value.BytesValue(value), _) if key.equalsIgnoreCase(k) =>
5870
value.asReadOnlyByteBuffer()
59-
}.asJava
71+
}
6072

61-
override def getAllKeys: util.List[String] = entries.map(_.key).asJava
73+
override def getAllKeys: util.List[String] = getAllKeysScala.asJava
74+
private[akkaserverless] def getAllKeysScala: Seq[String] = entries.map(_.key)
6275

6376
override def set(key: String, value: String): MetadataImpl = {
6477
Objects.requireNonNull(key, "Key must not be null")
6578
Objects.requireNonNull(value, "Value must not be null")
6679
new MetadataImpl(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value)))
6780
}
6881

69-
override def setBinary(key: String, value: ByteBuffer): Metadata = {
82+
override def setBinary(key: String, value: ByteBuffer): MetadataImpl = {
7083
Objects.requireNonNull(key, "Key must not be null")
7184
Objects.requireNonNull(value, "Value must not be null")
7285
new MetadataImpl(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value))))
7386
}
7487

75-
override def add(key: String, value: String): Metadata = {
88+
override def add(key: String, value: String): MetadataImpl = {
7689
Objects.requireNonNull(key, "Key must not be null")
7790
Objects.requireNonNull(value, "Value must not be null")
7891
new MetadataImpl(entries :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value)))
7992
}
8093

81-
override def addBinary(key: String, value: ByteBuffer): Metadata = {
94+
override def addBinary(key: String, value: ByteBuffer): MetadataImpl = {
8295
Objects.requireNonNull(key, "Key must not be null")
8396
Objects.requireNonNull(value, "Value must not be null")
8497
new MetadataImpl(entries :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value))))
8598
}
8699

87100
override def remove(key: String): MetadataImpl = new MetadataImpl(removeKey(key))
88101

89-
override def clear(): Metadata = MetadataImpl.Empty
102+
override def clear(): MetadataImpl = MetadataImpl.Empty
103+
104+
private[akkaserverless] def iteratorScala[R](f: MetadataEntry => R): Iterator[R] =
105+
entries.iterator.map(f)
90106

91107
override def iterator(): util.Iterator[Metadata.MetadataEntry] =
92-
entries.iterator.map { entry =>
108+
iteratorScala(entry =>
93109
new Metadata.MetadataEntry {
94110
override def getKey: String = entry.key
95111
override def getValue: String = entry.value.stringValue.orNull
96112
override def getBinaryValue: ByteBuffer = entry.value.bytesValue.map(_.asReadOnlyByteBuffer()).orNull
97113
override def isText: Boolean = entry.value.isStringValue
98114
override def isBinary: Boolean = entry.value.isBytesValue
99-
}
100-
}.asJava
115+
}).asJava
101116

102117
private def removeKey(key: String) = entries.filterNot(_.key.equalsIgnoreCase(key))
103118

104119
def isCloudEvent: Boolean = MetadataImpl.CeRequired.forall(h => has(h))
105120

106-
override def asCloudEvent(): CloudEvent =
121+
override def asCloudEvent(): MetadataImpl =
107122
if (!isCloudEvent) {
108123
throw new IllegalStateException("Metadata is not a CloudEvent!")
109124
} else this
110125

111-
override def asCloudEvent(id: String, source: URI, `type`: String): CloudEvent =
126+
override def asCloudEvent(id: String, source: URI, `type`: String): MetadataImpl =
112127
new MetadataImpl(
113128
entries.filterNot(e => MetadataImpl.CeRequired(e.key)) ++
114129
Seq(
@@ -131,41 +146,46 @@ private[impl] class MetadataImpl(val entries: immutable.Seq[MetadataEntry]) exte
131146

132147
override def id(): String = getRequiredCloudEventField(MetadataImpl.CeId)
133148

134-
override def withId(id: String): CloudEvent = set(MetadataImpl.CeId, id)
149+
override def withId(id: String): MetadataImpl = set(MetadataImpl.CeId, id)
135150

136151
override def source(): URI = URI.create(getRequiredCloudEventField(MetadataImpl.CeSource))
137152

138-
override def withSource(source: URI): CloudEvent = set(MetadataImpl.CeSource, source.toString)
153+
override def withSource(source: URI): MetadataImpl = set(MetadataImpl.CeSource, source.toString)
139154

140155
override def `type`(): String = getRequiredCloudEventField(MetadataImpl.CeType)
141156

142-
override def withType(`type`: String): CloudEvent = set(MetadataImpl.CeType, `type`)
157+
override def withType(`type`: String): MetadataImpl = set(MetadataImpl.CeType, `type`)
143158

144-
override def datacontenttype(): Optional[String] = get(MetadataImpl.CeDatacontenttype)
159+
override def datacontenttype(): Optional[String] = getScala(MetadataImpl.CeDatacontenttype).asJava
160+
private[akkaserverless] def datacontenttypeScala(): Option[String] = getScala(MetadataImpl.CeDatacontenttype)
145161

146-
override def withDatacontenttype(datacontenttype: String): CloudEvent =
162+
override def withDatacontenttype(datacontenttype: String): MetadataImpl =
147163
set(MetadataImpl.CeDatacontenttype, datacontenttype)
148164

149-
override def clearDatacontenttype(): CloudEvent = remove(MetadataImpl.CeDatacontenttype)
165+
override def clearDatacontenttype(): MetadataImpl = remove(MetadataImpl.CeDatacontenttype)
150166

151-
override def dataschema(): Optional[URI] = get(MetadataImpl.CeDataschema).map(URI.create(_))
167+
override def dataschema(): Optional[URI] = dataschemaScala().asJava
168+
private[akkaserverless] def dataschemaScala(): Option[URI] = getScala(MetadataImpl.CeDataschema).map(URI.create(_))
152169

153-
override def withDataschema(dataschema: URI): CloudEvent = set(MetadataImpl.CeDataschema, dataschema.toString)
170+
override def withDataschema(dataschema: URI): MetadataImpl = set(MetadataImpl.CeDataschema, dataschema.toString)
154171

155-
override def clearDataschema(): CloudEvent = remove(MetadataImpl.CeDataschema)
172+
override def clearDataschema(): MetadataImpl = remove(MetadataImpl.CeDataschema)
156173

157-
override def subject(): Optional[String] = get(MetadataImpl.CeSubject)
174+
override def subject(): Optional[String] = subjectScala.asJava
175+
private[akkaserverless] def subjectScala: Option[String] = getScala(MetadataImpl.CeSubject)
158176

159-
override def withSubject(subject: String): CloudEvent = set(MetadataImpl.CeSubject, subject)
177+
override def withSubject(subject: String): MetadataImpl = set(MetadataImpl.CeSubject, subject)
160178

161-
override def clearSubject(): CloudEvent = remove(MetadataImpl.CeSubject)
179+
override def clearSubject(): MetadataImpl = remove(MetadataImpl.CeSubject)
162180

163-
override def time(): Optional[ZonedDateTime] = get(MetadataImpl.CeTime).map(ZonedDateTime.parse(_))
181+
override def time(): Optional[ZonedDateTime] = timeScala.asJava
182+
private[akkaserverless] def timeScala: Option[ZonedDateTime] =
183+
getScala(MetadataImpl.CeTime).map(ZonedDateTime.parse(_))
164184

165-
override def withTime(time: ZonedDateTime): CloudEvent =
185+
override def withTime(time: ZonedDateTime): MetadataImpl =
166186
set(MetadataImpl.CeTime, DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time))
167187

168-
override def clearTime(): CloudEvent = remove(MetadataImpl.CeTime)
188+
override def clearTime(): MetadataImpl = remove(MetadataImpl.CeTime)
169189

170190
override def asMetadata(): Metadata = this
171191
}

sdk/java-sdk/src/main/scala/com/akkaserverless/javasdk/impl/valueentity/ValueEntitiesImpl.scala

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,17 @@ import akka.actor.ActorSystem
2424
import akka.event.Logging
2525
import akka.stream.scaladsl.Flow
2626
import akka.stream.scaladsl.Source
27+
2728
import com.akkaserverless.javasdk.AkkaServerlessRunner.Configuration
29+
30+
// FIXME these don't seem to be 'public API', more internals?
2831
import com.akkaserverless.javasdk.ComponentOptions
2932
import com.akkaserverless.javasdk.Context
3033
import com.akkaserverless.javasdk.Metadata
3134
import com.akkaserverless.javasdk.Service
3235
import com.akkaserverless.javasdk.ServiceCallFactory
36+
import com.akkaserverless.javasdk.valueentity._
37+
3338
import com.akkaserverless.javasdk.impl.ValueEntityFactory
3439
import com.akkaserverless.javasdk.impl._
3540
import com.akkaserverless.javasdk.impl.effect.EffectSupport
@@ -38,7 +43,6 @@ import com.akkaserverless.javasdk.impl.effect.MessageReplyImpl
3843
import com.akkaserverless.javasdk.impl.valueentity.ValueEntityEffectImpl.DeleteState
3944
import com.akkaserverless.javasdk.impl.valueentity.ValueEntityEffectImpl.UpdateState
4045
import com.akkaserverless.javasdk.impl.valueentity.ValueEntityHandler.CommandResult
41-
import com.akkaserverless.javasdk.valueentity._
4246
import com.akkaserverless.protocol.value_entity.ValueEntityAction.Action.Delete
4347
import com.akkaserverless.protocol.value_entity.ValueEntityAction.Action.Update
4448
import com.akkaserverless.protocol.value_entity.ValueEntityStreamIn.Message.{ Command => InCommand }
@@ -119,7 +123,7 @@ final class ValueEntitiesImpl(
119123
private def runEntity(init: ValueEntityInit): Flow[ValueEntityStreamIn, ValueEntityStreamOut, NotUsed] = {
120124
val service =
121125
services.getOrElse(init.serviceName, throw ProtocolException(init, s"Service not found: ${init.serviceName}"))
122-
val handler = service.factory.create(new ValueEntityContextImpl(init.entityId))
126+
val handler = service.factory.create(new ValueEntityContextImpl(rootContext, init.entityId))
123127
val thisEntityId = init.entityId
124128

125129
init.state match {
@@ -146,7 +150,7 @@ final class ValueEntitiesImpl(
146150
val cmd =
147151
service.anySupport.decode(
148152
ScalaPbAny.toJavaProto(command.payload.getOrElse(throw ProtocolException(command, "No command payload"))))
149-
val context = new CommandContextImpl(thisEntityId, command.name, command.id, metadata)
153+
val context = new CommandContextImpl(rootContext, thisEntityId, command.name, command.id, metadata)
150154

151155
val CommandResult(effect: ValueEntityEffectImpl[_]) =
152156
try {
@@ -205,21 +209,23 @@ final class ValueEntitiesImpl(
205209
}
206210
}
207211

208-
private trait AbstractContext extends ValueEntityContext {
209-
override def serviceCallFactory(): ServiceCallFactory = rootContext.serviceCallFactory()
210-
}
211-
212-
private final class CommandContextImpl(
213-
override val entityId: String,
214-
override val commandName: String,
215-
override val commandId: Long,
216-
override val metadata: Metadata)
217-
extends CommandContext
218-
with AbstractContext
219-
with ActivatableContext
220-
221-
private final class ValueEntityContextImpl(override val entityId: String)
222-
extends ValueEntityContext
223-
with AbstractContext
212+
}
224213

214+
private[akkaserverless] trait AbstractContext extends ValueEntityContext {
215+
private[akkaserverless] def rootContext: Context
216+
override def serviceCallFactory(): ServiceCallFactory = rootContext.serviceCallFactory()
225217
}
218+
219+
private[akkaserverless] final class CommandContextImpl(
220+
val rootContext: Context,
221+
override val entityId: String,
222+
override val commandName: String,
223+
override val commandId: Long,
224+
override val metadata: Metadata)
225+
extends CommandContext
226+
with AbstractContext
227+
with ActivatableContext
228+
229+
private[akkaserverless] final class ValueEntityContextImpl(val rootContext: Context, override val entityId: String)
230+
extends ValueEntityContext
231+
with AbstractContext

sdk/java-sdk/src/main/scala/com/akkaserverless/javasdk/impl/valueentity/ValueEntityEffectImpl.scala

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,16 @@ import java.util
2020

2121
import scala.jdk.CollectionConverters._
2222

23-
import com.akkaserverless.javasdk.Metadata
24-
import com.akkaserverless.javasdk.ServiceCall
25-
import com.akkaserverless.javasdk.SideEffect
2623
import com.akkaserverless.javasdk.impl.effect.ErrorReplyImpl
2724
import com.akkaserverless.javasdk.impl.effect.ForwardReplyImpl
2825
import com.akkaserverless.javasdk.impl.effect.MessageReplyImpl
2926
import com.akkaserverless.javasdk.impl.effect.NoReply
3027
import com.akkaserverless.javasdk.impl.effect.NoSecondaryEffectImpl
3128
import com.akkaserverless.javasdk.impl.effect.SecondaryEffectImpl
29+
30+
import com.akkaserverless.javasdk.Metadata
31+
import com.akkaserverless.javasdk.ServiceCall
32+
import com.akkaserverless.javasdk.SideEffect
3233
import com.akkaserverless.javasdk.valueentity.ValueEntity.Effect
3334
import Effect.Builder
3435
import Effect.OnSuccessBuilder
@@ -50,66 +51,66 @@ class ValueEntityEffectImpl[S] extends Builder[S] with OnSuccessBuilder[S] with
5051

5152
def secondaryEffect: SecondaryEffectImpl = _secondaryEffect
5253

53-
override def updateState(newState: S): OnSuccessBuilder[S] = {
54+
override def updateState(newState: S): ValueEntityEffectImpl[S] = {
5455
_primaryEffect = UpdateState(newState)
5556
this
5657
}
5758

58-
override def deleteState(): OnSuccessBuilder[S] = {
59+
override def deleteState(): ValueEntityEffectImpl[S] = {
5960
_primaryEffect = DeleteState
6061
this
6162
}
6263

63-
override def reply[T](message: T): Effect[T] =
64+
override def reply[T](message: T): ValueEntityEffectImpl[T] =
6465
reply(message, Metadata.EMPTY)
6566

66-
override def reply[T](message: T, metadata: Metadata): Effect[T] = {
67+
override def reply[T](message: T, metadata: Metadata): ValueEntityEffectImpl[T] = {
6768
_secondaryEffect = MessageReplyImpl(message, metadata, _secondaryEffect.sideEffects)
68-
this.asInstanceOf[Effect[T]]
69+
this.asInstanceOf[ValueEntityEffectImpl[T]]
6970
}
7071

71-
override def forward[T](serviceCall: ServiceCall): Effect[T] = {
72+
override def forward[T](serviceCall: ServiceCall): ValueEntityEffectImpl[T] = {
7273
_secondaryEffect = ForwardReplyImpl(serviceCall, _secondaryEffect.sideEffects)
73-
this.asInstanceOf[Effect[T]]
74+
this.asInstanceOf[ValueEntityEffectImpl[T]]
7475
}
7576

76-
override def error[T](description: String): Effect[T] = {
77+
override def error[T](description: String): ValueEntityEffectImpl[T] = {
7778
_secondaryEffect = ErrorReplyImpl(description, _secondaryEffect.sideEffects)
78-
this.asInstanceOf[Effect[T]]
79+
this.asInstanceOf[ValueEntityEffectImpl[T]]
7980
}
8081

8182
def hasError(): Boolean =
8283
_secondaryEffect.isInstanceOf[ErrorReplyImpl[_]]
8384

84-
override def noReply[T](): Effect[T] = {
85+
override def noReply[T](): ValueEntityEffectImpl[T] = {
8586
_secondaryEffect = NoReply(_secondaryEffect.sideEffects)
86-
this.asInstanceOf[Effect[T]]
87+
this.asInstanceOf[ValueEntityEffectImpl[T]]
8788
}
8889

89-
override def thenReply[T](message: T): Effect[T] =
90+
override def thenReply[T](message: T): ValueEntityEffectImpl[T] =
9091
thenReply(message, Metadata.EMPTY)
9192

92-
override def thenReply[T](message: T, metadata: Metadata): Effect[T] = {
93+
override def thenReply[T](message: T, metadata: Metadata): ValueEntityEffectImpl[T] = {
9394
_secondaryEffect = MessageReplyImpl(message, metadata, _secondaryEffect.sideEffects)
94-
this.asInstanceOf[Effect[T]]
95+
this.asInstanceOf[ValueEntityEffectImpl[T]]
9596
}
9697

97-
override def thenForward[T](serviceCall: ServiceCall): Effect[T] = {
98+
override def thenForward[T](serviceCall: ServiceCall): ValueEntityEffectImpl[T] = {
9899
_secondaryEffect = ForwardReplyImpl(serviceCall, _secondaryEffect.sideEffects)
99-
this.asInstanceOf[Effect[T]]
100+
this.asInstanceOf[ValueEntityEffectImpl[T]]
100101
}
101102

102-
override def thenNoReply[T](): Effect[T] = {
103+
override def thenNoReply[T](): ValueEntityEffectImpl[T] = {
103104
_secondaryEffect = NoReply(_secondaryEffect.sideEffects)
104-
this.asInstanceOf[Effect[T]]
105+
this.asInstanceOf[ValueEntityEffectImpl[T]]
105106
}
106107

107-
override def addSideEffects(sideEffects: util.Collection[SideEffect]): Effect[S] = {
108+
override def addSideEffects(sideEffects: util.Collection[SideEffect]): ValueEntityEffectImpl[S] = {
108109
_secondaryEffect = _secondaryEffect.addSideEffects(sideEffects.asScala)
109110
this
110111
}
111112

112-
override def addSideEffects(sideEffects: SideEffect*): Effect[S] = {
113+
override def addSideEffects(sideEffects: SideEffect*): ValueEntityEffectImpl[S] = {
113114
_secondaryEffect = _secondaryEffect.addSideEffects(sideEffects)
114115
this
115116
}

0 commit comments

Comments
 (0)