『瀚思彼岸』» 智能家居技术论坛

标题: 分享一个不用node-red,靠sh脚本以及pyscript来做企业微信推送.... [打印本页]

作者: animals    时间: 2023-6-25 17:25
标题: 分享一个不用node-red,靠sh脚本以及pyscript来做企业微信推送....
本帖最后由 animals 于 2023-11-2 15:59 编辑

补充一下前提, 需要本地图像识别系统才能使用, 具体可以参照
i. 免费的本地图像识别系统(人体、物体)并接入HA - 『HomeAssistant』综合讨论区 - 『瀚思彼岸』» 智能家居技术论坛 - Powered by Discuz! (hassbian.com)
ii. 搬运2个摄像头人体插件和一个人脸识别插件(都是本地) - 『HomeAssistant』综合讨论区 - 『瀚思彼岸』» 智能家居技术论坛 - Powered by Discuz! (hassbian.com) 的第二款
iii. 或者其他关于deepstack的教程

pyscript的教程 我彻底弃用HA模版Jinja了,因为Python更好更容易 [Pyscript插件] - 『HomeAssistant』综合讨论区 - 『瀚思彼岸』» 智能家居技术论坛 - Powered by Discuz! (hassbian.com)

1. 编写sh脚本,用于调用ffmpeg命令生成gif
  1. #! /bin/bash

  2. today=$(date -d $4 +"%Y-%m-%d")
  3. time=$(date -d $5 +"%H-%M-%S")
  4. root_path="/config/www/video"
  5. user=$1
  6. password=$2
  7. ip=$3

  8. # echo $4
  9. # echo ${today}
  10. # echo ${time}

  11. mkdir -p ${root_path}/${today}

  12. /usr/bin/ffmpeg -rtsp_transport tcp -i rtsp://$1:$2@$3:554/h265/ch1/main/av_stream -max_muxing_queue_size 9999 -t 10 -vf scale=1024:768 ${root_path}/${today}/${time}.avi > /dev/null
  13. /usr/bin/ffmpeg -i ${root_path}/${today}/${time}.avi ${root_path}/${today}/${time}.gif > /dev/null
  14. rm -f ${root_path}/${today}/${time}.avi
复制代码
补充一个指定分辨率来生成avi文件,适合硬盘不够大的同学使用, 分辨率根据实际情况来 参数是"-vf scale=1024:768" 上面的代码中已经补上了

2. 编写shell_command, 用来调用sh脚本
  1. shell_command:
  2.   #服务名称根据自己喜好来
  3.   save_video: sh /config/ffmpeg_ten_second_video.sh {{user}} {{pwd}} {{ip}} {{date}} {{time}}
  4.   #{{}}中的参数按顺序对应sh脚本中的$1-$5
复制代码
  1. <span style="background-color: rgb(255, 255, 255);">3. 编写pyscript脚本,用来监听事件</span>
复制代码
  1. import os
  2. from datetime import datetime

  3. def findAllVideo(base):
  4.     files = []
  5.     for root, ds, fs in os.walk(base):
  6.         for f in fs:
  7.             if f.endswith('.avi'):
  8.                 videos.append({'name': f, 'time': os.path.getmtime(f)})

  9. #监听之前定义好的人物识别事件, 并当对象是人时
  10. @event_trigger('deepstack.object_detected', "name == 'person'")
  11. def push_video(**kwargs):
  12.     # log.warning(f'WARNING: print kwargs: {kwargs}')
  13.     # print(kwargs)
  14.    
  15.     #根据事件获取实体
  16.     entity = kwargs["entity_id"]
  17.     #这是我这里的相机实体
  18.     cam_entity = f'camera.{entity}'
  19.    
  20.     # log.warning(f'WARNING:  cam_entity: {cam_entity}')
  21.     ip = None #摄像头ip
  22.     pwd = '' #摄像头rtsp密码
  23.     place = None #摄像头的名字 例如 大门
  24.     user = '' #摄像头的rtsp用户名
  25.    
  26.     if entity == 'camera_1':
  27.         ip = 'ip1'
  28.         place = "大门入口"
  29.         user = 'admin'
  30.         pwd = '123456'
  31.     elif entity == 'camera_1':
  32.         ip = 'ip2'
  33.         place = "花园"
  34.         user = 'haha'
  35.         pwd = '567890'
  36.     #...可以继续
  37.    
  38.     if ip:
  39.         now = datetime.now()
  40.         png_name = now.strftime('%Y-%m-%d-%H:%M')
  41.         today = str(now.date())
  42.         now_time = now.strftime("%H:%M:%S")#camera.snapshot
  43.         #调用截图事件,生成封面图
  44.         service.call("camera", "snapshot", entity_id=cam_entity, filename=f"截图存放路径")
  45.         # print(pwd,ip,today,now_time)
  46.         # log.info(f'INFO: pwd:{pwd}, ip:{ip}, today:{today}, time: {now_time}')
  47.         # log.warning(f'WARNING:  pwd:{pwd}, ip:{ip}, today:{today}, time: {now_time}')
  48.         
  49.         #调用shell_command命令,使用ffmepg生成gif文件
  50.         #问题: 产生文件效率较慢,有其他更好的截取视频的方案吗?
  51.         service.call("shell_command", "save_video", user=user,pwd=pwd,ip=ip,date=today, time=now_time)
  52.         
  53.         path = f'视频及gif的存放路径'
  54.         if os.path.exists(path):
  55.             _try_times = 0
  56.             
  57.             #重试20次
  58.             gif_path = f"{path}/{now_time.replace(':','-')}.gif" #gif文件路径
  59.             avi_path = f"{path}/{now_time.replace(':','-')}.avi" #avi文件路径
  60.             for i in range(20):
  61.                 # log.warning(f'WARNING: gif path check{str(i+1)}')
  62.                 # if os.path.exists(avi_path):
  63.                 #     log.warning(f'WARNING: avi:{avi_path} size: {os.path.getsize(avi_path)}')
  64.                 # if os.path.exists(gif_path):
  65.                 #     log.warning(f'WARNING: gif:{gif_path} size: {os.path.getsize(gif_path)}')
  66.                 #如果avi删除并且存在gif文件,表示gif已经成功生成了
  67.                 if os.path.exists(gif_path) and not os.path.exists(avi_path):
  68.                     # log.warning(f'WARNING: gif exists')
  69.                     service.call("notify", "wework", data={"type":"news","picurl":f"{hasUrl}/local/{封面路径}", "url":f"{hasUrl}/local/{gif路径}", title=f"{place}发现有人活动", message=f"##可信度:{kwargs['confidence']},位置为:{kwargs['centroid']['x']},{kwargs['centroid']['y']} 点击查看动图")
  70.                     return
  71.                 task.sleep(3)
  72.                 #使用task.sleep, 如果使用time包的sleep会造成线程阻塞
  73.                 # else:
  74.                 #     _time.sleep(3) #5秒重试一次
复制代码

4. 大概效果
[attach]48998[/attach]










作者: ucomeito    时间: 2023-6-25 18:16
不错,有空试一下。
作者: sorrypqa    时间: 2023-6-25 21:15
太牛了,高手就是可以为所欲为
作者: hyq    时间: 2023-6-25 21:24
高手可以为所欲为,小白就只能眼馋!
作者: ilongjiang    时间: 2023-6-25 22:37
没看明白怎么用
作者: ahmengkai    时间: 2023-6-25 23:12
虽然没看懂 还是先收藏起来




欢迎光临 『瀚思彼岸』» 智能家居技术论坛 (https://bbs.hassbian.com/) Powered by Discuz! X3.5