123 lines
4 KiB
Scala
123 lines
4 KiB
Scala
/*
|
|
* Copyright 2024 Dominic Grimm
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package net.dergrimm.keycloak.providers.events.mqtt
|
|
|
|
import java.util.logging
|
|
import org.keycloak.Config
|
|
import org.keycloak.events.EventListenerProvider
|
|
import org.keycloak.events.EventListenerProviderFactory
|
|
import org.keycloak.events.EventType
|
|
import org.keycloak.events.admin.OperationType
|
|
import org.keycloak.models.KeycloakSession
|
|
import org.keycloak.models.KeycloakSessionFactory
|
|
import scala.collection.immutable
|
|
import akka.stream.alpakka.mqtt.MqttConnectionSettings
|
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
|
|
import scala.concurrent.duration.FiniteDuration
|
|
import java.util.concurrent.TimeUnit
|
|
import akka.stream.alpakka.mqtt.scaladsl.MqttSink
|
|
import akka.actor.ActorSystem
|
|
|
|
object MqttEventListenerProviderFactory:
|
|
private val PLUGIN_ID = "mqtt"
|
|
private val PUBLISHER_ID = "keycloak"
|
|
|
|
implicit val system: ActorSystem = ActorSystem()
|
|
end MqttEventListenerProviderFactory
|
|
|
|
class MqttEventListenerProviderFactory(
|
|
var data: MqttEventListenerProviderFactoryData
|
|
) extends EventListenerProviderFactory:
|
|
private final val logger =
|
|
logging.Logger.getLogger(
|
|
classOf[MqttEventListenerProviderFactory].getName()
|
|
)
|
|
|
|
def this() = this(null)
|
|
|
|
override def create(session: KeycloakSession): MqttEventListenerProvider =
|
|
MqttEventListenerProvider(
|
|
session,
|
|
excludedEvents = data.excludedEvents,
|
|
excludedAdminEvents = data.excludedAdminOperations,
|
|
mqttOptions = data.mqttOptions,
|
|
mqttSink = data.mqttSink
|
|
)
|
|
|
|
override def init(config: Config.Scope): Unit =
|
|
val excludes = config.getArray("excludeEvents")
|
|
val excludedEvents =
|
|
if excludes != null then
|
|
Some(
|
|
immutable.HashSet.from(excludes.map(EventType.valueOf))
|
|
)
|
|
else None
|
|
|
|
val excludesOperations = config.getArray("excludesOperations")
|
|
val excludedAdminOperations =
|
|
if excludesOperations != null then
|
|
Some(
|
|
immutable.HashSet.from(excludesOperations.map(OperationType.valueOf))
|
|
)
|
|
else None
|
|
|
|
val uri = config.get("serverUri")
|
|
if uri == null then
|
|
throw new IllegalArgumentException("MQTT server URI is null")
|
|
|
|
val credentials =
|
|
val username = config.get("username")
|
|
val password = config.get("password")
|
|
if username != null && password != null then Some((username, password))
|
|
else None
|
|
|
|
val cleanSession = config.getBoolean("cleanSession", true)
|
|
val connectionTimeout =
|
|
FiniteDuration(config.getLong("connectionTimeout", 10), TimeUnit.SECONDS)
|
|
|
|
val mqttOptions = MqttOptions.fromConfig(config)
|
|
var connectionSettings = MqttConnectionSettings(
|
|
uri,
|
|
"net.dergrimm.keycloak.providers.events.mqtt",
|
|
new MemoryPersistence
|
|
).withCleanSession(cleanSession)
|
|
.withConnectionTimeout(connectionTimeout)
|
|
|
|
credentials match
|
|
case Some(creds) =>
|
|
connectionSettings = connectionSettings.withAuth(
|
|
username = creds(0),
|
|
password = creds(1)
|
|
)
|
|
case None => {}
|
|
|
|
val sink = MqttSink(connectionSettings, mqttOptions.qos)
|
|
|
|
data = MqttEventListenerProviderFactoryData(
|
|
excludedEvents = excludedEvents,
|
|
excludedAdminOperations = excludedAdminOperations,
|
|
mqttOptions,
|
|
sink
|
|
)
|
|
|
|
override def postInit(factory: KeycloakSessionFactory): Unit = {}
|
|
|
|
override def close(): Unit = {}
|
|
|
|
override def getId(): String = MqttEventListenerProviderFactory.PLUGIN_ID
|
|
end MqttEventListenerProviderFactory
|