Select Git revision
-
Levecque Etienne authoredLevecque Etienne authored
embed_juni.py 6.21 KiB
import os
import scipy.signal
from scipy.fftpack import dct, idct
import numpy as np
import jpegio as jio
import cv2
from utils import decompress_structure
os.environ['MKL_NUM_THREADS'] = '1'
os.environ['MKL_DOMAIN_BLAS'] = '1'
os.environ['OPENBLAS_NUM_THREADS'] = '1'
def dct2(a):
return dct(dct(a, axis=0, norm='ortho'), axis=1, norm='ortho')
def idct2(a):
return idct(idct(a, axis=0, norm='ortho'), axis=1, norm='ortho')
def entropy_ternary(pP1, pM1):
p0 = 1 - pP1 - pM1
p0[p0 <= 0] = 1
pP1[pP1 == 0] = 1
pM1[pM1 == 0] = 1
p = np.stack([p0, pP1, pM1])
H = -p * np.log2(p)
return np.nansum(H)
def calc_lambda(rho_p1, rho_m1, message_length, n):
l3 = 1e+3
m3 = float(message_length + 1)
iterations = 0
while m3 > message_length:
l3 *= 2
pP1 = (np.exp(-l3 * rho_p1)) / (1 + np.exp(-l3 * rho_p1) + np.exp(-l3 * rho_m1))
pM1 = (np.exp(-l3 * rho_m1)) / (1 + np.exp(-l3 * rho_p1) + np.exp(-l3 * rho_m1))
m3 = entropy_ternary(pP1, pM1)
iterations += 1
if iterations > 10:
return l3
l1 = 0
m1 = float(n)
lamb = 0
iterations = 0
alpha = float(message_length) / n
# limit search to 30 iterations and require that relative payload embedded
# is roughly within 1/1000 of the required relative payload
while float(m1 - m3) / n > alpha / 1000.0 and iterations < 30:
lamb = l1 + (l3 - l1) / 2
pP1 = (np.exp(-lamb * rho_p1)) / (1 + np.exp(-lamb * rho_p1) + np.exp(-lamb * rho_m1))
pM1 = (np.exp(-lamb * rho_m1)) / (1 + np.exp(-lamb * rho_p1) + np.exp(-lamb * rho_m1))
m2 = entropy_ternary(pP1, pM1)
if m2 < message_length:
l3 = lamb
m3 = m2
else:
l1 = lamb
m1 = m2
iterations += 1
return lamb
def embedding_simulator(x, rho_p1, rho_m1, m):
n = x.size
lamb = calc_lambda(rho_p1, rho_m1, m, n)
pChangeP1 = (np.exp(-lamb * rho_p1)) / (1 + np.exp(-lamb * rho_p1) + np.exp(-lamb * rho_m1))
pChangeM1 = (np.exp(-lamb * rho_m1)) / (1 + np.exp(-lamb * rho_p1) + np.exp(-lamb * rho_m1))
y = x.copy()
randChange = np.random.rand(y.shape[0], y.shape[1])
y[randChange < pChangeP1] = y[randChange < pChangeP1] + 1
y[(randChange >= pChangeP1) & (randChange < pChangeP1 + pChangeM1)] = y[(randChange >= pChangeP1) & (
randChange < pChangeP1 + pChangeM1)] - 1
return y
def embed_JUNI(coverPath, stegoPath, payload):
if os.path.exists(stegoPath):
return
C_STRUCT = jio.read(coverPath)
C_COEFFS = np.copy(C_STRUCT.coef_arrays[0])
S_COEFFS = np.copy(C_COEFFS)
S_STRUCT = C_STRUCT # doesn't create a copy!
Q = C_STRUCT.quant_tables[0]
cover_spatial = cv2.imread(coverPath, cv2.IMREAD_GRAYSCALE).astype(np.float32)
if cover_spatial.shape[-1] == 1:
cover_spatial = np.squeeze(cover_spatial)
hpdf = np.array([
-0.0544158422, 0.3128715909, -0.6756307363, 0.5853546837,
0.0158291053, -0.2840155430, -0.0004724846, 0.1287474266,
0.0173693010, -0.0440882539, -0.0139810279, 0.0087460940,
0.0048703530, -0.0003917404, -0.0006754494, -0.0001174768
])
sign = np.array([-1 if i % 2 else 1 for i in range(len(hpdf))])
lpdf = hpdf[::-1] * sign
F = []
F.append(np.outer(lpdf.T, hpdf))
F.append(np.outer(hpdf.T, lpdf))
F.append(np.outer(hpdf.T, hpdf))
# Pre-compute impact in spatial domain when a jpeg coefficient is changed by 1
spatial_impact = {}
for i in range(8):
for j in range(8):
test_coeffs = np.zeros((8, 8))
test_coeffs[i, j] = 1
spatial_impact[i, j] = idct2(test_coeffs) * Q[i, j]
# Pre-compute impact on wavelet coefficients when a jpeg coefficient is changed by 1
wavelet_impact = {}
for f_index in range(len(F)):
for i in range(8):
for j in range(8):
wavelet_impact[f_index, i, j] = scipy.signal.correlate2d(spatial_impact[i, j], F[f_index], mode='full',
boundary='fill', fillvalue=0.) # XXX
# Create reference cover wavelet coefficients (LH, HL, HH)
pad_size = 16 # XXX
spatial_padded = np.pad(cover_spatial, (pad_size, pad_size), 'symmetric')
# print(spatial_padded.shape)
RC = []
for i in range(len(F)):
f = scipy.signal.correlate2d(spatial_padded, F[i], mode='same', boundary='fill')
RC.append(f)
k, l = C_COEFFS.shape
nzAC = np.count_nonzero(S_COEFFS) - np.count_nonzero(S_COEFFS[::8, ::8])
rho = np.zeros((k, l))
tempXi = [0.] * 3
sgm = 2 ** (-6)
# Computation of costs
for row in range(k):
for col in range(l):
mod_row = row % 8
mod_col = col % 8
sub_rows = list(range(row - mod_row - 6 + pad_size - 1, row - mod_row + 16 + pad_size))
sub_cols = list(range(col - mod_col - 6 + pad_size - 1, col - mod_col + 16 + pad_size))
for f_index in range(3):
RC_sub = RC[f_index][sub_rows][:, sub_cols]
wav_cover_stego_diff = wavelet_impact[f_index, mod_row, mod_col]
tempXi[f_index] = abs(wav_cover_stego_diff) / (abs(RC_sub) + sgm)
rho_temp = tempXi[0] + tempXi[1] + tempXi[2]
rho[row, col] = np.sum(rho_temp)
wet_cost = 10 ** 13
rho_m1 = rho.copy()
rho_p1 = rho.copy()
rho_p1[rho_p1 > wet_cost] = wet_cost
rho_p1[np.isnan(rho_p1)] = wet_cost
rho_p1[S_COEFFS > 1023] = wet_cost
rho_m1[rho_m1 > wet_cost] = wet_cost
rho_m1[np.isnan(rho_m1)] = wet_cost
rho_m1[S_COEFFS < -1023] = wet_cost
S_COEFFS = embedding_simulator(S_COEFFS, rho_p1, rho_m1, round(payload * nzAC))
# print(np.sum(np.abs(stego_coeffs.astype("int16")-coeffs.astype("int16"))))
# print(stego_coeffs)
S_STRUCT.coef_arrays[0][:] = S_COEFFS
jio.write(S_STRUCT, stegoPath)
def embed_img(paths_payload_tuple):
input_path, output_path, payload = paths_payload_tuple
os.makedirs(output_path, exist_ok=True)
filename = input_path.split("/")[-1]
stego_path = os.path.join(output_path, filename)
embed_JUNI(input_path, stego_path, payload)
tmp = jio.read(stego_path)
return decompress_structure(tmp)[:, :, 0].astype(np.float32)