initial commit and version 1.0

This commit is contained in:
2025-04-21 15:14:03 +02:00
commit ae6b2bbf44
82 changed files with 10782 additions and 0 deletions

441
moviepy/audio/AudioClip.py Normal file
View File

@@ -0,0 +1,441 @@
"""Implements AudioClip (base class for audio clips) and its main subclasses:
- Audio clips: AudioClip, AudioFileClip, AudioArrayClip
- Composition: CompositeAudioClip
"""
import numbers
import os
import numpy as np
import proglog
from moviepy.audio.io.ffmpeg_audiowriter import ffmpeg_audiowrite
from moviepy.audio.io.ffplay_audiopreviewer import ffplay_audiopreview
from moviepy.Clip import Clip
from moviepy.decorators import convert_path_to_string, requires_duration
from moviepy.tools import extensions_dict
class AudioClip(Clip):
"""Base class for audio clips.
See ``AudioFileClip`` and ``CompositeAudioClip`` for usable classes.
An AudioClip is a Clip with a ``frame_function`` attribute of
the form `` t -> [ f_t ]`` for mono sound and
``t-> [ f1_t, f2_t ]`` for stereo sound (the arrays are Numpy arrays).
The `f_t` are floats between -1 and 1. These bounds can be
trespassed without problems (the program will put the
sound back into the bounds at conversion time, without much impact).
Parameters
----------
frame_function
A function `t-> frame at time t`. The frame does not mean much
for a sound, it is just a float. What 'makes' the sound are
the variations of that float in the time.
duration
Duration of the clip (in seconds). Some clips are infinite, in
this case their duration will be ``None``.
nchannels
Number of channels (one or two for mono or stereo).
Examples
--------
.. code:: python
# Plays the note A in mono (a sine wave of frequency 440 Hz)
import numpy as np
frame_function = lambda t: np.sin(440 * 2 * np.pi * t)
clip = AudioClip(frame_function, duration=5, fps=44100)
clip.preview()
# Plays the note A in stereo (two sine waves of frequencies 440 and 880 Hz)
frame_function = lambda t: np.array([
np.sin(440 * 2 * np.pi * t),
np.sin(880 * 2 * np.pi * t)
]).T.copy(order="C")
clip = AudioClip(frame_function, duration=3, fps=44100)
clip.preview()
"""
def __init__(self, frame_function=None, duration=None, fps=None):
super().__init__()
if fps is not None:
self.fps = fps
if frame_function is not None:
self.frame_function = frame_function
frame0 = self.get_frame(0)
if hasattr(frame0, "__iter__"):
self.nchannels = len(list(frame0))
else:
self.nchannels = 1
if duration is not None:
self.duration = duration
self.end = duration
@requires_duration
def iter_chunks(
self,
chunksize=None,
chunk_duration=None,
fps=None,
quantize=False,
nbytes=2,
logger=None,
):
"""Iterator that returns the whole sound array of the clip by chunks"""
if fps is None:
fps = self.fps
logger = proglog.default_bar_logger(logger)
if chunk_duration is not None:
chunksize = int(chunk_duration * fps)
total_size = int(fps * self.duration)
nchunks = total_size // chunksize + 1
positions = np.linspace(0, total_size, nchunks + 1, endpoint=True, dtype=int)
for i in logger.iter_bar(chunk=list(range(nchunks))):
size = positions[i + 1] - positions[i]
assert size <= chunksize
timings = (1.0 / fps) * np.arange(positions[i], positions[i + 1])
yield self.to_soundarray(
timings, nbytes=nbytes, quantize=quantize, fps=fps, buffersize=chunksize
)
@requires_duration
def to_soundarray(
self, tt=None, fps=None, quantize=False, nbytes=2, buffersize=50000
):
"""
Transforms the sound into an array that can be played by pygame
or written in a wav file. See ``AudioClip.preview``.
Parameters
----------
fps
Frame rate of the sound for the conversion.
44100 for top quality.
nbytes
Number of bytes to encode the sound: 1 for 8bit sound,
2 for 16bit, 4 for 32bit sound.
"""
if tt is None:
if fps is None:
fps = self.fps
max_duration = 1 * buffersize / fps
if self.duration > max_duration:
stacker = np.vstack if self.nchannels == 2 else np.hstack
return stacker(
tuple(
self.iter_chunks(
fps=fps, quantize=quantize, nbytes=2, chunksize=buffersize
)
)
)
else:
tt = np.arange(0, self.duration, 1.0 / fps)
"""
elif len(tt)> 1.5*buffersize:
nchunks = int(len(tt)/buffersize+1)
tt_chunks = np.array_split(tt, nchunks)
return stacker([self.to_soundarray(tt=ttc, buffersize=buffersize, fps=fps,
quantize=quantize, nbytes=nbytes)
for ttc in tt_chunks])
"""
snd_array = self.get_frame(tt)
if quantize:
snd_array = np.maximum(-0.99, np.minimum(0.99, snd_array))
inttype = {1: "int8", 2: "int16", 4: "int32"}[nbytes]
snd_array = (2 ** (8 * nbytes - 1) * snd_array).astype(inttype)
return snd_array
def max_volume(self, stereo=False, chunksize=50000, logger=None):
"""Returns the maximum volume level of the clip."""
# max volume separated by channels if ``stereo`` and not mono
stereo = stereo and self.nchannels > 1
# zero for each channel
maxi = np.zeros(self.nchannels)
for chunk in self.iter_chunks(chunksize=chunksize, logger=logger):
maxi = np.maximum(maxi, abs(chunk).max(axis=0))
# if mono returns float, otherwise array of volumes by channel
return maxi if stereo else maxi[0]
@requires_duration
@convert_path_to_string("filename")
def write_audiofile(
self,
filename,
fps=None,
nbytes=2,
buffersize=2000,
codec=None,
bitrate=None,
ffmpeg_params=None,
write_logfile=False,
logger="bar",
):
"""Writes an audio file from the AudioClip.
Parameters
----------
filename
Name of the output file, as a string or a path-like object.
fps
Frames per second. If not set, it will try default to self.fps if
already set, otherwise it will default to 44100.
nbytes
Sample width (set to 2 for 16-bit sound, 4 for 32-bit sound)
buffersize
The sound is not generated all at once, but rather made by bunches
of frames (chunks). ``buffersize`` is the size of such a chunk.
Try varying it if you meet audio problems (but you shouldn't
have to). Default to 2000
codec
Which audio codec should be used. If None provided, the codec is
determined based on the extension of the filename. Choose
'pcm_s16le' for 16-bit wav and 'pcm_s32le' for 32-bit wav.
bitrate
Audio bitrate, given as a string like '50k', '500k', '3000k'.
Will determine the size and quality of the output file.
Note that it mainly an indicative goal, the bitrate won't
necessarily be the this in the output file.
ffmpeg_params
Any additional parameters you would like to pass, as a list
of terms, like ['-option1', 'value1', '-option2', 'value2']
write_logfile
If true, produces a detailed logfile named filename + '.log'
when writing the file
logger
Either ``"bar"`` for progress bar or ``None`` or any Proglog logger.
"""
# print(filename)
if not fps:
if not self.fps:
fps = 44100
else:
fps = self.fps
if codec is None:
name, ext = os.path.splitext(os.path.basename(filename))
try:
codec = extensions_dict[ext[1:]]["codec"][0]
except KeyError:
raise ValueError(
"MoviePy couldn't find the codec associated "
"with the filename. Provide the 'codec' "
"parameter in write_audiofile."
)
return ffmpeg_audiowrite(
self,
filename,
fps,
nbytes,
buffersize,
codec=codec,
bitrate=bitrate,
write_logfile=write_logfile,
ffmpeg_params=ffmpeg_params,
logger=logger,
)
@requires_duration
def audiopreview(
self, fps=None, buffersize=2000, nbytes=2, audio_flag=None, video_flag=None
):
"""
Preview an AudioClip using ffplay
Parameters
----------
fps
Frame rate of the sound. 44100 gives top quality, but may cause
problems if your computer is not fast enough and your clip is
complicated. If the sound jumps during the preview, lower it
(11025 is still fine, 5000 is tolerable).
buffersize
The sound is not generated all at once, but rather made by bunches
of frames (chunks). ``buffersize`` is the size of such a chunk.
Try varying it if you meet audio problems (but you shouldn't
have to).
nbytes:
Number of bytes to encode the sound: 1 for 8bit sound, 2 for
16bit, 4 for 32bit sound. 2 bytes is fine.
audio_flag, video_flag:
Instances of class threading events that are used to synchronize
video and audio during ``VideoClip.preview()``.
"""
ffplay_audiopreview(
clip=self,
fps=fps,
buffersize=buffersize,
nbytes=nbytes,
audio_flag=audio_flag,
video_flag=video_flag,
)
def __add__(self, other):
if isinstance(other, AudioClip):
return concatenate_audioclips([self, other])
return super(AudioClip, self).__add__(other)
class AudioArrayClip(AudioClip):
"""
An audio clip made from a sound array.
Parameters
----------
array
A Numpy array representing the sound, of size Nx1 for mono,
Nx2 for stereo.
fps
Frames per second : speed at which the sound is supposed to be
played.
"""
def __init__(self, array, fps):
Clip.__init__(self)
self.array = array
self.fps = fps
self.duration = 1.0 * len(array) / fps
def frame_function(t):
"""Complicated, but must be able to handle the case where t
is a list of the form sin(t).
"""
if isinstance(t, np.ndarray):
array_inds = np.round(self.fps * t).astype(int)
in_array = (array_inds >= 0) & (array_inds < len(self.array))
result = np.zeros((len(t), 2))
result[in_array] = self.array[array_inds[in_array]]
return result
else:
i = int(self.fps * t)
if i < 0 or i >= len(self.array):
return 0 * self.array[0]
else:
return self.array[i]
self.frame_function = frame_function
self.nchannels = len(list(self.get_frame(0)))
class CompositeAudioClip(AudioClip):
"""Clip made by composing several AudioClips.
An audio clip made by putting together several audio clips.
Parameters
----------
clips
List of audio clips, which may start playing at different times or
together, depends on their ``start`` attributes. If all have their
``duration`` attribute set, the duration of the composite clip is
computed automatically.
"""
def __init__(self, clips):
self.clips = clips
self.nchannels = max(clip.nchannels for clip in self.clips)
# self.duration is set at AudioClip
duration = None
for end in self.ends:
if end is None:
break
duration = max(end, duration or 0)
# self.fps is set at AudioClip
fps = None
for clip in self.clips:
if hasattr(clip, "fps") and isinstance(clip.fps, numbers.Number):
fps = max(clip.fps, fps or 0)
super().__init__(duration=duration, fps=fps)
@property
def starts(self):
"""Returns starting times for all clips in the composition."""
return (clip.start for clip in self.clips)
@property
def ends(self):
"""Returns ending times for all clips in the composition."""
return (clip.end for clip in self.clips)
def frame_function(self, t):
"""Renders a frame for the composition for the time ``t``."""
played_parts = [clip.is_playing(t) for clip in self.clips]
sounds = [
clip.get_frame(t - clip.start) * np.array([part]).T
for clip, part in zip(self.clips, played_parts)
if (part is not False)
]
if isinstance(t, np.ndarray):
zero = np.zeros((len(t), self.nchannels))
else:
zero = np.zeros(self.nchannels)
return zero + sum(sounds)
def concatenate_audioclips(clips):
"""Concatenates one AudioClip after another, in the order that are passed
to ``clips`` parameter.
Parameters
----------
clips
List of audio clips, which will be played one after other.
"""
# start, end/start2, end2/start3... end
starts_end = np.cumsum([0, *[clip.duration for clip in clips]])
newclips = [clip.with_start(t) for clip, t in zip(clips, starts_end[:-1])]
return CompositeAudioClip(newclips).with_duration(starts_end[-1])

View File

@@ -0,0 +1 @@
"""Everything about audio manipulation."""

View File

@@ -0,0 +1,70 @@
from dataclasses import dataclass
import numpy as np
from moviepy.audio.AudioClip import CompositeAudioClip
from moviepy.audio.fx.MultiplyVolume import MultiplyVolume
from moviepy.Clip import Clip
from moviepy.decorators import audio_video_effect
from moviepy.Effect import Effect
@dataclass
class AudioDelay(Effect):
"""Repeats audio certain number of times at constant intervals multiplying
their volume levels using a linear space in the range 1 to ``decay`` argument
value.
Parameters
----------
offset : float, optional
Gap between repetitions start times, in seconds.
n_repeats : int, optional
Number of repetitions (without including the clip itself).
decay : float, optional
Multiplication factor for the volume level of the last repetition. Each
repetition will have a value in the linear function between 1 and this value,
increasing or decreasing constantly. Keep in mind that the last repetition
will be muted if this is 0, and if is greater than 1, the volume will increase
for each repetition.
Examples
--------
.. code:: python
from moviepy import *
videoclip = AudioFileClip('myaudio.wav').with_effects([
afx.AudioDelay(offset=.2, n_repeats=10, decayment=.2)
])
# stereo A note
frame_function = lambda t: np.array(
[np.sin(440 * 2 * np.pi * t), np.sin(880 * 2 * np.pi * t)]
).T
clip = AudioClip(frame_function=frame_function, duration=0.1, fps=44100)
clip = clip.with_effects([afx.AudioDelay(offset=.2, n_repeats=11, decay=0)])
"""
offset: float = 0.2
n_repeats: int = 8
decay: float = 1
@audio_video_effect
def apply(self, clip: Clip) -> Clip:
"""Apply the effect to the clip."""
decayments = np.linspace(1, max(0, self.decay), self.n_repeats + 1)
return CompositeAudioClip(
[
clip.copy(),
*[
clip.with_start((rep + 1) * self.offset).with_effects(
[MultiplyVolume(decayments[rep + 1])]
)
for rep in range(self.n_repeats)
],
]
)

View File

@@ -0,0 +1,60 @@
from dataclasses import dataclass
import numpy as np
from moviepy.Clip import Clip
from moviepy.decorators import audio_video_effect
from moviepy.Effect import Effect
from moviepy.tools import convert_to_seconds
@dataclass
class AudioFadeIn(Effect):
"""Return an audio (or video) clip that is first mute, then the
sound arrives progressively over ``duration`` seconds.
Parameters
----------
duration : float
How long does it take for the sound to return to its normal level.
Examples
--------
.. code:: python
clip = VideoFileClip("media/chaplin.mp4")
clip.with_effects([afx.AudioFadeIn("00:00:06")])
"""
duration: float
def __post_init__(self):
self.duration = convert_to_seconds(self.duration)
def _mono_factor_getter(self):
return lambda t, duration: np.minimum(t / duration, 1)
def _stereo_factor_getter(self, nchannels):
def getter(t, duration):
factor = np.minimum(t / duration, 1)
return np.array([factor for _ in range(nchannels)]).T
return getter
@audio_video_effect
def apply(self, clip: Clip) -> Clip:
"""Apply the effect to the clip."""
if clip.duration is None:
raise ValueError("Attribute 'duration' not set")
get_factor = (
self._mono_factor_getter()
if clip.nchannels == 1
else self._stereo_factor_getter(clip.nchannels)
)
return clip.transform(
lambda get_frame, t: get_factor(t, self.duration) * get_frame(t),
)

View File

@@ -0,0 +1,62 @@
from dataclasses import dataclass
import numpy as np
from moviepy.Clip import Clip
from moviepy.decorators import audio_video_effect
from moviepy.Effect import Effect
from moviepy.tools import convert_to_seconds
@dataclass
class AudioFadeOut(Effect):
"""Return a sound clip where the sound fades out progressively
over ``duration`` seconds at the end of the clip.
Parameters
----------
duration : float
How long does it take for the sound to reach the zero level at the end
of the clip.
Examples
--------
.. code:: python
clip = VideoFileClip("media/chaplin.mp4")
clip.with_effects([afx.AudioFadeOut("00:00:06")])
"""
duration: float
def __post_init__(self):
self.duration = convert_to_seconds(self.duration)
def _mono_factor_getter(self, clip_duration):
return lambda t, duration: np.minimum(1.0 * (clip_duration - t) / duration, 1)
def _stereo_factor_getter(self, clip_duration, nchannels):
def getter(t, duration):
factor = np.minimum(1.0 * (clip_duration - t) / duration, 1)
return np.array([factor for _ in range(nchannels)]).T
return getter
@audio_video_effect
def apply(self, clip: Clip) -> Clip:
"""Apply the effect to the clip."""
if clip.duration is None:
raise ValueError("Attribute 'duration' not set")
get_factor = (
self._mono_factor_getter(clip.duration)
if clip.nchannels == 1
else self._stereo_factor_getter(clip.duration, clip.nchannels)
)
return clip.transform(
lambda get_frame, t: get_factor(t, self.duration) * get_frame(t),
keep_duration=True,
)

View File

@@ -0,0 +1,41 @@
from dataclasses import dataclass
from moviepy.audio.AudioClip import concatenate_audioclips
from moviepy.Clip import Clip
from moviepy.decorators import audio_video_effect
from moviepy.Effect import Effect
@dataclass
class AudioLoop(Effect):
"""Loops over an audio clip.
Returns an audio clip that plays the given clip either
`n_loops` times, or during `duration` seconds.
Examples
--------
.. code:: python
from moviepy import *
videoclip = VideoFileClip('myvideo.mp4')
music = AudioFileClip('music.ogg')
audio = music.with_effects([afx.AudioLoop(duration=videoclip.duration)])
videoclip.with_audio(audio)
"""
n_loops: int = None
duration: float = None
@audio_video_effect
def apply(self, clip: Clip) -> Clip:
"""Apply the effect to the clip."""
if self.duration is not None:
self.n_loops = int(self.duration / clip.duration) + 1
return concatenate_audioclips(self.n_loops * [clip]).with_duration(
self.duration
)
return concatenate_audioclips(self.n_loops * [clip])

View File

@@ -0,0 +1,31 @@
from dataclasses import dataclass
from moviepy.audio.fx.MultiplyVolume import MultiplyVolume
from moviepy.Clip import Clip
from moviepy.decorators import audio_video_effect
from moviepy.Effect import Effect
@dataclass
class AudioNormalize(Effect):
"""Return a clip whose volume is normalized to 0db.
Return an audio (or video) clip whose audio volume is normalized
so that the maximum volume is at 0db, the maximum achievable volume.
Examples
--------
>>> from moviepy import *
>>> videoclip = VideoFileClip('myvideo.mp4').with_effects([afx.AudioNormalize()])
"""
@audio_video_effect
def apply(self, clip: Clip) -> Clip:
"""Apply the effect to the clip."""
max_volume = clip.max_volume()
if max_volume == 0:
return clip
else:
return clip.with_effects([MultiplyVolume(1 / max_volume)])

View File

@@ -0,0 +1,44 @@
from dataclasses import dataclass
from moviepy.Clip import Clip
from moviepy.decorators import audio_video_effect
from moviepy.Effect import Effect
@dataclass
class MultiplyStereoVolume(Effect):
"""For a stereo audioclip, this function enables to change the volume
of the left and right channel separately (with the factors `left`
and `right`). Makes a stereo audio clip in which the volume of left
and right is controllable.
Examples
--------
.. code:: python
from moviepy import AudioFileClip
music = AudioFileClip('music.ogg')
# mutes left channel
audio_r = music.with_effects([afx.MultiplyStereoVolume(left=0, right=1)])
# halves audio volume
audio_h = music.with_effects([afx.MultiplyStereoVolume(left=0.5, right=0.5)])
"""
left: float = 1
right: float = 1
@audio_video_effect
def apply(self, clip: Clip) -> Clip:
"""Apply the effect to the clip."""
def stereo_volume(get_frame, t):
frame = get_frame(t)
if len(frame) == 1: # mono
frame *= self.left if self.left is not None else self.right
else: # stereo, stereo surround...
for i in range(len(frame[0])): # odd channels are left
frame[:, i] *= self.left if i % 2 == 0 else self.right
return frame
return clip.transform(stereo_volume)

View File

@@ -0,0 +1,90 @@
from dataclasses import dataclass
import numpy as np
from moviepy.Clip import Clip
from moviepy.decorators import audio_video_effect
from moviepy.Effect import Effect
from moviepy.tools import convert_to_seconds
@dataclass
class MultiplyVolume(Effect):
"""Returns a clip with audio volume multiplied by the
value `factor`. Can be applied to both audio and video clips.
Parameters
----------
factor : float
Volume multiplication factor.
start_time : float, optional
Time from the beginning of the clip until the volume transformation
begins to take effect, in seconds. By default at the beginning.
end_time : float, optional
Time from the beginning of the clip until the volume transformation
ends to take effect, in seconds. By default at the end.
Examples
--------
.. code:: python
from moviepy import AudioFileClip
music = AudioFileClip("music.ogg")
# doubles audio volume
doubled_audio_clip = music.with_effects([afx.MultiplyVolume(2)])
# halves audio volume
half_audio_clip = music.with_effects([afx.MultiplyVolume(0.5)])
# silences clip during one second at third
effect = afx.MultiplyVolume(0, start_time=2, end_time=3)
silenced_clip = clip.with_effects([effect])
"""
factor: float
start_time: float = None
end_time: float = None
def __post_init__(self):
if self.start_time is not None:
self.start_time = convert_to_seconds(self.start_time)
if self.end_time is not None:
self.end_time = convert_to_seconds(self.end_time)
def _multiply_volume_in_range(self, factor, start_time, end_time, nchannels):
def factors_filter(factor, t):
return np.array([factor if start_time <= t_ <= end_time else 1 for t_ in t])
def multiply_stereo_volume(get_frame, t):
return np.multiply(
get_frame(t),
np.array([factors_filter(factor, t) for _ in range(nchannels)]).T,
)
def multiply_mono_volume(get_frame, t):
return np.multiply(get_frame(t), factors_filter(factor, t))
return multiply_mono_volume if nchannels == 1 else multiply_stereo_volume
@audio_video_effect
def apply(self, clip: Clip) -> Clip:
"""Apply the effect to the clip."""
if self.start_time is None and self.end_time is None:
return clip.transform(
lambda get_frame, t: self.factor * get_frame(t),
keep_duration=True,
)
return clip.transform(
self._multiply_volume_in_range(
self.factor,
clip.start if self.start_time is None else self.start_time,
clip.end if self.end_time is None else self.end_time,
clip.nchannels,
),
keep_duration=True,
)

View File

@@ -0,0 +1,22 @@
"""All the audio effects that can be applied to AudioClip and VideoClip."""
# import every video fx function
from moviepy.audio.fx.AudioDelay import AudioDelay
from moviepy.audio.fx.AudioFadeIn import AudioFadeIn
from moviepy.audio.fx.AudioFadeOut import AudioFadeOut
from moviepy.audio.fx.AudioLoop import AudioLoop
from moviepy.audio.fx.AudioNormalize import AudioNormalize
from moviepy.audio.fx.MultiplyStereoVolume import MultiplyStereoVolume
from moviepy.audio.fx.MultiplyVolume import MultiplyVolume
__all__ = (
"AudioDelay",
"AudioFadeIn",
"AudioFadeOut",
"AudioLoop",
"AudioNormalize",
"MultiplyStereoVolume",
"MultiplyVolume",
)

View File

@@ -0,0 +1,85 @@
"""Implements AudioFileClip, a class for audio clips creation using audio files."""
from moviepy.audio.AudioClip import AudioClip
from moviepy.audio.io.readers import FFMPEG_AudioReader
from moviepy.decorators import convert_path_to_string
class AudioFileClip(AudioClip):
"""
An audio clip read from a sound file, or an array.
The whole file is not loaded in memory. Instead, only a portion is
read and stored in memory. this portion includes frames before
and after the last frames read, so that it is fast to read the sound
backward and forward.
Parameters
----------
filename
Either a soundfile name (of any extension supported by ffmpeg)
as a string or a path-like object,
or an array representing a sound. If the soundfile is not a .wav,
it will be converted to .wav first, using the ``fps`` and
``bitrate`` arguments.
buffersize:
Size to load in memory (in number of frames)
Attributes
----------
nbytes
Number of bits per frame of the original audio file.
fps
Number of frames per second in the audio file
buffersize
See Parameters.
Lifetime
--------
Note that this creates subprocesses and locks files. If you construct one
of these instances, you must call close() afterwards, or the subresources
will not be cleaned up until the process ends.
Examples
--------
.. code:: python
snd = AudioFileClip("song.wav")
snd.close()
"""
@convert_path_to_string("filename")
def __init__(
self, filename, decode_file=False, buffersize=200000, nbytes=2, fps=44100
):
AudioClip.__init__(self)
self.filename = filename
self.reader = FFMPEG_AudioReader(
filename,
decode_file=decode_file,
fps=fps,
nbytes=nbytes,
buffersize=buffersize,
)
self.fps = fps
self.duration = self.reader.duration
self.end = self.reader.duration
self.buffersize = self.reader.buffersize
self.filename = filename
self.frame_function = lambda t: self.reader.get_frame(t)
self.nchannels = self.reader.nchannels
def close(self):
"""Close the internal reader."""
if self.reader:
self.reader.close()
self.reader = None

View File

@@ -0,0 +1 @@
"""Class and methods to read, write, preview audiofiles."""

View File

@@ -0,0 +1,240 @@
"""MoviePy audio writing with ffmpeg."""
import subprocess as sp
from log import log_step
import proglog
from moviepy.config import FFMPEG_BINARY
from moviepy.decorators import requires_duration
from moviepy.tools import cross_platform_popen_params, ffmpeg_escape_filename
class FFMPEG_AudioWriter:
"""
A class to write an AudioClip into an audio file.
Parameters
----------
filename
Name of any video or audio file, like ``video.mp4`` or ``sound.wav`` etc.
size
Size (width,height) in pixels of the output video.
fps_input
Frames per second of the input audio (given by the AudioClip being
written down).
nbytes : int, optional
Number of bytes per sample. Default is 2 (16-bit audio).
nchannels : int, optional
Number of audio channels. Default is 2 (stereo).
codec : str, optional
The codec to use for the output. Default is ``libfdk_aac``.
bitrate:
A string indicating the bitrate of the final video. Only
relevant for codecs which accept a bitrate.
input_video : str, optional
Path to an input video file. If provided, the audio will be muxed with this video.
If not provided, the output will be audio-only.
logfile : file-like object or None, optional
A file object where FFMPEG logs will be written. If None, logs are suppressed.
ffmpeg_params : list of str, optional
Additional FFMPEG command-line parameters to customize the output.
"""
def __init__(
self,
filename,
fps_input,
nbytes=2,
nchannels=2,
codec="libfdk_aac",
bitrate=None,
input_video=None,
logfile=None,
ffmpeg_params=None,
):
if logfile is None:
logfile = sp.PIPE
self.logfile = logfile
self.filename = filename
self.codec = codec
self.ext = self.filename.split(".")[-1]
# order is important
cmd = [
FFMPEG_BINARY,
"-y",
"-loglevel",
"error" if logfile == sp.PIPE else "info",
"-f",
"s%dle" % (8 * nbytes),
"-acodec",
"pcm_s%dle" % (8 * nbytes),
"-ar",
"%d" % fps_input,
"-ac",
"%d" % nchannels,
"-i",
"-",
]
if input_video is None:
cmd.extend(["-vn"])
else:
cmd.extend(["-i", ffmpeg_escape_filename(input_video), "-vcodec", "copy"])
cmd.extend(["-acodec", codec] + ["-ar", "%d" % fps_input])
cmd.extend(["-strict", "-2"]) # needed to support codec 'aac'
if bitrate is not None:
cmd.extend(["-ab", bitrate])
if ffmpeg_params is not None:
cmd.extend(ffmpeg_params)
cmd.extend([ffmpeg_escape_filename(filename)])
popen_params = cross_platform_popen_params(
{"stdout": sp.DEVNULL, "stderr": logfile, "stdin": sp.PIPE}
)
self.proc = sp.Popen(cmd, **popen_params)
def write_frames(self, frames_array):
"""Send the audio frame (a chunck of ``AudioClip``) to ffmpeg for writting"""
try:
self.proc.stdin.write(frames_array.tobytes())
except IOError as err:
_, ffmpeg_error = self.proc.communicate()
if ffmpeg_error is not None:
ffmpeg_error = ffmpeg_error.decode()
else:
# The error was redirected to a logfile with `write_logfile=True`,
# so read the error from that file instead
self.logfile.seek(0)
ffmpeg_error = self.logfile.read()
error = (
f"{err}\n\nMoviePy error: FFMPEG encountered the following error while "
f"writing file {self.filename}:\n\n {ffmpeg_error}"
)
if "Unknown encoder" in ffmpeg_error:
error += (
"\n\nThe audio export failed because FFMPEG didn't find the "
f"specified codec for audio encoding {self.codec}. "
"Please install this codec or change the codec when calling "
"write_videofile or write_audiofile.\nFor instance for mp3:\n"
" >>> write_videofile('myvid.mp4', audio_codec='libmp3lame')"
)
elif "incorrect codec parameters ?" in ffmpeg_error:
error += (
"\n\nThe audio export failed, possibly because the "
f"codec specified for the video {self.codec} is not compatible"
f" with the given extension {self.ext}. Please specify a "
"valid 'codec' argument in write_audiofile or 'audio_codoc'"
"argument in write_videofile. This would be "
"'libmp3lame' for mp3, 'libvorbis' for ogg..."
)
elif "bitrate not specified" in ffmpeg_error:
error += (
"\n\nThe audio export failed, possibly because the "
"bitrate you specified was too high or too low for "
"the audio codec."
)
elif "Invalid encoder type" in ffmpeg_error:
error += (
"\n\nThe audio export failed because the codec "
"or file extension you provided is not suitable for audio"
)
raise IOError(error)
def close(self):
"""Closes the writer, terminating the subprocess if is still alive."""
if hasattr(self, "proc") and self.proc:
self.proc.stdin.close()
self.proc.stdin = None
if self.proc.stderr is not None:
self.proc.stderr.close()
self.proc.stderr = None
# If this causes deadlocks, consider terminating instead.
self.proc.wait()
self.proc = None
def __del__(self):
# If the garbage collector comes, make sure the subprocess is terminated.
self.close()
# Support the Context Manager protocol, to ensure that resources are cleaned up.
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
@requires_duration
def ffmpeg_audiowrite(
clip,
filename,
fps,
nbytes,
buffersize,
codec="libvorbis",
bitrate=None,
write_logfile=False,
ffmpeg_params=None,
logger=None,
):
"""
A function that wraps the FFMPEG_AudioWriter to write an AudioClip
to a file.
"""
if write_logfile:
logfile = open(filename + ".log", "w+")
else:
logfile = None
# logger = proglog.default_bar_logger(logger)
# logger(message="MoviePy - Writing audio in %s" % filename)
writer = FFMPEG_AudioWriter(
filename,
fps,
nbytes,
clip.nchannels,
codec=codec,
bitrate=bitrate,
logfile=logfile,
ffmpeg_params=ffmpeg_params,
)
total_chunks = len(list(clip.iter_chunks(chunksize=buffersize, quantize=True, nbytes=nbytes, fps=fps)))
old_progress = -1
for i, chunk in enumerate(clip.iter_chunks(chunksize=buffersize, quantize=True, nbytes=nbytes, fps=fps)):
# Calculate the progress
progress = (i + 1) / total_chunks * 100
int_progress = int(progress)
# Display the progress if it has changed from the last time
if int_progress != old_progress:
old_progress = int_progress
log_step("audio_extraction", old_progress, "extracting the audio from the video")
# Écrire le chunk audio
writer.write_frames(chunk)
writer.close()
if write_logfile:
logfile.close()
# logger(message="MoviePy - Done.")

View File

@@ -0,0 +1,154 @@
"""MoviePy audio writing with ffmpeg."""
import subprocess as sp
from moviepy.config import FFPLAY_BINARY
from moviepy.decorators import requires_duration
from moviepy.tools import cross_platform_popen_params
class FFPLAY_AudioPreviewer:
"""
A class to preview an AudioClip.
Parameters
----------
fps_input
Frames per second of the input audio (given by the AudioClip being
written down).
nbytes:
Number of bytes to encode the sound: 1 for 8bit sound, 2 for
16bit, 4 for 32bit sound. Default is 2 bytes, it's fine.
nchannels:
Number of audio channels in the clip. Default to 2 channels.
"""
def __init__(
self,
fps_input,
nbytes=2,
nchannels=2,
):
# order is important
cmd = [
FFPLAY_BINARY,
"-autoexit", # If you don't precise, ffplay won't stop at end
"-nodisp", # If you don't precise a window is
"-f",
"s%dle" % (8 * nbytes),
"-ar",
"%d" % fps_input,
"-ac",
"%d" % nchannels,
"-i",
"-",
]
popen_params = cross_platform_popen_params(
{"stdout": sp.DEVNULL, "stderr": sp.STDOUT, "stdin": sp.PIPE}
)
self.proc = sp.Popen(cmd, **popen_params)
def write_frames(self, frames_array):
"""Send a raw audio frame (a chunck of audio) to ffplay to be played"""
try:
self.proc.stdin.write(frames_array.tobytes())
except IOError as err:
_, ffplay_error = self.proc.communicate()
if ffplay_error is not None:
ffplay_error = ffplay_error.decode()
else:
# The error was redirected to a logfile with `write_logfile=True`,
# so read the error from that file instead
self.logfile.seek(0)
ffplay_error = self.logfile.read()
error = (
f"{err}\n\nMoviePy error: FFPLAY encountered the following error while "
f":\n\n {ffplay_error}"
)
raise IOError(error)
def close(self):
"""Closes the writer, terminating the subprocess if is still alive."""
if hasattr(self, "proc") and self.proc:
self.proc.stdin.close()
self.proc.stdin = None
if self.proc.stderr is not None:
self.proc.stderr.close()
self.proc.stderr = None
# If this causes deadlocks, consider terminating instead.
self.proc.wait()
self.proc = None
def __del__(self):
# If the garbage collector comes, make sure the subprocess is terminated.
self.close()
# Support the Context Manager protocol, to ensure that resources are cleaned up.
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
@requires_duration
def ffplay_audiopreview(
clip, fps=None, buffersize=2000, nbytes=2, audio_flag=None, video_flag=None
):
"""
A function that wraps the FFPLAY_AudioPreviewer to preview an AudioClip
Parameters
----------
fps
Frame rate of the sound. 44100 gives top quality, but may cause
problems if your computer is not fast enough and your clip is
complicated. If the sound jumps during the preview, lower it
(11025 is still fine, 5000 is tolerable).
buffersize
The sound is not generated all at once, but rather made by bunches
of frames (chunks). ``buffersize`` is the size of such a chunk.
Try varying it if you meet audio problems (but you shouldn't
have to).
nbytes:
Number of bytes to encode the sound: 1 for 8bit sound, 2 for
16bit, 4 for 32bit sound. 2 bytes is fine.
audio_flag, video_flag:
Instances of class threading events that are used to synchronize
video and audio during ``VideoClip.preview()``.
"""
if not fps:
if not clip.fps:
fps = 44100
else:
fps = clip.fps
with FFPLAY_AudioPreviewer(fps, nbytes, clip.nchannels) as previewer:
first_frame = True
for chunk in clip.iter_chunks(
chunksize=buffersize, quantize=True, nbytes=nbytes, fps=fps
):
# On first frame, wait for video
if first_frame:
first_frame = False
if audio_flag is not None:
audio_flag.set() # Say to video that audio is ready
if video_flag is not None:
video_flag.wait() # Wait for video to be ready
previewer.write_frames(chunk)

304
moviepy/audio/io/readers.py Normal file
View File

@@ -0,0 +1,304 @@
"""MoviePy audio reading with ffmpeg."""
import subprocess as sp
import warnings
import numpy as np
from moviepy.config import FFMPEG_BINARY
from moviepy.tools import cross_platform_popen_params, ffmpeg_escape_filename
from moviepy.video.io.ffmpeg_reader import ffmpeg_parse_infos
class FFMPEG_AudioReader:
"""A class to read the audio in either video files or audio files
using ffmpeg. ffmpeg will read any audio and transform them into
raw data.
Parameters
----------
filename
Name of any video or audio file, like ``video.mp4`` or
``sound.wav`` etc.
buffersize
The size of the buffer to use. Should be bigger than the buffer
used by ``write_audiofile``
print_infos
Print the ffmpeg infos on the file being read (for debugging)
fps
Desired frames per second in the decoded signal that will be
received from ffmpeg
nbytes
Desired number of bytes (1,2,4) in the signal that will be
received from ffmpeg
"""
def __init__(
self,
filename,
buffersize,
decode_file=False,
print_infos=False,
fps=44100,
nbytes=2,
nchannels=2,
):
# TODO bring FFMPEG_AudioReader more in line with FFMPEG_VideoReader
# E.g. here self.pos is still 1-indexed.
# (or have them inherit from a shared parent class)
self.filename = filename
self.nbytes = nbytes
self.fps = fps
self.format = "s%dle" % (8 * nbytes)
self.codec = "pcm_s%dle" % (8 * nbytes)
self.nchannels = nchannels
infos = ffmpeg_parse_infos(filename, decode_file=decode_file)
self.duration = infos["duration"]
self.bitrate = infos["audio_bitrate"]
self.infos = infos
self.proc = None
self.n_frames = int(self.fps * self.duration)
self.buffersize = min(self.n_frames + 1, buffersize)
self.buffer = None
self.buffer_startframe = 1
self.initialize()
self.buffer_around(1)
def initialize(self, start_time=0):
"""Opens the file, creates the pipe."""
self.close() # if any
if start_time != 0:
offset = min(1, start_time)
i_arg = [
"-ss",
"%.05f" % (start_time - offset),
"-i",
ffmpeg_escape_filename(self.filename),
"-vn",
"-ss",
"%.05f" % offset,
]
else:
i_arg = ["-i", ffmpeg_escape_filename(self.filename), "-vn"]
cmd = (
[FFMPEG_BINARY]
+ i_arg
+ [
"-loglevel",
"error",
"-f",
self.format,
"-acodec",
self.codec,
"-ar",
"%d" % self.fps,
"-ac",
"%d" % self.nchannels,
"-",
]
)
popen_params = cross_platform_popen_params(
{
"bufsize": self.buffersize,
"stdout": sp.PIPE,
"stderr": sp.PIPE,
"stdin": sp.DEVNULL,
}
)
self.proc = sp.Popen(cmd, **popen_params)
self.pos = np.round(self.fps * start_time)
def skip_chunk(self, chunksize):
"""Skip a chunk of audio data by reading and discarding the specified number of
frames from the audio stream. The audio stream is read from the `proc` stdout.
After skipping the chunk, the `pos` attribute is updated accordingly.
Parameters
----------
chunksize (int):
The number of audio frames to skip.
"""
_ = self.proc.stdout.read(self.nchannels * chunksize * self.nbytes)
self.proc.stdout.flush()
self.pos = self.pos + chunksize
def read_chunk(self, chunksize):
"""Read a chunk of audio data from the audio stream.
This method reads a chunk of audio data from the audio stream. The
specified number of frames, given by `chunksize`, is read from the
`proc` stdout. The audio data is returned as a NumPy array, where
each row corresponds to a frame and each column corresponds to a
channel. If there is not enough audio left to read, the remaining
portion is padded with zeros, ensuring that the returned array has
the desired length. The `pos` attribute is updated accordingly.
Parameters
----------
chunksize (float):
The desired number of audio frames to read.
"""
# chunksize is not being autoconverted from float to int
chunksize = int(round(chunksize))
s = self.proc.stdout.read(self.nchannels * chunksize * self.nbytes)
data_type = {1: "int8", 2: "int16", 4: "int32"}[self.nbytes]
if hasattr(np, "frombuffer"):
result = np.frombuffer(s, dtype=data_type)
else:
result = np.fromstring(s, dtype=data_type)
result = (1.0 * result / 2 ** (8 * self.nbytes - 1)).reshape(
(int(len(result) / self.nchannels), self.nchannels)
)
# Pad the read chunk with zeros when there isn't enough audio
# left to read, so the buffer is always at full length.
pad = np.zeros((chunksize - len(result), self.nchannels), dtype=result.dtype)
result = np.concatenate([result, pad])
# self.proc.stdout.flush()
self.pos = self.pos + chunksize
return result
def seek(self, pos):
"""Read a frame at time t. Note for coders: getting an arbitrary
frame in the video with ffmpeg can be painfully slow if some
decoding has to be done. This function tries to avoid fectching
arbitrary frames whenever possible, by moving between adjacent
frames.
"""
if (pos < self.pos) or (pos > (self.pos + 1000000)):
t = 1.0 * pos / self.fps
self.initialize(t)
elif pos > self.pos:
self.skip_chunk(pos - self.pos)
# last case standing: pos = current pos
self.pos = pos
def get_frame(self, tt):
"""Retrieve the audio frame(s) corresponding to the given timestamp(s).
Parameters
----------
tt (float or numpy.ndarray):
The timestamp(s) at which to retrieve the audio frame(s).
If `tt` is a single float value, the frame corresponding to that
timestamp is returned. If `tt` is a NumPy array of timestamps, an
array of frames corresponding to each timestamp is returned.
"""
if isinstance(tt, np.ndarray):
# lazy implementation, but should not cause problems in
# 99.99 % of the cases
# elements of t that are actually in the range of the
# audio file.
in_time = (tt >= 0) & (tt < self.duration)
# Check that the requested time is in the valid range
if not in_time.any():
raise IOError(
"Error in file %s, " % (self.filename)
+ "Accessing time t=%.02f-%.02f seconds, " % (tt[0], tt[-1])
+ "with clip duration=%f seconds, " % self.duration
)
# The np.round in the next line is super-important.
# Removing it results in artifacts in the noise.
frames = np.round((self.fps * tt)).astype(int)[in_time]
fr_min, fr_max = frames.min(), frames.max()
# if min and max frames don't fit the buffer, it results in IndexError
# we avoid that by recursively calling this function on smaller length
# and concatenate the results:w
max_frame_threshold = fr_min + self.buffersize // 2
threshold_idx = np.searchsorted(frames, max_frame_threshold, side="right")
if threshold_idx != len(frames):
in_time_head = in_time[0:threshold_idx]
in_time_tail = in_time[threshold_idx:]
return np.concatenate(
[self.get_frame(in_time_head), self.get_frame(in_time_tail)]
)
if not (0 <= (fr_min - self.buffer_startframe) < len(self.buffer)):
self.buffer_around(fr_min)
elif not (0 <= (fr_max - self.buffer_startframe) < len(self.buffer)):
self.buffer_around(fr_max)
try:
result = np.zeros((len(tt), self.nchannels))
indices = frames - self.buffer_startframe
result[in_time] = self.buffer[indices]
return result
except IndexError as error:
warnings.warn(
"Error in file %s, " % (self.filename)
+ "At time t=%.02f-%.02f seconds, " % (tt[0], tt[-1])
+ "indices wanted: %d-%d, " % (indices.min(), indices.max())
+ "but len(buffer)=%d\n" % (len(self.buffer))
+ str(error),
UserWarning,
)
# repeat the last frame instead
indices[indices >= len(self.buffer)] = len(self.buffer) - 1
result[in_time] = self.buffer[indices]
return result
else:
ind = int(self.fps * tt)
if ind < 0 or ind > self.n_frames: # out of time: return 0
return np.zeros(self.nchannels)
if not (0 <= (ind - self.buffer_startframe) < len(self.buffer)):
# out of the buffer: recenter the buffer
self.buffer_around(ind)
# read the frame in the buffer
return self.buffer[ind - self.buffer_startframe]
def buffer_around(self, frame_number):
"""Fill the buffer with frames, centered on frame_number if possible."""
# start-frame for the buffer
new_bufferstart = max(0, frame_number - self.buffersize // 2)
if self.buffer is not None:
current_f_end = self.buffer_startframe + self.buffersize
if new_bufferstart < current_f_end < new_bufferstart + self.buffersize:
# We already have part of what must be read
conserved = current_f_end - new_bufferstart
chunksize = self.buffersize - conserved
array = self.read_chunk(chunksize)
self.buffer = np.vstack([self.buffer[-conserved:], array])
else:
self.seek(new_bufferstart)
self.buffer = self.read_chunk(self.buffersize)
else:
self.seek(new_bufferstart)
self.buffer = self.read_chunk(self.buffersize)
self.buffer_startframe = new_bufferstart
def close(self):
"""Closes the reader, terminating the subprocess if is still alive."""
if self.proc:
if self.proc.poll() is None:
self.proc.terminate()
self.proc.stdout.close()
self.proc.stderr.close()
self.proc.wait()
self.proc = None
def __del__(self):
# If the garbage collector comes, make sure the subprocess is terminated.
self.close()

View File

@@ -0,0 +1 @@
"""Tools to better processing and edition of audio."""

View File

@@ -0,0 +1,29 @@
"""Cutting utilities working with audio."""
import numpy as np
def find_audio_period(clip, min_time=0.1, max_time=2, time_resolution=0.01):
"""Finds the period, in seconds of an audioclip.
Parameters
----------
min_time : float, optional
Minimum bound for the returned value.
max_time : float, optional
Maximum bound for the returned value.
time_resolution : float, optional
Numerical precision.
"""
chunksize = int(time_resolution * clip.fps)
chunk_duration = 1.0 * chunksize / clip.fps
# v denotes the list of volumes
v = np.array([(chunk**2).sum() for chunk in clip.iter_chunks(chunksize)])
v = v - v.mean()
corrs = np.correlate(v, v, mode="full")[-len(v) :]
corrs[: int(min_time / chunk_duration)] = 0
corrs[int(max_time / chunk_duration) :] = 0
return chunk_duration * np.argmax(corrs)