Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Feature as Predictor #6852

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions Orange/classification/tests/test_column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import unittest
from unittest.mock import patch

import numpy as np

from Orange.classification import ColumnLearner, ColumnClassifier
from Orange.data import DiscreteVariable, ContinuousVariable, Domain, Table


class ColumnTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.domain = Domain([DiscreteVariable("d1", values=["a", "b"]),
DiscreteVariable("d2", values=["c", "d"]),
DiscreteVariable("d3", values=["d", "c"]),
ContinuousVariable("c1"),
ContinuousVariable("c2")
],
DiscreteVariable("cls", values=["c", "d"]),
[DiscreteVariable("m1", values=["a", "b"]),
DiscreteVariable("m2", values=["d"]),
ContinuousVariable("c3")]
)
cls.data = Table.from_numpy(
cls.domain,
np.array([[0, 0, 0, 1, 0.5],
[0, 1, 1, 0.25, -3],
[1, 0, np.nan, np.nan, np.nan]]),
np.array([0, 1, 1]),
np.array([[0, 0, 2],
[1, 0, 8],
[np.nan, np.nan, 5]])
)

@patch("Orange.classification.column.ColumnModel")
def test_fit_storage(self, clsfr):
learner = ColumnLearner(self.domain.class_var, self.domain["d2"])
self.assertEqual(learner.name, "column 'd2'")
learner.fit_storage(self.data)
clsfr.assert_called_with(self.domain.class_var, self.domain["d2"], None, None)

learner = ColumnLearner(self.domain.class_var, self.domain["c3"])
learner.fit_storage(self.data)
clsfr.assert_called_with(self.domain.class_var, self.domain["c3"], None, None)

learner = ColumnLearner(self.domain.class_var, self.domain["c3"], 42, 3.5)
self.assertEqual(learner.name, "column 'c3'")
learner.fit_storage(self.data)
clsfr.assert_called_with(self.domain.class_var, self.domain["c3"], 42, 3.5)

def test_classifier_init_checks(self):
cls = ColumnClassifier(self.domain.class_var, self.domain["d2"])
cls.name = "column 'd2'"

cls = ColumnClassifier(self.domain.class_var, self.domain["d3"])
cls.name = "column 'd3'"

cls = ColumnClassifier(self.domain.class_var, self.domain["c3"])
cls.name = "column 'c3'"

self.assertRaises(
ValueError,
ColumnClassifier,
self.domain.class_var, self.domain["d1"])

self.assertRaises(
ValueError,
ColumnClassifier,
DiscreteVariable("x", values=("a", "b", "c")), self.domain["c3"])

def test_check_prob_range(self):
self.assertTrue(
ColumnClassifier.valid_prob_range(np.array([0, 0.5, 1]))
)
self.assertTrue(
ColumnClassifier.valid_prob_range(np.array([0, 0.5, np.nan]))
)
self.assertFalse(
ColumnClassifier.valid_prob_range(np.array([0, 0.5, 1.5]))
)
self.assertFalse(
ColumnClassifier.valid_prob_range(np.array([0, 0.5, -1]))
)

def test_check_value_sets(self):
d1, d2, d3, *_ = self.domain.attributes
c = self.domain.class_var
m2: DiscreteVariable = self.domain["m2"]
self.assertFalse(ColumnClassifier.valid_value_sets(c, d1))
self.assertTrue(ColumnClassifier.valid_value_sets(c, d2))
self.assertTrue(ColumnClassifier.valid_value_sets(c, d3))
self.assertTrue(ColumnClassifier.valid_value_sets(c, m2))
self.assertFalse(ColumnClassifier.valid_value_sets(m2, c))

def test_predict_discrete(self):
# Just copy
model = ColumnClassifier(self.domain.class_var, self.domain["d2"])
self.assertEqual(model.name, "column 'd2'")
classes, probs = model(self.data, model.ValueProbs)
np.testing.assert_equal(classes, [0, 1, 0])
np.testing.assert_equal(probs, [[1, 0], [0, 1], [1, 0]])

# Values are not in the same order -> map
model = ColumnClassifier(self.domain.class_var, self.domain["d3"])
classes, probs = model(self.data, model.ValueProbs)
np.testing.assert_equal(classes, [1, 0, np.nan])
np.testing.assert_equal(probs, [[0, 1], [1, 0], [0.5, 0.5]])

# Not in the same order, and one is missing -> map
model = ColumnClassifier(self.domain.class_var, self.domain["m2"])
classes, probs = model(self.data, model.ValueProbs)
np.testing.assert_equal(classes, [1, 1, np.nan])
np.testing.assert_equal(probs, [[0, 1], [0, 1], [0.5, 0.5]])

# Non-binary class
domain = Domain(
self.domain.attributes,
DiscreteVariable("cls", values=["a", "c", "b", "d", "e"]))
data = Table.from_numpy(domain, self.data.X, self.data.Y)
model = ColumnClassifier(domain.class_var, domain["d3"])
classes, probs = model(data, model.ValueProbs)
np.testing.assert_equal(classes, [3, 1, np.nan])
np.testing.assert_almost_equal(
probs,
np.array([[0, 0, 0, 1, 0],
[0, 1, 0, 0, 0],
[0.2, 0.2, 0.2, 0.2, 0.2]]))

def test_predict_as_direct_probs(self):
model = ColumnClassifier(self.domain.class_var, self.domain["c1"])
self.assertEqual(model.name, "column 'c1'")
classes, probs = model(self.data, model.ValueProbs)
np.testing.assert_equal(classes, [1, 0, np.nan])
np.testing.assert_equal(probs, [[0, 1], [0.75, 0.25], [0.5, 0.5]])

model = ColumnClassifier(self.domain.class_var, self.domain["c2"])
self.assertRaises(ValueError, model, self.data)

model = ColumnClassifier(self.domain.class_var, self.domain["c3"])
self.assertRaises(ValueError, model, self.data)

def test_predict_with_logistic(self):
model = ColumnClassifier(
self.domain.class_var, self.domain["c1"], 0.5, 3)
classes, probs = model(self.data, model.ValueProbs)
np.testing.assert_equal(classes, [1, 0, np.nan])
np.testing.assert_almost_equal(
probs[:, 1], [1 / (1 + np.exp(-3 * (1 - 0.5))),
1 / (1 + np.exp(-3 * (0.25 - 0.5))),
0.5])
np.testing.assert_equal(probs[:, 0], 1 - probs[:, 1])

model = ColumnClassifier(
self.domain.class_var, self.domain["c2"], 0.5, 3)
classes, probs = model(self.data, model.ValueProbs)
np.testing.assert_equal(classes, [0, 0, np.nan])
np.testing.assert_almost_equal(
probs[:, 1], [1 / (1 + np.exp(-3 * (0.5 - 0.5))),
1 / (1 + np.exp(-3 * (-3 - 0.5))),
0.5])
np.testing.assert_equal(probs[:, 0], 1 - probs[:, 1])


if __name__ == "__main__":
unittest.main()
1 change: 1 addition & 0 deletions Orange/modelling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .randomforest import *
from .svm import *
from .tree import *
from .column import *
try:
from .catgb import *
except ImportError:
Expand Down
155 changes: 155 additions & 0 deletions Orange/modelling/column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from typing import Optional

import numpy as np

from Orange.data import Variable, DiscreteVariable, Domain, Table
from Orange.classification import LogisticRegressionLearner
from Orange.regression import LinearRegressionLearner
from Orange.modelling import Model, Learner

__all__ = ["ColumnLearner", "ColumnModel"]


def _check_column_combinations(
class_var: Variable,
column: Variable,
fit_regression: bool):
if class_var.is_continuous:
if not column.is_continuous:
raise ValueError(
"Regression can only be used with numeric variables")
return

assert isinstance(class_var, DiscreteVariable) # remove type warnings
if column.is_continuous:
if len(class_var.values) != 2:
raise ValueError(
"Numeric columns can only be used with binary class variables")
else:
assert isinstance(column, DiscreteVariable)
if not valid_value_sets(class_var, column):
raise ValueError(
"Column contains values that are not in class variable")
if fit_regression and not column.is_continuous:
raise ValueError(
"Intercept and coefficient are only allowed for continuous "
"variables")


def valid_prob_range(values: np.ndarray):
return np.nanmin(values) >= 0 and np.nanmax(values) <= 1


def valid_value_sets(class_var: DiscreteVariable,
column_var: DiscreteVariable):
return set(column_var.values) <= set(class_var.values)


class ColumnLearner(Learner):
def __init__(self,
class_var: Variable,
column: Variable,
fit_regression: bool = False):
super().__init__()
_check_column_combinations(class_var, column, fit_regression)
self.class_var = class_var
self.column = column
self.fit_regression = fit_regression
self.name = f"column '{column.name}'"

def __fit_coefficients(self, data: Table):
# Use learners from Orange rather than directly calling
# scikit-learn, so that we make sure we use the same parameters
# and get the same result as we would if we used the widgets.
data1 = data.transform(Domain([self.column], self.class_var))
if self.class_var.is_discrete:
model = LogisticRegressionLearner()(data1)
return model.intercept[0], model.coefficients[0][0]
else:
model = LinearRegressionLearner()(data1)
return model.intercept, model.coefficients[0]

def fit_storage(self, data: Table):
if data.domain.class_var != self.class_var:
raise ValueError("Class variable does not match the data")
if not self.fit_regression:
return ColumnModel(self.class_var, self.column)

intercept, coefficient = self.__fit_coefficients(data)
return ColumnModel(self.class_var, self.column, intercept, coefficient)


class ColumnModel(Model):
def __init__(self,
class_var: Variable,
column: Variable,
intercept: Optional[float] = None,
coefficient: Optional[float] = None):
super().__init__(Domain([column], class_var))

_check_column_combinations(class_var, column, intercept is not None)
if (intercept is not None) is not (coefficient is not None):
raise ValueError(
"Intercept and coefficient must both be provided or absent")

self.class_var = class_var
self.column = column
self.intercept = intercept
self.coefficient = coefficient
if (column.is_discrete and
class_var.values[:len(column.values)] != column.values):
self.value_mapping = np.array([class_var.to_val(x)
for x in column.values])
else:
self.value_mapping = None

pars = f" ({intercept}, {coefficient})" if intercept is not None else ""
self.name = f"column '{column.name}'{pars}"

def predict_storage(self, data: Table):
vals = data.get_column(self.column)
if self.class_var.is_discrete:
return self._predict_discrete(vals)
else:
return self._predict_continuous(vals)

def _predict_discrete(self, vals):
assert isinstance(self.class_var, DiscreteVariable)
nclasses = len(self.class_var.values)
proba = np.full((len(vals), nclasses), np.nan)
rows = np.isfinite(vals)
if self.column.is_discrete:
mapped = vals[rows].astype(int)
if self.value_mapping is not None:
mapped = self.value_mapping[mapped]
vals = vals.copy()
vals[rows] = mapped
proba[rows] = 0
proba[rows, mapped] = 1
else:
if self.coefficient is None:
if not valid_prob_range(vals):
raise ValueError("Column values must be in [0, 1] range "
"unless logistic function is applied")
proba[rows, 1] = vals[rows]
else:
proba[rows, 1] = (
1 /
(1 + np.exp(-self.intercept - self.coefficient * vals[rows])
))

proba[rows, 0] = 1 - proba[rows, 1]
vals = (proba[:, 1] > 0.5).astype(float)
vals[~rows] = np.nan
return vals, proba

def _predict_continuous(self, vals):
if self.coefficient is None:
return vals
else:
return vals * self.coefficient + self.intercept

def __str__(self):
pars = f" ({self.intercept}, {self.coefficient})" \
if self.intercept is not None else ""
return f'ColumnModel {self.column.name}{pars}'
Loading
Loading