keycloak-event-listener-mqtt/src/main/scala/net/dergrimm/keycloak/providers/events/mqtt/MqttEventListenerProvider.scala
2024-02-27 17:50:25 +01:00

90 lines
2.9 KiB
Scala

/*
* Copyright 2024 Dominic Grimm
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.dergrimm.keycloak.providers.events.mqtt
import java.util.logging
import org.keycloak.events.EventListenerProvider
import org.keycloak.events.Event
import org.keycloak.events.EventType
import org.keycloak.events.admin.AdminEvent
import org.keycloak.events.admin.OperationType
import org.keycloak.models.KeycloakSession
import akka.stream.scaladsl.Sink
import akka.stream.alpakka.mqtt.MqttMessage
import akka.Done
import scala.concurrent.Future
import akka.util.ByteString
import akka.stream.scaladsl.Source
import scala.util.Success
import scala.util.Failure
import io.bullet.borer.Json
class MqttEventListenerProvider(
val session: KeycloakSession,
val excludedEvents: Option[Set[EventType]],
val excludedAdminEvents: Option[Set[OperationType]],
val mqttOptions: MqttOptions,
val mqttSink: Sink[MqttMessage, Future[Done]]
) extends EventListenerProvider:
private final val logger: logging.Logger =
logging.Logger.getLogger(classOf[MqttEventListenerProvider].getName())
override def onEvent(event: Event): Unit =
if excludedEvents.isDefined && excludedEvents.contains(event.getType())
then return
val payload: Payload = Payload.fromEvent(event, session)
sendMessage(payload)
end onEvent
override def onEvent(
event: AdminEvent,
includeRepresentation: Boolean
): Unit =
if excludedAdminEvents.isDefined && excludedAdminEvents.contains(
event.getOperationType()
)
then return
val payload: Payload = Payload.fromEvent(event, session)
sendMessage(payload)
end onEvent
override def close(): Unit = {}
private def sendMessage(payload: Payload): Unit =
import concurrent.ExecutionContext.Implicits.global
import MqttEventListenerProviderFactory.system
val payloadBytes: Array[Byte] = Json.encode(payload).toByteArray
val msg: MqttMessage =
MqttMessage(mqttOptions.topic, ByteString.fromArray(payloadBytes))
.withRetained(mqttOptions.retained)
val future: Future[Done] = Source.single(msg).runWith(mqttSink)
future.onComplete {
case Failure(exception) =>
logger.log(
logging.Level.SEVERE,
s"Failed to publish message: ${exception.getMessage()}"
)
case Success(_) => {}
}
end sendMessage
end MqttEventListenerProvider