『瀚思彼岸』» 智能家居技术论坛

 找回密码
 立即注册
查看: 580|回复: 0

[技术探讨] 宿主机cpu、内存、硬盘占用、温度接入docker下的ha

[复制链接]

1

主题

16

帖子

110

积分

注册会员

Rank: 2

积分
110
金钱
94
HASS币
0
发表于 2024-10-17 15:57:43 | 显示全部楼层 |阅读模式
我的硬件是华为悦盒刷的ubuntu,获取cpu、内存、硬盘、cpu温度指令与别的硬件或许不通用,需自行测试修改

一、宿主机安装mqtt服务,开机自启(这部分自行百度安装)

二、采用python脚本运行指令获取宿主机的cpu、内存、硬盘占用信息采用mqtt上传到homeassistant(代码需填入的内容需与宿主机设置的内容一直)
import json
import paho.mqtt.client as mqtt
import time
import subprocess

# MQTT 配置
MQTT_SERVER = "xxx" #mqtt服务器地址
MQTT_PORT = xxx #端口
MQTT_USERNAME = "xxx" #账号
MQTT_PASSWORD = "xxx" #密码

# MQTT 发现主题和状态主题
DISCOVERY_TOPIC_CPU_USAGE = "homeassistant/sensor/cpu_usage/config"
STATE_TOPIC_CPU_USAGE = "cpu_usage/sensor/state"
DISCOVERY_TOPIC_DISK_USAGE = "homeassistant/sensor/disk_usage/config"
STATE_TOPIC_DISK_USAGE = "disk_usage/sensor/state"
DISCOVERY_TOPIC_MEMORY_USAGE = "homeassistant/sensor/memory_usage/config"
STATE_TOPIC_MEMORY_USAGE = "memory_usage/sensor/state"

# 创建MQTT客户端实例
client = mqtt.Client()


# 当连接到MQTT Broker时调用
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to MQTT Broker!")
        # 发送发现消息
        send_discovery_message()
    else:
        print(f"Failed to connect, return code {rc}")


# 发送MQTT发现消息
def send_discovery_message():
    config_payload_cpu_usage = {
        "name": "CPU Usage",
        "state_topic": STATE_TOPIC_CPU_USAGE,
        "unit_of_measurement": "%",
        "value_template": "{{ value_json.cpu_usage }}",
        "unique_id": "cpu_usage_sensor",
        "device": {
            "identifiers": ["cpu_usage_device"],
            "name": "CPU Usage Device"
        }
    }

    config_payload_disk_usage = {
        "name": "Disk Usage",
        "state_topic": STATE_TOPIC_DISK_USAGE,
        "unit_of_measurement": "%",
        "value_template": "{{ value_json.disk_usage }}",
        "unique_id": "disk_usage_sensor",
        "device": {
            "identifiers": ["disk_usage_device"],
            "name": "Disk Usage Device"
        }
    }

    config_payload_memory_usage = {
        "name": "Memory Usage",
        "state_topic": STATE_TOPIC_MEMORY_USAGE,
        "unit_of_measurement": "%",
        "value_template": "{{ value_json.memory_usage }}",
        "unique_id": "memory_usage_sensor",
        "device": {
            "identifiers": ["memory_usage_device"],
            "name": "Memory Usage Device"
        }
    }

    client.publish(DISCOVERY_TOPIC_CPU_USAGE, payload=json.dumps(config_payload_cpu_usage), retain=True)
    client.publish(DISCOVERY_TOPIC_DISK_USAGE, payload=json.dumps(config_payload_disk_usage), retain=True)
    client.publish(DISCOVERY_TOPIC_MEMORY_USAGE, payload=json.dumps(config_payload_memory_usage), retain=True)
    print("Discovery messages sent.")


# 获取并发送系统资源使用率
def get_and_send_system_usage():
    command = r'''
    df -h / | awk '$NF=="/"{printf "%.2f\n", $5}' | tr -d '%' && free -m | awk 'NR==2{printf "%.2f\n", $3/$2*100}' && top -bn1 | grep 'Cpu(s)' | awk '{print $2+$4}'
    '''
    result = subprocess.run(
        command,
        shell=True,  # 使用 shell=True 来执行包含管道的命令
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )

    # 分割输出以获取各个值
    outputs = result.stdout.strip().split('\n')

    if len(outputs) == 3:
        try:
            disk_usage = float(outputs[0])
            memory_usage = float(outputs[1])
            cpu_usage = float(outputs[2])

            # 发布磁盘使用率
            state_payload_disk_usage = {"disk_usage": disk_usage}
            client.publish(STATE_TOPIC_DISK_USAGE, payload=json.dumps(state_payload_disk_usage))
            print(f"Published: Disk Usage={disk_usage}%")

            # 发布内存使用率
            state_payload_memory_usage = {"memory_usage": memory_usage}
            client.publish(STATE_TOPIC_MEMORY_USAGE, payload=json.dumps(state_payload_memory_usage))
            print(f"Published: Memory Usage={memory_usage}%")

            # 发布CPU使用率
            state_payload_cpu_usage = {"cpu_usage": cpu_usage}
            client.publish(STATE_TOPIC_CPU_USAGE, payload=json.dumps(state_payload_cpu_usage))
            print(f"Published: CPU Usage={cpu_usage}%")
        except ValueError as e:
            print("Failed to parse system usage: ", e, result.stderr)
    else:
        print("Unexpected number of output lines: ", len(outputs), result.stderr)


# 设置回调函数
client.on_connect = on_connect

# 连接到MQTT Broker
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.connect(MQTT_SERVER, MQTT_PORT, 60)

# 开始循环处理网络流量,阻塞调用直到断开连接
client.loop_start()

try:
    while True:
        # 获取并发送系统资源使用率
        get_and_send_system_usage()
        time.sleep(10)  # 每10秒更新一次
except KeyboardInterrupt:
    print("Exiting...")

# 停止循环
client.loop_stop()
三、py脚本设置权限并设置开机自启动
#py脚本权限,脚本在根目录
chmod +x /system_status_mqtt.py

#创建或修改systemd服务文件:
sudo nano /etc/systemd/system/system_status_mqtt.service
#创建内容:
[Unit]
Description=System Status MQTT Service
After=network.target

[Service]
Type=oneshot
ExecStartPre=-sleep 120
ExecStart=/usr/bin/python3 /system_status_mqtt.py
RemainAfterExit=yes
Restart=on-failure

[Install]
WantedBy=multi-user.target

#启用并启动服务
sudo systemctl enable system_status_mqtt.service
sudo systemctl start system_status_mqtt.service

#查看服务状态
sudo systemctl status system_status_mqtt.service
#查看服务日志
sudo journalctl -u system_status_mqtt.service
四、获取温度直接在yaml文件中配置
command_line:
  - sensor:
      name: CPU Temperature
      command: >
        grep Tsensor /proc/msp/pm_cpu | awk '{print $4}'
      unit_of_measurement: "°C"
      value_template: "{{ value }}"
      scan_interval: 5


接入到ha

接入到ha

cpu

cpu

cpu

cpu

内存

内存

评分

参与人数 1金钱 +12 收起 理由
隔壁的王叔叔 + 12 不错哦,这是高手

查看全部评分

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|Hassbian

GMT+8, 2025-1-10 01:29 , Processed in 0.048017 second(s), 26 queries .

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表