mirror of
https://github.com/hongwt/hekili-rotation-bot.git
synced 2026-05-06 21:41:10 +08:00
将线程修改为进程,避免python全局锁的限制
This commit is contained in:
61
bot.py
61
bot.py
@@ -5,23 +5,23 @@ import pyautogui
|
||||
|
||||
import numpy as np
|
||||
|
||||
from threading import Thread, Lock
|
||||
from PIL import ImageGrab
|
||||
|
||||
import multiprocessing
|
||||
|
||||
from vision import Vision
|
||||
|
||||
import config
|
||||
|
||||
class WowBot:
|
||||
|
||||
# threading properties
|
||||
stopped = True
|
||||
lock = None
|
||||
|
||||
capture = None
|
||||
vision = None
|
||||
|
||||
def __init__(self, capture, vision):
|
||||
# create a thread lock object
|
||||
self.lock = Lock()
|
||||
|
||||
self.capture = capture
|
||||
self.vision = vision
|
||||
def __init__(self):
|
||||
self.vision = Vision()
|
||||
|
||||
def press_ability_key(self, key, cooldown):
|
||||
# delay = random.uniform(0.1, 0.2)
|
||||
@@ -32,27 +32,30 @@ class WowBot:
|
||||
print(f'Casting ability {key} with cooldown {cooldown} seconds.')
|
||||
pyautogui.press(key)
|
||||
|
||||
|
||||
def runBot(self):
|
||||
while not self.stopped:
|
||||
screenshot = ImageGrab.grab(bbox=(config.HEKILI_X,
|
||||
config.HEKILI_Y,
|
||||
config.HEKILI_X + config.HEKILI_W,
|
||||
config.HEKILI_Y + config.HEKILI_H))
|
||||
if screenshot is None:
|
||||
continue
|
||||
|
||||
screenshot_np = np.array(screenshot)
|
||||
loop_time = time.time()
|
||||
key = self.vision.get_ability_key(screenshot_np)
|
||||
print(f'vision FPS {1 / (time.time() - loop_time)}')
|
||||
if (key and key != ''):
|
||||
self.press_ability_key(key, 0)
|
||||
|
||||
def start(self):
|
||||
self.stopped = False
|
||||
t = Thread(target=self.run)
|
||||
t.start()
|
||||
self.process = multiprocessing.Process(target=self.runBot)
|
||||
self.process.start()
|
||||
|
||||
def stop(self):
|
||||
self.stopped = True
|
||||
|
||||
# main logic controller
|
||||
def run(self):
|
||||
loop_time = time.time()
|
||||
while not self.stopped:
|
||||
if self.capture.screenshot is None:
|
||||
continue
|
||||
screenshot_np = np.array(self.capture.screenshot)
|
||||
ability_key = self.vision.get_ability_key(screenshot_np)
|
||||
if (ability_key and ability_key != ''):
|
||||
# ability_cooldown = self.vision.get_ability_cooldown(screenshot_np)
|
||||
# if (ability_cooldown > 1):
|
||||
# print(f'Ability {ability_key} is on cooldown for {ability_cooldown} seconds.')
|
||||
# else:
|
||||
self.press_ability_key(ability_key, 0)
|
||||
print(f'WoW Bot FPS {1 / (time.time() - loop_time)}')
|
||||
loop_time = time.time()
|
||||
if self.process:
|
||||
self.process.terminate()
|
||||
self.process.join()
|
||||
self.stopped = True
|
||||
28
main.py
28
main.py
@@ -1,13 +1,14 @@
|
||||
import os
|
||||
|
||||
from PyQt5.QtCore import Qt
|
||||
from PyQt5.QtWidgets import *
|
||||
from PyQt5.QtGui import QPixmap, QImage, qRgb, QPainter, QPen
|
||||
|
||||
import win32gui
|
||||
from PIL import ImageGrab
|
||||
|
||||
import config
|
||||
from windowcapture import WindowCapture
|
||||
from vision import Vision
|
||||
from bot import WowBot
|
||||
|
||||
def list_window_names():
|
||||
@@ -15,11 +16,15 @@ def list_window_names():
|
||||
if win32gui.IsWindowVisible(hwnd):
|
||||
print(hex(hwnd), win32gui.GetWindowText(hwnd))
|
||||
win32gui.EnumWindows(winEnumHandler, None)
|
||||
|
||||
class WinGUI(QWidget):
|
||||
|
||||
# threading properties
|
||||
stopped = True
|
||||
lock = None
|
||||
|
||||
# properties
|
||||
hwnd = None
|
||||
vision = None
|
||||
bot = None
|
||||
|
||||
def __init__(self):
|
||||
@@ -35,18 +40,16 @@ class WinGUI(QWidget):
|
||||
# find the handle for the window we want to capture.
|
||||
# if no window name is given, capture the entire screen
|
||||
self.hwnd = win32gui.FindWindow(None, config.WOW_WINDOW_NAME)
|
||||
self.capture = WindowCapture(config.HEKILI_X,
|
||||
config.HEKILI_Y,
|
||||
config.HEKILI_X + config.HEKILI_W,
|
||||
config.HEKILI_Y + config.HEKILI_H)
|
||||
self.capture = WindowCapture()
|
||||
self.capture.closeEvent = self.handleWidgetClose
|
||||
self.vision = Vision()
|
||||
self.bot = WowBot(self.capture, self.vision)
|
||||
|
||||
self.bot = WowBot()
|
||||
|
||||
self.canvas_hekili_zone = self.__create_canvas_hekili_zone(self)
|
||||
|
||||
self.frame_hekili_zone = self.__create_frame_hekili_zone(self)
|
||||
self.label_hekili = self.__create_label_hekili(self.frame_hekili_zone)
|
||||
|
||||
self.label_hekili_x = self.__create_label_hekili_x(self.frame_hekili_zone)
|
||||
self.input_hekili_x = self.__create_input_hekili_x(self.frame_hekili_zone)
|
||||
self.label_hekili_y = self.__create_label_hekili_y(self.frame_hekili_zone)
|
||||
@@ -354,7 +357,10 @@ class WinGUI(QWidget):
|
||||
self.paintImage(event)
|
||||
|
||||
def paintImage(self, event):
|
||||
screenshot = self.capture.get_screenshot()
|
||||
screenshot = ImageGrab.grab(bbox=(config.HEKILI_X,
|
||||
config.HEKILI_Y,
|
||||
config.HEKILI_X + config.HEKILI_W,
|
||||
config.HEKILI_Y + config.HEKILI_H))
|
||||
# 将截图转换为 QImage 对象
|
||||
qt_image = QImage(screenshot.tobytes(), screenshot.width, screenshot.height, screenshot.width * 3, QImage.Format_RGB888)
|
||||
# Convert qt_image to QPixmap
|
||||
@@ -374,17 +380,15 @@ class WinGUI(QWidget):
|
||||
self.canvas_hekili_zone.setPixmap(pixmap)
|
||||
|
||||
def startRotation(self):
|
||||
if (self.capture.stopped):
|
||||
if (self.bot.stopped):
|
||||
print("开始...")
|
||||
# 将窗口设置为前置
|
||||
if self.hwnd:
|
||||
win32gui.SetForegroundWindow(self.hwnd)
|
||||
self.capture.start()
|
||||
self.bot.start()
|
||||
self.buttonStart.setText("结束")
|
||||
else:
|
||||
print("结束...")
|
||||
self.capture.stop()
|
||||
self.bot.stop()
|
||||
self.buttonStart.setText("开始")
|
||||
|
||||
|
||||
56
vision.py
56
vision.py
@@ -1,37 +1,44 @@
|
||||
import cv2 as cv
|
||||
import time
|
||||
import numpy as np
|
||||
from PIL import Image, ImageGrab
|
||||
import torch
|
||||
from torchvision import transforms as T
|
||||
from PIL import Image
|
||||
|
||||
import config
|
||||
|
||||
class Vision:
|
||||
|
||||
# properties
|
||||
model = None
|
||||
models = ['parseq', 'parseq_tiny', 'abinet', 'crnn', 'trba', 'vitstr']
|
||||
|
||||
def __init__(self):
|
||||
# load the trained model
|
||||
# 2 指定运行设备,这里为单块GPU
|
||||
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
|
||||
|
||||
self.model = torch.hub.load('baudm/parseq', 'parseq_tiny', pretrained=True).eval()
|
||||
self._model_cache = {}
|
||||
self._preprocess = T.Compose([
|
||||
T.Resize((32, 128), T.InterpolationMode.BICUBIC),
|
||||
T.ToTensor(),
|
||||
T.Normalize(0.5, 0.5)
|
||||
])
|
||||
|
||||
def convertToText(self, image):
|
||||
def _get_model(self, name):
|
||||
if name in self._model_cache:
|
||||
return self._model_cache[name]
|
||||
model = torch.hub.load('baudm/parseq', name, pretrained=True, trust_repo=True).eval()
|
||||
self._model_cache[name] = model
|
||||
return model
|
||||
|
||||
@torch.inference_mode()
|
||||
def convertToText(self, model_name, image):
|
||||
if image is None:
|
||||
return ''
|
||||
image = Image.fromarray(image).convert('RGB')
|
||||
image = self._preprocess(image).unsqueeze(0)
|
||||
return '', []
|
||||
|
||||
image = Image.fromarray(image)
|
||||
|
||||
model = self._get_model(model_name)
|
||||
image = self._preprocess(image.convert('RGB')).unsqueeze(0)
|
||||
# Greedy decoding
|
||||
pred = self.model(image).softmax(-1)
|
||||
label, _ = self.model.tokenizer.decode(pred)
|
||||
raw_label, raw_confidence = self.model.tokenizer.decode(pred, raw=True)
|
||||
pred = model(image).softmax(-1)
|
||||
label, _ = model.tokenizer.decode(pred)
|
||||
raw_label, raw_confidence = model.tokenizer.decode(pred, raw=True)
|
||||
# Format confidence values
|
||||
max_len = len(label[0]) + 1
|
||||
conf = list(map('{:0.1f}'.format, raw_confidence[0][:max_len].tolist()))
|
||||
@@ -44,16 +51,14 @@ class Vision:
|
||||
else:
|
||||
return ''
|
||||
|
||||
def get_ability_key(self, screenshot_np):
|
||||
def get_ability_key(self, screenshot_np=None):
|
||||
# 技能按键区域
|
||||
ability_key_image = screenshot_np[config.ABILITY_KEY_Y:config.ABILITY_KEY_Y+config.ABILITY_KEY_H,
|
||||
config.ABILITY_KEY_X:config.ABILITY_KEY_X+config.ABILITY_KEY_W]
|
||||
if ability_key_image.size == 0:
|
||||
print("技能按键区域图像为空,可能是配置的区域超出了原图的范围。")
|
||||
return ''
|
||||
loop_time = time.time()
|
||||
key_text = self.convertToText(ability_key_image)
|
||||
print(f"convertToText take time: {time.time() - loop_time}...")
|
||||
key_text = self.convertToText('parseq_tiny', ability_key_image)
|
||||
# if config.DEBUG:
|
||||
# cv.imwrite('images/Key_{}_{}.jpg'.format(key_text, time.time()), ability_key_image)
|
||||
|
||||
@@ -81,3 +86,16 @@ class Vision:
|
||||
return int(cooldown_text)
|
||||
else:
|
||||
return -1
|
||||
|
||||
if __name__ == '__main__':
|
||||
vision = Vision()
|
||||
while True:
|
||||
screenshot_np = ImageGrab.grab(bbox=(config.HEKILI_X,
|
||||
config.HEKILI_Y,
|
||||
config.HEKILI_X + config.HEKILI_W,
|
||||
config.HEKILI_Y + config.HEKILI_H))
|
||||
screenshot_np = np.array(screenshot_np)
|
||||
loop_time = time.time()
|
||||
key = vision.get_ability_key(screenshot_np)
|
||||
print(f'vision FPS {1 / (time.time() - loop_time)}')
|
||||
print('key output: ', key)
|
||||
|
||||
@@ -1,38 +1,17 @@
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
from PIL import ImageGrab
|
||||
import win32gui
|
||||
from PyQt5.QtCore import Qt
|
||||
from PyQt5.QtGui import QPixmap, QPainter, QPen
|
||||
from PyQt5.QtWidgets import QApplication, QLabel, QWidget
|
||||
import ctypes
|
||||
|
||||
from threading import Thread, Lock
|
||||
from PyQt5.QtGui import QPainter, QPen
|
||||
from PyQt5.QtWidgets import QApplication, QWidget
|
||||
|
||||
class WindowCapture(QWidget):
|
||||
|
||||
# threading properties
|
||||
stopped = True
|
||||
lock = None
|
||||
|
||||
# properties
|
||||
screenshot = None
|
||||
# Properties
|
||||
x1, y1, x2, y2 = 0, 0, 0, 0
|
||||
|
||||
def __init__(self, x1, y1, x2, y2):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.begin = None
|
||||
self.end = None
|
||||
|
||||
# create a thread lock object
|
||||
self.lock = Lock()
|
||||
|
||||
self.x1 = x1
|
||||
self.y1 = y1
|
||||
self.x2 = x2
|
||||
self.y2 = y2
|
||||
|
||||
# Qt.WindowStaysOnTopHint 置顶窗口
|
||||
# Qt.FramelessWindowHint 产生一个无窗口边框的窗口,此时用户无法移动该窗口和改变它的大小
|
||||
self.setWindowFlags(Qt.WindowStaysOnTopHint | Qt.FramelessWindowHint)
|
||||
@@ -74,31 +53,6 @@ class WindowCapture(QWidget):
|
||||
painter.drawRect(self.begin.x(), self.begin.y(),
|
||||
self.end.x() - self.begin.x(), self.end.y() - self.begin.y())
|
||||
|
||||
def get_screenshot(self):
|
||||
# 获取屏幕截图
|
||||
return ImageGrab.grab(bbox=(self.x1, self.y1, self.x2, self.y2))
|
||||
|
||||
def start(self):
|
||||
self.stopped = False
|
||||
t = Thread(target=self.run)
|
||||
t.start()
|
||||
|
||||
def stop(self):
|
||||
self.stopped = True
|
||||
|
||||
def run(self):
|
||||
# TODO: you can write your own time/iterations calculation to determine how fast this is
|
||||
loop_time = time.time()
|
||||
while not self.stopped:
|
||||
# get an updated image of the game
|
||||
screenshot = self.get_screenshot()
|
||||
# lock the thread while updating the results
|
||||
self.lock.acquire()
|
||||
self.screenshot = screenshot
|
||||
self.lock.release()
|
||||
# print(f'Window Capture FPS {1 / (time.time() - loop_time)}')
|
||||
loop_time = time.time()
|
||||
|
||||
def main():
|
||||
app = QApplication([])
|
||||
widget = WindowCapture()
|
||||
|
||||
Reference in New Issue
Block a user