commit 7b8b705f52483b4b5b38a9274ce10061655e5150 Author: Dominic Grimm Date: Sun Feb 18 17:11:38 2024 +0100 Init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9e79245 --- /dev/null +++ b/.gitignore @@ -0,0 +1,32 @@ +# macOS +.DS_Store + +# sbt specific +dist/* +target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ +project/local-plugins.sbt +.history +.ensime +.ensime_cache/ +.sbt-scripted/ +local.sbt + +# Bloop +.bsp + +# VS Code +.vscode/ + +# Metals +.bloop/ +.metals/ +metals.sbt + +# IDEA +.idea +.idea_modules +/.worksheet/ diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..9dfb755 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,2 @@ +version = "3.7.15" +runner.dialect = scala3 diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..85bdec0 --- /dev/null +++ b/build.sbt @@ -0,0 +1,25 @@ +val scala3Version = "3.3.1" + +val keycloakVersion = "23.0.6" +val keycloakDeps = Seq( + "org.keycloak" % "keycloak-core" % keycloakVersion % "provided", + "org.keycloak" % "keycloak-server-spi" % keycloakVersion % "provided", + "org.keycloak" % "keycloak-server-spi-private" % keycloakVersion % "provided" +) + +val deps = Seq( + "com.lihaoyi" %% "upickle" % "3.2.0", + "com.lightbend.akka" %% "akka-stream-alpakka-mqtt" % "7.0.1", + "com.typesafe.akka" %% "akka-stream" % "2.9.0" +) + +lazy val root = project + .in(file(".")) + .settings( + name := "keycloak-event-listener-mqtt", + version := keycloakVersion, + scalaVersion := scala3Version, + resolvers += "Akka library repository".at("https://repo.akka.io/maven"), + libraryDependencies ++= keycloakDeps, + libraryDependencies ++= deps + ) diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..abbbce5 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.9.8 diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..c46ce74 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0") diff --git a/src/main/resources/META-INF/services/org.keycloak.events.EventListenerProviderFactory b/src/main/resources/META-INF/services/org.keycloak.events.EventListenerProviderFactory new file mode 100644 index 0000000..01c125b --- /dev/null +++ b/src/main/resources/META-INF/services/org.keycloak.events.EventListenerProviderFactory @@ -0,0 +1 @@ +net.dergrimm.keycloak.providers.events.mqtt.MqttEventListenerProviderFactory diff --git a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/Credentials.scala b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/Credentials.scala new file mode 100644 index 0000000..8b3e532 --- /dev/null +++ b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/Credentials.scala @@ -0,0 +1,6 @@ +package net.dergrimm.keycloak.providers.events.mqtt + +private final case class Credentials( + username: String, + password: String +) diff --git a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProvider.scala b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProvider.scala new file mode 100644 index 0000000..f6015ba --- /dev/null +++ b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProvider.scala @@ -0,0 +1,67 @@ +package net.dergrimm.keycloak.providers.events.mqtt + +import java.util.logging +import org.keycloak.events.EventListenerProvider +import org.keycloak.events.Event +import org.keycloak.events.EventType +import org.keycloak.events.admin.AdminEvent +import org.keycloak.events.admin.OperationType +import org.keycloak.models.KeycloakSession +import akka.stream.scaladsl.Sink +import akka.stream.alpakka.mqtt.MqttMessage +import akka.Done +import scala.concurrent.Future +import akka.util.ByteString +import akka.stream.scaladsl.Source +import scala.util.Success +import scala.util.Failure +import concurrent.ExecutionContext.Implicits.global + +class MqttEventListenerProvider( + val session: KeycloakSession, + val excludedEvents: Option[Set[EventType]], + val excludedAdminEvents: Option[Set[OperationType]], + val mqttOptions: MqttOptions, + val mqttSink: Sink[MqttMessage, Future[Done]] +) extends EventListenerProvider: + private final val logger: logging.Logger = + logging.Logger.getLogger(classOf[MqttEventListenerProvider].getName()) + + override def onEvent(event: Event): Unit = + if excludedEvents.isDefined && excludedEvents.contains(event.getType()) + then return + + val payload = Payload.fromEvent(event, session) + sendMessage(payload) + + override def onEvent( + event: AdminEvent, + includeRepresentation: Boolean + ): Unit = + if excludedAdminEvents.isDefined && excludedAdminEvents.contains( + event.getOperationType() + ) + then return + + val payload = Payload.fromEvent(event, session) + sendMessage(payload) + + override def close(): Unit = {} + + private def sendMessage(payload: Payload): Unit = + import MqttEventListenerProviderFactory.system + + val topic = s"${mqttOptions.topic}/${payload.topic}" + val payloadStr = upickle.default.write(payload) + val msg = MqttMessage(topic, ByteString(payloadStr)) + .withRetained(mqttOptions.retained) + val future = Source.single(msg).runWith(mqttSink) + future.onComplete { + case Success(value) => + logger.log(logging.Level.INFO, value.toString()) + case Failure(exception) => + logger.log( + logging.Level.SEVERE, + s"Failed to publish message: ${exception.getMessage()}" + ) + } diff --git a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProviderFactory.scala b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProviderFactory.scala new file mode 100644 index 0000000..aa5d42d --- /dev/null +++ b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProviderFactory.scala @@ -0,0 +1,105 @@ +package net.dergrimm.keycloak.providers.events.mqtt + +import java.util.logging +import org.keycloak.Config +import org.keycloak.events.EventListenerProvider +import org.keycloak.events.EventListenerProviderFactory +import org.keycloak.events.EventType +import org.keycloak.events.admin.OperationType +import org.keycloak.models.KeycloakSession +import org.keycloak.models.KeycloakSessionFactory +import scala.collection.immutable +import akka.stream.alpakka.mqtt.MqttConnectionSettings +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence +import scala.concurrent.duration.FiniteDuration +import java.util.concurrent.TimeUnit +import akka.stream.alpakka.mqtt.scaladsl.MqttSink +import akka.actor.ActorSystem + +object MqttEventListenerProviderFactory: + private val PLUGIN_ID = "mqtt" + private val PUBLISHER_ID = "keycloak" + + implicit val system: ActorSystem = ActorSystem() + +class MqttEventListenerProviderFactory( + var data: MqttEventListenerProviderFactoryData +) extends EventListenerProviderFactory: + private final val logger = + logging.Logger.getLogger( + classOf[MqttEventListenerProviderFactory].getName() + ) + + def this() = this(null) + + override def create(session: KeycloakSession): MqttEventListenerProvider = + MqttEventListenerProvider( + session, + excludedEvents = data.excludedEvents, + excludedAdminEvents = data.excludedAdminOperations, + mqttOptions = data.mqttOptions, + mqttSink = data.mqttSink + ) + + override def init(config: Config.Scope): Unit = + val excludes = config.getArray("excludeEvents") + val excludedEvents = + if excludes != null then + Some( + immutable.HashSet.from(excludes.map(EventType.valueOf)) + ) + else None + + val excludesOperations = config.getArray("excludesOperations") + val excludedAdminOperations = + if excludesOperations != null then + Some( + immutable.HashSet.from(excludesOperations.map(OperationType.valueOf)) + ) + else None + + val uri = config.get("serverUri") + if uri == null then + throw new IllegalArgumentException("MQTT server URI is null") + + val credentials = + val username = config.get("username") + val password = config.get("password") + if username != null && password != null then + Some(Credentials(username, password)) + else None + + val cleanSession = config.getBoolean("cleanSession", true) + val connectionTimeout = + FiniteDuration(config.getLong("connectionTimeout", 10), TimeUnit.SECONDS) + + val mqttOptions = MqttOptions.fromConfig(config) + var connectionSettings = MqttConnectionSettings( + uri, + "net.dergrimm.keycloak.providers.events.mqtt", + new MemoryPersistence + ).withCleanSession(cleanSession) + .withConnectionTimeout(connectionTimeout) + + credentials match + case Some(creds) => + connectionSettings = connectionSettings.withAuth( + username = creds.username, + password = creds.password + ) + case None => {} + + val sink = MqttSink(connectionSettings, mqttOptions.qos) + + data = MqttEventListenerProviderFactoryData( + excludedEvents = excludedEvents, + excludedAdminOperations = excludedAdminOperations, + mqttOptions, + sink + ) + + override def postInit(factory: KeycloakSessionFactory): Unit = {} + + override def close(): Unit = {} + + override def getId(): String = MqttEventListenerProviderFactory.PLUGIN_ID diff --git a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProviderFactoryData.scala b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProviderFactoryData.scala new file mode 100644 index 0000000..54c2f10 --- /dev/null +++ b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProviderFactoryData.scala @@ -0,0 +1,15 @@ +package net.dergrimm.keycloak.providers.events.mqtt + +import org.keycloak.events.EventType +import org.keycloak.events.admin.OperationType +import akka.stream.scaladsl.Sink +import akka.stream.alpakka.mqtt.MqttMessage +import akka.Done +import scala.concurrent.Future + +final case class MqttEventListenerProviderFactoryData( + excludedEvents: Option[Set[EventType]], + excludedAdminOperations: Option[Set[OperationType]], + mqttOptions: MqttOptions, + mqttSink: Sink[MqttMessage, Future[Done]] +) diff --git a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttOptions.scala b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttOptions.scala new file mode 100644 index 0000000..840754f --- /dev/null +++ b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttOptions.scala @@ -0,0 +1,30 @@ +package net.dergrimm.keycloak.providers.events.mqtt + +import org.keycloak.Config +import akka.stream.alpakka.mqtt.MqttQoS + +object MqttOptions: + def fromConfig(config: Config.Scope): MqttOptions = + + val topic = config.get("topic") + if topic == null then + throw new IllegalArgumentException("MQTT topic is null") + + val retained = config.getBoolean("retained") + + val qos = config.getInt("qos", 0) match + case 0 => MqttQoS.atMostOnce + case 1 => MqttQoS.atLeastOnce + case 2 => MqttQoS.exactlyOnce + + MqttOptions( + topic = topic, + retained, + qos + ) + +private final case class MqttOptions( + topic: String, + retained: Boolean, + qos: MqttQoS +) diff --git a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/Payload.scala b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/Payload.scala new file mode 100644 index 0000000..2de7863 --- /dev/null +++ b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/Payload.scala @@ -0,0 +1,70 @@ +package net.dergrimm.keycloak.providers.events.mqtt + +import org.keycloak.events.Event +import org.keycloak.events.admin.AdminEvent +import org.keycloak.models.KeycloakSession +import upickle.default.ReadWriter + +object Payload: + def fromEvent(event: Event, session: KeycloakSession): Payload = + val error = event.getError() + val realmId = event.getRealmId() + + Payload( + admin = false, + time = event.getTime(), + realm = session.realms().getRealm(realmId).getName(), + realmId, + authDetails = PayloadAuthDetails.fromEvent(event), + resourceType = null, + operationType = event.getType().toString(), + resourcePath = null, + representation = null, + error, + resourceTypeAsString = null + ) + + def fromEvent(event: AdminEvent, session: KeycloakSession): Payload = + val resourceType = event.getResourceType() + val representation = event.getRepresentation() + val error = event.getError() + val realmId = event.getRealmId() + + Payload( + admin = true, + time = event.getTime(), + realm = session.realms().getRealm(realmId).getName(), + realmId, + authDetails = PayloadAuthDetails.fromEvent(event), + resourceType = + if resourceType == null then null else resourceType.toString(), + operationType = event.getOperationType().toString(), + resourcePath = event.getResourcePath(), + representation, + error, + resourceTypeAsString = event.getResourceTypeAsString() + ) + +private final case class Payload( + admin: Boolean, + time: Long, + realm: String, + realmId: String, + authDetails: PayloadAuthDetails, + resourceType: String, + operationType: String, + resourcePath: String, + representation: String, + error: String, + resourceTypeAsString: String +) derives ReadWriter: + private def result: String = + if error != null then "error" else "success" + + def topic: String = + println(resourceType.toString()) + if admin + then + s"admin/${realm}/${result}/${resourceType.toLowerCase()}/${operationType.toLowerCase()}" + else + s"client/${realm}/${result}/${authDetails.clientId}/${operationType.toLowerCase()}" diff --git a/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/PayloadAuthDetails.scala b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/PayloadAuthDetails.scala new file mode 100644 index 0000000..62cc226 --- /dev/null +++ b/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/PayloadAuthDetails.scala @@ -0,0 +1,31 @@ +package net.dergrimm.keycloak.providers.events.mqtt + +import org.keycloak.events.Event +import org.keycloak.events.admin.AdminEvent +import upickle.default.ReadWriter + +object PayloadAuthDetails: + def fromEvent(event: Event): PayloadAuthDetails = + PayloadAuthDetails( + realmId = event.getRealmId(), + clientId = event.getClientId(), + userId = event.getUserId(), + ipAddress = event.getIpAddress() + ) + + def fromEvent(event: AdminEvent): PayloadAuthDetails = + val auth = event.getAuthDetails() + + PayloadAuthDetails( + realmId = auth.getRealmId(), + clientId = auth.getClientId(), + userId = auth.getClientId(), + ipAddress = auth.getIpAddress() + ) + +private final case class PayloadAuthDetails( + realmId: String, + clientId: String, + userId: String, + ipAddress: String +) derives ReadWriter