None: # Use the newer AnimDecoder API to parse the (possibly) animated file, # and access muxed chunks like ICC/EXIF/XMP. assert self.fp is not None self._decoder = _webp.WebPAnimDecoder(self.fp.read()) # Get info from decoder self._size, self.info["loop"], bgcolor, self.n_frames, self.rawmode = ( self._decoder.get_info() ) self.info["background"] = ( (bgcolor >> 16) & 0xFF, # R (bgcolor >> 8) & 0xFF, # G bgcolor & 0xFF, # B (bgcolor >> 24) & 0xFF, # A ) self.is_animated = self.n_frames > 1 self._mode = "RGB" if self.rawmode == "RGBX" else self.rawmode # Attempt to read ICC / EXIF / XMP chunks from file for key, chunk_name in { "icc_profile": "ICCP", "exif": "EXIF", "xmp": "XMP ", }.items(): if value := self._decoder.get_chunk(chunk_name): self.info[key] = value # Initialize seek state self._reset(reset=False) def _getexif(self) -> dict[int, Any] | None: if "exif" not in self.info: return None return self.getexif()._get_merged_dict()">

Pillow (PIL Fork) 12.2.0.dev0 documentation (original) (raw)

from future import annotations

from io import BytesIO

from . import Image, ImageFile

try: from . import _webp

SUPPORTED = True

except ImportError: SUPPORTED = False

TYPE_CHECKING = False if TYPE_CHECKING: from typing import IO, Any

_VP8_MODES_BY_IDENTIFIER = { b"VP8 ": "RGB", b"VP8X": "RGBA", b"VP8L": "RGBA", # lossless }

def _accept(prefix: bytes) -> bool | str: is_riff_file_format = prefix.startswith(b"RIFF") is_webp_file = prefix[8:12] == b"WEBP" is_valid_vp8_mode = prefix[12:16] in _VP8_MODES_BY_IDENTIFIER

if is_riff_file_format and is_webp_file and is_valid_vp8_mode:
    if not SUPPORTED:
        return (
            "image file could not be identified because WEBP support not installed"
        )
    return True
return False

[docs] class WebPImageFile(ImageFile.ImageFile): format = "WEBP" format_description = "WebP image" __loaded = 0 __logical_frame = 0

def _open(self) -> None:
    # Use the newer AnimDecoder API to parse the (possibly) animated file,
    # and access muxed chunks like ICC/EXIF/XMP.
    assert self.fp is not None
    self._decoder = _webp.WebPAnimDecoder(self.fp.read())

    # Get info from decoder
    self._size, self.info["loop"], bgcolor, self.n_frames, self.rawmode = (
        self._decoder.get_info()
    )
    self.info["background"] = (
        (bgcolor >> 16) & 0xFF,  # R
        (bgcolor >> 8) & 0xFF,  # G
        bgcolor & 0xFF,  # B
        (bgcolor >> 24) & 0xFF,  # A
    )
    self.is_animated = self.n_frames > 1
    self._mode = "RGB" if self.rawmode == "RGBX" else self.rawmode

    # Attempt to read ICC / EXIF / XMP chunks from file
    for key, chunk_name in {
        "icc_profile": "ICCP",
        "exif": "EXIF",
        "xmp": "XMP ",
    }.items():
        if value := self._decoder.get_chunk(chunk_name):
            self.info[key] = value

    # Initialize seek state
    self._reset(reset=False)

def _getexif(self) -> dict[int, Any] | None:
    if "exif" not in self.info:
        return None
    return self.getexif()._get_merged_dict()

[docs] def seek(self, frame: int) -> None: if not self._seek_check(frame): return

    # Set logical frame to requested position
    self.__logical_frame = frame


def _reset(self, reset: bool = True) -> None:
    if reset:
        self._decoder.reset()
    self.__physical_frame = 0
    self.__loaded = -1
    self.__timestamp = 0

def _get_next(self) -> tuple[bytes, int, int]:
    # Get next frame
    ret = self._decoder.get_next()
    self.__physical_frame += 1

    # Check if an error occurred
    if ret is None:
        self._reset()  # Reset just to be safe
        self.seek(0)
        msg = "failed to decode next frame in WebP file"
        raise EOFError(msg)

    # Compute duration
    data, timestamp = ret
    duration = timestamp - self.__timestamp
    self.__timestamp = timestamp

    # libwebp gives frame end, adjust to start of frame
    timestamp -= duration
    return data, timestamp, duration

def _seek(self, frame: int) -> None:
    if self.__physical_frame == frame:
        return  # Nothing to do
    if frame < self.__physical_frame:
        self._reset()  # Rewind to beginning
    while self.__physical_frame < frame:
        self._get_next()  # Advance to the requested frame

[docs] def load(self) -> Image.core.PixelAccess | None: if self.__loaded != self.__logical_frame: self._seek(self.__logical_frame)

        # We need to load the image data for this frame
        data, self.info["timestamp"], self.info["duration"] = self._get_next()
        self.__loaded = self.__logical_frame

        # Set tile
        if self.fp and self._exclusive_fp:
            self.fp.close()
        self.fp = BytesIO(data)
        self.tile = [ImageFile._Tile("raw", (0, 0) + self.size, 0, self.rawmode)]

    return super().load()

[docs] def load_seek(self, pos: int) -> None: pass

[docs] def tell(self) -> int: return self.__logical_frame

def _convert_frame(im: Image.Image) -> Image.Image: # Make sure image mode is supported if im.mode not in ("RGBX", "RGBA", "RGB"): im = im.convert("RGBA" if im.has_transparency_data else "RGB") return im

def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None: encoderinfo = im.encoderinfo.copy() append_images = list(encoderinfo.get("append_images", []))

# If total frame count is 1, then save using the legacy API, which
# will preserve non-alpha modes
total = 0
for ims in [im] + append_images:
    total += getattr(ims, "n_frames", 1)
if total == 1:
    _save(im, fp, filename)
    return

background: int | tuple[int, ...] = (0, 0, 0, 0)
if "background" in encoderinfo:
    background = encoderinfo["background"]
elif "background" in im.info:
    background = im.info["background"]
    if isinstance(background, int):
        # GifImagePlugin stores a global color table index in
        # info["background"]. So it must be converted to an RGBA value
        palette = im.getpalette()
        if palette:
            r, g, b = palette[background * 3 : (background + 1) * 3]
            background = (r, g, b, 255)
        else:
            background = (background, background, background, 255)

duration = im.encoderinfo.get("duration", im.info.get("duration", 0))
loop = im.encoderinfo.get("loop", 0)
minimize_size = im.encoderinfo.get("minimize_size", False)
kmin = im.encoderinfo.get("kmin", None)
kmax = im.encoderinfo.get("kmax", None)
allow_mixed = im.encoderinfo.get("allow_mixed", False)
verbose = False
lossless = im.encoderinfo.get("lossless", False)
quality = im.encoderinfo.get("quality", 80)
alpha_quality = im.encoderinfo.get("alpha_quality", 100)
method = im.encoderinfo.get("method", 0)
icc_profile = im.encoderinfo.get("icc_profile") or ""
exif = im.encoderinfo.get("exif", "")
if isinstance(exif, Image.Exif):
    exif = exif.tobytes()
xmp = im.encoderinfo.get("xmp", "")
if allow_mixed:
    lossless = False

# Sensible keyframe defaults are from gif2webp.c script
if kmin is None:
    kmin = 9 if lossless else 3
if kmax is None:
    kmax = 17 if lossless else 5

# Validate background color
if (
    not isinstance(background, (list, tuple))
    or len(background) != 4
    or not all(0 <= v < 256 for v in background)
):
    msg = f"Background color is not an RGBA tuple clamped to (0-255): {background}"
    raise OSError(msg)

# Convert to packed uint
bg_r, bg_g, bg_b, bg_a = background
background = (bg_a << 24) | (bg_r << 16) | (bg_g << 8) | (bg_b << 0)

# Setup the WebP animation encoder
enc = _webp.WebPAnimEncoder(
    im.size,
    background,
    loop,
    minimize_size,
    kmin,
    kmax,
    allow_mixed,
    verbose,
)

# Add each frame
frame_idx = 0
timestamp = 0
cur_idx = im.tell()
try:
    for ims in [im] + append_images:
        # Get number of frames in this image
        nfr = getattr(ims, "n_frames", 1)

        for idx in range(nfr):
            ims.seek(idx)

            frame = _convert_frame(ims)

            # Append the frame to the animation encoder
            enc.add(
                frame.getim(),
                round(timestamp),
                lossless,
                quality,
                alpha_quality,
                method,
            )

            # Update timestamp and frame index
            if isinstance(duration, (list, tuple)):
                timestamp += duration[frame_idx]
            else:
                timestamp += duration
            frame_idx += 1

finally:
    im.seek(cur_idx)

# Force encoder to flush frames
enc.add(None, round(timestamp), lossless, quality, alpha_quality, 0)

# Get the final output from the encoder
data = enc.assemble(icc_profile, exif, xmp)
if data is None:
    msg = "cannot write file as WebP (encoder returned None)"
    raise OSError(msg)

fp.write(data)

def _save(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None: lossless = im.encoderinfo.get("lossless", False) quality = im.encoderinfo.get("quality", 80) alpha_quality = im.encoderinfo.get("alpha_quality", 100) icc_profile = im.encoderinfo.get("icc_profile") or "" exif = im.encoderinfo.get("exif", b"") if isinstance(exif, Image.Exif): exif = exif.tobytes() if exif.startswith(b"Exif\x00\x00"): exif = exif[6:] xmp = im.encoderinfo.get("xmp", "") method = im.encoderinfo.get("method", 4) exact = 1 if im.encoderinfo.get("exact") else 0

im = _convert_frame(im)

data = _webp.WebPEncode(
    im.getim(),
    lossless,
    float(quality),
    float(alpha_quality),
    icc_profile,
    method,
    exact,
    exif,
    xmp,
)
if data is None:
    msg = "cannot write file as WebP (encoder returned None)"
    raise OSError(msg)

fp.write(data)

Image.register_open(WebPImageFile.format, WebPImageFile, _accept) if SUPPORTED: Image.register_save(WebPImageFile.format, _save) Image.register_save_all(WebPImageFile.format, _save_all) Image.register_extension(WebPImageFile.format, ".webp") Image.register_mime(WebPImageFile.format, "image/webp")