|
|
'''
|
|
|
Author: SJ2050
|
|
|
Date: 2021-11-07 15:13:40
|
|
|
LastEditTime: 2021-11-07 21:14:18
|
|
|
Version: v0.0.1
|
|
|
Description: Use a density-based clustering algorithm.
|
|
|
Copyright © 2021 SJ2050
|
|
|
'''
|
|
|
import numpy as np
|
|
|
import random
|
|
|
from basic_kMeans import read_csv_data, show_figure
|
|
|
|
|
|
def compute_distances(points):
|
|
|
"""Compute the distances between every two points.
|
|
|
|
|
|
Args:
|
|
|
points: All points.
|
|
|
|
|
|
Returns:
|
|
|
distances: The distances between every two points.
|
|
|
|
|
|
"""
|
|
|
points = np.array(points, dtype=np.float64)
|
|
|
n = len(points)
|
|
|
distances = np.zeros((n, n))
|
|
|
|
|
|
for i in range(n):
|
|
|
for j in range(n):
|
|
|
if i <= j:
|
|
|
distances[i, j] = np.linalg.norm(points[i]-points[j])
|
|
|
else:
|
|
|
distances[i, j] = distances[j, i]
|
|
|
|
|
|
return distances
|
|
|
|
|
|
def find_neighbors(distances, p_ind, eps):
|
|
|
"""Find neighbors of point p.
|
|
|
|
|
|
When the distance between two points is less than eps, they are considered as neighbors.
|
|
|
|
|
|
Args:
|
|
|
distances: The distances between every two points (numpy 2d array).
|
|
|
p_ind: Reference point's index.
|
|
|
eps: The distance between two neighbors should be less than eps.
|
|
|
|
|
|
Returns:
|
|
|
neighbors: A set of the reference point's neighbors' indices.
|
|
|
"""
|
|
|
neighbors = {i for i in range(distances.shape[0]) if distances[p_ind][i] < eps and p_ind != i}
|
|
|
|
|
|
return neighbors
|
|
|
|
|
|
def DBSCAN(points, eps, min_Pts):
|
|
|
"""A density-based clustering algorithm
|
|
|
|
|
|
Args:
|
|
|
points: All points.
|
|
|
eps: The distance between two neighbors should be less than eps.
|
|
|
min_Pts: Each core object should at least have `min_Pts` neighbors.
|
|
|
|
|
|
Returns:
|
|
|
clusters: Clusters of given points.
|
|
|
"""
|
|
|
# initialization
|
|
|
distances = compute_distances(points)
|
|
|
each_point_neighbors = [find_neighbors(distances, i, eps) for i in range(len(points))]
|
|
|
core_objects = {i for i in range(len(points)) \
|
|
|
if len(each_point_neighbors[i]) >= min_Pts}
|
|
|
unvisited_indices = {i for i in range(len(points))}
|
|
|
|
|
|
clusters = []
|
|
|
while len(core_objects) > 0:
|
|
|
unvisited_indices_old = unvisited_indices.copy()
|
|
|
core_obj = random.choice(list(core_objects))
|
|
|
Q = [core_obj]
|
|
|
unvisited_indices = unvisited_indices - {core_obj}
|
|
|
|
|
|
while len(Q) > 0:
|
|
|
q = Q.pop(0)
|
|
|
Nq = each_point_neighbors[q]
|
|
|
if len(Nq) >= min_Pts:
|
|
|
delta = Nq & unvisited_indices
|
|
|
Q.extend(list(delta))
|
|
|
unvisited_indices = unvisited_indices - delta
|
|
|
|
|
|
Ck = unvisited_indices_old - unvisited_indices
|
|
|
clusters.append(Ck)
|
|
|
core_objects = core_objects - Ck
|
|
|
|
|
|
clusters.append(unvisited_indices)
|
|
|
return clusters
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
data_file = 'dataset_circles.csv'
|
|
|
original_data = read_csv_data(data_file)
|
|
|
points = original_data[:, :2]
|
|
|
eps = 5
|
|
|
min_Pts = 5
|
|
|
|
|
|
clusters = DBSCAN(points, eps, min_Pts)
|
|
|
classified_points = [(points[j][0], points[j][1], i) \
|
|
|
for i in range(len(clusters)) \
|
|
|
for j in clusters[i]]
|
|
|
show_figure(classified_points, [])
|