You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

105 lines
3.1 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

'''
Author: SJ2050
Date: 2021-11-07 15:13:40
LastEditTime: 2021-11-07 21:14:18
Version: v0.0.1
Description: Use a density-based clustering algorithm.
Copyright © 2021 SJ2050
'''
import numpy as np
import random
from basic_kMeans import read_csv_data, show_figure
def compute_distances(points):
"""Compute the distances between every two points.
Args
points: All points.
Returns:
distances: The distances between every two points.
"""
points = np.array(points, dtype=np.float64)
n = len(points)
distances = np.zeros((n, n))
for i in range(n):
for j in range(n):
if i <= j:
distances[i, j] = np.linalg.norm(points[i]-points[j])
else:
distances[i, j] = distances[j, i]
return distances
def find_neighbors(distances, p_ind, eps):
"""Find neighbors of point p.
When the distance between two points is less than eps, they are considered as neighbors.
Args:
distances: The distances between every two points (numpy 2d array).
p_ind Reference point's index.
eps: The distance between two neighbors should be less than eps.
Returns:
neighbors: A set of the reference point's neighbors' indices.
"""
neighbors = {i for i in range(distances.shape[0]) if distances[p_ind][i] < eps and p_ind != i}
return neighbors
def DBSCAN(points, eps, min_Pts):
"""A density-based clustering algorithm
Args:
points: All points.
eps: The distance between two neighbors should be less than eps.
min_Pts: Each core object should at least have `min_Pts` neighbors.
Returns:
clusters: Clusters of given points.
"""
# initialization
distances = compute_distances(points)
each_point_neighbors = [find_neighbors(distances, i, eps) for i in range(len(points))]
core_objects = {i for i in range(len(points)) \
if len(each_point_neighbors[i]) >= min_Pts}
unvisited_indices = {i for i in range(len(points))}
clusters = []
while len(core_objects) > 0:
unvisited_indices_old = unvisited_indices.copy()
core_obj = random.choice(list(core_objects))
Q = [core_obj]
unvisited_indices = unvisited_indices - {core_obj}
while len(Q) > 0:
q = Q.pop(0)
Nq = each_point_neighbors[q]
if len(Nq) >= min_Pts:
delta = Nq & unvisited_indices
Q.extend(list(delta))
unvisited_indices = unvisited_indices - delta
Ck = unvisited_indices_old - unvisited_indices
clusters.append(Ck)
core_objects = core_objects - Ck
clusters.append(unvisited_indices)
return clusters
if __name__ == '__main__':
data_file = 'dataset_circles.csv'
original_data = read_csv_data(data_file)
points = original_data[:, :2]
eps = 5
min_Pts = 5
clusters = DBSCAN(points, eps, min_Pts)
classified_points = [(points[j][0], points[j][1], i) \
for i in range(len(clusters)) \
for j in clusters[i]]
show_figure(classified_points, [])