请选择 进入手机版 | 继续访问电脑版

『瀚思彼岸』» 智能家居技术论坛

 找回密码
 立即注册
查看: 367|回复: 6

[经验分享] [新手慎入]分享一个360可视门铃接入HA的思路

[复制链接]

1

主题

4

帖子

82

积分

论坛技术达人

积分
82
金钱
78
HASS币
0
发表于 2021-2-19 18:09:17 | 显示全部楼层 |阅读模式
我用的是1C这个型号,其他型号应该也没啥问题.接入方法比较简单粗暴.

                               
登录/注册后可看大图

思路:
每几秒让HA ping一下门铃的IP,如果能PING通则说明门铃被触发或者是定期轮询服务器上的设置信息.这时候直接通过小程序版的360摄像机的协议来下载最新的视频和图片.由于我的HA是放在树莓派上的,空间并不多,所以下载之后的视频文件上传到七牛云进行备份.

由于360摄像机的视频流数据均通过加密传输,所以暂时没办法来实现视频直播了(手动遗憾),期待有朋友能解决下这个问题.
request_params 需要通过抓360摄像机的包来获取里面的数据

代码如下:
# coding: utf-8
import requests
import json
import datetime
import logging
import argparse
import os
import urllib.request
import pathlib
import shutil
from minio import Minio
import mimetypes
from minio.error import ResponseError
import time
from qiniu import Auth, put_file, etag
import qiniu.config

log_file = '360.log'
request_params = 'qid=*****&sid=*****&from=mpc_ipcam_wechatmp'
access_key = '七牛云AK'
secret_key = '七牛云SK'
bucket_name = 'BUCKET名称'

if os.path.exists(log_file):
    os.remove(log_file)

logging.basicConfig(filename=log_file, level=logging.DEBUG, format='%(asctime)s %(message)s',
                    datefmt='%m/%d/%Y %I:%M:%S %p')
logger = logging.getLogger("360可视门铃")
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)


class IPC360:
    headers = {
        'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/7.0.11(0x17000b21) NetType/WIFI Language/zh_CN',
        'Content-Type': 'application/x-www-form-urlencoded',
        "Referer": 'https://servicewechat.com/wx5f78be8ca091436c/11/page-frame.html'
    }

    def build_url(self, action):
        return "https://q3.jia.360.cn/" + action + "?" + request_params

    def post_request(self, url, data):
        logger.info('请求地址 %s ...', url)
        res = requests.post(url=url, data=data, headers=self.headers)
        res = json.loads(res.text)
        return res

    def ipc_list(self):
        data = {
            'version': 3
        }
        return self.post_request(self.build_url('app/getIpcList'), data)

    def index(self):
        data = {
            'days': 1
        }
        return self.post_request(self.build_url('event/getIndex'), data)

    def list_by_sn(self, sn, ipc_type=5):
        today = datetime.date.today()
        data = {
            'sn': sn,
            'ipcType': ipc_type,
            'humanCount': 0,
            'date': today.strftime("%Y%m%d"),
            'count': 100,
            'eventTypes': '[304,305,306,307]'
        }
        return self.post_request(self.build_url('event/getListBySn'), data)


parser = argparse.ArgumentParser(description='Start Args')
parser.add_argument('--download', type=bool, default=True)
args = parser.parse_args()
path = os.getcwd() + "/download/"
if not os.path.exists(path):
    os.makedirs(path)


def upload():
    logger.info('开始上传文件.')
    q = Auth(access_key, secret_key)
    for _, dirs, _ in os.walk(path):
        for dir in dirs:
            for parent, _, files in os.walk(path + dir):
                for file in files:
                    file_name = parent + '/' + file
                    suffix = file_name.split(".")
                    if not file.split('.')[0].isdigit():
                        continue
                    timestamp = int(file.split('.')[0][:-3])
                    time_array = time.localtime(timestamp)
                    save_name = dir + '/' + time.strftime("%Y-%m-%d/%H-%M-%S", time_array)
                    file_stat = os.stat(file_name)
                    if file_stat.st_size > 0:
                        save_name = save_name + '.' + suffix[1]
                        logger.info('正在上传: %s', file_name)
                        token = q.upload_token(bucket_name, save_name, 3600)
                        ret, info = put_file(token, save_name, file_name)
                        logger.info(info)
                        os.remove(file_name)
                    else:
                        if len(suffix) > 1:
                            os.remove(file_name)


if args.download:
    upload()
    logger.info('延时3秒')
    time.sleep(3)
    ipc = IPC360()
    ipc_list = ipc.ipc_list()
    errorCode = ipc_list["errorCode"]
    logger.info('获取摄像头信息,返回代码:%d', errorCode)
    if errorCode == 0:
        for device in ipc_list["devices"]:
            logger.info('正在检查设备 SN:%s(%s) ...', device['sn'], device['title'])
            sn_path = path + device["sn"] + "/"
            if not os.path.exists(sn_path):
                os.makedirs(sn_path)
            index = 0
            for event in device["eventList"]:
                video_file = sn_path + str(event["eventTime"])
                video_url = event["videoUrl"] + '&' + request_params
                thumb_url = event["thumbUrl"] + '&' + request_params
                thumb_file = video_file + ".jpg"
                tmp_file = video_file + '.mpeg'
                if not os.path.exists(sn_path + 'latest.mpeg'):
                    logger.info('最新视频不存在,下载中... %s', sn_path + 'latest.mpeg')
                    urllib.request.urlretrieve(video_url, sn_path + 'latest.mpeg')
                if os.path.exists(video_file):
                    logger.info('视频记录 %s 已上传,跳过...', tmp_file)
                    continue
                urllib.request.urlretrieve(video_url, tmp_file)
                if not os.path.exists(tmp_file):
                    logger.info("文件 %s 下载失败 %s", video_url, tmp_file)
                    continue
                logger.info('视频记录 %s 已保存到:%s .', str(event["eventTime"]), tmp_file)
                if os.path.getsize(tmp_file) == 0:
                    logger.info('视频文件尺寸不正确,暂时跳过该文件: %s', video_url)
                    continue
                urllib.request.urlretrieve(thumb_url, thumb_file)
                if index == 0:
                    shutil.copy(tmp_file, sn_path + 'latest.mpeg')
                    shutil.copy(thumb_file, sn_path + 'latest.jpg')
                pathlib.Path(video_file).touch()
                index += 1
    upload()
    logger.info('完成...')





评分

参与人数 2金钱 +30 收起 理由
chiunownow + 10 纳尼,还有这种操作?
+ 20 厉害了word楼主!

查看全部评分

回复

使用道具 举报

114

主题

4200

帖子

1万

积分

管理员

囧死

Rank: 9Rank: 9Rank: 9

积分
13161
金钱
8876
HASS币
45
发表于 2021-2-19 23:09:45 | 显示全部楼层
厉害厉害!
回复

使用道具 举报

4

主题

373

帖子

1584

积分

金牌会员

Rank: 6Rank: 6

积分
1584
金钱
1211
HASS币
0
QQ
发表于 2021-2-20 01:26:07 来自手机 | 显示全部楼层
前排支持
回复

使用道具 举报

88

主题

465

帖子

1857

积分

金牌会员

Rank: 6Rank: 6

积分
1857
金钱
1392
HASS币
20
发表于 2021-2-21 20:15:14 | 显示全部楼层
曲线救国,就是绕的有点远
回复

使用道具 举报

0

主题

24

帖子

60

积分

注册会员

Rank: 2

积分
60
金钱
36
HASS币
0
发表于 2021-2-22 01:28:54 | 显示全部楼层
厉害啊!!!
回复

使用道具 举报

0

主题

24

帖子

60

积分

注册会员

Rank: 2

积分
60
金钱
36
HASS币
0
发表于 2021-2-22 12:19:38 | 显示全部楼层
要是有详细操作就好了
回复

使用道具 举报

0

主题

25

帖子

120

积分

注册会员

Rank: 2

积分
120
金钱
95
HASS币
0
发表于 2021-2-24 18:21:14 | 显示全部楼层
我正是在用360門鈴, 請問有詳細教學嗎?  感謝~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|Hassbian

GMT+8, 2021-3-5 03:04 , Processed in 0.106490 second(s), 31 queries .

Powered by Discuz! X3.4

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表