using System.Net; using System.Text; using Microsoft.Extensions.Options; using MQTTnet.Protocol; using MQTTnet.Server; namespace ProofOfConcept.Services; public class MQTTServer : IHostedService { private readonly ILogger logger; private readonly MQTTServerConfiguration configuration; private readonly MqttServer server; public MQTTServer(ILogger logger, IOptions options) { this.logger = logger; this.configuration = options.Value; MqttServerOptions mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().WithDefaultEndpointPort(this.configuration.Port).WithKeepAlive().Build(); this.server = new MqttServerFactory().CreateMqttServer(mqttServerOptions); this.server.ClientConnectedAsync += (e) => { logger.LogInformation("Client connected from: {IPAddress}. ClientID: {ClientID}", e.RemoteEndPoint.ToString(), e.ClientId); return Task.CompletedTask; }; this.server.ClientDisconnectedAsync += (e) => { logger.LogInformation("Client {ClientID} disconnected from: {IPAddress}. Reason: {ReasonString} ({ReasonCode})", e.ClientId, e.RemoteEndPoint.ToString(), e.ReasonString, e.ReasonCode); return Task.CompletedTask; }; this.server.ClientSubscribedTopicAsync += (e) => { logger.LogInformation("Client {ClientID} subscribed to topic: {Topic}", e.ClientId, e.TopicFilter.Topic); return Task.CompletedTask; }; this.server.ClientUnsubscribedTopicAsync += (e) => { logger.LogInformation("Client {ClientID} unsubscribed from topic: {Topic}", e.ClientId, e.TopicFilter); return Task.CompletedTask; }; this.server.InterceptingPublishAsync += (e) => { if (e.ApplicationMessage.Topic == "telemetry" && e.ClientId != this.configuration.TelemetryClient.ClientID) { logger.LogWarning("Client {ClientID} tried to publish to telemetry topic", e.ClientId); e.Response.ReasonCode = MqttPubAckReasonCode.NotAuthorized; return Task.CompletedTask; } logger.LogInformation("Client {ClientID} published message to topic: {Topic}: {Message}", e.ClientId, e.ApplicationMessage.Topic, Encoding.UTF8.GetString(e.ApplicationMessage.Payload)); return Task.CompletedTask; }; this.server.ValidatingConnectionAsync += (e) => { //Local client (by clientID) if (e.ClientId == this.configuration.LocalClient.ClientID) { //Not local connection (IP is not loopback) if (e.RemoteEndPoint is IPEndPoint ipEndPoint && !IPAddress.IsLoopback(ipEndPoint.Address)) { logger.LogWarning("Local client tried to connect from non-loopback address: {IPAddress}", ipEndPoint.ToString()); e.ReasonCode = MqttConnectReasonCode.NotAuthorized; } //Invalid username or password else if (e.UserName != configuration.LocalClient.Username || e.Password != configuration.LocalClient.Password) { logger.LogWarning("Local client tried to connect with invalid credentials"); e.ReasonCode = MqttConnectReasonCode.NotAuthorized; } } //Telemetry client else if (e.ClientId == this.configuration.TelemetryClient.ClientID) { if (e.UserName != configuration.TelemetryClient.Username || e.Password != configuration.TelemetryClient.Password) { logger.LogWarning("Telemetry client tried to connect with invalid credentials"); e.ReasonCode = MqttConnectReasonCode.NotAuthorized; } } //Observer else if ((this.configuration.Observer.ClientID is not null && e.ClientId == this.configuration.Observer.ClientID) || this.configuration.Observer.ClientID is null) { if (e.UserName != configuration.Observer.Username || e.Password != configuration.Observer.Password) { logger.LogWarning("Observer tried to connect with invalid credentials: {Username} / {Password} from {RemoteIP}", e.UserName, e.Password, e.RemoteEndPoint is IPEndPoint ipEndPoint ? ipEndPoint.ToString() : e.RemoteEndPoint.GetType().FullName); e.ReasonCode = MqttConnectReasonCode.NotAuthorized; } } else e.ReasonCode = MqttConnectReasonCode.Banned; return Task.CompletedTask; }; } public async Task StartAsync(CancellationToken cancellationToken) { this.logger.LogTrace("Starting MQTT Server..."); await this.server.StartAsync(); this.logger.LogInformation("MQTT Server started"); } public async Task StopAsync(CancellationToken cancellationToken) { this.logger.LogTrace("Stopping MQTT Server..."); await this.server.StopAsync(); this.logger.LogInformation("MQTT Server stopped"); } } public class MQTTServerConfiguration { public ushort Port { get; set; } = 1883; public MQTTAuthenticationData TelemetryClient { get; set; } = new MQTTAuthenticationData("telemetry", "fleet", "9R0FXNs5NEIEbI4"); //Must have a known client id public MQTTAuthenticationData LocalClient { get; set; } = new MQTTAuthenticationData("local", "client", "xuhvoc-parrox-xuzsE4"); //Must have a known client id public MQTTAuthenticationData Observer { get; set; } = new MQTTAuthenticationData("observer", "JC740d4c4E9dveh"); } public class MQTTAuthenticationData { public string? ClientID { get; set; } public string Username { get; set; } public string Password { get; set; } public MQTTAuthenticationData(string clientId, string username, string password) { ClientID = clientId; Username = username; Password = password; } public MQTTAuthenticationData(string username, string password) { ClientID = null; Username = username; Password = password; } }