theonlineoasis

Documentation coming soon ...

Code samples on this website come without any guarantees – use them at your own risk! I am grateful for comments or patches sent to my email address: first name.last name@cl.cam.ac.uk.

#!/usr/bin/python

import glob
import sys
import numpy
import math
import cPickle

# http://www.pybytes.com/pywavelets/ref/
import pywt

# http://www.pythonware.com/library/pil/handbook/index.htm
from PIL import Image, ImageOps

# Accelerator module for filtering.
from accel.filter_accel import wiener_filter

TILE_OVERLAP = 8
TILE_SIZE = 512
DENOISE_SIGMA = 5

def denoise_coefficient_list(coefficient_list, sigma):
  ll = coefficient_list[0]
  denoised_bands = [ll]
  for band, subband_coefficients in enumerate(coefficient_list[1 :]):
    denoised_bands.append([wiener_filter(s.astype(numpy.float), sigma) 
                           for s in subband_coefficients])
  return denoised_bands

def get_noise(greyscale_matrix):
  original_shape = greyscale_matrix.shape
  
  # The image will be transformed in TILE_SIZE * TILE_SIZE tiles with overlap
  # TILE_OVERLAP on each side.
  tiles_count = [(d - TILE_OVERLAP) / (TILE_SIZE - TILE_OVERLAP)
                 for d in original_shape]
  tiled_shape = [TILE_OVERLAP + c * (TILE_SIZE - TILE_OVERLAP)
                 for c in tiles_count]
  without_edges_shape = [c * (TILE_SIZE - TILE_OVERLAP) - TILE_OVERLAP 
                         for c in tiles_count]
  
  # The greyscale image is represented as a matrix of float values.
  greyscale_matrix = greyscale_matrix.astype(float)
  result_matrix = numpy.zeros(tiled_shape, dtype = numpy.float)
  
  # Work out how many levels of wavelet decomposition we will do.
  dyad_length = math.ceil(math.log(TILE_SIZE, 2))
  ll_levels = 5
  wavelet_levels = dyad_length - ll_levels
  ll_size = 2 ** ll_levels
  
  # Make a window for the tile edges.
  tile_window = numpy.zeros((TILE_SIZE, TILE_SIZE), dtype=numpy.float)
  tile_window[TILE_OVERLAP / 2 : 
                -(TILE_OVERLAP / 2), 
              TILE_OVERLAP / 2 : 
                -(TILE_OVERLAP / 2)] = 1.0
  
  # Transform and filter each non-overlapping TILE_SIZE * TILE_SIZE square of 
  # the image separately.
  for ty in range(0, tiles_count[1]):
    for tx in range(0, tiles_count[0]):
      print (tx, ty)
      transform_input = greyscale_matrix[
                          tx * (TILE_SIZE - TILE_OVERLAP) : 
                            tx * (TILE_SIZE - TILE_OVERLAP) + TILE_SIZE, 
                          ty * (TILE_SIZE - TILE_OVERLAP) : 
                            ty * (TILE_SIZE - TILE_OVERLAP) + TILE_SIZE]
      coefficient_list = pywt.wavedec2(transform_input, 
                                       'db8', 
                                       level = int(wavelet_levels),
                                       mode = 'per')
      coefficient_list = denoise_coefficient_list(coefficient_list,
                                                  DENOISE_SIGMA)
      denoised_tile = pywt.waverec2(coefficient_list, 
                                    'db8', 
                                    mode='per')
      denoised_tile[denoised_tile > 255.0] = 255.0
      denoised_tile[denoised_tile < 0.0] = 0.0
      result_matrix[tx * (TILE_SIZE - TILE_OVERLAP) : 
                      tx * (TILE_SIZE - TILE_OVERLAP) + TILE_SIZE, 
                    ty * (TILE_SIZE - TILE_OVERLAP) : 
                      ty * (TILE_SIZE - TILE_OVERLAP) + TILE_SIZE] += \
                      (denoised_tile * tile_window)
  
  # Remove the edges.
  result_matrix = result_matrix[TILE_OVERLAP : -TILE_OVERLAP, 
                                TILE_OVERLAP : -TILE_OVERLAP]
  
  Image.fromstring('L', 
                   (result_matrix.shape[1], result_matrix.shape[0]),
                   (result_matrix / 2.0)
                     .astype(numpy.uint8)
                     .tostring()).save('denoisedtest.png', 'PNG')
  # Subtract the denoised image from the original to get an estimate of the
  # noise.
  result_matrix = greyscale_matrix[
                     TILE_OVERLAP : tiled_shape[0] - TILE_OVERLAP,
                     TILE_OVERLAP : tiled_shape[1] - TILE_OVERLAP] \
                     - result_matrix
  
  return result_matrix

def get_noise_from_file(file_name):
  original = Image.open(file_name)
  greyscale = ImageOps.grayscale(original)
  greyscale_vector = numpy.fromstring(greyscale.tostring(), dtype=numpy.uint8)
  greyscale_matrix = numpy.reshape(greyscale_vector, 
                                   (original.size[1], original.size[0]))
  noise_matrix = get_noise(greyscale_matrix)
  return noise_matrix

# Command line utility for creating the characteristic.
if __name__ == '__main__':
  if len(sys.argv) != 2:
    print "Usage:\n\t%s path_with_png_files" % (sys.argv[0],)
    sys.exit(0)
  
  # Get a list of images to process.
  file_list = glob.glob(sys.argv[1] + '/*.png')
  print "Processing %d images" % (len(file_list),)
  
  # Denoise each image, and add the noise to the average_buffer.
  average_buffer = None
  for i, f in enumerate(file_list):
    print "Processing %03d %s" % (i, f,)
    noise_matrix = get_noise_from_file(f)
    if average_buffer == None:
      average_buffer = numpy.zeros_like(noise_matrix)
    average_buffer += noise_matrix
    
    # Dump the average buffer to a file.
    numpy.savetxt('noise_data.dat', average_buffer)
    Image.fromstring('L', 
                     (average_buffer.shape[1], average_buffer.shape[0]),
                     ((255.0 + (average_buffer / (i + 1))) / 2.0)
                       .astype(numpy.uint8)
                       .tostring()).save('noise%03d.png' % (i,), 'PNG')
#!/usr/bin/python
#
# test_characteristic.py

from PIL import Image, ImageOps

from make_characteristic import get_noise_from_file

import numpy
import cPickle
import glob
import sys

TILE_OVERLAP = 8

if len(sys.argv) != 3:
  print "Usage:\n\t%s noise_data.dat path_with_png_files" % (sys.argv[0],)
  sys.exit(0)

noise_file_name = sys.argv[1]
image_path_name = sys.argv[2]

# Load the camera noise.
camera_noise = numpy.loadtxt(noise_file_name, dtype=numpy.float)
camera_noise_average = numpy.average(camera_noise)
camera_noise -= camera_noise_average
camera_noise_norm = numpy.sqrt(numpy.sum(camera_noise * camera_noise))

file_list = glob.glob(image_path_name + '/*.png')
print "Processing %d images" % (len(file_list),)
for f in file_list:
  # Get this image's noise.
  image_noise = get_noise_from_file(f)
  image_noise_average = numpy.average(image_noise)
  image_noise -= image_noise_average
  image_noise_norm = numpy.sqrt(numpy.sum(image_noise * image_noise))

  # Calculate the correlation between the two signals.
  print "Dot product %s is: %s" % (f, 
                                   numpy.sum(camera_noise * image_noise) /
                                     (camera_noise_norm * image_noise_norm))
# accel/setup.py
#
# Compile the accelerator module with python setup.py build_ext

from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext
import numpy

ext_modules = [Extension("filter_accel", 
                         ["filter_accel.pyx"], 
                         extra_compile_args = ['-I%s' %
                           (numpy.get_include(),)])]

setup(name = 'Filtering accelerator',
      cmdclass = {'build_ext' : build_ext},
      ext_modules = ext_modules)
# accel/filter_accel.pyx

cimport numpy
import numpy
DTYPE = numpy.float
ctypedef numpy.float_t DTYPE_t
cdef inline int int_max(int a, int b): return a if a >= b else b
cdef inline int int_min(int a, int b): return a if a <= b else b
def wiener_filter(numpy.ndarray[DTYPE_t, ndim=2] 
                    subband_coefficients not None, 
                  int sigma):
  cdef int \
    y, x, w, l, k, xmin, xmax, ymin, ymax
  assert subband_coefficients.dtype == DTYPE
  cdef numpy.ndarray[DTYPE_t, ndim=2] \
    result_coefficients = numpy.zeros_like(subband_coefficients)
  (xmin, xmax, ymin, ymax) = (0,
                              0, 
                              subband_coefficients.shape[0] - 1,
                              subband_coefficients.shape[1] - 1)
  sigma_squared = float(sigma * sigma)
  for y in range(0, subband_coefficients.shape[1]):
    for x in range(0, subband_coefficients.shape[0]):
      variances = []
      for w in [3, 5, 7, 9]:
        accumulator = 0.0
        # The sub-band is padded by repetition.
        for l in range(-(w + 1) / 2, (w + 1) / 2 + 1):
          for k in range(-(w + 1) / 2, (w + 1) / 2 + 1):
            ki = int_min(int_max(x + k, xmin), xmax)
            li = int_min(int_max(y + l, ymin), ymax)
            accumulator += subband_coefficients[ki, li] * \
                             subband_coefficients[ki, li] \
                           - sigma_squared
        accumulator /= float(w * w)
        variances.append(max(0.0, accumulator))
      minimum_local_variance = min(variances)
      result_coefficients[x, y] = subband_coefficients[x, y] * \
                                  (minimum_local_variance /    \
                                    (minimum_local_variance + sigma_squared))
  return result_coefficients