Source code for abess.graph

import numpy as np
import numbers
import warnings

from sklearn.base import BaseEstimator
from sklearn.utils.validation import check_array
from scipy.sparse import coo_matrix

from .bess_base import bess_base
from .linear import LogisticRegression


# ---------------------------------------------------------------------------
# Helpers for make_ising_data
# ---------------------------------------------------------------------------

def _sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))


def _gen_4nn_cyclic(p, degree, beta, alpha, graph_type):
    """
    4-nearest-neighbor cyclic grid topology.
    p must be a perfect square.  Returns p×p theta matrix.
    graph_type 3 = ferromagnetic (+beta), 4 = spin-glass (±beta).
    """
    sq = int(round(p ** 0.5))
    if sq * sq != p:
        raise ValueError("p must be a perfect square for graph type 3/4.")
    theta = np.zeros((p, p))
    for node in range(p):
        row, col = divmod(node, sq)
        neighbors = [
            row * sq + (col + 1) % sq,           # right
            row * sq + (col - 1) % sq,           # left
            ((row + 1) % sq) * sq + col,         # down
            ((row - 1) % sq) * sq + col,         # up
        ]
        for nb in neighbors:
            if graph_type == 4:
                # spin-glass: alternate sign based on (row+col) parity
                sign = 1 if (row + col) % 2 == 0 else -1
            else:
                sign = 1
            if theta[node, nb] == 0:
                theta[node, nb] = sign * beta
                theta[nb, node] = sign * beta
    return theta


def _gen_ld_adjmat(p, degree, alpha):
    """Local-degree adjacency matrix (type 6)."""
    rng = np.random.default_rng(None)
    theta = np.zeros((p, p))
    for i in range(p):
        neighbors = list(range(max(0, i - degree), i)) + \
                    list(range(i + 1, min(p, i + degree + 1)))
        for nb in neighbors:
            if theta[i, nb] == 0:
                val = alpha * (rng.random() - 0.5) * 2  # uniform in [-alpha, alpha]
                theta[i, nb] = val
                theta[nb, i] = val
    return theta


def _gen_rr_adjmat(p, degree, beta, alpha, graph_type):
    """Random k-regular graph (types 1, 2, 7) using networkx."""
    try:
        import networkx as nx
    except ImportError:
        raise ImportError(
            "networkx is required for graph types 1, 2, 7. "
            "Install it with: pip install networkx"
        )
    G = nx.random_regular_graph(degree, p)
    theta = np.zeros((p, p))
    for u, v in G.edges():
        if graph_type == 2:
            # spin-glass
            sign = 1 if np.random.rand() > 0.5 else -1
            val = sign * beta
        elif graph_type == 7:
            # random weight in [-alpha, alpha]
            val = alpha * (np.random.rand() - 0.5) * 2
        else:
            # type 1: ferromagnetic
            val = beta
        theta[u, v] = val
        theta[v, u] = val
    return theta


def _sim_theta(p, graph_type, graph_seed, beta, degree, alpha):
    """Dispatch to the correct topology builder."""
    if graph_seed is not None:
        np.random.seed(graph_seed)
    if graph_type in (3, 4):
        return _gen_4nn_cyclic(p, degree, beta, alpha, graph_type)
    elif graph_type == 6:
        return _gen_ld_adjmat(p, degree, alpha)
    elif graph_type in (1, 2, 7):
        return _gen_rr_adjmat(p, degree, beta, alpha, graph_type)
    else:
        raise ValueError(f"Unsupported graph type: {graph_type}")


def _gibbs_sample(theta, n_sample, burn=100000, skip=50, seed=1):
    """
    Pure-numpy Gibbs sampler for Ising model.
    theta: p×p interaction matrix (including diagonal for external field).
    Returns: (n_sample × p) array of {-1, 1} samples.
    """
    rng = np.random.default_rng(seed)
    p = theta.shape[0]
    x = rng.choice([-1, 1], size=p).astype(float)

    # burn-in
    for _ in range(burn):
        j = rng.integers(p)
        log_odd = 2.0 * (theta[j, :] @ x - theta[j, j] * x[j])
        prob = _sigmoid(log_odd)
        x[j] = 1.0 if rng.random() < prob else -1.0

    samples = np.empty((n_sample, p), dtype=float)
    for i in range(n_sample):
        for _ in range(skip):
            j = rng.integers(p)
            log_odd = 2.0 * (theta[j, :] @ x - theta[j, j] * x[j])
            prob = _sigmoid(log_odd)
            x[j] = 1.0 if rng.random() < prob else -1.0
        samples[i] = x.copy()

    return samples


def _exact_sample(n, theta, seed):
    """
    Exact sampler: enumerate all 2^p configurations.
    Only feasible for p <= 20.
    Returns (unique_data, frequencies) where frequencies sum to n.
    """
    p = theta.shape[0]
    if p > 20:
        warnings.warn(
            f"p={p} > 20: exact sampling may be very slow. "
            "Consider using method='gibbs' instead."
        )
    rng = np.random.default_rng(seed)
    configs = np.array(
        [[(1 if (k >> j) & 1 else -1) for j in range(p)] for k in range(2 ** p)],
        dtype=float
    )  # shape (2^p, p)
    # theta_off = theta with diagonal zeroed
    theta_off = theta.copy()
    np.fill_diagonal(theta_off, 0.0)
    # log-weight: 0.5 * c @ theta_off @ c
    log_w = 0.5 * np.array([c @ theta_off @ c for c in configs])
    log_w -= log_w.max()
    w = np.exp(log_w)
    w /= w.sum()

    indices = rng.choice(len(configs), size=n, p=w, replace=True)
    data = configs[indices]
    unique_data, counts = np.unique(data, axis=0, return_counts=True)
    return data, np.ones(n)


def make_ising_data(n, p, type=3, seed=None, graph_seed=None, theta=None,
                    beta=0.7, degree=3, alpha=0.4, method="gibbs"):
    """
    Generate data from an Ising model.

    Parameters
    ----------
    n : int
        Number of samples to generate.
    p : int
        Number of nodes (variables).
    type : int, optional
        Graph topology type:
        1 = random regular (ferro), 2 = random regular (spin-glass),
        3 = 4-nearest-neighbor cyclic (ferro, requires p perfect square),
        4 = 4-nearest-neighbor cyclic (spin-glass),
        6 = local-degree, 7 = random regular (random weight).
        Default: 3.
    seed : int or None, optional
        Random seed for sampling. Default: None.
    graph_seed : int or None, optional
        Random seed for graph generation. Default: None.
    theta : ndarray of shape (p, p), optional
        Custom interaction matrix. If provided, ignores type/beta/degree/alpha.
    beta : float, optional
        Coupling strength. Default: 0.7.
    degree : int, optional
        Node degree for graph construction. Default: 3.
    alpha : float, optional
        Weight scaling for random-weight topologies. Default: 0.4.
    method : str, optional
        Sampling method: "gibbs" (default) or "freq" (exact enumeration,
        feasible only for small p).

    Returns
    -------
    dict with keys:
        "data" : ndarray of shape (n, p), values in {-1, 1}
        "weight" : ndarray of shape (n,), sample weights (all 1 for gibbs)
        "theta" : ndarray of shape (p, p), the interaction matrix
    """
    if theta is None:
        theta = _sim_theta(p, type, graph_seed, beta, degree, alpha)

    if method == "auto":
        method = "freq" if p <= 16 else "gibbs"

    if method == "freq":
        data, weight = _exact_sample(n, theta, seed)
    else:
        data = _gibbs_sample(theta, n, seed=seed if seed is not None else 1)
        weight = np.ones(n)

    return {"data": data, "weight": weight, "theta": theta}


# ---------------------------------------------------------------------------
# IsingModel: SLIDE algorithm (node-wise logistic regression)
# ---------------------------------------------------------------------------

[docs]class IsingModel(BaseEstimator): """ Sparse Ising model estimation via SLIDE (Sparse pairwise Logistic regression for Ising model via best-subset sElection). Fits one LogisticRegression per node (node-wise logistic regression), then symmetrizes the resulting coefficient matrix. Parameters ---------- max_support_size : int, array-like of shape (p,), or None Maximum support size for each node's logistic regression. If None, defaults to min(p-2, 100) for each node. tune_type : str, optional Model selection criterion: "gic", "aic", "bic", "ebic", or "cv". Default: "gic". ic_coef : float, optional Coefficient for information criterion. Default: 1.0. cv : int, optional Number of cross-validation folds (used when tune_type="cv"). Default: 5. graph_threshold : float, optional Entries with |theta| <= graph_threshold are zeroed out. Default: 0.0. thread : int, optional Number of threads for LogisticRegression. Default: 1. primary_model_fit_max_iter : int, optional Max iterations for inner logistic solver. Default: 100. primary_model_fit_epsilon : float, optional Convergence tolerance for inner logistic solver. Default: 1e-6. """
[docs] def __init__(self, max_support_size=None, tune_type="gic", ic_coef=1.0, cv=5, graph_threshold=0.0, thread=1, primary_model_fit_max_iter=100, primary_model_fit_epsilon=1e-6): self.max_support_size = max_support_size self.tune_type = tune_type self.ic_coef = ic_coef self.cv = cv self.graph_threshold = graph_threshold self.thread = thread self.primary_model_fit_max_iter = primary_model_fit_max_iter self.primary_model_fit_epsilon = primary_model_fit_epsilon
[docs] def fit(self, X, y=None, sample_weight=None): """ Fit the sparse Ising model via SLIDE. Parameters ---------- X : array-like of shape (n_samples, p_features) Binary data with values in {-1, 1}. y : ignored sample_weight : array-like of shape (n_samples,), optional Sample weights. Returns ------- self """ X = check_array(X, dtype=np.float64, ensure_2d=True) n, p = X.shape self.n_features_in_ = p # Resolve per-node max support size if self.max_support_size is None: ms_arr = np.full(p, min(p - 2, 100), dtype=int) elif isinstance(self.max_support_size, (numbers.Integral, numbers.Real)): ms_arr = np.full(p, int(self.max_support_size), dtype=int) else: ms_arr = np.array(self.max_support_size, dtype=int) if ms_arr.shape != (p,): raise ValueError( "max_support_size array must have length p." ) # Build tune kwargs if self.tune_type == "cv": tune_kwargs = dict(cv=self.cv, ic_type="ebic") else: tune_kwargs = dict(cv=1, ic_type=self.tune_type) theta = np.zeros((p, p)) for j in range(p): ms = int(ms_arr[j]) X_minus = np.delete(X, j, axis=1) # n × (p-1) y_node = X[:, j] # {-1, 1} model = LogisticRegression( support_size=list(range(0, ms + 1)), ic_coef=self.ic_coef, fit_intercept=False, thread=self.thread, primary_model_fit_max_iter=self.primary_model_fit_max_iter, primary_model_fit_epsilon=self.primary_model_fit_epsilon, **tune_kwargs ) # override normalize_type to 0 (no normalization) as in R's SLIDE model.normalize_type = 0 model.fit(X_minus, y_node, sample_weight=sample_weight) coef = model.coef_.flatten() # length p-1 # Insert 0 at position j theta[j, :j] = coef[:j] theta[j, j + 1:] = coef[j:] # Symmetrize and scale theta = (theta + theta.T) / 2.0 theta /= 2.0 # Threshold theta[np.abs(theta) <= self.graph_threshold] = 0.0 self.theta_ = theta return self
[docs] def score(self, X, y=None, sample_weight=None): """ Compute the pseudo-log-likelihood of X under the fitted Ising model. Parameters ---------- X : array-like of shape (n_samples, p_features) y : ignored sample_weight : ignored Returns ------- float Mean pseudo-log-likelihood (higher is better). """ X = check_array(X, dtype=np.float64, ensure_2d=True) n, p = X.shape theta = self.theta_ pll = 0.0 for j in range(p): # theta[j, -j] @ x_i,-j for each i mask = np.ones(p, dtype=bool) mask[j] = False eta = X[:, mask] @ theta[j, mask] # shape (n,) log_prob = np.log(_sigmoid(2.0 * X[:, j] * eta)) pll += log_prob.sum() return float(pll / n)
# --------------------------------------------------------------------------- # abessGraph (kept for backward compatibility) # --------------------------------------------------------------------------- def fix_docs(cls): # inherit the document from base class index = cls.__doc__.find("Examples\n --------\n") if (index != -1): cls.__doc__ = cls.__doc__[:index] + \ cls.__bases__[0].__doc__ + cls.__doc__[index:] return cls @fix_docs class abessGraph(bess_base): """ Adaptive Best-Subset Selection(ABESS) algorithm for gaussian graph model. Parameters ---------- splicing_type: {0, 1}, optional The type of splicing in `fit()` (in Algorithm.h). "0" for decreasing by half, "1" for decresing by one. Default: splicing_type = 0. Examples -------- >>> ### Sparsity known >>> >>> from abess.graph import abessGraph >>> import numpy as np >>> np.random.seed(12345) >>> model = abessGraph(support_size = 10) >>> >>> ### X known >>> >>> model.fit(X) >>> print(model.coef_) """ def __init__(self, max_iter=200, exchange_num=5, path_type="seq", is_warm_start=True, support_size=None, s_min=None, s_max=None, ic_type="aic", ic_coef=1.0, primary_model_fit_max_iter=20, primary_model_fit_epsilon=1e-3, always_select=[], alpha=None, cv=1, thread=1, sparse_matrix=False, splicing_type=1 ): super(abessGraph, self).__init__( algorithm_type="abess", model_type="Graph", data_type=1, path_type=path_type, max_iter=max_iter, exchange_num=exchange_num, is_warm_start=is_warm_start, support_size=support_size, s_min=s_min, s_max=s_max, ic_type=ic_type, ic_coef=ic_coef, primary_model_fit_max_iter=primary_model_fit_max_iter, primary_model_fit_epsilon=primary_model_fit_epsilon, always_select=always_select, alpha=alpha, cv=cv, thread=thread, sparse_matrix=sparse_matrix, splicing_type=splicing_type ) def fit(self, X=None, cv_fold_id=None): """ The fit function is used to transfer the information of data and return the fit result. Parameters ---------- X : array-like of shape (n_samples, p_features) Training data. cv_fold_id: array_like of shape (n_samples,) , optional An array indicates different folds in CV. Samples in the same fold should be given the same number. Default: cv_fold_id=None """ try: from abess.pybind_cabess import pywrap_GLM as pywrap_abess except ImportError: raise ImportError( "abessGraph requires the compiled C++ extension (pybind_cabess). " "Please build the package with: pip install -e python/" ) # Input check if isinstance(X, (list, np.ndarray, np.matrix, coo_matrix)): if isinstance(X, coo_matrix): self.sparse_matrix = True X = check_array(X, accept_sparse=True) n = X.shape[0] p = X.shape[1] y = np.zeros((n, 1)) self.n_features_in_ = p else: raise ValueError("X should be given in Ising model.") # Algorithm_type if self.algorithm_type == "abess": algorithm_type_int = 6 else: raise ValueError("algorithm_type should not be " + str(self.algorithm_type)) model_type_int = 9 # Path_type: seq, pgs if self.path_type == "seq": path_type_int = 1 elif self.path_type == "pgs": path_type_int = 2 else: raise ValueError("path_type should be \'seq\' or \'pgs\'") # Ic_type if self.ic_type == "aic": ic_type_int = 1 elif self.ic_type == "bic": ic_type_int = 2 elif self.ic_type == "gic": ic_type_int = 3 elif self.ic_type == "ebic": ic_type_int = 4 else: raise ValueError( "ic_type should be \"aic\", \"bic\", \"ebic\" or \"gic\"") # cv if (not isinstance(self.cv, int) or self.cv <= 0): raise ValueError("cv should be an positive integer.") elif (self.cv > 1): self.is_cv = True # cv_fold_id if cv_fold_id is None: cv_fold_id = np.array([], dtype="int32") else: cv_fold_id = np.array(cv_fold_id, dtype="int32") if cv_fold_id.ndim > 1: raise ValueError("group should be an 1D array of integers.") elif cv_fold_id.size != n: raise ValueError( "The length of group should be equal to X.shape[0].") elif len(set(cv_fold_id)) != self.cv: raise ValueError( "The number of different masks should be equal to `cv`.") # path parameter if self.support_size is None: support_sizes = list(range(0, int(p * (p - 1) / 2))) else: if isinstance(self.support_size, (numbers.Real, numbers.Integral)): support_sizes = np.empty(1, dtype=int) support_sizes[0] = self.support_size elif (np.any(np.array(self.support_size) > p * (p - 1) / 2) or np.any(np.array(self.support_size) < 0)): raise ValueError( "All support_size should be between 0 and X.shape[1]") else: support_sizes = self.support_size support_sizes = np.array(support_sizes).astype('int32') # alpha if self.alpha is None: alphas = [0] else: if isinstance(self.alpha, (numbers.Real, numbers.Integral)): alphas = np.empty(1, dtype=float) alphas[0] = self.alpha else: alphas = self.alpha # Exchange_num if (not isinstance(self.exchange_num, int) or self.exchange_num <= 0): raise ValueError("exchange_num should be an positive integer.") # Thread if (not isinstance(self.thread, int) or self.thread < 0): raise ValueError( "thread should be positive number or 0 (maximum supported by your device).") # Splicing type if (self.splicing_type != 0 and self.splicing_type != 1): raise ValueError("splicing type should be 0 or 1.") # Important_search if (not isinstance(self.important_search, int) or self.important_search < 0): raise ValueError( "important_search should be a non-negative number.") # Sparse X if self.sparse_matrix: if type(X) != type(coo_matrix((1, 1))): nonzero = 0 tmp = np.zeros([X.shape[0] * X.shape[1], 3]) for j in range(X.shape[1]): for i in range(X.shape[0]): if X[i, j] != 0.: tmp[nonzero, :] = np.array([X[i, j], i, j]) nonzero += 1 X = tmp[:nonzero, :] else: tmp = np.zeros([len(X.data), 3]) tmp[:, 1] = X.row tmp[:, 2] = X.col tmp[:, 0] = X.data ind = np.lexsort((tmp[:, 2], tmp[:, 1])) X = tmp[ind, :] # always select (diag) temp = -1 for i in range(p): temp += (i + 1) if temp not in self.always_select: self.always_select += [temp] support_sizes += 1 # unused new_s_min = 0 new_s_max = 0 new_K_max = 0 new_lambda_min = 0 new_lambda_max = 0 new_screening_size = -1 g_index = list(range(p)) state = [0] Sigma = np.array([[-1.0]]) is_normal = False weight = np.ones(n) result = pywrap_abess(X, y, n, p, self.data_type, weight, Sigma, is_normal, algorithm_type_int, model_type_int, self.max_iter, self.exchange_num, path_type_int, self.is_warm_start, ic_type_int, self.ic_coef, self.is_cv, self.cv, g_index, state, support_sizes, alphas, cv_fold_id, new_s_min, new_s_max, new_K_max, self.epsilon, new_lambda_min, new_lambda_max, self.n_lambda, self.is_screening, new_screening_size, self.powell_path, self.always_select, self.tau, self.primary_model_fit_max_iter, self.primary_model_fit_epsilon, self.early_stop, self.approximate_Newton, self.thread, self.covariance_update, self.sparse_matrix, self.splicing_type, self.important_search, int(p * (p + 1) / 2), 1 * 1, 1, 1, 1, 1, 1, int(p * (p + 1) / 2) ) self.coef_ = result[0].reshape(int(p * (p + 1) / 2), 1) self.theta_ = np.zeros((p, p)) i = 0 j = 0 for k in range(0, int(p * (p + 1) / 2)): self.theta_[i, j] = self.coef_[k, 0] self.theta_[j, i] = self.coef_[k, 0] i += 1 if (i > j): i = 0 j += 1 return self