From 2beeb3932fd309595c9b360fc51704c9abdd8e34 Mon Sep 17 00:00:00 2001 From: Dominic Grimm Date: Mon, 19 Feb 2024 18:00:25 +0100 Subject: [PATCH] Switch JSON serialization framework --- build.sbt | 15 ++-- ...viderFactoryData.scala => FromEvent.scala} | 23 +++---- .../mqtt/MqttEventListenerProvider.scala | 21 ++++-- .../MqttEventListenerProviderFactory.scala | 68 ++++++++++--------- .../providers/events/mqtt/MqttOptions.scala | 10 +-- .../providers/events/mqtt/Payload.scala | 65 ++++++++++-------- .../events/mqtt/PayloadAuthDetails.scala | 22 ++++-- 7 files changed, 123 insertions(+), 101 deletions(-) rename src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/{MqttEventListenerProviderFactoryData.scala => FromEvent.scala} (59%) diff --git a/build.sbt b/build.sbt index 4ef8b5e..1910af7 100644 --- a/build.sbt +++ b/build.sbt @@ -16,6 +16,13 @@ val scala3Version = "3.3.1" +val deps = Seq( + "io.bullet" %% "borer-core" % "1.14.0", + "io.bullet" %% "borer-derivation" % "1.14.0", + "com.lightbend.akka" %% "akka-stream-alpakka-mqtt" % "7.0.1", + "com.typesafe.akka" %% "akka-stream" % "2.9.0" +) + val keycloakVersion = "23.0.6" val keycloakDeps = Seq( "org.keycloak" % "keycloak-core" % keycloakVersion % "provided", @@ -23,17 +30,11 @@ val keycloakDeps = Seq( "org.keycloak" % "keycloak-server-spi-private" % keycloakVersion % "provided" ) -val deps = Seq( - "com.lihaoyi" %% "upickle" % "3.2.0", - "com.lightbend.akka" %% "akka-stream-alpakka-mqtt" % "7.0.1", - "com.typesafe.akka" %% "akka-stream" % "2.9.0" -) - lazy val root = project .in(file(".")) .settings( name := "keycloak-event-listener-mqtt", - version := keycloakVersion, + version := "0.1.0", scalaVersion := scala3Version, resolvers += "Akka library repository".at("https://repo.akka.io/maven"), libraryDependencies ++= keycloakDeps, diff --git a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProviderFactoryData.scala b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/FromEvent.scala similarity index 59% rename from src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProviderFactoryData.scala rename to src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/FromEvent.scala index 8fa5395..e90574f 100644 --- a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProviderFactoryData.scala +++ b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/FromEvent.scala @@ -1,3 +1,5 @@ +package net.dergrimm.keycloak.providers.events.mqtt + /* * Copyright 2024 Dominic Grimm * @@ -14,18 +16,11 @@ * limitations under the License. */ -package net.dergrimm.keycloak.providers.events.mqtt +import org.keycloak.events.Event +import org.keycloak.events.admin.AdminEvent +import org.keycloak.models.KeycloakSession -import org.keycloak.events.EventType -import org.keycloak.events.admin.OperationType -import akka.stream.scaladsl.Sink -import akka.stream.alpakka.mqtt.MqttMessage -import akka.Done -import scala.concurrent.Future - -final case class MqttEventListenerProviderFactoryData( - excludedEvents: Option[Set[EventType]], - excludedAdminOperations: Option[Set[OperationType]], - mqttOptions: MqttOptions, - mqttSink: Sink[MqttMessage, Future[Done]] -) +private trait FromEvent[+Self]: + def fromEvent(event: Event, session: KeycloakSession): Self + def fromEvent(event: AdminEvent, session: KeycloakSession): Self +end FromEvent diff --git a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProvider.scala b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProvider.scala index 58d41ee..2e412d0 100644 --- a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProvider.scala +++ b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProvider.scala @@ -31,6 +31,7 @@ import akka.util.ByteString import akka.stream.scaladsl.Source import scala.util.Success import scala.util.Failure +import io.bullet.borer.Json class MqttEventListenerProvider( val session: KeycloakSession, @@ -46,8 +47,9 @@ class MqttEventListenerProvider( if excludedEvents.isDefined && excludedEvents.contains(event.getType()) then return - val payload = Payload.fromEvent(event, session) + val payload: Payload = Payload.fromEvent(event, session) sendMessage(payload) + end onEvent override def onEvent( event: AdminEvent, @@ -58,8 +60,9 @@ class MqttEventListenerProvider( ) then return - val payload = Payload.fromEvent(event, session) + val payload: Payload = Payload.fromEvent(event, session) sendMessage(payload) + end onEvent override def close(): Unit = {} @@ -67,11 +70,14 @@ class MqttEventListenerProvider( import concurrent.ExecutionContext.Implicits.global import MqttEventListenerProviderFactory.system - val topic = s"${mqttOptions.topic}/${payload.topic}" - val payloadStr = upickle.default.write(payload) - val msg = MqttMessage(topic, ByteString(payloadStr)) - .withRetained(mqttOptions.retained) - val future = Source.single(msg).runWith(mqttSink) + val topic: String = s"${mqttOptions.topic}/${payload.topic}" + val payloadBytes: Array[Byte] = Json.encode(payload).toByteArray + + val msg: MqttMessage = + MqttMessage(topic, ByteString.fromArray(payloadBytes)) + .withRetained(mqttOptions.retained) + val future: Future[Done] = Source.single(msg).runWith(mqttSink) + future.onComplete { case Failure(exception) => logger.log( @@ -80,4 +86,5 @@ class MqttEventListenerProvider( ) case Success(_) => {} } + end sendMessage end MqttEventListenerProvider diff --git a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProviderFactory.scala b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProviderFactory.scala index 5f71d1d..69ab9a2 100644 --- a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProviderFactory.scala +++ b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProviderFactory.scala @@ -31,70 +31,82 @@ import scala.concurrent.duration.FiniteDuration import java.util.concurrent.TimeUnit import akka.stream.alpakka.mqtt.scaladsl.MqttSink import akka.actor.ActorSystem +import akka.stream.scaladsl.Sink +import akka.stream.alpakka.mqtt.MqttMessage +import akka.Done +import scala.concurrent.Future object MqttEventListenerProviderFactory: private val PLUGIN_ID = "mqtt" private val PUBLISHER_ID = "keycloak" - implicit val system: ActorSystem = ActorSystem() + given system: ActorSystem = ActorSystem() end MqttEventListenerProviderFactory -class MqttEventListenerProviderFactory( - var data: MqttEventListenerProviderFactoryData -) extends EventListenerProviderFactory: +class MqttEventListenerProviderFactory extends EventListenerProviderFactory: private final val logger = logging.Logger.getLogger( classOf[MqttEventListenerProviderFactory].getName() ) - def this() = this(null) + private var excludedEvents: Option[Set[EventType]] = None + private var excludedAdminEvents: Option[Set[OperationType]] = None + private var mqttOptions: MqttOptions = null + private var mqttSink: Sink[MqttMessage, Future[Done]] = null override def create(session: KeycloakSession): MqttEventListenerProvider = MqttEventListenerProvider( session, - excludedEvents = data.excludedEvents, - excludedAdminEvents = data.excludedAdminOperations, - mqttOptions = data.mqttOptions, - mqttSink = data.mqttSink + excludedEvents, + excludedAdminEvents, + mqttOptions, + mqttSink ) + end create override def init(config: Config.Scope): Unit = - val excludes = config.getArray("excludeEvents") - val excludedEvents = - if excludes != null then + val excludes: Array[String] = config.getArray("excludeEvents") + excludedEvents = + if excludes != null + then Some( immutable.HashSet.from(excludes.map(EventType.valueOf)) ) else None - val excludesOperations = config.getArray("excludesOperations") - val excludedAdminOperations = - if excludesOperations != null then + val excludesOperations: Array[String] = + config.getArray("excludesOperations") + excludedAdminEvents = + if excludesOperations != null + then Some( immutable.HashSet.from(excludesOperations.map(OperationType.valueOf)) ) else None - val uri = config.get("serverUri") + val uri: String = config.get("serverUri") if uri == null then throw new IllegalArgumentException("MQTT server URI is null") - val credentials = + val credentials: Option[(String, String)] = val username = config.get("username") val password = config.get("password") - if username != null && password != null then Some((username, password)) + + if username != null && password != null + then Some(username -> password) else None - val cleanSession = config.getBoolean("cleanSession", true) - val connectionTimeout = + val cleanSession: Boolean = config.getBoolean("cleanSession", true) + val connectionTimeout: FiniteDuration = FiniteDuration(config.getLong("connectionTimeout", 10), TimeUnit.SECONDS) - val mqttOptions = MqttOptions.fromConfig(config) + mqttOptions = MqttOptions.fromConfig(config) var connectionSettings = MqttConnectionSettings( uri, "net.dergrimm.keycloak.providers.events.mqtt", - new MemoryPersistence - ).withCleanSession(cleanSession) + new MemoryPersistence() + ) + .withCleanSession(cleanSession) .withConnectionTimeout(connectionTimeout) credentials match @@ -105,14 +117,8 @@ class MqttEventListenerProviderFactory( ) case None => {} - val sink = MqttSink(connectionSettings, mqttOptions.qos) - - data = MqttEventListenerProviderFactoryData( - excludedEvents = excludedEvents, - excludedAdminOperations = excludedAdminOperations, - mqttOptions, - sink - ) + mqttSink = MqttSink(connectionSettings, mqttOptions.qos) + end init override def postInit(factory: KeycloakSessionFactory): Unit = {} diff --git a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttOptions.scala b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttOptions.scala index b94a725..cf7b6df 100644 --- a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttOptions.scala +++ b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttOptions.scala @@ -21,23 +21,23 @@ import akka.stream.alpakka.mqtt.MqttQoS object MqttOptions: def fromConfig(config: Config.Scope): MqttOptions = - - val topic = config.get("topic") + val topic: String = config.get("topic") if topic == null then throw new IllegalArgumentException("MQTT topic is null") - val retained = config.getBoolean("retained") + val retained: Boolean = config.getBoolean("retained") - val qos = config.getInt("qos", 0) match + val qos: MqttQoS = config.getInt("qos", 0) match case 0 => MqttQoS.atMostOnce case 1 => MqttQoS.atLeastOnce case 2 => MqttQoS.exactlyOnce MqttOptions( - topic = topic, + topic, retained, qos ) + end fromConfig end MqttOptions private final case class MqttOptions( diff --git a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/Payload.scala b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/Payload.scala index 935c14b..a741c9e 100644 --- a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/Payload.scala +++ b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/Payload.scala @@ -19,47 +19,50 @@ package net.dergrimm.keycloak.providers.events.mqtt import org.keycloak.events.Event import org.keycloak.events.admin.AdminEvent import org.keycloak.models.KeycloakSession -import upickle.default.ReadWriter +import org.keycloak.events.admin.ResourceType +import io.bullet.borer.{Codec, Encoder} +import io.bullet.borer.derivation.MapBasedCodecs._ +import io.bullet.borer.NullOptions.given -object Payload: +object Payload extends FromEvent[Payload]: def fromEvent(event: Event, session: KeycloakSession): Payload = - val error = event.getError() - val realmId = event.getRealmId() + val error: Option[String] = Option(event.getError()) + val realmId: String = event.getRealmId() Payload( admin = false, time = event.getTime(), realm = session.realms().getRealm(realmId).getName(), realmId, - authDetails = PayloadAuthDetails.fromEvent(event), - resourceType = null, + authDetails = PayloadAuthDetails.fromEvent(event, session), + resourceType = None, operationType = event.getType().toString(), - resourcePath = null, - representation = null, - error, - resourceTypeAsString = null + resourcePath = None, + representation = None, + error ) + end fromEvent def fromEvent(event: AdminEvent, session: KeycloakSession): Payload = - val resourceType = event.getResourceType() - val representation = event.getRepresentation() - val error = event.getError() - val realmId = event.getRealmId() + val resourceType: Option[String] = + Option(event.getResourceType()).map(_.toString()) + val representation: Option[String] = Option(event.getRepresentation()) + val error: Option[String] = Option(event.getError()) + val realmId: String = event.getRealmId() Payload( admin = true, time = event.getTime(), realm = session.realms().getRealm(realmId).getName(), realmId, - authDetails = PayloadAuthDetails.fromEvent(event), - resourceType = - if resourceType == null then null else resourceType.toString(), + authDetails = PayloadAuthDetails.fromEvent(event, session), + resourceType, operationType = event.getOperationType().toString(), - resourcePath = event.getResourcePath(), + resourcePath = Some(event.getResourcePath()), representation, - error, - resourceTypeAsString = event.getResourceTypeAsString() + error ) + end fromEvent end Payload private final case class Payload( @@ -68,20 +71,22 @@ private final case class Payload( realm: String, realmId: String, authDetails: PayloadAuthDetails, - resourceType: String, + resourceType: Option[String], operationType: String, - resourcePath: String, - representation: String, - error: String, - resourceTypeAsString: String -) derives ReadWriter: - private def result: String = - if error != null then "error" else "success" + resourcePath: Option[String], + representation: Option[String], + error: Option[String] +) derives Codec: + private def result: String = if error.isDefined then "error" else "success" def topic: String = - println(resourceType.toString()) if admin then - s"admin/${realm}/${result}/${resourceType.toLowerCase()}/${operationType.toLowerCase()}" + resourceType match + case Some(rType) => + s"admin/${realm}/${result}/${rType.toLowerCase()}/${operationType.toLowerCase()}" + case None => throw new IllegalStateException else s"client/${realm}/${result}/${authDetails.clientId}/${operationType.toLowerCase()}" + end topic +end Payload diff --git a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/PayloadAuthDetails.scala b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/PayloadAuthDetails.scala index 01a67ef..0fba816 100644 --- a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/PayloadAuthDetails.scala +++ b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/PayloadAuthDetails.scala @@ -17,20 +17,27 @@ package net.dergrimm.keycloak.providers.events.mqtt import org.keycloak.events.Event -import org.keycloak.events.admin.AdminEvent -import upickle.default.ReadWriter +import org.keycloak.events.admin.{AdminEvent, AuthDetails} +import org.keycloak.models.KeycloakSession +import io.bullet.borer.Codec +import io.bullet.borer.derivation.MapBasedCodecs._ +import io.bullet.borer.NullOptions.given -object PayloadAuthDetails: - def fromEvent(event: Event): PayloadAuthDetails = +object PayloadAuthDetails extends FromEvent[PayloadAuthDetails]: + def fromEvent(event: Event, session: KeycloakSession): PayloadAuthDetails = PayloadAuthDetails( realmId = event.getRealmId(), clientId = event.getClientId(), userId = event.getUserId(), ipAddress = event.getIpAddress() ) + end fromEvent - def fromEvent(event: AdminEvent): PayloadAuthDetails = - val auth = event.getAuthDetails() + def fromEvent( + event: AdminEvent, + session: KeycloakSession + ): PayloadAuthDetails = + val auth: AuthDetails = event.getAuthDetails() PayloadAuthDetails( realmId = auth.getRealmId(), @@ -38,6 +45,7 @@ object PayloadAuthDetails: userId = auth.getClientId(), ipAddress = auth.getIpAddress() ) + end fromEvent end PayloadAuthDetails private final case class PayloadAuthDetails( @@ -45,4 +53,4 @@ private final case class PayloadAuthDetails( clientId: String, userId: String, ipAddress: String -) derives ReadWriter +) derives Codec