keycloak-event-listener-mqtt/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProvider.scala
2024-02-18 17:11:38 +01:00

68 lines
2.2 KiB
Scala

package net.dergrimm.keycloak.providers.events.mqtt
import java.util.logging
import org.keycloak.events.EventListenerProvider
import org.keycloak.events.Event
import org.keycloak.events.EventType
import org.keycloak.events.admin.AdminEvent
import org.keycloak.events.admin.OperationType
import org.keycloak.models.KeycloakSession
import akka.stream.scaladsl.Sink
import akka.stream.alpakka.mqtt.MqttMessage
import akka.Done
import scala.concurrent.Future
import akka.util.ByteString
import akka.stream.scaladsl.Source
import scala.util.Success
import scala.util.Failure
import concurrent.ExecutionContext.Implicits.global
class MqttEventListenerProvider(
val session: KeycloakSession,
val excludedEvents: Option[Set[EventType]],
val excludedAdminEvents: Option[Set[OperationType]],
val mqttOptions: MqttOptions,
val mqttSink: Sink[MqttMessage, Future[Done]]
) extends EventListenerProvider:
private final val logger: logging.Logger =
logging.Logger.getLogger(classOf[MqttEventListenerProvider].getName())
override def onEvent(event: Event): Unit =
if excludedEvents.isDefined && excludedEvents.contains(event.getType())
then return
val payload = Payload.fromEvent(event, session)
sendMessage(payload)
override def onEvent(
event: AdminEvent,
includeRepresentation: Boolean
): Unit =
if excludedAdminEvents.isDefined && excludedAdminEvents.contains(
event.getOperationType()
)
then return
val payload = Payload.fromEvent(event, session)
sendMessage(payload)
override def close(): Unit = {}
private def sendMessage(payload: Payload): Unit =
import MqttEventListenerProviderFactory.system
val topic = s"${mqttOptions.topic}/${payload.topic}"
val payloadStr = upickle.default.write(payload)
val msg = MqttMessage(topic, ByteString(payloadStr))
.withRetained(mqttOptions.retained)
val future = Source.single(msg).runWith(mqttSink)
future.onComplete {
case Success(value) =>
logger.log(logging.Level.INFO, value.toString())
case Failure(exception) =>
logger.log(
logging.Level.SEVERE,
s"Failed to publish message: ${exception.getMessage()}"
)
}