使用scikit - learn K - Means聚类指定自定义距离函数的探讨

使用scikit - learn K - Means聚类指定自定义距离函数的探讨

技术背景

在聚类分析中,K - Means是一种广泛使用的算法。通常,K - Means算法默认使用欧几里得距离来计算数据点与聚类中心的距离。然而,在某些场景下,欧几里得距离可能不是最佳选择,用户可能希望使用自定义的距离函数,如曼哈顿距离、余弦距离等。但scikit - learn当前的K - Means实现存在一定限制,需要探讨能否以及如何指定自定义距离函数。

实现步骤

1. 检查scikit - learn原生支持

scikit - learn当前版本的K - Means实现默认仅使用欧几里得距离,不直接支持指定自定义距离函数。

2. 其他替代方案

方案一:自定义K - Means实现

可以自己实现一个K - Means算法,支持使用scipy.spatial.distance中的20多种距离度量,或用户自定义的函数。以下是一个示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from __future__ import division
import random
import numpy as np
from scipy.spatial.distance import cdist
from scipy.sparse import issparse

def kmeans( X, centres, delta=.001, maxiter=10, metric="euclidean", p=2, verbose=1 ):
if not issparse(X):
X = np.asanyarray(X)
centres = centres.todense() if issparse(centres) else centres.copy()
N, dim = X.shape
k, cdim = centres.shape
if dim != cdim:
raise ValueError( "kmeans: X %s and centres %s must have the same number of columns" % (
X.shape, centres.shape ))
if verbose:
print "kmeans: X %s centres %s delta=%.2g maxiter=%d metric=%s" % (
X.shape, centres.shape, delta, maxiter, metric)
allx = np.arange(N)
prevdist = 0
for jiter in range( 1, maxiter+1 ):
D = cdist_sparse( X, centres, metric=metric, p=p )
xtoc = D.argmin(axis=1)
distances = D[allx,xtoc]
avdist = distances.mean()
if verbose >= 2:
print "kmeans: av |X - nearest centre| = %.4g" % avdist
if (1 - delta) * prevdist <= avdist <= prevdist or jiter == maxiter:
break
prevdist = avdist
for jc in range(k):
c = np.where( xtoc == jc )[0]
if len(c) > 0:
centres[jc] = X[c].mean( axis=0 )
if verbose:
print "kmeans: %d iterations cluster sizes:" % jiter, np.bincount(xtoc)
if verbose >= 2:
r50 = np.zeros(k)
r90 = np.zeros(k)
for j in range(k):
dist = distances[ xtoc == j ]
if len(dist) > 0:
r50[j], r90[j] = np.percentile( dist, (50, 90) )
print "kmeans: cluster 50 % radius", r50.astype(int)
print "kmeans: cluster 90 % radius", r90.astype(int)
return centres, xtoc, distances

def kmeanssample( X, k, nsample=0, **kwargs ):
N, dim = X.shape
if nsample == 0:
nsample = max( 2*np.sqrt(N), 10*k )
Xsample = randomsample( X, int(nsample) )
pass1centres = randomsample( X, int(k) )
samplecentres = kmeans( Xsample, pass1centres, **kwargs )[0]
return kmeans( X, samplecentres, **kwargs )

def cdist_sparse( X, Y, **kwargs ):
sxy = 2*issparse(X) + issparse(Y)
if sxy == 0:
return cdist( X, Y, **kwargs )
d = np.empty( (X.shape[0], Y.shape[0]), np.float64 )
if sxy == 2:
for j, x in enumerate(X):
d[j] = cdist( x.todense(), Y, **kwargs ) [0]
elif sxy == 1:
for k, y in enumerate(Y):
d[:,k] = cdist( X, y.todense(), **kwargs ) [0]
else:
for j, x in enumerate(X):
for k, y in enumerate(Y):
d[j,k] = cdist( x.todense(), y.todense(), **kwargs ) [0]
return d

def randomsample( X, n ):
sampleix = random.sample( xrange( X.shape[0] ), int(n) )
return X[sampleix]

def nearestcentres( X, centres, metric="euclidean", p=2 ):
D = cdist( X, centres, metric=metric, p=p )
return D.argmin(axis=1)

def Lqmetric( x, y=None, q=.5 ):
return (np.abs(x - y) ** q) .mean() if y is not None else (np.abs(x) ** q) .mean()

class Kmeans:
def __init__( self, X, k=0, centres=None, nsample=0, **kwargs ):
self.X = X
if centres is None:
self.centres, self.Xtocentre, self.distances = kmeanssample(
X, k=k, nsample=nsample, **kwargs )
else:
self.centres, self.Xtocentre, self.distances = kmeans(
X, centres, **kwargs )

def __iter__(self):
for jc in range(len(self.centres)):
yield jc, (self.Xtocentre == jc)

方案二:使用其他库

  • nltk:可以使用nltk.cluster.kmeans.KMeansClusterer指定自定义距离函数。示例代码如下:
1
2
3
4
5
from nltk.cluster.kmeans import KMeansClusterer
NUM_CLUSTERS = 5
data = your_data.toarray()
kclusterer = KMeansClusterer(NUM_CLUSTERS, distance=nltk.cluster.util.cosine_distance, repeats=25)
assigned_clusters = kclusterer.cluster(data, assign_clusters=True)
  • pyclustering:该库是Python/C++实现,速度较快,支持指定自定义距离函数。示例代码如下:
1
2
3
4
5
6
7
8
9
from pyclustering.cluster.kmeans import kmeans
from pyclustering.utils.metric import type_metric, distance_metric

user_function = lambda point1, point2: point1[0] + point2[0] + 2
metric = distance_metric(type_metric.USER_DEFINED, func=user_function)
start_centers = [[4.7, 5.9], [5.7, 6.5]]
kmeans_instance = kmeans(sample, start_centers, metric=metric)
kmeans_instance.process()
clusters = kmeans_instance.get_clusters()

方案三:修改scikit - learn源码

在scikit - learn 1.2.2版本中,可以替换sklearn.cluster._kmeans中的_euclidean_distances函数。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
import sklearn.cluster._kmeans as kmeans
from sklearn.metrics import pairwise_distances

def custom_distances(X, Y=None, Y_norm_squared=None, squared=False):
if squared:
return pairwise_distances(X,Y, metric='minkowski', p=1.5)
else:
return pairwise_distances(X,Y, metric='minkowski', p=1.5)

kmeans._euclidean_distances = custom_distances
kmeans.euclidean_distances = custom_distances
km = kmeans.KMeans(init="k-means++", n_clusters=clusters, n_init=4, random_state=0)

方案四:继承KMeans类

在scikit - learn 1.1.3版本中,可以创建一个继承自sklearn.cluster.KMeans的类,并重写其_transform方法。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import sklearn.cluster
import numpy as np

def anchor_iou(box_dims, centroid_box_dims):
box_w, box_h = box_dims[..., 0], box_dims[..., 1]
centroid_w, centroid_h = centroid_box_dims[..., 0], centroid_box_dims[..., 1]
inter_w = np.minimum(box_w[..., np.newaxis], centroid_w[np.newaxis, ...])
inter_h = np.minimum(box_h[..., np.newaxis], centroid_h[np.newaxis, ...])
inter_area = inter_w * inter_h
centroid_area = centroid_w * centroid_h
box_area = box_w * box_h
return inter_area / (
centroid_area[np.newaxis, ...] + box_area[..., np.newaxis] - inter_area
)

class IOUKMeans(sklearn.cluster.KMeans):
def __init__(
self,
n_clusters=8,
*,
init="k-means++",
n_init=10,
max_iter=300,
tol=1e-4,
verbose=0,
random_state=None,
copy_x=True,
algorithm="lloyd",
):
super().__init__(
n_clusters=n_clusters,
init=init,
n_init=n_init,
max_iter=max_iter,
tol=tol,
verbose=verbose,
random_state=random_state,
copy_x=copy_x,
algorithm=algorithm
)

def _transform(self, X):
return anchor_iou(X, self.cluster_centers_)

rng = np.random.default_rng(12345)
num_boxes = 10
bboxes = rng.integers(low=0, high=100, size=(num_boxes, 2))
kmeans = IOUKMeans(num_boxes).fit(bboxes)

最佳实践

  • 数据预处理:在使用自定义距离函数之前,对数据进行适当的预处理,如归一化等,以确保距离计算的有效性。
  • 距离函数选择:根据数据的特点和问题的需求选择合适的距离函数。例如,对于文本数据,余弦距离可能更合适;对于时间序列数据,动态时间规整(DTW)距离可能更合适。
  • 验证聚类结果:使用聚类评估指标(如轮廓系数、Calinski - Harabasz指数等)验证聚类结果的质量。

常见问题

  • 收敛问题:K - Means算法是基于欧几里得距离设计的,使用其他距离函数可能导致算法不收敛。在这种情况下,可以考虑使用K - Medoids等算法。
  • 性能问题:一些自定义距离函数的计算复杂度较高,可能会导致算法性能下降。可以通过优化代码或使用更高效的库来解决。
  • 中心计算问题:不同的距离函数可能需要不同的中心计算方法。例如,对于曼哈顿距离,需要使用中位数作为聚类中心,而不是平均值。

使用scikit - learn K - Means聚类指定自定义距离函数的探讨
https://119291.xyz/posts/2025-04-22.scikit-learn-kmeans-custom-distance-function/
作者
ww
发布于
2025年4月22日
许可协议