From 5165609ac86f9be0670b92605afdda611242ce83 Mon Sep 17 00:00:00 2001 From: "Casper V. Kristensen" Date: Fri, 5 Apr 2019 03:31:30 +0200 Subject: [PATCH] Commit intentionally unsigned for plausible deniability. --- .gitignore | 92 ++++++++ main.py | 590 +++++++----------------------------------------- opencv_video.py | 46 ++-- runner.py | 384 ++++++------------------------- util.py | 92 ++++++++ utils.py | 7 - 6 files changed, 359 insertions(+), 852 deletions(-) create mode 100644 .gitignore create mode 100644 util.py delete mode 100644 utils.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..652e12c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,92 @@ + +# Created by https://www.gitignore.io/api/pycharm+all +# Edit at https://www.gitignore.io/?templates=pycharm+all + +### PyCharm+all ### +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/modules.xml +# .idea/*.iml +# .idea/modules + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + +# JetBrains templates +**___jb_tmp___ + +### PyCharm+all Patch ### +# Ignores the whole .idea folder and all .iml files +# See https://github.com/joeblau/gitignore.io/issues/186 and https://github.com/joeblau/gitignore.io/issues/360 + +.idea/ + +# Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-249601023 + +*.iml +modules.xml +.idea/misc.xml +*.ipr + +# Sonarlint plugin +.idea/sonarlint + +# End of https://www.gitignore.io/api/pycharm+all diff --git a/main.py b/main.py index 1b446aaf..c62167b2 100644 --- a/main.py +++ b/main.py @@ -1,82 +1,59 @@ -from functools import lru_cache - import cv2 -import runner -from sklearn.externals import joblib -import numpy as np -import operator -from matplotlib import pyplot as plt -import glob -import os -import heapq -import math -from datetime import datetime import sys +from collections import defaultdict +from datetime import datetime + +import numpy as np + +import runner +from util import load_classifier, PIECE, COLOR, POSITION, Board, Squares, PieceAndColor np.set_printoptions(threshold=sys.maxsize) -pieces = ['rook', 'knight'] -#pieces = ['rook', 'knight'] -#piece_to_symbol = {'rook': 1, 'knight': 2, 'empty': 0} -piece_to_symbol = {'rook': 1, 'knight': 2} -colors = ['black', 'white'] - - -def classify(image, sift : cv2.xfeatures2d_SIFT, file, rank, empty_bias=False): +def identify_piece(image: np.ndarray, sift : cv2.xfeatures2d_SIFT, empty_bias=False) -> PieceAndColor: centers = np.load("training_data/centers.npy") - probs = {'rook': {'black': 0, 'white': 0}, 'knight': {'black': 0, 'white': 0}, 'empty': {'black': 0, 'white': 0}} - #probs = {'rook': 0, 'knight': 0, 'empty': 0} - for piece in pieces: - for color in colors: + probs = defaultdict(lambda: defaultdict(float)) + best = 0 + best_piece = best_color = None + for piece in PIECE: + for color in COLOR: #color = runner.compute_color(file, rank) - classifier = joblib.load(f"classifiers/classifier_{piece}/{color}.pkl") + classifier = load_classifier(f"classifiers/classifier_{piece}/{color}.pkl") features = runner.generate_bag_of_words(image, centers, sift) prob = classifier.predict_proba(features) - - - probs[piece][color] = prob[0, 1] - + if prob[0, 1] > best: + best_piece, best_color = piece, color + print(probs) if empty_bias: - probs['empty'] *= 1.2 + probs[PIECE.EMPTY] *= 1.2 - return probs + return best_piece, best_color -def pred_test(file, rank, mystery_image=None, empty_bias=False): +def pred_test(position: POSITION, mystery_image=None, empty_bias=False): sift = cv2.xfeatures2d.SIFT_create() if mystery_image is None: mystery_image = cv2.imread("training_images/rook/white/rook_training_D4_2.png") - probs = classify(mystery_image, sift, file, rank, empty_bias=empty_bias) + probs = classify(mystery_image, sift, empty_bias=empty_bias) return probs - -def pre_process_and_train(): +def pre_process_and_train() -> None: runner.do_pre_processing() runner.train_pieces_svm() -def build_board_from_dict(board_dict : dict): +def build_board_from_squares(squares: Squares) -> Board: sift = cv2.xfeatures2d.SIFT_create() - board = [[0]*8 for _ in range(8)] + board = Board() counter = 0 - for idx, value in enumerate(board_dict.values()): - probs = classify(value, sift) - likely_piece = max(probs.items(), key=operator.itemgetter(1))[0] - symbol = piece_to_symbol[likely_piece] - - column = idx // 8 - row = (idx % 7) - - - board[row][column] = symbol - - print(probs) - - if likely_piece != 'empty': + for position, square in squares.values(): + likely_piece = identify_piece(square, sift) + board[position] = likely_piece + if likely_piece != PIECE.EMPTY: counter += 1 print(counter) @@ -84,333 +61,30 @@ def build_board_from_dict(board_dict : dict): return board +def test_entire_board() -> None: + board_img = cv2.imread("homo_pls_fuck.jpg") + warped = runner.warp_board(board_img) - -def detect_using_nn(spec_image): - probs = {'rook': 0, 'knight': 0} - for piece in pieces: - piece_class = piece_to_symbol[piece] - win_size = (64, 64) - classifier = joblib.load("classifiers/neural_net_" + piece + ".pkl") - - spec_image = cv2.resize(spec_image, (64, 128)) - features = np.reshape(spec_image, (1, np.product(spec_image.shape))) - prob = classifier.predict_proba(features) - print(piece) - print(prob[0,1]) - - - - -def test_entire_board(): - board = cv2.imread("homo_pls_fuck.jpg") - warped = runner.warp_board(board) - - board_dict = runner.get_squares(warped) - board = build_board_from_dict(board_dict) + squares = runner.get_squares(warped) + board = build_board_from_squares(squares) print(board) -def lel_test(): - # img = cv2.imread('training_images/rook/white/rook_training_D4_2.png') +def predict(square: np.ndarray, position: POSITION) -> PIECE: + y, x = np.histogram(square.ravel(), bins=256, range=[0, 256]) - counter = 0 - - for filename in glob.glob(os.path.join("training_images", "empty", "*", "*.png")): - - img = cv2.imread(filename) - gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - ret = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 3, 3) - - # binarize the image - #ret, bw = cv2.threshold(gray, 100, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) - - # find connected components - connectivity = 4 - nb_components, output, stats, centroids = cv2.connectedComponentsWithStats(ret, connectivity, cv2.CV_32S) - sizes = stats[1:, -1] - nb_components = nb_components - 1 - min_size = 250 # threshhold value for objects in scene - img2 = np.zeros((img.shape), np.uint8) - - - for i in range(0, nb_components + 1): - # use if sizes[i] >= min_size: to identify your objects - color = np.random.randint(255, size=3) - # draw the bounding rectangele around each object - cv2.rectangle(img2, (stats[i][0], stats[i][1]), (stats[i][0] + stats[i][2], stats[i][1] + stats[i][3]), - (0, 255, 0), 2) - img2[output == i + 1] = color - - #print(nb_components+1) - - if nb_components+1 >= 4: - counter += 1 - print(filename) - cv2.imshow("lel", img2) - cv2.waitKey(0) - - - print(counter) - - -def selective_search(image, use_fast=False, use_slow=False): - # speed-up using multithreads - cv2.setUseOptimized(True) - cv2.setNumThreads(4) - - - if type(image) == str: - # read image - im = cv2.imread(image) - else: - im = image - # resize image - #newHeight = 200 - #newWidth = int(im.shape[1] * 150 / im.shape[0]) - #im = cv2.resize(im, (newWidth, newHeight)) - - #im = cv2.imread(image) - - - #lel, im = cv2.threshold(im, 128, 255, cv2.THRESH_BINARY) - - - - # create Selective Search Segmentation Object using default parameters - ss = cv2.ximgproc.segmentation.createSelectiveSearchSegmentation() - - # set input image on which we will run segmentation - ss.setBaseImage(im) - - # Switch to fast but low recall Selective Search method - - ss.switchToSingleStrategy() - - if (use_fast): - ss.switchToSelectiveSearchFast() - - # Switch to high recall but slow Selective Search method - elif (use_slow): - ss.switchToSelectiveSearchQuality() - - # run selective search segmentation on input image - rects = ss.process() - #print('Total Number of Region Proposals: {}'.format(len(rects))) - - # number of region proposals to show - numShowRects = 150 - # increment to increase/decrease total number - # of reason proposals to be shown - increment = 1 - - best_proposals = [] - - while True: - # create a copy of original image - - # itereate over all the region proposals - for i, rect in enumerate(rects): - imOut = im.copy() - - # draw rectangle for region proposal till numShowRects - if (i < numShowRects): - x, y, w, h = rect - # cv2.rectangle(imOut, (x, y), (x + w, y + h), (0, 255, 0), 1, cv2.LINE_AA) - - # size = (max(w, x) - min(w, x)) * ((max(h, y) - min(h, y))) - top_left = (x,y) - bottom_left = (x, y+h) - top_right = (x+w, y) - bottom_right = (x+w, y+h) - - rect_width = bottom_right[0] - bottom_left[0] - rect_height = bottom_right[1] - top_right[1] - - - size = rect_width * rect_height - #print(f"({x}, {y}), ({w}, {h})\n Of size: { size }") - - #cv2.rectangle(imOut, (x, y), (x + w, y + h), (0, 255, 0), 1, cv2.LINE_AA) - #cv2.imshow("lel", imOut) - #cv2.waitKey(0) - - best_proposals.append((rect, size)) - - #if size > biggest_size: - # biggest_rect = (x, y, w, h) - # biggest_size = size - # print(f"New biggest: \n({x}, {y}), ({w}, {h})\nOf size: {biggest_size}") - - else: - break - - height, width, channels = im.shape - center_x = width // 2 - center_y = (height // 2)+5 - dists = [] - #print(f"Amount of best proposals:\n{len(best_proposals)}") - - #print(f"lel: {len(heapq.nlargest(10, best_proposals, key=lambda x: x[1]))}") - for i in heapq.nlargest(10, best_proposals, key=lambda x: x[1]): - width, height, channels = im.shape - #print(width * height) - #print(i[1]) - x, y, w, h = i[0] - - - - if i[1] <= (width*height)*0.8 and i[1] > (width*height)*0.25: - - - imCop = imOut.copy() - - #cv2.rectangle(imCop, (x, y), (x + w, y + h), (0, 255, 0), 2, cv2.LINE_AA) - #cv2.imshow("lel", imCop) - #cv2.waitKey(0) - - - #cv2.rectangle(imCop, (x, y), (x + w, y + h), (0, 255, 0), 4, cv2.LINE_AA) - - top_left = (x,y) - bottom_left = (x, y+h) - top_right = (x+w, y) - bottom_right = (x+w, y+h) - - box_center_x = (top_left[0]+bottom_left[0]+top_right[0]+bottom_right[0]) // 4 - box_center_y = (top_left[1]+bottom_left[1]+top_right[1]+bottom_right[1]) // 4 - - #print(f"{box_center_x}, {box_center_y}, {center_x}, {center_y}") - - dist = (center_x - box_center_x) ** 2 + (center_y - box_center_y) ** 2 - print(dist) - dists.append([i, dist]) - - - cv2.drawMarker(imCop, position=(x+w, h+y), color=(255, 0, 0), thickness=3) - cv2.drawMarker(imCop, position=(x+w, y), color=(255, 0, 0), thickness=3) - cv2.drawMarker(imCop, position=(x, y), color=(255, 0, 0), thickness=3) - cv2.drawMarker(imCop, position=(x, y+h), color=(255, 0, 0), thickness=3) - - cv2.drawMarker(imCop, position=(box_center_x, box_center_y), color=(0, 255, 0), thickness=3) - - cv2.drawMarker(imCop, position=(center_x, center_y), color=(0, 0, 255), thickness=3) - - - #cv2.imshow("lel", imCop) - #cv2.waitKey(0) - - #print("-------"*5) - - for pls in dists: - imCop = imOut.copy() - x, y, w, h = pls[0][0] - #print(x,y,w,h) - #print(pls[1]) - - top_left = (x, y) - bottom_left = (x, y + h) - top_right = (x + w, y) - bottom_right = (x + w, y + h) - - cv2.drawMarker(imCop, position=(x + w, h + y), color=(255, 0, 0), thickness=3) - cv2.drawMarker(imCop, position=(x + w, y), color=(255, 0, 0), thickness=3) - cv2.drawMarker(imCop, position=(x, y), color=(255, 0, 0), thickness=3) - cv2.drawMarker(imCop, position=(x, y + h), color=(255, 0, 0), thickness=3) - - box_center_x = (top_left[0] + bottom_left[0] + top_right[0] + bottom_right[0]) // 4 - box_center_y = (top_left[1] + bottom_left[1] + top_right[1] + bottom_right[1]) // 4 - - cv2.drawMarker(imCop, position=(box_center_x, box_center_y), color=(0, 255, 0), thickness=3) - - cv2.drawMarker(imCop, position=(center_y, center_x), color=(0, 0, 255), thickness=3) - - - cv2.rectangle(imCop, (x, y), (x + w, y + h), (0, 255, 0), 2, cv2.LINE_AA) - #cv2.imshow("lel", imCop) - #cv2.waitKey(0) - - imCop = imOut.copy() - - - - - best = heapq.nsmallest(1, dists, key=lambda x: x[1]) - - if (len(best) == 0): - return ((0, 0), (0, height), (width, 0), (width, height)) - - x, y, w, h = best[0][0][0] - cv2.rectangle(imCop, (x, y), (x + w, y + h), (0, 255, 0), 4, cv2.LINE_AA) - - top_left = (x, y) - bottom_left = (x, y + h) - top_right = (x + w, y) - bottom_right = (x + w, y + h) - - cv2.drawMarker(imCop, position=(x + w, h + y), color=(255, 0, 0), thickness=3) - cv2.drawMarker(imCop, position=(x + w, y), color=(255, 0, 0), thickness=3) - cv2.drawMarker(imCop, position=(x, y), color=(255, 0, 0), thickness=3) - cv2.drawMarker(imCop, position=(x, y + h), color=(255, 0, 0), thickness=3) - - box_center_x = (top_left[0] + bottom_left[0] + top_right[0] + bottom_right[0]) // 4 - box_center_y = (top_left[1] + bottom_left[1] + top_right[1] + bottom_right[1]) // 4 - - cv2.drawMarker(imCop, position=(box_center_x, box_center_y), color=(0, 255, 0), thickness=3) - - cv2.drawMarker(imCop, position=(center_x, center_y), color=(0, 0, 255), thickness=3) - - - #cv2.imshow("lel", imCop) - #cv2.waitKey(0) - return (top_left, bottom_left, top_right, bottom_right) - - # show output - cv2.imshow("Output", imOut) - - # record key press - k = cv2.waitKey(0) & 0xFF - - # m is pressed - if k == 109: - # increase total number of rectangles to show by increment - numShowRects += increment - # l is pressed - elif k == 108 and numShowRects > increment: - # decrease total number of rectangles to show by increment - numShowRects -= increment - # q is pressed - elif k == 113: - break - # close image show window - cv2.destroyAllWindows() - - -def predict(square, file, rank): - square_color = runner.compute_color(file, rank) - - y, x = np.histogram(square.ravel(), bins=256, range=[0, 256]) # TODO: Maybe img.ravel() ? - - for color in ['black', 'white']: + for color in COLOR: empty_classifier = load_classifier(f"classifiers/classifier_empty/white_piece_on_{color}_square.pkl") prob = empty_classifier.predict_proba(np.array(y).reshape(1, -1)) print(f"{file}{rank}, {color}: {prob[0, 1]}") if prob[0, 1] > 0.5: - return 'empty' + return PIECE.EMPTY return None - -@lru_cache() -def load_classifier(filename): - print(f"loading {filename}") - return joblib.load(filename) - - if __name__ == '__main__': - - board = cv2.imread("whole_boards/boards_for_empty/board_1554286488.605142_.png") + board = cv2.imread("whole_boards/boards_for_empty/board_1554286488.605142_rank_3.png") warped = runner.warp_board(board) empty = 0 @@ -421,58 +95,53 @@ if __name__ == '__main__': non_empties = [] - for file in files: - for rank in ranks: - counter = 0 + for position in POSITION: + counter = 0 - src = runner.get_square(warped, file, rank) - width, height, _ = src.shape - src = src[width//25:, height//25:] - # src = src[:-width//200, :-height//200] - segmentator = cv2.ximgproc.segmentation.createGraphSegmentation(sigma=0.8, k=150, min_size=700) - segment = segmentator.processImage(src) + src = runner.get_square(warped, position) + width, height, _ = src.shape + src = src[width//25:, height//25:] + # src = src[:-width//200, :-height//200] + segmentator = cv2.ximgproc.segmentation.createGraphSegmentation(sigma=0.8, k=150, min_size=700) + segment = segmentator.processImage(src) - mask = segment.reshape(list(segment.shape) + [1]).repeat(3, axis=2) - masked = np.ma.masked_array(src, fill_value=0) - pls = [] - for i in range(np.max(segment)): - masked.mask = mask != i + mask = segment.reshape(list(segment.shape) + [1]).repeat(3, axis=2) + masked = np.ma.masked_array(src, fill_value=0) + pls = [] + for i in range(np.max(segment)): + masked.mask = mask != i - y, x = np.where(segment == i) + y, x = np.where(segment == i) - top, bottom, left, right = min(y), max(y), min(x), max(x) + top, bottom, left, right = min(y), max(y), min(x), max(x) - dst = masked.filled()[top: bottom + 1, left: right + 1] - lel = (bottom - top) * (right - left) - #print(f"this is lel: {lel} ") - #print(f"this is meh: {np.sum(mask[:,:,0])} ") - if f"{file}{rank}" == "H7": - print("--"*20) - print("H7") - print(lel) - print(len(y)) - print(np.max(segment)) + dst = masked.filled()[top: bottom + 1, left: right + 1] + lel = (bottom - top) * (right - left) + #print(f"this is lel: {lel} ") + #print(f"this is meh: {np.sum(mask[:,:,0])} ") + if position == POSITION.H7: + print("--"*20) + print("H7") + print(lel) + print(len(y)) + print(np.max(segment)) # print(lel) # print(np.sum(mask[:, :, 0])) - print("--"*20) + print("--"*20) - pls.append(len(y)) - if len(y) < (164**2)*0.65: - counter += 1 + pls.append(len(y)) + if len(y) < (164**2)*0.65: + counter += 1 - cv2.imwrite(f"segment_test/segment_{datetime.utcnow().timestamp()}_{file}{rank}.jpg", dst) + cv2.imwrite(f"segment_test/segment_{datetime.utcnow().timestamp()}_{position}.png", dst) - - - - - if np.max(segment) > 0 and not np.all([x < (164**2)*0.2 for x in pls]) and (np.max(segment) >= 3 or np.all([x < (164**2)*0.942 for x in pls])): - print(f"{file}{rank} is nonempty") - non_empties.append([f"{file}{rank}", src]) - print(counter) - print(np.max(segment)) - empty += 1 + if np.max(segment) > 0 and not np.all([x < (164**2)*0.2 for x in pls]) and (np.max(segment) >= 3 or np.all([x < (164**2)*0.942 for x in pls])): + print(f"{position} is nonempty") + non_empties.append([f"{position}", src]) + print(counter) + print(np.max(segment)) + empty += 1 print("++"*20) print(counter) print(64-empty) @@ -498,124 +167,35 @@ if __name__ == '__main__': for file in files: for rank in ranks: - square = runner.get_square(warped, file, rank) - if predict(square, file, rank) == 'empty': + square_img = runner.get_square(warped, file, rank) + if predict(square_img, file, rank) == 'empty': counter += 1 print(counter) exit() - square = runner.get_square(warped, "D", 2) + square_img = runner.get_square(warped, "D", 2) - gray_square = cv2.cvtColor(square, cv2.COLOR_BGR2GRAY) - print(cv2.meanStdDev(gray_square)[1]) - print(cv2.meanStdDev(square)[1]) - cv2.imshow("square", square) + gray_square_img = cv2.cvtColor(square_img, cv2.COLOR_BGR2GRAY) + print(cv2.meanStdDev(gray_square_img)[1]) + print(cv2.meanStdDev(square_img)[1]) + cv2.imshow("square", square_img) cv2.waitKey(0) - print(pred_test("C", 2, square)) + print(pred_test("C", 2, square_img)) sift: cv2.xfeatures2d_SIFT = cv2.xfeatures2d.SIFT_create() - gray = cv2.cvtColor(square, cv2.COLOR_BGR2GRAY) + gray = cv2.cvtColor(square_img, cv2.COLOR_BGR2GRAY) kp, desc = sift.detectAndCompute(gray, None) - cv2.drawKeypoints(square, kp, square) + cv2.drawKeypoints(square_img, kp, square_img) - cv2.imshow("kp", square) + cv2.imshow("kp", square_img) cv2.waitKey(0) - exit() - board = cv2.imread("whole_boards/board_202_1554154094.001122_.png") - - runner.fetch_empty_fields(board) - exit() - - - warped = runner.warp_board(board) - - counter = 0 - - #square = runner.get_square(warped, "A", 3) - - #top_left, bottom_left, top_right, bottom_right = selective_search(square, use_fast=True) - #cropped = square[top_left[1]:bottom_right[1], top_left[0]:bottom_right[0]] - - - - for file in files: - for rank in ranks: - - square = runner.get_square(warped, file, rank) - - - top_left, bottom_left, top_right, bottom_right = selective_search(square, use_fast=True) - cropped = square[top_left[1]:bottom_right[1], top_left[0]:bottom_right[0]] - - rect_width = bottom_right[0] - bottom_left[0] - rect_height = bottom_right[1] - top_right[1] - - size = rect_width * rect_height - - square_height, square_width, channels = square.shape - - empty_bias = (size == square_height*square_width) - if size == square_height*square_width: - print(f"{file}{rank} is likely empty") - - res = pred_test(file, rank, mystery_image=square, empty_bias=empty_bias) - print(res) - if (max(res.items(), key=operator.itemgetter(1))[0] == 'empty'): - counter += 1 - - - - print(f"Amount of empty fields: {counter}") - #print("Non-cropped:\t",pred_test(square)) - #print("Cropped:\t",pred_test(cropped)) - - #cv2.imshow("square", square) - #cv2.waitKey(0) - - - #runner.do_pre_processing() - #runner.train() - - - #img = "warped_square_B5.png" - #detect_using_nn(img) - - #selective_search("training_images/empty/colorless/warped_square_A6.png", use_fast=True) - #selective_search("warped_square_B5.png", use_fast=True) - img = "training_images/rook/white/rook_training_D4_7.png" - #img = "training_images/rook/white_square/rook_training_E4_10.png" - #img = "training_images/knight/white_square/training_D5_134.png" - - #top_left, bottom_left, top_right, bottom_right = selective_search(img, use_fast=True) - #cropped = cv2.imread(img)[top_left[1]:bottom_right[1], top_left[0]:bottom_right[0]] - - #cv2.imshow("output", cropped) - #print(pred_test(cropped)) - #cv2.waitKey(0) - - - - #lel_test() - - - # test_entire_board() - - #board = [[0, 0, 1, 2, 0, 0, 0, 2], [0, 1, 2, 2, 1, 0, 0, 1], [0, 0, 0, 0, 1, 0, 2, 0], [0, 2, 2, 1, 1, 2, 2, 0], [0, 1, 0, 0, 1, 2, 0, 0], [0, 0, 0, 0, 0, 2, 2, 0], [0, 0, 0, 2, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0]] - #for i in board: - # print(i) - - #warped = cv2.imread("homo_pls_fuck.jpg") - #square = runner.get_square(warped, "D", 4) - #print(pred_test(square)) - #cv2.imshow("lel", square) - #cv2.waitKey(0) diff --git a/opencv_video.py b/opencv_video.py index 4748b295..64bef30b 100644 --- a/opencv_video.py +++ b/opencv_video.py @@ -1,53 +1,41 @@ -import itertools -from pathlib import Path -from threading import Thread -from time import sleep -import numpy as np -import cv2 -import runner from datetime import datetime -import utils +import cv2 + +import runner +from util import FILE, RANK, PIECE, COLOR, imwrite, POSITION -#cap = cv2.VideoCapture(0) -#cap = cv2.VideoCapture("rtsp://10.192.49.108:8080/h264_ulaw.sdp") cap = cv2.VideoCapture(0) - -piece = "knight" -color = "black" - - -rank = 8 - +color = COLOR.BLACK +rank = RANK.EIGHT pieces = { - 'knight': [("E", rank), ("H", rank)], - 'rook': [("A", rank), ("F", rank)], - 'bishop': [("C", rank), ("D", rank)], - 'king': [("G", rank)], - 'queen': [("B", rank)] + PIECE.rook: [POSITION(FILE.A, rank), POSITION(FILE.F, rank)], + PIECE.knight: [POSITION(FILE.E, rank), POSITION(FILE.H, rank)], + PIECE.bishop: [POSITION(FILE.C, rank), POSITION(FILE.D, rank)], + PIECE.queen: [POSITION(FILE.B, rank)], + PIECE.king: [POSITION(FILE.G, rank)], } -while(True): +while True: # Capture frame-by-frame ret, frame = cap.read() # Display the resulting frame - cv2.imshow('frame', frame) + cv2.imshow("frame", frame) - if cv2.waitKey(100) & 0xFF == ord('c'): + if cv2.waitKey(100) & 0xFF == ord("c"): print(f"capturing frame") - # cv2.imwrite(f"single_frame_{counter}.png", frame) - utils.imwrite(f"whole_boards/boards_for_empty/board_{datetime.utcnow().timestamp()}_.png", frame) + imwrite(f"whole_boards/boards_for_empty/board_{datetime.utcnow().timestamp()}_.png", frame) warped = runner.warp_board(frame) runner.save_empty_fields(warped, skip_rank=rank) for piece, positions in pieces.items(): for position in positions: - square = runner.get_square(warped, position[0], position[1]) + square = runner.get_square(warped, *position) x, y = position - utils.imwrite(f"training_images/{piece}/{runner.compute_color(x, y)}_square/training_{x}{str(y)}_{datetime.utcnow().timestamp()}.png", square) + imwrite(f"training_images/{piece}/{position.color}_square/training_{x}{str(y)}_{datetime.utcnow().timestamp()}.png", square) # When everything done, release the capture diff --git a/runner.py b/runner.py index b4829b6e..efe2a8f4 100644 --- a/runner.py +++ b/runner.py @@ -1,173 +1,36 @@ -from functools import lru_cache - import cv2 -import numpy as np import glob import os -from sklearn import cluster -from sklearn import metrics -from sklearn import svm -from sklearn.externals import joblib -from sklearn import neural_network -import heapq from datetime import datetime +from typing import Tuple +import numpy as np +from sklearn import cluster, metrics, svm +from sklearn.externals import joblib from sklearn.pipeline import make_pipeline from sklearn.preprocessing import StandardScaler -import utils - -import random - -pieces = ["rook", "knight"] -colors = ['black', 'white'] +from util import RANK, POSITION, imwrite, PIECE, COLOR, Squares - - - - -def selective_search(image, use_fast=False, use_slow=False, image_name = None): - # speed-up using multithreads - cv2.setUseOptimized(True) - cv2.setNumThreads(4) - - im = image - - img_out = im.copy() - - # create Selective Search Segmentation Object using default parameters - ss = cv2.ximgproc.segmentation.createSelectiveSearchSegmentation() - - # set input image on which we will run segmentation - ss.setBaseImage(im) - - ss.switchToSingleStrategy() - - if (use_fast): - ss.switchToSelectiveSearchFast() - - elif (use_slow): - ss.switchToSelectiveSearchQuality() - - # run selective search segmentation on input image - rects = ss.process() - - # number of region proposals to show - numShowRects = 150 - - best_proposals = [] - - while True: - # create a copy of original image - - # itereate over all the region proposals - for i, rect in enumerate(rects): - imOut = im.copy() - - # draw rectangle for region proposal till numShowRects - if (i < numShowRects): - x, y, w, h = rect - - top_left = (x,y) - bottom_left = (x, y+h) - top_right = (x+w, y) - bottom_right = (x+w, y+h) - - rect_width = bottom_right[0] - bottom_left[0] - rect_height = bottom_right[1] - top_right[1] - - size = rect_width * rect_height - - best_proposals.append((rect, size)) - - - else: - break - - height, width, channels = im.shape - center_x = width // 2 - center_y = (height // 2)+5 - dists = [] - - for i in heapq.nlargest(10, best_proposals, key=lambda x: x[1]): - width, height, channels = im.shape - - x, y, w, h = i[0] - - if i[1] < (width*height)*0.90 and i[1] > (width*height)*0.25: - - - top_left = (x,y) - bottom_left = (x, y+h) - top_right = (x+w, y) - bottom_right = (x+w, y+h) - - box_center_x = (top_left[0]+bottom_left[0]+top_right[0]+bottom_right[0]) // 4 - box_center_y = (top_left[1]+bottom_left[1]+top_right[1]+bottom_right[1]) // 4 - - dist = (center_x - box_center_x) ** 2 + (center_y - box_center_y) ** 2 - dists.append([i, dist]) - - - imCop = imOut.copy() - - - print(image_name) - best = heapq.nsmallest(1, dists, key=lambda x: x[1]) - x, y, w, h = best[0][0][0] - cv2.rectangle(imCop, (x, y), (x + w, y + h), (0, 255, 0), 4, cv2.LINE_AA) - - top_left = (x, y) - bottom_right = (x + w, y + h) - - - cropped = img_out[top_left[1]:bottom_right[1], top_left[0]:bottom_right[0]] - return cropped - - # show output - cv2.imshow("Output", imOut) - - # record key press - k = cv2.waitKey(0) & 0xFF - - # m is pressed - if k == 109: - # increase total number of rectangles to show by increment - numShowRects += increment - # l is pressed - elif k == 108 and numShowRects > increment: - # decrease total number of rectangles to show by increment - numShowRects -= increment - # q is pressed - elif k == 113: - break - # close image show window - cv2.destroyAllWindows() - - -def generate_centers(number_of_clusters, sift : cv2.xfeatures2d_SIFT): - features = None - for piece in pieces: - for color in colors: +def generate_centers(number_of_clusters, sift: cv2.xfeatures2d_SIFT): + features = [] + for piece in PIECE: + for color in COLOR: for filename in glob.glob(os.path.join("training_images", piece, f"{color}_square", "*.png")): image = cv2.imread(filename) #image = selective_search(image, use_fast=True) gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) kp, desc = sift.detectAndCompute(gray, None) - - if features is None: - features = np.array(desc) - else: - print(f"{piece}, {color}, {filename}") - features = np.vstack((features, desc)) - + print(f"{piece}, {color}, {filename}") + features.append(desc) + features = np.array(features) k_means = cluster.KMeans(number_of_clusters) k_means.fit(features) return k_means.cluster_centers_ -def generate_bag_of_words(image, centers, sift : cv2.xfeatures2d_SIFT): +def generate_bag_of_words(image, centers, sift: cv2.xfeatures2d_SIFT): num_centers = centers.shape[0] histogram = np.zeros((1, num_centers)) @@ -180,73 +43,58 @@ def generate_bag_of_words(image, centers, sift : cv2.xfeatures2d_SIFT): distances = metrics.pairwise.pairwise_distances(desc, centers) best_centers = np.argmin(distances, axis=1) - for i in best_centers: - histogram[0,i] = histogram[0,i] + 1 - histogram = histogram / np.sum(histogram) + for i in best_centers: # TODO: Could do this way faster in one line with numpy somehow + histogram[0, i] += + 1 - return histogram + return histogram / np.sum(histogram) -def do_pre_processing(): +def do_pre_processing() -> None: sift = cv2.xfeatures2d.SIFT_create() centers = generate_centers(8, sift) np.save("training_data/centers", centers) - for piece in pieces: - for color in colors: + for piece in PIECE: + for color in COLOR: for filename in glob.glob(os.path.join("training_images", piece, f"{color}_square", "*.png")): image = cv2.imread(filename) #image = selective_search(image, image_name=filename, use_fast=True) bow_features = generate_bag_of_words(image, centers, sift) - np.save(f"training_data/{piece}/{color}_square/" + os.path.basename(filename), bow_features) + np.save(f"training_data/{piece}/{color}_square/{os.path.basename(filename)}", bow_features) -def load_training_data(spec_piece, color): - X = None - Y = None - - for piece in pieces: - piece_class = int(spec_piece == piece) +def load_training_data(piece: PIECE, color: COLOR) -> Tuple[np.array, np.array]: + X = [] + Y = [] + for p in PIECE: for filename in glob.glob(os.path.join("training_data", piece, f"{color}_square", "*.npy")): data = np.load(filename) - if X is None: - X = np.array(data) - Y = np.array([piece_class]) - else: - X = np.vstack((X, data)) - Y = np.vstack((Y, [piece_class])) - return X, Y + X.append(data) + Y.append(p == piece) + return np.array(X), np.array(Y) -def train_empty_or_piece_var(): - for square_color in ["black", "white"]: - X = None - Y = None - for piece in ['empty', 'rook', 'knight']: - +def train_empty_or_piece_hist() -> None: + for square_color in COLOR: + X = [] + Y = [] + for piece in (PIECE.EMPTY, PIECE.ROOK, PIECE.KNIGHT): for filename in glob.glob(os.path.join("training_images", f"{piece}", f"{square_color}_square", "*.png")): - piece_class = 'empty' == piece img = cv2.imread(filename) - - y, x = np.histogram(img.ravel(), bins=256, range=[0, 256]) # TODO: Maybe img.ravel() ? - - if X is None: - X = np.array(y) - Y = np.array([piece_class]) - else: - X = np.vstack((X, y)) - Y = np.vstack((Y, [piece_class])) + y, x = np.histogram(img.ravel(), bins=256, range=[0, 256]) + X.append(y) + Y.append(piece == PIECE.EMPTY) classifier = make_pipeline(StandardScaler(), svm.SVC(C=10.0, gamma=0.01, probability=True)) - classifier.fit(X, Y) + classifier.fit(np.array(X), np.array(Y)) joblib.dump(classifier, f"classifiers/classifier_empty/white_piece_on_{square_color}_square.pkl") -def train_pieces_svm(): - for piece in pieces: - for color in colors: +def train_pieces_svm() -> None: + for piece in PIECE: + for color in COLOR: # TODO: Consider removing empty from total_weights, so all classifiers do not consider empty pieces total_weights = len(glob.glob(os.path.join("training_images", "*", f"{color}_square", "*.png"))) current_weight = len(glob.glob(os.path.join("training_images", piece, f"{color}_square", "*.png"))) @@ -258,21 +106,7 @@ def train_pieces_svm(): joblib.dump(classifier, f"classifiers/classifier_{piece}/{color}.pkl") - -def compute_features(training_image): - sift = cv2.xfeatures2d.SIFT_create() - gray_training_image = cv2.cvtColor(training_image, cv2.COLOR_BGR2GRAY) - kp = sift.detect(gray_training_image) - kp, desc = sift.compute(gray_training_image, kp) - - cv2.drawKeypoints(training_image, kp, training_image) - - return training_image - - -def warp_board(camera_image, debug_image=None): - #cv2.imwrite('camera_image.png', camera_image) - +def warp_board(camera_image, debug_image=None) -> np.ndarray: baseline = cv2.imread("new_baseline_board.png") camera_image_gray = cv2.cvtColor(camera_image, cv2.COLOR_BGR2GRAY) @@ -287,7 +121,7 @@ def warp_board(camera_image, debug_image=None): if debug_image is not None: cv2.drawKeypoints(camera_image, keypoints=camera_image_keypoints, outImage=debug_image) - cv2.imwrite('keypoints_img.jpg', camera_image) + cv2.imwrite("keypoints_img.jpg", camera_image) # FLANN parameters FLANN_INDEX_KDTREE = 0 @@ -298,28 +132,27 @@ def warp_board(camera_image, debug_image=None): matches = flann.knnMatch(des, des2, k=2) # Need to draw only good matches, so create a mask - matchesMask = [[0,0] for i in range(len(matches))] + matchesMask = [[0, 0] for _ in range(len(matches))] + # Ratio test as per Lowe's paper good_matches = [] - - # ratio test as per Lowe's paper - for i,(m,n) in enumerate(matches): + for i, (m, n) in enumerate(matches): if m.distance < 0.55*n.distance: - matchesMask[i]=[1,0] - good_matches.append([m,n]) + matchesMask[i] = [1, 0] + good_matches.append([m, n]) - draw_params = dict(matchColor=(0,255,0), - singlePointColor=(255,0,0), - matchesMask=matchesMask, - flags=0) - - img3 = cv2.drawMatchesKnn(camera_image, - camera_image_keypoints, - baseline, - baseline_keypoints, - matches, - None, - **draw_params) + img3 = cv2.drawMatchesKnn( + camera_image, + camera_image_keypoints, + baseline, + baseline_keypoints, + matches, + None, + matchColor=(0, 255, 0), + singlePointColor=(255, 0, 0), + matchesMask=matchesMask, + flags=0 + ) cv2.imwrite("matches.jpg", img3) # Extract location of good matches @@ -330,112 +163,41 @@ def warp_board(camera_image, debug_image=None): points1[i, :] = camera_image_keypoints[m.queryIdx].pt points2[i, :] = baseline_keypoints[m.trainIdx].pt - # print(len(points2)) - h, mask = cv2.findHomography(points1, points2, cv2.RANSAC) height, width, channels = baseline.shape - im1Reg = cv2.warpPerspective(camera_image, h, (width, height)) - - # cv2.imwrite('homo_pls_fuck.jpg', im1Reg) - return im1Reg + return cv2.warpPerspective(camera_image, h, (width, height)) -def get_square(warped_board, file, rank): - files = "ABCDEFGH" - file = files.index(file) - rank = 8 - rank +def get_square(warped_board: np.ndarray, position: POSITION) -> np.ndarray: width, _, _ = warped_board.shape # board is square anyway side = int(width * 0.045) size = width - 2 * side square_size = size // 8 - - padding = 0 - x1 = side + (square_size * file) + x1 = side + (square_size * position.file) x2 = x1 + square_size - y1 = max(0, side + (square_size * rank) - padding) + y1 = max(0, side + (square_size * (8 - position.rank)) - padding) # 8 - rank because chessboard is from 8 to 1 y2 = min(width, y1 + square_size + padding) square = warped_board[y1:y2, x1:x2] return square -def get_squares(warped_board): - result = {} - for file in "ABCDEFGH": - for rank in range(1, 9): - square = get_square(warped_board, file, rank) - result[f"{file}{rank}"] = square - # cv2.imwrite(f"warped_square_{file}{rank}.png", square) - return result - -def load_data_nn(spec_piece): - X = None - Y = None - for piece in pieces: - piece_class = int(spec_piece == piece) - for filename in glob.glob(os.path.join("training_images", piece, "*", "*.png")): - image = cv2.imread(filename) - image = cv2.resize(image, (64, 128)) - data = np.reshape(image, (1, np.product(image.shape))) - if X is None: - if piece_class == 1: - for _ in range(10): - X = np.array(data) - Y = np.array([piece_class]) - else: - X = np.array(data) - Y = np.array([piece_class]) - else: - if piece_class == 1: - for _ in range(10): - X = np.vstack((X, data)) - Y = np.vstack((Y, [piece_class])) - else: - X = np.vstack((X, data)) - Y = np.vstack((Y, [piece_class])) - return (X, Y) - -def train_nn(): - for piece in pieces: - X, Y = load_data_nn(piece) - classifier = neural_network.MLPClassifier(hidden_layer_sizes=64) - classifier.fit(X, Y) - joblib.dump(classifier, "classifiers/neural_net_" + piece + ".pkl") +def get_squares(warped_board: np.ndarray) -> Squares: + # cv2.imwrite(f"warped_square_{square}.png", square) + return {position: get_square(warped_board, position) + for position in POSITION} -def letter_to_int(letter): - alphabet = list('ABCDEFGH') - return alphabet.index(letter) + 1 - - -@lru_cache(maxsize=64) -def compute_color(file, rank): - if ((letter_to_int(file)+rank) % 2): - return 'white' - else: - return 'black' - - -def save_empty_fields(warped, skip_rank=None): - alpha = "ABCDEFGH" - ranks = [1, 2, 3, 4, 5, 6, 7, 8] - - if skip_rank is not None: - ranks.remove(skip_rank) - - for file in alpha: - for rank in ranks: - square = get_square(warped, file, rank) - color = compute_color(file, rank) - - utils.imwrite(f"training_images/empty/{color}_square/training_{file}{rank}_{datetime.utcnow().timestamp()}.png", square) +def save_empty_fields(warped_board: np.ndarray, skip_rank: RANK = None) -> None: + for position in POSITION: + if position.rank == skip_rank: + continue + square = get_square(warped_board, position) + imwrite(f"training_images/empty/{position.color}_square/training_{position}_{datetime.utcnow().timestamp()}.png", square) if __name__ == '__main__': - train_empty_or_piece_var() - - - + train_empty_or_piece_hist() diff --git a/util.py b/util.py new file mode 100644 index 00000000..92ced2a4 --- /dev/null +++ b/util.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +import cv2 +from enum import Enum +from functools import lru_cache +from pathlib import Path +from typing import NewType, NamedTuple, Dict, Tuple + +import numpy as np +from sklearn.externals import joblib + + +class COLOR(Enum): + WHITE = "white" + BLACK = "black" + + def __str__(self) -> str: + return self.value + + +class PIECE(Enum): + PAWN = "pawn" + ROOK = "rook" + KNIGHT = "knight" + BISHOP = "bishop" + QUEEN = "queen" + KING = "king" + EMPTY = "empty" + + def __str__(self) -> str: + return self.value + + +PieceAndColor = Tuple[PIECE, COLOR] + + +class FILE(int, Enum): + A = 1 + B = 2 + C = 3 + D = 4 + E = 5 + F = 6 + G = 7 + H = 8 + + +class RANK(int, Enum): + EIGHT = 8 + SEVEN = 7 + SIX = 6 + FIVE = 5 + FOUR = 4 + THREE = 3 + TWO = 2 + ONE = 1 + + +class _Position(NamedTuple): + file: FILE + rank: RANK + + def __str__(self) -> str: + return f"{self.file.name}{self.rank}" + + @property + def color(self): + if (self.file + self.rank) % 2: + return COLOR.WHITE + return COLOR.BLACK + + +# POSITION.{A8, A7, ..., H1} +POSITION = Enum("POSITION", {str(_Position(f, r)): _Position(f, r) for f in FILE for r in RANK}, type=_Position) + +# Squares is a dict mapping positions to square images, i.e. a board container during image processing +Squares = NewType("Squares", Dict[POSITION, np.ndarray]) + + +class Board(Dict[POSITION, PIECE]): + """Board is a dict mapping positions to a piece, i.e. a board configuration after all image processing""" + + +def imwrite(*args, **kwargs): + Path(args[0]).parent.mkdir(parents=True, exist_ok=True) + return cv2.imwrite(*args, **kwargs) + + +@lru_cache() +def load_classifier(filename): + print(f"Loading classifier {filename}") + return joblib.load(filename) diff --git a/utils.py b/utils.py deleted file mode 100644 index 73de8dcb..00000000 --- a/utils.py +++ /dev/null @@ -1,7 +0,0 @@ -from pathlib import Path -import cv2 - - -def imwrite(*args, **kwargs): - Path(args[0]).parent.mkdir(parents=True, exist_ok=True) - return cv2.imwrite(*args, **kwargs)