Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/vstarstack/library/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#

import math
import typing

import skimage.color
from skimage import exposure
Expand Down Expand Up @@ -126,7 +127,7 @@ class IImageSource(abc.ABC):
"""Abstract image source"""

@abc.abstractmethod
def items(self) -> vstarstack.library.data.DataFrame:
def items(self) -> typing.Generator[vstarstack.library.data.DataFrame, None, None]:
"""Take elements from source"""

@abc.abstractmethod
Expand All @@ -139,7 +140,7 @@ def __init__(self, images : list[vstarstack.library.data.DataFrame]):
self.images = images
self.index = 0

def items(self) -> vstarstack.library.data.DataFrame:
def items(self) -> typing.Generator[vstarstack.library.data.DataFrame, None, None]:
"""Take next element from source"""
for item in self.images:
yield item
Expand All @@ -153,7 +154,7 @@ class FilesImageSource(IImageSource):
def __init__(self, filenames : list[str]):
self.filenames = filenames

def items(self):
def items(self) -> typing.Generator[vstarstack.library.data.DataFrame, None, None]:
"""Take next element from source"""
for fname in self.filenames:
yield vstarstack.library.data.DataFrame.load(fname)
Expand Down
6 changes: 5 additions & 1 deletion src/vstarstack/library/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,21 @@ def add_channel(self, data : np.ndarray, name : str, **options):
options["brightness"] = False
if "signal" not in options:
options["signal"] = False
if "normed" not in options:
options["normed"] = False

self.channels[name] = {
"data": data,
"options": options,
}

def replace_channel(self, data : np.ndarray, name : str):
def replace_channel(self, data : np.ndarray, name : str, **options):
"""Replace channel image"""
if name not in self.channels:
return False
self.channels[name]["data"] = data
for key in options:
self.channels[name]["options"][key] = options[key]
return True

def add_channel_link(self, name : str, linked : str, link_type : str):
Expand Down
37 changes: 29 additions & 8 deletions src/vstarstack/library/image_process/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,38 @@ def normalize(dataframe : vstarstack.library.data.DataFrame, deepcopy=True):
else:
new_dataframe = dataframe
for channel in new_dataframe.get_channels():
image, opts = new_dataframe.get_channel(channel)
if "normalized" in opts and opts["normalized"]:
image, _ = new_dataframe.get_channel(channel)
if new_dataframe.get_channel_option(channel, "normed"):
continue
if not opts["brightness"]:
if not new_dataframe.get_channel_option(channel, "brightness"):
continue
if channel not in new_dataframe.links["weight"]:
weight, _, _ = new_dataframe.get_linked_channel(channel, "weight")
if weight is None:
continue
weight, _ = new_dataframe.get_channel(new_dataframe.links["weight"][channel])

image = image / weight
image[np.where(weight == 0)] = 0
opts["normalized"] = True
new_dataframe.replace_channel(image, channel)
image[np.where(weight < 1e-12)] = 0
new_dataframe.replace_channel(image, channel, normed=True)

return new_dataframe

def denormalize(dataframe : vstarstack.library.data.DataFrame, deepcopy=True):
"""De-normalize image layers"""
if deepcopy:
new_dataframe = dataframe.copy()
else:
new_dataframe = dataframe
for channel in new_dataframe.get_channels():
image, _ = new_dataframe.get_channel(channel)
if not new_dataframe.get_channel_option(channel, "normed"):
continue
if not new_dataframe.get_channel_option(channel, "brightness"):
continue
weight, _, _ = new_dataframe.get_linked_channel(channel, "weight")
if weight is None:
continue

image = image * weight
new_dataframe.replace_channel(image, channel, normed=False)

return new_dataframe
140 changes: 81 additions & 59 deletions src/vstarstack/library/merge/kappa_sigma.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2023 Vladislav Tsendrovskii
# Copyright (c) 2023-2024 Vladislav Tsendrovskii
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -14,91 +14,81 @@

import numpy as np

import vstarstack.library.data
from vstarstack.library.image_process.normalize import normalize, denormalize
from vstarstack.library.data import DataFrame
from vstarstack.library.common import IImageSource


def hard_clip(delta, sigma, kappa):
def _hard_clip(delta : np.ndarray, sigma : np.ndarray, kappa : float) -> np.ndarray:
"""Hard clip where |delta| > sigma * kappa"""
return (abs(delta) - sigma * kappa <= 0).astype("float")
return (abs(delta) - sigma * kappa <= 0).astype(np.int32)

def calculate_clip(image, mean, sigma, kappa):
def _calculate_clip(image : np.ndarray, mean : np.ndarray, sigma : np.ndarray, kappa : float) -> np.ndarray:
"""Calculate clip array"""
delta = image - mean
return hard_clip(delta, sigma, kappa)
return _hard_clip(delta, sigma, kappa)

def _read_and_prepare(dataframe, channel):
def _read_and_prepare(dataframe : DataFrame, channel : str):
"""Remove invalid points from image"""
image, opts = dataframe.get_channel(channel)
if not opts["signal"]:
return None, None, None

if channel in dataframe.links["weight"]:
weight_channel = dataframe.links["weight"][channel]
weight, _ = dataframe.get_channel(weight_channel)
else:
weight = np.ones(image.shape, dtype=np.float64)
weight, _, _ = dataframe.get_linked_channel(channel, "weight")
if weight is None:
if (weight_k := dataframe.get_parameter("weight")) is None:
weight_k = 1
weight = np.ones(image.shape, dtype=np.float64) * weight_k

image[np.where(weight == 0)] = 0
image[np.where(weight < 1e-12)] = 0
return image, weight, opts


def _calculate_mean(images, means: dict, sigmas: dict, kappa: float):
def _calculate_mean(images : IImageSource, means: dict, sigmas: dict, kappa: float):
"""Calculate mean value of images"""
mean_image = {}
total_weight = {}
channel_opts = {}
signal_sum = {}
weight_sum = {}

for img in images.items():
img = normalize(img, deepcopy=False)
for channel in img.get_channels():
image, weight, opts = _read_and_prepare(img, channel)
if image is None:
if not img.get_channel_option(channel, "signal"):
continue
if channel not in channel_opts:
channel_opts[channel] = opts

if channel in means:
clip = calculate_clip(image, means[channel], sigmas[channel], kappa)
else:
clip = np.ones(image.shape)
signal, weight, _ = _read_and_prepare(img, channel)

if channel not in mean_image:
mean_image[channel] = image * clip
total_weight[channel] = weight * clip
if channel in means and channel in sigmas:
clip = _calculate_clip(signal, means[channel], sigmas[channel], kappa)
else:
mean_image[channel] += image * clip
total_weight[channel] += weight * clip
clip = np.ones(signal.shape, dtype=np.int32)

for channel in mean_image:
mean_image[channel] = mean_image[channel] / total_weight[channel]
mean_image[channel][np.where(total_weight[channel] == 0)] = 0
if channel not in signal_sum:
signal_sum[channel] = signal * weight * clip
weight_sum[channel] = weight * clip
else:
signal_sum[channel] += signal * weight * clip
weight_sum[channel] += weight * clip

return mean_image, total_weight, channel_opts
new_means = {}
for channel in signal_sum:
new_means[channel] = signal_sum[channel] / weight_sum[channel]
new_means[channel][np.where(weight_sum[channel] < 1e-12)] = 0

return new_means

def _calculate_sigma(images, means, sigmas, kappa):
def _calculate_sigma(images : IImageSource, means : dict, sigmas : dict, kappa : float):
"""Calculate sigma in each pixel"""
sigma = {}
clips = {}

for img in images.items():
img = normalize(img, deepcopy=False)
for channel in img.get_channels():
image, _, _ = _read_and_prepare(img, channel)
if image is None:
if not img.get_channel_option(channel, "signal"):
continue

if channel in means:
if channel in sigmas:
clip = calculate_clip(image,
means[channel],
sigmas[channel],
kappa)
else:
clip = np.ones(image.shape)
signal, _, _ = _read_and_prepare(img, channel)
if channel in means and channel in sigmas:
clip = _calculate_clip(signal, means[channel], sigmas[channel], kappa)
else:
clip = np.ones(image.shape)
clip = np.ones(signal.shape, dtype=np.int32)

delta2 = (image - means[channel])**2
delta2 = ((signal - means[channel])**2) * clip
if channel not in sigma:
sigma[channel] = delta2
clips[channel] = clip
Expand All @@ -112,7 +102,42 @@ def _calculate_sigma(images, means, sigmas, kappa):

return sigma

def kappa_sigma(images: vstarstack.library.common.IImageSource,
def _calculate_sum(images : IImageSource, means: dict, sigmas: dict, kappa: float):
"""Calculate mean value of images"""
signal_sum = {}
weight_sum = {}
channel_opts = {}

for img in images.items():
img = normalize(img, deepcopy=False)
for channel in img.get_channels():
if not img.get_channel_option(channel, "signal"):
continue

signal, weight, opts = _read_and_prepare(img, channel)
if channel not in channel_opts:
channel_opts[channel] = opts

if channel in means and channel in sigmas:
clip = _calculate_clip(signal, means[channel], sigmas[channel], kappa)
else:
clip = np.ones(signal.shape, dtype=np.int32)

if channel not in signal_sum:
signal_sum[channel] = signal * weight * clip
weight_sum[channel] = weight * clip
else:
signal_sum[channel] += signal * weight * clip
weight_sum[channel] += weight * clip

for channel in signal_sum:
signal_sum[channel][np.where(weight_sum[channel] < 1e-12)] = 0
weight_sum[channel][np.where(weight_sum[channel] < 1e-12)] = 0

channel_opts["normed"] = False
return signal_sum, weight_sum, channel_opts

def kappa_sigma(images: IImageSource,
kappa1: float,
kappa2: float,
steps: int) -> DataFrame:
Expand All @@ -130,16 +155,13 @@ def kappa_sigma(images: vstarstack.library.common.IImageSource,
kappa = (kappa1 * (steps-1-step) + kappa2 * step) / (steps-1)
else:
kappa = (kappa1 + kappa2) / 2
means, _, _ = _calculate_mean(images, means, sigmas, kappa)
means = _calculate_mean(images, means, sigmas, kappa)
sigmas = _calculate_sigma(images, means, sigmas, kappa)

lights, weights, channel_opts = _calculate_mean(images,
means,
sigmas,
kappa2)
signals, weights, channel_opts = _calculate_sum(images, means, sigmas, kappa2)

result = DataFrame(params=params)
for channel_name, light in lights.items():
for channel_name, light in signals.items():
weight = weights[channel_name]
result.add_channel(light, channel_name, **channel_opts[channel_name])
result.add_channel(weight, "weight-"+channel_name, weight=True)
Expand Down
22 changes: 12 additions & 10 deletions src/vstarstack/library/merge/simple_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import numpy as np
import vstarstack.library.data
import vstarstack.library.common
import vstarstack.library.image_process.normalize
from vstarstack.library.data import DataFrame
from copy import deepcopy

Expand All @@ -30,24 +32,24 @@ def simple_add(images : vstarstack.library.common.IImageSource) -> DataFrame:
channel_opts = {}
for img in images.items():
params = img.params
img = vstarstack.library.image_process.normalize.denormalize(img)
for channel_name in img.get_channels():
channel, opts = img.get_channel(channel_name)
if not opts["brightness"]:
if not img.get_channel_option(channel_name, "brightness"):
continue
if channel_name not in channel_opts:
channel_opts[channel_name] = opts

if channel_name in img.links["weight"]:
weight_channel = img.links["weight"][channel_name]
weight, _ = img.get_channel(weight_channel)
else:
weight = np.ones(channel.shape, dtype=np.float64)
weight, _, _ = img.get_linked_channel(channel_name, "weight")
if weight is None:
if (weight_k := img.get_parameter("weight")) is None:
weight_k = 1
weight = np.ones(channel.shape, dtype=np.float64) * weight_k

if channel_name not in summary:
summary[channel_name] = deepcopy(channel.astype(np.float64))
summary_weight[channel_name] = deepcopy(weight)
else:

try:
summary[channel_name] += channel
summary_weight[channel_name] += weight
Expand All @@ -57,9 +59,9 @@ def simple_add(images : vstarstack.library.common.IImageSource) -> DataFrame:
result = vstarstack.library.data.DataFrame(params=params)
for channel_name, channel in summary.items():
print(channel_name)
weight_channel_name = "weight-"+channel_name
result.add_channel(channel, channel_name, **channel_opts[channel_name])
result.add_channel(summary_weight[channel_name],
"weight-"+channel_name, weight=True)
result.add_channel_link(channel_name, "weight-"+channel_name, "weight")
result.add_channel(summary_weight[channel_name], weight_channel_name, weight=True)
result.add_channel_link(channel_name, weight_channel_name, "weight")

return result
2 changes: 1 addition & 1 deletion src/vstarstack/library/merge/simple_mean.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#

import numpy as np
import vstarstack.library.data
import vstarstack.library.common
import vstarstack.library.image_process.normalize
import vstarstack.library.merge.simple_add
from vstarstack.library.data import DataFrame
Expand Down
Loading