Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
T
Toward Reliable JPEG Stega qf100 WIFS2022
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Levecque Etienne
Toward Reliable JPEG Stega qf100 WIFS2022
Commits
88b3f96f
Commit
88b3f96f
authored
2 years ago
by
Levecque Etienne
Browse files
Options
Downloads
Patches
Plain Diff
feature: data pipeline
parent
802e1be0
Branches
Branches containing commit
No related tags found
No related merge requests found
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
data.py
+52
-13
52 additions, 13 deletions
data.py
embed_juni.py
+188
-0
188 additions, 0 deletions
embed_juni.py
main.py
+17
-9
17 additions, 9 deletions
main.py
utils.py
+2
-1
2 additions, 1 deletion
utils.py
with
259 additions
and
23 deletions
data.py
+
52
−
13
View file @
88b3f96f
import
os
import
jpegio
as
jio
import
numpy
as
np
import
multiprocessing
as
mp
from
skimage
import
view_as_block
from
utils
import
decompress_structure
from
embed_juni
import
embed_img
def
img_generator
(
dir_path
,
names
=
None
):
if
names
is
not
None
:
for
name
in
names
:
path
=
os
.
path
.
join
(
dir_path
,
name
)
tmp
=
jio
.
read
(
path
)
img
=
decompress_structure
(
tmp
)[:,
:,
0
].
astype
(
np
.
float32
)
yield
img
else
:
for
name
in
os
.
listdir
(
dir_path
):
path
=
os
.
path
.
join
(
dir_path
,
name
)
tmp
=
jio
.
read
(
path
)
img
=
decompress_structure
(
tmp
)[:,
:,
0
].
astype
(
np
.
float32
)
yield
img
def
get_train_test_generator
(
dir_path
,
train_size
,
stego_percentage
):
...
...
@@ -18,13 +37,6 @@ def get_train_test_generator(dir_path, train_size, stego_percentage):
# TODO: identify why some images raise a "Premature end of JPEG file" and correct it.
# TODO: Read .pgm and compress them in .jpeg.
def
img_generator
(
dir_path
,
names
):
for
name
in
names
:
path
=
os
.
path
.
join
(
dir_path
,
name
)
tmp
=
jio
.
read
(
path
)
img
=
decompress_structure
(
tmp
)[:,
:,
0
].
astype
(
np
.
float32
)
yield
img
names
=
os
.
listdir
(
dir_path
)
n
=
len
(
names
)
n_train
=
int
(
n
*
train_size
)
...
...
@@ -32,17 +44,24 @@ def get_train_test_generator(dir_path, train_size, stego_percentage):
return
img_generator
(
dir_path
,
names
[:
n_train
]),
\
img_generator
(
dir_path
,
names
[
n_train
:
n_train
+
n_normal
]),
\
img_generator
(
dir_path
,
names
[
n_train
+
n_normal
:])
(
os
.
path
.
join
(
dir_path
,
name
)
for
name
in
names
[
n_train
+
n_normal
:])
def
embed_
images
(
img_generator
,
payload
):
def
embed_
generator
(
path_generator
,
output_path
,
payload
):
"""
Embed random messages into images with J-UNIWARD
:param img_generator: image generator
:param path_generator: image path generator
:param output_path: where to store stego image after embedding
:param payload: payload of the message in bpnzac
:return: a generator of stego images
"""
pass
with
mp
.
Pool
()
as
p
:
try
:
for
res
in
p
.
imap_unordered
(
embed_img
,
((
path
,
output_path
,
payload
)
for
path
in
path_generator
)):
yield
res
except
StopIteration
as
ex
:
stats
=
ex
.
value
return
stats
def
variance_filter
(
img_generator
,
variance_threshold
,
block_per_threshold
):
...
...
@@ -54,7 +73,22 @@ def variance_filter(img_generator, variance_threshold, block_per_threshold):
:param block_per_threshold: image with a percentage of accepted blocks below this threshold are discarded
:return: a generator of blocks
"""
pass
ignored
=
0
try
:
for
img
in
img_generator
:
view
=
view_as_block
(
img
,
(
8
,
8
))
mask_var
=
np
.
var
(
view
,
axis
=
(
2
,
3
))
>=
variance_threshold
mask_saturated
=
np
.
any
(
view
==
255
,
axis
=
(
2
,
3
))
|
np
.
any
(
view
==
0
,
axis
=
(
2
,
3
))
remaining_blocks
=
view
.
reshape
((
-
1
,
8
,
8
))[
mask_var
.
flatten
()
&
~
mask_saturated
.
flatten
()]
if
remaining_blocks
.
shape
[
0
]
/
np
.
product
(
view
.
shape
[:
2
])
<
block_per_threshold
:
ignored
+=
1
continue
else
:
yield
remaining_blocks
except
StopIteration
as
ex
:
stats
=
ex
.
value
stats
[
'
Removed by the filter
'
]
=
ignored
return
stats
def
feature_extractor
(
block_generator
):
...
...
@@ -63,4 +97,9 @@ def feature_extractor(block_generator):
:param block_generator: a generator of list of blocks. One list for one image
:return: a generator of views
"""
pass
try
:
for
blocks
in
block_generator
:
yield
np
.
round
(
blocks
)
-
blocks
except
StopIteration
as
ex
:
stats
=
ex
.
value
return
stats
This diff is collapsed.
Click to expand it.
embed_juni.py
0 → 100644
+
188
−
0
View file @
88b3f96f
import
os
import
scipy.signal
import
scipy.fftpack
import
numpy
as
np
from
tqdm
import
tqdm
as
tqdm
import
multiprocessing
from
multiprocessing
import
Pool
import
jpegio
as
jio
import
cv2
import
pickle
from
utils
import
decompress_structure
os
.
environ
[
'
MKL_NUM_THREADS
'
]
=
'
1
'
os
.
environ
[
'
MKL_DOMAIN_BLAS
'
]
=
'
1
'
os
.
environ
[
'
OPENBLAS_NUM_THREADS
'
]
=
'
1
'
def
dct2
(
a
):
return
scipy
.
fftpack
.
dct
(
scipy
.
fftpack
.
dct
(
a
,
axis
=
0
,
norm
=
'
ortho
'
),
axis
=
1
,
norm
=
'
ortho
'
)
def
idct2
(
a
):
return
scipy
.
fftpack
.
idct
(
scipy
.
fftpack
.
idct
(
a
,
axis
=
0
,
norm
=
'
ortho
'
),
axis
=
1
,
norm
=
'
ortho
'
)
def
entropy_ternary
(
pP1
,
pM1
):
p0
=
1
-
pP1
-
pM1
p0
[
p0
<=
0
]
=
1
pP1
[
pP1
==
0
]
=
1
pM1
[
pM1
==
0
]
=
1
p
=
np
.
stack
([
p0
,
pP1
,
pM1
])
H
=
-
p
*
np
.
log2
(
p
)
return
np
.
nansum
(
H
)
def
calc_lambda
(
rho_p1
,
rho_m1
,
message_length
,
n
):
l3
=
1e+3
m3
=
float
(
message_length
+
1
)
iterations
=
0
while
m3
>
message_length
:
l3
*=
2
pP1
=
(
np
.
exp
(
-
l3
*
rho_p1
))
/
(
1
+
np
.
exp
(
-
l3
*
rho_p1
)
+
np
.
exp
(
-
l3
*
rho_m1
))
pM1
=
(
np
.
exp
(
-
l3
*
rho_m1
))
/
(
1
+
np
.
exp
(
-
l3
*
rho_p1
)
+
np
.
exp
(
-
l3
*
rho_m1
))
m3
=
entropy_ternary
(
pP1
,
pM1
)
iterations
+=
1
if
iterations
>
10
:
return
l3
l1
=
0
m1
=
float
(
n
)
lamb
=
0
iterations
=
0
alpha
=
float
(
message_length
)
/
n
# limit search to 30 iterations and require that relative payload embedded
# is roughly within 1/1000 of the required relative payload
while
float
(
m1
-
m3
)
/
n
>
alpha
/
1000.0
and
iterations
<
30
:
lamb
=
l1
+
(
l3
-
l1
)
/
2
pP1
=
(
np
.
exp
(
-
lamb
*
rho_p1
))
/
(
1
+
np
.
exp
(
-
lamb
*
rho_p1
)
+
np
.
exp
(
-
lamb
*
rho_m1
))
pM1
=
(
np
.
exp
(
-
lamb
*
rho_m1
))
/
(
1
+
np
.
exp
(
-
lamb
*
rho_p1
)
+
np
.
exp
(
-
lamb
*
rho_m1
))
m2
=
entropy_ternary
(
pP1
,
pM1
)
if
m2
<
message_length
:
l3
=
lamb
m3
=
m2
else
:
l1
=
lamb
m1
=
m2
iterations
+=
1
return
lamb
def
embedding_simulator
(
x
,
rho_p1
,
rho_m1
,
m
):
n
=
x
.
size
lamb
=
calc_lambda
(
rho_p1
,
rho_m1
,
m
,
n
)
pChangeP1
=
(
np
.
exp
(
-
lamb
*
rho_p1
))
/
(
1
+
np
.
exp
(
-
lamb
*
rho_p1
)
+
np
.
exp
(
-
lamb
*
rho_m1
))
pChangeM1
=
(
np
.
exp
(
-
lamb
*
rho_m1
))
/
(
1
+
np
.
exp
(
-
lamb
*
rho_p1
)
+
np
.
exp
(
-
lamb
*
rho_m1
))
y
=
x
.
copy
()
randChange
=
np
.
random
.
rand
(
y
.
shape
[
0
],
y
.
shape
[
1
])
y
[
randChange
<
pChangeP1
]
=
y
[
randChange
<
pChangeP1
]
+
1
y
[(
randChange
>=
pChangeP1
)
&
(
randChange
<
pChangeP1
+
pChangeM1
)]
=
y
[(
randChange
>=
pChangeP1
)
&
(
randChange
<
pChangeP1
+
pChangeM1
)]
-
1
return
y
def
embed_JUNI
(
coverPath
,
stegoPath
,
payload
):
if
os
.
path
.
exists
(
stegoPath
):
return
C_STRUCT
=
jio
.
read
(
coverPath
)
C_COEFFS
=
np
.
copy
(
C_STRUCT
.
coef_arrays
[
0
])
S_COEFFS
=
np
.
copy
(
C_COEFFS
)
S_STRUCT
=
C_STRUCT
# doesn't create a copy!
Q
=
C_STRUCT
.
quant_tables
[
0
]
cover_spatial
=
cv2
.
imread
(
coverPath
,
cv2
.
IMREAD_GRAYSCALE
).
astype
(
np
.
float32
)
if
cover_spatial
.
shape
[
-
1
]
==
1
:
cover_spatial
=
np
.
squeeze
(
cover_spatial
)
hpdf
=
np
.
array
([
-
0.0544158422
,
0.3128715909
,
-
0.6756307363
,
0.5853546837
,
0.0158291053
,
-
0.2840155430
,
-
0.0004724846
,
0.1287474266
,
0.0173693010
,
-
0.0440882539
,
-
0.0139810279
,
0.0087460940
,
0.0048703530
,
-
0.0003917404
,
-
0.0006754494
,
-
0.0001174768
])
sign
=
np
.
array
([
-
1
if
i
%
2
else
1
for
i
in
range
(
len
(
hpdf
))])
lpdf
=
hpdf
[::
-
1
]
*
sign
F
=
[]
F
.
append
(
np
.
outer
(
lpdf
.
T
,
hpdf
))
F
.
append
(
np
.
outer
(
hpdf
.
T
,
lpdf
))
F
.
append
(
np
.
outer
(
hpdf
.
T
,
hpdf
))
# Pre-compute impact in spatial domain when a jpeg coefficient is changed by 1
spatial_impact
=
{}
for
i
in
range
(
8
):
for
j
in
range
(
8
):
test_coeffs
=
np
.
zeros
((
8
,
8
))
test_coeffs
[
i
,
j
]
=
1
spatial_impact
[
i
,
j
]
=
idct2
(
test_coeffs
)
*
Q
[
i
,
j
]
# Pre-compute impact on wavelet coefficients when a jpeg coefficient is changed by 1
wavelet_impact
=
{}
for
f_index
in
range
(
len
(
F
)):
for
i
in
range
(
8
):
for
j
in
range
(
8
):
wavelet_impact
[
f_index
,
i
,
j
]
=
scipy
.
signal
.
correlate2d
(
spatial_impact
[
i
,
j
],
F
[
f_index
],
mode
=
'
full
'
,
boundary
=
'
fill
'
,
fillvalue
=
0.
)
# XXX
# Create reference cover wavelet coefficients (LH, HL, HH)
pad_size
=
16
# XXX
spatial_padded
=
np
.
pad
(
cover_spatial
,
(
pad_size
,
pad_size
),
'
symmetric
'
)
# print(spatial_padded.shape)
RC
=
[]
for
i
in
range
(
len
(
F
)):
f
=
scipy
.
signal
.
correlate2d
(
spatial_padded
,
F
[
i
],
mode
=
'
same
'
,
boundary
=
'
fill
'
)
RC
.
append
(
f
)
k
,
l
=
C_COEFFS
.
shape
nzAC
=
np
.
count_nonzero
(
S_COEFFS
)
-
np
.
count_nonzero
(
S_COEFFS
[::
8
,
::
8
])
rho
=
np
.
zeros
((
k
,
l
))
tempXi
=
[
0.
]
*
3
sgm
=
2
**
(
-
6
)
# Computation of costs
for
row
in
range
(
k
):
for
col
in
range
(
l
):
mod_row
=
row
%
8
mod_col
=
col
%
8
sub_rows
=
list
(
range
(
row
-
mod_row
-
6
+
pad_size
-
1
,
row
-
mod_row
+
16
+
pad_size
))
sub_cols
=
list
(
range
(
col
-
mod_col
-
6
+
pad_size
-
1
,
col
-
mod_col
+
16
+
pad_size
))
for
f_index
in
range
(
3
):
RC_sub
=
RC
[
f_index
][
sub_rows
][:,
sub_cols
]
wav_cover_stego_diff
=
wavelet_impact
[
f_index
,
mod_row
,
mod_col
]
tempXi
[
f_index
]
=
abs
(
wav_cover_stego_diff
)
/
(
abs
(
RC_sub
)
+
sgm
)
rho_temp
=
tempXi
[
0
]
+
tempXi
[
1
]
+
tempXi
[
2
]
rho
[
row
,
col
]
=
np
.
sum
(
rho_temp
)
wet_cost
=
10
**
13
rho_m1
=
rho
.
copy
()
rho_p1
=
rho
.
copy
()
rho_p1
[
rho_p1
>
wet_cost
]
=
wet_cost
rho_p1
[
np
.
isnan
(
rho_p1
)]
=
wet_cost
rho_p1
[
S_COEFFS
>
1023
]
=
wet_cost
rho_m1
[
rho_m1
>
wet_cost
]
=
wet_cost
rho_m1
[
np
.
isnan
(
rho_m1
)]
=
wet_cost
rho_m1
[
S_COEFFS
<
-
1023
]
=
wet_cost
S_COEFFS
=
embedding_simulator
(
S_COEFFS
,
rho_p1
,
rho_m1
,
round
(
payload
*
nzAC
))
# print(np.sum(np.abs(stego_coeffs.astype("int16")-coeffs.astype("int16"))))
# print(stego_coeffs)
S_STRUCT
.
coef_arrays
[
0
][:]
=
S_COEFFS
jio
.
write
(
S_STRUCT
,
stegoPath
)
def
embed_img
(
paths_payload_tuple
):
input_path
,
output_path
,
payload
=
paths_payload_tuple
os
.
makedirs
(
output_path
,
exist_ok
=
True
)
filename
=
input_path
.
split
(
"
/
"
)[
-
1
]
stego_path
=
os
.
path
.
join
(
output_path
,
filename
)
embed_JUNI
(
input_path
,
stego_path
,
payload
)
tmp
=
jio
.
read
(
stego_path
)
return
decompress_structure
(
tmp
)[:,
:,
0
].
astype
(
np
.
float32
)
This diff is collapsed.
Click to expand it.
main.py
+
17
−
9
View file @
88b3f96f
from
data
import
get_train_test_generator
,
embed_
images
,
variance_filter
,
feature_extractor
from
data
import
get_train_test_generator
,
embed_
generator
,
variance_filter
,
feature_extractor
,
img_generator
dir_path
=
""
train_size
=
0.0
cover_dir
=
"
/home/labaro/Documents/These/datasets/images/alaska/jpeg/qf100
"
stego_dir
=
"
/home/labaro/Documents/These/datasets/images/alaska/jpeg/embedded
"
compute_stego
=
False
train_size
=
0.1
payload
=
0.0
stego_percentage
=
0.
0
stego_percentage
=
0.
1
variance_threshold
=
0.0
block_per_threshold
=
0.0
if
__name__
==
"
__main__
"
:
train_gen
,
test_cover_gen
,
test_stego_gen
=
get_train_test_generator
(
dir_path
,
train_size
,
stego_percentage
)
if
compute_stego
:
train_gen
,
test_cover_gen
,
stego_names_gen
=
get_train_test_generator
(
cover_dir
,
train_size
,
stego_percentage
)
stego_gen
=
embed_generator
(
stego_names_gen
,
stego_dir
,
payload
)
else
:
train_gen
,
test_cover_gen
,
_
=
get_train_test_generator
(
cover_dir
,
train_size
,
0
)
stego_gen
=
img_generator
(
stego_dir
)
t
rain
_features
=
feature_extractor
(
variance_filter
(
train
_gen
,
t
est_stego
_features
=
feature_extractor
(
variance_filter
(
stego
_gen
,
variance_threshold
,
block_per_threshold
))
t
est_cover
_features
=
feature_extractor
(
variance_filter
(
t
est_cover
_gen
,
t
rain
_features
=
feature_extractor
(
variance_filter
(
t
rain
_gen
,
variance_threshold
,
block_per_threshold
))
test_stego_features
=
feature_extractor
(
variance_filter
(
embed_images
(
test_stego_gen
,
payload
),
test_cover_features
=
feature_extractor
(
variance_filter
(
test_cover_gen
,
variance_threshold
,
block_per_threshold
))
This diff is collapsed.
Click to expand it.
utils.py
+
2
−
1
View file @
88b3f96f
...
...
@@ -2,6 +2,7 @@ import numpy as np
from
numpy.lib.stride_tricks
import
as_strided
from
scipy
import
fftpack
def
block_view
(
A
,
block
=
(
8
,
8
)):
"""
Provide a 2D block view to 2D array. No error checking made.
Therefore, meaningful (as implemented) only for blocks strictly
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment