Merge pull request #1776 from maxwbuckley/paste-back-optimization

Paste-back: O(crop_area) compositing + uint8 cv2 SIMD blend
This commit is contained in:
Kenneth Estanislao
2026-04-22 22:14:45 +08:00
committed by GitHub
3 changed files with 99 additions and 122 deletions
+19 -18
View File
@@ -178,17 +178,17 @@ def _paste_back(
h, w = frame.shape[:2]
inv_matrix = cv2.invertAffineTransform(affine_matrix)
# Build or reuse cached feathered mask
# Build or reuse cached feathered mask (uint8 — blended via cv2 SIMD ops)
if _enhancer_cache['mask_size'] != output_size:
face_mask = np.ones((output_size, output_size), dtype=np.float32)
face_mask_f = np.ones((output_size, output_size), dtype=np.float32)
border = max(1, int(output_size * 0.05))
ramp_up = np.linspace(0.0, 1.0, border, dtype=np.float32)
ramp_down = np.linspace(1.0, 0.0, border, dtype=np.float32)
face_mask[:border, :] *= ramp_up[:, None]
face_mask[-border:, :] *= ramp_down[:, None]
face_mask[:, :border] *= ramp_up[None, :]
face_mask[:, -border:] *= ramp_down[None, :]
_enhancer_cache['mask'] = face_mask
face_mask_f[:border, :] *= ramp_up[:, None]
face_mask_f[-border:, :] *= ramp_down[:, None]
face_mask_f[:, :border] *= ramp_up[None, :]
face_mask_f[:, -border:] *= ramp_down[None, :]
_enhancer_cache['mask'] = (face_mask_f * 255.0).astype(np.uint8)
_enhancer_cache['mask_size'] = output_size
# Compute tight bbox from affine corners (avoids full-frame warpAffine scan)
@@ -220,25 +220,26 @@ def _paste_back(
)
inv_mask_crop = cv2.warpAffine(
_enhancer_cache['mask'], inv_crop, (crop_w, crop_h),
borderMode=cv2.BORDER_CONSTANT, borderValue=0.0,
borderMode=cv2.BORDER_CONSTANT, borderValue=0,
)
np.clip(inv_mask_crop, 0.0, 1.0, out=inv_mask_crop)
target_crop = frame[y1p:y2p, x1p:x2p]
if _HAS_TORCH_CUDA:
# GPU blend on crop only
mask_t = torch.from_numpy(inv_mask_crop).cuda().unsqueeze(2)
# Upload uint8 alpha — smaller transfer, scale on device.
mask_t = torch.from_numpy(inv_mask_crop).cuda().float().mul_(1.0 / 255.0).unsqueeze(2)
enhanced_t = torch.from_numpy(inv_restored_crop).float().cuda()
target_t = torch.from_numpy(frame[y1p:y2p, x1p:x2p]).float().cuda()
target_t = torch.from_numpy(target_crop).float().cuda()
blended = (mask_t * enhanced_t + (1.0 - mask_t) * target_t
).to(torch.uint8).cpu().numpy()
frame[y1p:y2p, x1p:x2p] = blended
else:
# CPU blend on crop only
mask_3d = inv_mask_crop[:, :, np.newaxis]
target_crop = frame[y1p:y2p, x1p:x2p].astype(np.float32)
blended = (mask_3d * inv_restored_crop.astype(np.float32)
+ (1.0 - mask_3d) * target_crop)
frame[y1p:y2p, x1p:x2p] = np.clip(blended, 0, 255).astype(np.uint8)
# Fused uint8 blend via cv2 SIMD — ~7× faster than the float32 round-trip.
alpha_3c = cv2.merge([inv_mask_crop, inv_mask_crop, inv_mask_crop])
inv_alpha = 255 - alpha_3c
a_enh = cv2.multiply(inv_restored_crop, alpha_3c, scale=1.0 / 255.0)
a_tgt = cv2.multiply(target_crop, inv_alpha, scale=1.0 / 255.0)
frame[y1p:y2p, x1p:x2p] = cv2.add(a_enh, a_tgt)
return frame
+71 -95
View File
@@ -157,9 +157,31 @@ except ImportError:
# Cache for paste-back
_paste_cache = {
'mask_white': None, # pre-allocated white image
'soft_alpha': None, # feathered alpha mask in aligned-face space
'alpha_size': 0,
}
def _get_soft_alpha(size: int) -> np.ndarray:
"""Feathered alpha template in aligned-face space, cached.
The legacy paste-back eroded and Gaussian-blurred the warped mask in
output coordinates with kernels scaled to the output face size, which
made the per-frame cost quartic in face linear size. Doing the same
erode+blur once in aligned space and then warping the *soft* mask
per-frame gives a visually equivalent feather at O(crop_area) cost —
the feather radius scales naturally with the affine transform.
"""
if _paste_cache['alpha_size'] != size:
k_erode = max(size // 10, 3)
k_blur = max(size // 20, 3)
mask = np.full((size, size), 255, dtype=np.uint8)
mask = cv2.erode(mask, np.ones((k_erode, k_erode), np.uint8), iterations=1)
mask = cv2.GaussianBlur(mask, (2 * k_blur + 1, 2 * k_blur + 1), 0)
_paste_cache['soft_alpha'] = mask # uint8 [0, 255] — blended via cv2 SIMD ops
_paste_cache['alpha_size'] = size
return _paste_cache['soft_alpha']
# CUDA graph swap session cache
_cuda_graph_session = {
'session': None,
@@ -266,112 +288,66 @@ def _cuda_graph_swap_inference(blob: np.ndarray, latent: np.ndarray) -> np.ndarr
def _fast_paste_back(target_img: Frame, bgr_fake: np.ndarray, aimg: np.ndarray, M: np.ndarray) -> Frame:
"""GPU-accelerated paste-back that restricts blending to the face bounding box.
"""Paste bgr_fake back onto target_img via the inverse affine of M.
Same visual output as insightface's built-in paste_back, but:
- Skips dead fake_diff code (computed but unused in insightface)
- Runs erosion, blur, and blend on the face bbox instead of the full frame
- Uses torch CUDA for warpAffine + blend when available
- Writes directly into target_img to avoid full-frame copy
Restricts work to the face bbox in output coordinates and warps a
precomputed feathered alpha template per-frame instead of running a
size-scaled erode+blur on the warped mask. Cost is O(crop_area) regardless
of how much of the frame the face occupies.
"""
h, w = target_img.shape[:2]
face_h, face_w = aimg.shape[:2]
# inswapper's aligned-face space is square (128x128). _get_soft_alpha
# caches a single NxN template keyed by N, so fail loudly if that ever
# stops being true rather than silently mis-warping the alpha mask.
assert face_h == face_w, f"Expected square aligned face, got {face_h}x{face_w}"
IM = cv2.invertAffineTransform(M)
# Reuse pre-allocated white mask
if _paste_cache['mask_white'] is None or _paste_cache['mask_white'].shape != (face_h, face_w):
_paste_cache['mask_white'] = np.full((face_h, face_w), 255, dtype=np.float32)
# Bbox in output coords from the affine corners of the aligned-face square.
corners = np.array(
[[0, 0], [face_w, 0], [face_w, face_h], [0, face_h]], dtype=np.float32
)
transformed = (IM[:, :2] @ corners.T).T + IM[:, 2]
x1 = int(np.floor(transformed[:, 0].min()))
x2 = int(np.ceil(transformed[:, 0].max()))
y1 = int(np.floor(transformed[:, 1].min()))
y2 = int(np.ceil(transformed[:, 1].max()))
if x1 >= x2 or y1 >= y2:
return target_img
# Small interpolation margin only — the feather is baked into the template.
pad = 2
y1p, y2p = max(0, y1 - pad), min(h, y2 + pad + 1)
x1p, x2p = max(0, x1 - pad), min(w, x2 + pad + 1)
IM_crop = IM.copy()
IM_crop[0, 2] -= x1p
IM_crop[1, 2] -= y1p
crop_w, crop_h = x2p - x1p, y2p - y1p
soft_alpha = _get_soft_alpha(face_h)
bgr_fake_crop = cv2.warpAffine(bgr_fake, IM_crop, (crop_w, crop_h), borderValue=0.0)
alpha_crop = cv2.warpAffine(soft_alpha, IM_crop, (crop_w, crop_h), borderValue=0)
target_crop = target_img[y1p:y2p, x1p:x2p]
if _HAS_TORCH_CUDA:
# GPU path: compute bbox from affine matrix (avoids warpAffine + scan on white mask)
corners = np.array([[0, 0], [face_w, 0], [face_w, face_h], [0, face_h]], dtype=np.float32)
transformed = (IM[:, :2] @ corners.T).T + IM[:, 2]
x1 = int(np.floor(transformed[:, 0].min()))
x2 = int(np.ceil(transformed[:, 0].max()))
y1 = int(np.floor(transformed[:, 1].min()))
y2 = int(np.ceil(transformed[:, 1].max()))
if x1 >= x2 or y1 >= y2:
return target_img
mask_h = y2 - y1
mask_w = x2 - x1
mask_size = int(np.sqrt(mask_h * mask_w))
k_erode = max(mask_size // 10, 10)
k_blur = max(mask_size // 20, 5)
pad = k_erode + k_blur + 2
y1p, y2p = max(0, y1 - pad), min(h, y2 + pad + 1)
x1p, x2p = max(0, x1 - pad), min(w, x2 + pad + 1)
# Warp face and mask into crop region only (CPU — fast on small image)
IM_crop = IM.copy()
IM_crop[0, 2] -= x1p
IM_crop[1, 2] -= y1p
crop_w, crop_h = x2p - x1p, y2p - y1p
bgr_fake_crop = cv2.warpAffine(bgr_fake, IM_crop, (crop_w, crop_h), borderValue=0.0)
mask_crop = cv2.warpAffine(_paste_cache['mask_white'], IM_crop, (crop_w, crop_h), borderValue=0.0)
# All mask processing + blend on GPU (no CPU roundtrips)
mask_t = torch.from_numpy(mask_crop).cuda()
mask_t = torch.where(mask_t > 20, 255.0, 0.0)
orig_h, orig_w = mask_t.shape
# Erode via negative max_pool (equivalent to min_pool)
m4 = mask_t.unsqueeze(0).unsqueeze(0)
m4 = -torch.nn.functional.max_pool2d(-m4, kernel_size=k_erode, stride=1, padding=k_erode // 2)
# Gaussian blur approximation via avg_pool
bk = 2 * k_blur + 1
m4 = torch.nn.functional.avg_pool2d(m4, kernel_size=bk, stride=1, padding=bk // 2)
# Fix any padding-induced size mismatch
m4 = m4[:, :, :orig_h, :orig_w]
mask_3d = (m4.squeeze() * (1.0 / 255.0)).unsqueeze(2)
# Scale alpha to [0, 1] on device — cheaper to upload uint8 than float.
mask_t = torch.from_numpy(alpha_crop).cuda().float().mul_(1.0 / 255.0).unsqueeze(2)
fake_t = torch.from_numpy(bgr_fake_crop).float().cuda()
tgt_t = torch.from_numpy(target_img[y1p:y2p, x1p:x2p]).float().cuda()
blended = (mask_3d * fake_t + (1.0 - mask_3d) * tgt_t).to(torch.uint8).cpu().numpy()
tgt_t = torch.from_numpy(target_crop).float().cuda()
blended = (mask_t * fake_t + (1.0 - mask_t) * tgt_t).to(torch.uint8).cpu().numpy()
target_img[y1p:y2p, x1p:x2p] = blended
return target_img
else:
# CPU fallback
bgr_fake_full = cv2.warpAffine(bgr_fake, IM, (w, h), borderValue=0.0)
img_white_full = cv2.warpAffine(_paste_cache['mask_white'], IM, (w, h), borderValue=0.0)
# Fused uint8 blend via cv2 SIMD — no float32 round-trip.
# Measured ~7-8× faster than the old numpy float32 path on a 1000×1000 crop.
alpha_3c = cv2.merge([alpha_crop, alpha_crop, alpha_crop])
inv_alpha = 255 - alpha_3c
a_fake = cv2.multiply(bgr_fake_crop, alpha_3c, scale=1.0 / 255.0)
a_tgt = cv2.multiply(target_crop, inv_alpha, scale=1.0 / 255.0)
target_img[y1p:y2p, x1p:x2p] = cv2.add(a_fake, a_tgt)
rows = np.any(img_white_full > 20, axis=1)
cols = np.any(img_white_full > 20, axis=0)
row_idx = np.where(rows)[0]
col_idx = np.where(cols)[0]
if len(row_idx) == 0 or len(col_idx) == 0:
return target_img
y1, y2 = row_idx[0], row_idx[-1]
x1, x2 = col_idx[0], col_idx[-1]
mask_h = y2 - y1
mask_w = x2 - x1
mask_size = int(np.sqrt(mask_h * mask_w))
k_erode = max(mask_size // 10, 10)
k_blur = max(mask_size // 20, 5)
pad = k_erode + k_blur + 2
y1p, y2p = max(0, y1 - pad), min(h, y2 + pad + 1)
x1p, x2p = max(0, x1 - pad), min(w, x2 + pad + 1)
mask_crop = img_white_full[y1p:y2p, x1p:x2p]
mask_crop[mask_crop > 20] = 255
mask_crop = cv2.erode(mask_crop, np.ones((k_erode, k_erode), np.uint8), iterations=1)
mask_crop = cv2.GaussianBlur(mask_crop, (2*k_blur+1, 2*k_blur+1), 0)
mask_crop *= (1.0 / 255.0)
mask_3d = mask_crop[:, :, np.newaxis]
fake_crop = bgr_fake_full[y1p:y2p, x1p:x2p].astype(np.float32)
target_crop = target_img[y1p:y2p, x1p:x2p].astype(np.float32)
blended = mask_3d * fake_crop + (1.0 - mask_3d) * target_crop
# Write in-place, consistent with the GPU path
target_img[y1p:y2p, x1p:x2p] = np.clip(blended, 0, 255).astype(np.uint8)
return target_img
return target_img
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
+9 -9
View File
@@ -1220,11 +1220,9 @@ def _processing_thread_func(capture_queue, processed_queue, stop_event,
2,
)
# BGR→RGB in the processing thread so the display thread gets
# a contiguous RGB array (faster PIL.fromarray).
temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
# Put processed frame into output queue, dropping old frames if full
# Queue the processed frame as BGR; the display thread resizes to the
# preview window first and then runs cvtColor on the (much smaller)
# buffer — cheaper than converting the full 1080p frame here.
try:
processed_queue.put_nowait(temp_frame)
except queue.Full:
@@ -1294,15 +1292,17 @@ def create_webcam_preview(camera_index: int):
return
try:
rgb_frame = processed_queue.get_nowait()
bgr_frame = processed_queue.get_nowait()
except queue.Empty:
ROOT.after(poll_ms, _display_next_frame)
return
# Frame is already RGB from processing thread; resize to preview window
rgb_frame = fit_image_to_size(
rgb_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
# Resize the full-resolution BGR frame to the preview window first,
# then convert colour on the smaller buffer.
bgr_frame = fit_image_to_size(
bgr_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
rgb_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB)
image = Image.fromarray(rgb_frame)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)