Maison > développement back-end > Tutoriel Python > Comment utiliser Python pour implémenter un gadget de déduplication vidéo

Comment utiliser Python pour implémenter un gadget de déduplication vidéo

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
Libérer: 2023-04-19 14:37:14
avant
1825 Les gens l'ont consulté

Créez un nouveau supplément de méthode dup_video

import json
import os
import shutil

import cv2
import imagehash
from PIL import Image
from loguru import logger
from PySimpleGUI import popup_get_folder


class VideoDuplicate(object):
    '''
    返回整个视频的图片指纹列表
    从1秒开始,每3秒抽帧,计算一张图像指纹
    '''

    def __init__(self):
        self._over_length_video: list = []
        self._no_video: list = []

    def _video_hash(self, video_path) -> list:
        '''
        @param video_path -> 视频绝对路径;
        '''
        hash_arr = []
        cap = cv2.VideoCapture(video_path)  ##打开视频文件
        logger.info(f'开始抽帧【{video_path}】')

        n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))  # 视频的帧数
        logger.warning(f'视频帧数:{n_frames}')

        fps = cap.get(cv2.CAP_PROP_FPS)  # 视频的帧率
        logger.warning(f'视频帧率:{fps}')

        dur = n_frames / fps * 1000  # 视频大致总长度
        cap_set = 1000
        logger.warning(f'视频大约总长:{dur / 1000}')
        if dur // 1000 > 11:
            logger.error(f'视频时长超出规定范围【6~10】;当前时长:【{dur // 1000}】;跳过该视频;')
            self._over_length_video.append(video_path)
            return []

        while cap_set < dur:  # 从3秒开始,每60秒抽帧,计算图像指纹。总长度-3s,是因为有的时候计算出来的长度不准。
            cap.set(cv2.CAP_PROP_POS_MSEC, cap_set)
            logger.debug(f&#39;开始提取:【{cap_set // 1000}】/s的图片;&#39;)
            # 返回该时间点的,图像(numpy数组),及读取是否成功
            success, image_np = cap.read()
            if success:
                img = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB))  # 转成cv图像格式
                h = str(imagehash.dhash(img))
                logger.success(f&#39;【{cap_set}/s图像指纹:【{h}】&#39;)
                hash_arr.append(h)  # 图像指纹
            else:
                logger.error(str(cap_set / 1000))
            cap_set += 1000 * 2
        cap.release()  # 释放视频
        return hash_arr

    def start(self, base_dir):
        &#39;&#39;&#39;
        @param base_dir -> 主文件路径;
        &#39;&#39;&#39;
        data: list = []
        for video in os.listdir(base_dir):
            logger.debug(f&#39;-&#39; * 80)
            name, ext = os.path.splitext(video)
            if ext not in (&#39;.mp4&#39;, &#39;.MP4&#39;):
                logger.error(f&#39;视频文件格式不符;【{video}】;执行跳过;&#39;)
                continue

            abs_video_path = os.path.join(base_dir, video)
            video_hash_list = self._video_hash(abs_video_path)
            if video_hash_list:
                data.append({&#39;video_abs_path&#39;: abs_video_path, &#39;hash&#39;: video_hash_list})

        self._write_log(data)
        return data

    @staticmethod
    def _write_log(data: list) -> None:
        &#39;&#39;&#39;视频哈希后的值写入日志文件&#39;&#39;&#39;
        with open(f&#39;log.txt&#39;, &#39;w+&#39;, encoding=&#39;utf-8&#39;) as f:
            f.write(json.dumps(data))

    def __call__(self, base_dir, *args, **kwargs):
        self.start(base_dir)
        logger.debug(f&#39;-----------------------------------开始比对关键帧差值感知余弦算法-----------------------------&#39;)

        with open(&#39;log.txt&#39;) as f:
            data = json.loads(f.read())
            for i in range(0, len(data) - 1):
                for j in range(i + 1, len(data)):
                    if data[i][&#39;hash&#39;] == data[j][&#39;hash&#39;]:
                        _, filename = os.path.split(data[i][&#39;video_abs_path&#39;])
                        logger.error(f&#39;移动文件:【{filename}】&#39;)
                        shutil.move(
                            os.path.join(base_dir, filename),
                            os.path.join(os.path.join(os.getcwd(), &#39;dup_video&#39;), filename)
                        )
        logger.warning(&#39;---------------------超长视频----------------------&#39;)
        for i in self._over_length_video:
            _, name = os.path.split(i)
            logger.error(name)


def main():
    path = popup_get_folder(&#39;请选择[视频去重]文件夹&#39;)
    v = VideoDuplicate()
    v(path)


if __name__ == &#39;__main__&#39;:
    main()
Copier après la connexion

dans le répertoire de même niveau

python+opencv pour extraire des images vidéo et dédupliquer

import os 
import sys
import cv2
import glob
import json
import numpy as np
import skimage
from skimage import metrics
import hashlib
print(skimage.__version__)

def load_json(json_file):
    with open(json_file) as fp:
        data = json.load(fp)
    return data[&#39;outputs&#39;]


def ssim_dis(im1, im2):
    ssim = metrics.structural_similarity(im1, im2, data_range=255, multichannel=True)
    return ssim

# cv2
def isdarkOrBright(grayImg, thre_dark=10, thre_bright=230):
    mean = np.mean(grayImg)
    if mean < thre_dark or mean > thre_bright:
        return True 
    else:
        return False

def get_file_md5(file_name):
    """
    caculate md5
    : param file_name
    : return md5
    """
    m = hashlib.md5()
    with open(file_name, &#39;rb&#39;) as fobj:
        while True:
            data = fobj.read(4096)
            if not data:
                break
            m.update(data)
    return m.hexdigest()

def extract_frame(video_path, save_dir, prefix, ssim_thre=0.90):
    count = 0
    md5set = {}
    last_frame = None
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    index = 0
    tmp_frames = []
    while cap.isOpened():
        frameState, frame = cap.read()
        if not frameState or frame is None:
            break
        grayImg = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        # if isdarkOrBright(grayImg):
        #     index += 1
        #     continue
        assert cv2.imwrite(&#39;tmp.jpg&#39;, frame, [cv2.IMWRITE_JPEG_QUALITY, 100])
        md5 = get_file_md5(&#39;tmp.jpg&#39;)
        if md5 in md5set:
            md5set[md5] += 1
            index += 1
            continue
        md5set[md5] = 1
        
        save_path = os.path.join(save_dir, prefix+&#39;_&#39;+str(index).rjust(4, &#39;0&#39;)+&#39;.jpg&#39;)
        if last_frame is None:
            if cv2.imwrite(save_path, frame, [cv2.IMWRITE_JPEG_QUALITY, 100]):
                count += 1
                last_frame = frame
                tmp_frames.append(frame)
        else:
            dis = ssim_dis(last_frame, frame)
            if dis <= ssim_thre:
                save_frame = tmp_frames[len(tmp_frames)//2]
                if cv2.imwrite(save_path, save_frame, [cv2.IMWRITE_JPEG_QUALITY, 100]):
                    count += 1
                    last_frame = frame
                    tmp_frames = [frame]
            else:
                tmp_frames.append(frame)
        index += 1

    cap.release()
    return count
        
        

if __name__ == &#39;__main__&#39;:
    import sys
    video_path = "videos/***.mp4"
    video_name = video_path.split("/")[-1]
    prefix = video_name[:-4]
    save_imgs_dir = prefix
    if not os.path.exists(save_imgs_dir):
        os.mkdir(save_imgs_dir)
    N = extract_frame(video_path, save_imgs_dir, prefix)
    print(video_name, N)
Copier après la connexion

Dédupliquer des images, des vidéos, des fichiers

import os
from tkinter import *
from tkinter import messagebox
import tkinter.filedialog
root=Tk()
root.title("筛选重复的视频和照片")
root.geometry("500x500+500+200")
def wbb():
      a=[]
      c={}
      filename=tkinter.filedialog.askopenfilenames()
            
      for i in filename:
            with open(i,&#39;rb&#39;) as f:
                  a.append(f.read())
      for j in range(len(a)):
            c[a[j]]=filename[j]
      filename1=tkinter.filedialog.askdirectory()
     
      if filename1!="":
            p=1
            lb1.config(text=filename1+"下的文件为:")
            for h in c:
                k=c[h].split(".")[-1]
                with open(filename1+"/"+str(p)+"."+k,&#39;wb&#39;) as f:
                      f.write(h)
                p=p+1      
            for g in os.listdir(filename1):
                  txt.insert(END,g+&#39;\n&#39;)
                  
      else:
            messagebox.showinfo("提示",message =&#39;请选择路径&#39;)
frame1=Frame(root,relief=RAISED)
frame1.place(relx=0.0)

frame2=Frame(root,relief=GROOVE)
frame2.place(relx=0.5)

lb1=Label(frame1,text="等等下面会有变化?",font=(&#39;华文新魏&#39;,13))
lb1.pack(fill=X)    

txt=Text(frame1,width=30,height=50,font=(&#39;华文新魏&#39;,10))
txt.pack(fill=X)        

lb=Label(frame2,text="点我选择要进行筛选的文件:",font=(&#39;华文新魏&#39;,10))
lb.pack(fill=X)            
            
                  
btn=Button(frame2,text="请选择要进行筛选的文件",fg=&#39;black&#39;,relief="raised",bd="9",command=wbb)
btn.pack(fill=X)
root.mainloop()
Copier après la connexion

Renderings

Comment utiliser Python pour implémenter un gadget de déduplication vidéo

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:yisu.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal