using UnityEngine; using MQTTnet; using MQTTnet.Client; using System; using System.Text; using System.IO; using System.Threading; using System.Threading.Tasks; using System.Collections.Concurrent; // Klasa dla danych z M5Stick (Tętno + GSR) [Serializable] public class M5StickData { public float hr; public float gsr; } // Klasa dla danych z SeeedStudio (Tylko Tętno) [Serializable] public class SeeedData { public float hr; } public class ExperimentManager : MonoBehaviour { public static ExperimentManager Instance; [Header("MQTT Settings")] public string brokerIp = "172.30.106.19"; public int port = 1883; public string topicM5Stick = "m5stick/state"; public string topicSeeed = "seeedstudio-mr60bha2/state"; [Header("Watchdog Settings")] public float timeoutSeconds = 5f; // Czas w sekundach do włączenia alarmu private IMqttClient _mqttClient; private string _subjectFolderPath; // Ścieżki do plików CSV private string _m5stickCsvFilePath; private string _seeedCsvFilePath; // Osobne kolejki dla obu urządzeń private ConcurrentQueue _m5stickQueue = new ConcurrentQueue(); private ConcurrentQueue _seeedQueue = new ConcurrentQueue(); // Zmienne do pilnowania czasu (Watchdog) dla obu urządzeń private float _timeSinceLastM5Stick = 0f; private float _timeSinceLastSeeed = 0f; private bool _isM5StickTimeoutWarningShown = false; private bool _isSeeedTimeoutWarningShown = false; void Awake() { if (Instance == null) { Instance = this; DontDestroyOnLoad(gameObject); } else { Destroy(gameObject); } } public async void InitializeSession(string subjectId) { string dataPath = Path.Combine(Application.dataPath, "Data"); _subjectFolderPath = Path.Combine(dataPath, subjectId); Directory.CreateDirectory(_subjectFolderPath); Directory.CreateDirectory(Path.Combine(_subjectFolderPath, "Neutral")); Directory.CreateDirectory(Path.Combine(_subjectFolderPath, "Experimental")); Debug.Log($"[ExperimentManager] Folders created for subject: {subjectId}"); await ConnectToMqttAsync(); } public void SetPhase(string phaseName) { string timestamp = DateTime.Now.ToString("yyyyMMdd_HHmmss"); // Tworzenie nazw plików dla obu urządzeń string m5stickFileName = $"{phaseName}_M5Stick_Data_{timestamp}.csv"; string seeedFileName = $"{phaseName}_Seeed_Data_{timestamp}.csv"; _m5stickCsvFilePath = Path.Combine(_subjectFolderPath, phaseName, m5stickFileName); _seeedCsvFilePath = Path.Combine(_subjectFolderPath, phaseName, seeedFileName); // Nagłówki dostosowane do danych if (!File.Exists(_m5stickCsvFilePath)) { File.WriteAllText(_m5stickCsvFilePath, "Timestamp;HR;GSR\n"); } if (!File.Exists(_seeedCsvFilePath)) { File.WriteAllText(_seeedCsvFilePath, "Timestamp;HR\n"); } Debug.Log($"[ExperimentManager] Logging M5Stick to: {_m5stickCsvFilePath}"); Debug.Log($"[ExperimentManager] Logging Seeed to: {_seeedCsvFilePath}"); } public string GetCurrentFolderPath() { if (string.IsNullOrEmpty(_m5stickCsvFilePath)) return null; return Path.GetDirectoryName(_m5stickCsvFilePath); } public void StopLogging() { _m5stickCsvFilePath = null; _seeedCsvFilePath = null; Debug.Log("[ExperimentManager] Logging stopped. Data collection finished."); } private async Task ConnectToMqttAsync() { var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); var options = new MqttClientOptionsBuilder() .WithTcpServer(brokerIp, port) .Build(); _mqttClient.ApplicationMessageReceivedAsync += e => { string topic = e.ApplicationMessage.Topic; string payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment); try { if (topic == topicM5Stick) { M5StickData data = JsonUtility.FromJson(payload); _m5stickQueue.Enqueue(data); } else if (topic == topicSeeed) { SeeedData data = JsonUtility.FromJson(payload); _seeedQueue.Enqueue(data); } } catch (Exception ex) { Debug.LogWarning($"[MQTT] Parse error on topic {topic}: {ex.Message}"); } return Task.CompletedTask; }; try { await _mqttClient.ConnectAsync(options, CancellationToken.None); // Subskrypcja obu tematów var subOptions = new MqttClientSubscribeOptionsBuilder() .WithTopicFilter(f => f.WithTopic(topicM5Stick)) .WithTopicFilter(f => f.WithTopic(topicSeeed)) .Build(); await _mqttClient.SubscribeAsync(subOptions, CancellationToken.None); Debug.Log("[MQTT] Connected successfully and subscribed to M5Stick & Seeed!"); } catch (Exception ex) { Debug.LogError($"[MQTT] Connection Error: {ex.Message}"); } } void Update() { bool receivedM5StickThisFrame = false; bool receivedSeeedThisFrame = false; string timestamp = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"); // Opróżnianie kolejki dla M5Stick while (_m5stickQueue.TryDequeue(out M5StickData m5Data)) { receivedM5StickThisFrame = true; if (!string.IsNullOrEmpty(_m5stickCsvFilePath)) { string csvLine = $"{timestamp};{m5Data.hr};{m5Data.gsr}\n"; File.AppendAllText(_m5stickCsvFilePath, csvLine); } } // Opróżnianie kolejki dla SeeedStudio while (_seeedQueue.TryDequeue(out SeeedData seeedData)) { receivedSeeedThisFrame = true; if (!string.IsNullOrEmpty(_seeedCsvFilePath)) { string csvLine = $"{timestamp};{seeedData.hr}\n"; File.AppendAllText(_seeedCsvFilePath, csvLine); } } // --- SYSTEM WATCHDOG (Pilnowanie transmisji) --- if (_mqttClient != null && _mqttClient.IsConnected) { // Watchdog: M5Stick if (receivedM5StickThisFrame) { _timeSinceLastM5Stick = 0f; if (_isM5StickTimeoutWarningShown) { Debug.Log("[ExperimentManager] Transmisja odzyskana! M5Stick znów nadaje."); _isM5StickTimeoutWarningShown = false; } } else { _timeSinceLastM5Stick += Time.deltaTime; if (_timeSinceLastM5Stick >= timeoutSeconds && !_isM5StickTimeoutWarningShown) { Debug.LogError($"[UWAGA MQTT!] Brak danych od M5Stick przez {timeoutSeconds} sek!"); _isM5StickTimeoutWarningShown = true; } } // Watchdog: SeeedStudio if (receivedSeeedThisFrame) { _timeSinceLastSeeed = 0f; if (_isSeeedTimeoutWarningShown) { Debug.Log("[ExperimentManager] Transmisja odzyskana! SeeedStudio znów nadaje."); _isSeeedTimeoutWarningShown = false; } } else { _timeSinceLastSeeed += Time.deltaTime; if (_timeSinceLastSeeed >= timeoutSeconds && !_isSeeedTimeoutWarningShown) { Debug.LogError($"[UWAGA MQTT!] Brak danych od SeeedStudio przez {timeoutSeconds} sek!"); _isSeeedTimeoutWarningShown = true; } } } } private async void OnApplicationQuit() { if (_mqttClient != null && _mqttClient.IsConnected) { await _mqttClient.DisconnectAsync(); } } }