129 lines
4.1 KiB
Scala
129 lines
4.1 KiB
Scala
/*
|
|
* Copyright 2024 Dominic Grimm
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package net.dergrimm.keycloak.providers.events.mqtt
|
|
|
|
import java.util.logging
|
|
import org.keycloak.Config
|
|
import org.keycloak.events.EventListenerProvider
|
|
import org.keycloak.events.EventListenerProviderFactory
|
|
import org.keycloak.events.EventType
|
|
import org.keycloak.events.admin.OperationType
|
|
import org.keycloak.models.KeycloakSession
|
|
import org.keycloak.models.KeycloakSessionFactory
|
|
import scala.collection.immutable
|
|
import akka.stream.alpakka.mqtt.MqttConnectionSettings
|
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
|
|
import scala.concurrent.duration.FiniteDuration
|
|
import java.util.concurrent.TimeUnit
|
|
import akka.stream.alpakka.mqtt.scaladsl.MqttSink
|
|
import akka.actor.ActorSystem
|
|
import akka.stream.scaladsl.Sink
|
|
import akka.stream.alpakka.mqtt.MqttMessage
|
|
import akka.Done
|
|
import scala.concurrent.Future
|
|
|
|
object MqttEventListenerProviderFactory:
|
|
private val PLUGIN_ID = "mqtt"
|
|
private val PUBLISHER_ID = "keycloak"
|
|
|
|
given system: ActorSystem = ActorSystem()
|
|
end MqttEventListenerProviderFactory
|
|
|
|
class MqttEventListenerProviderFactory extends EventListenerProviderFactory:
|
|
private final val logger =
|
|
logging.Logger.getLogger(
|
|
classOf[MqttEventListenerProviderFactory].getName()
|
|
)
|
|
|
|
private var excludedEvents: Option[Set[EventType]] = None
|
|
private var excludedAdminEvents: Option[Set[OperationType]] = None
|
|
private var mqttOptions: MqttOptions = null
|
|
private var mqttSink: Sink[MqttMessage, Future[Done]] = null
|
|
|
|
override def create(session: KeycloakSession): MqttEventListenerProvider =
|
|
MqttEventListenerProvider(
|
|
session,
|
|
excludedEvents,
|
|
excludedAdminEvents,
|
|
mqttOptions,
|
|
mqttSink
|
|
)
|
|
end create
|
|
|
|
override def init(config: Config.Scope): Unit =
|
|
val excludes: Array[String] = config.getArray("excludeEvents")
|
|
excludedEvents =
|
|
if excludes != null
|
|
then
|
|
Some(
|
|
immutable.HashSet.from(excludes.map(EventType.valueOf))
|
|
)
|
|
else None
|
|
|
|
val excludesOperations: Array[String] =
|
|
config.getArray("excludesOperations")
|
|
excludedAdminEvents =
|
|
if excludesOperations != null
|
|
then
|
|
Some(
|
|
immutable.HashSet.from(excludesOperations.map(OperationType.valueOf))
|
|
)
|
|
else None
|
|
|
|
val uri: String = config.get("serverUri")
|
|
if uri == null then
|
|
throw new IllegalArgumentException("MQTT server URI is null")
|
|
|
|
val credentials: Option[(String, String)] =
|
|
val username = config.get("username")
|
|
val password = config.get("password")
|
|
|
|
if username != null && password != null
|
|
then Some(username -> password)
|
|
else None
|
|
|
|
val cleanSession: Boolean = config.getBoolean("cleanSession", true)
|
|
val connectionTimeout: FiniteDuration =
|
|
FiniteDuration(config.getLong("connectionTimeout", 10), TimeUnit.SECONDS)
|
|
|
|
mqttOptions = MqttOptions.fromConfig(config)
|
|
var connectionSettings = MqttConnectionSettings(
|
|
uri,
|
|
"net.dergrimm.keycloak.providers.events.mqtt",
|
|
new MemoryPersistence()
|
|
)
|
|
.withCleanSession(cleanSession)
|
|
.withConnectionTimeout(connectionTimeout)
|
|
|
|
credentials match
|
|
case Some(creds) =>
|
|
connectionSettings = connectionSettings.withAuth(
|
|
username = creds(0),
|
|
password = creds(1)
|
|
)
|
|
case None => {}
|
|
|
|
mqttSink = MqttSink(connectionSettings, mqttOptions.qos)
|
|
end init
|
|
|
|
override def postInit(factory: KeycloakSessionFactory): Unit = {}
|
|
|
|
override def close(): Unit = {}
|
|
|
|
override def getId(): String = MqttEventListenerProviderFactory.PLUGIN_ID
|
|
end MqttEventListenerProviderFactory
|