|
import argparse |
|
import ast |
|
import torch |
|
from PIL import Image |
|
import time |
|
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor |
|
from PIL import Image, ImageDraw |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Qwen-VL Inference") |
|
parser.add_argument("--image_path", type=str, required=False, default="./test_screenshots/amazon.png", help="Path to the input image") |
|
parser.add_argument("--query", type=str, required=False, default="Click on the 'Chairs'.", help="Text query or instruction") |
|
parser.add_argument("--model_dir", type=str, default="./", help="Path to the local ShowUI model directory") |
|
args = parser.parse_args() |
|
|
|
DEVICE = "cuda:0" |
|
|
|
|
|
MIN_PIXELS = 256 * 28 * 28 |
|
|
|
|
|
MAX_PIXELS = 1344 * 28 * 28 |
|
|
|
|
|
def draw_point_on_image(image_path, position, output_path="output_image.png", radius=2, color="red"): |
|
image = Image.open(image_path).convert("RGB") |
|
draw = ImageDraw.Draw(image) |
|
width, height = image.size |
|
x = int(position[0] * width) |
|
y = int(position[1] * height) |
|
draw.ellipse([(x - radius, y - radius), (x + radius, y + radius)], fill=color, outline=color) |
|
image.save(output_path) |
|
|
|
|
|
|
|
model = Qwen2VLForConditionalGeneration.from_pretrained( |
|
args.model_dir, |
|
torch_dtype=torch.float16, |
|
|
|
device_map="cpu", |
|
) |
|
|
|
print("Model dtype:", model.dtype) |
|
|
|
|
|
processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct", min_pixels=MIN_PIXELS, max_pixels=MAX_PIXELS) |
|
|
|
|
|
torch.cuda.empty_cache() |
|
|
|
_NAV_SYSTEM = """You are an assistant trained to navigate the {_APP} screen. |
|
Given a task instruction, a screen observation, and an action history sequence, |
|
output the next action and wait for the next observation. |
|
Here is the action space: |
|
{_ACTION_SPACE} |
|
""" |
|
|
|
_ACTION_MAP = """ |
|
1. CLICK: Click on an element, value is not applicable and the position [x,y] is required. |
|
2. INPUT: Type a string into an element, value is a string to type and the position [x,y] is required. |
|
3. HOVER: Hover on an element, value is not applicable and the position [x,y] is required. |
|
4. ENTER: Enter operation, value and position are not applicable. |
|
5. SCROLL: Scroll the screen, value is the direction to scroll and the position is not applicable. |
|
6. ESC: ESCAPE operation, value and position are not applicable. |
|
7. PRESS: Long click on an element, value is not applicable and the position [x,y] is required. |
|
""" |
|
|
|
_SYSTEM = _NAV_SYSTEM.format( |
|
_APP="web", |
|
_ACTION_SPACE=_ACTION_MAP |
|
) |
|
|
|
if args.query: |
|
_QUERY = args.query |
|
else: |
|
_QUERY = "Click on the 'Chairs'." |
|
|
|
|
|
messages = [ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": _SYSTEM}, |
|
{"type": "image", "image": args.image_path, "min_pixels": MIN_PIXELS, "max_pixels": MAX_PIXELS}, |
|
{"type": "text", "text": _QUERY} |
|
], |
|
} |
|
] |
|
|
|
|
|
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
|
|
|
|
|
image = Image.open(args.image_path).convert("RGB") |
|
inputs = processor( |
|
text=[text], |
|
images=[image], |
|
padding=True, |
|
return_tensors="pt" |
|
) |
|
|
|
|
|
model = model.to(DEVICE) |
|
inputs = inputs.to(DEVICE) |
|
|
|
|
|
print(f"Max CUDA memory after model load: {torch.cuda.max_memory_allocated(device=DEVICE)/1024**2:.2f} MB") |
|
|
|
|
|
torch.cuda.reset_peak_memory_stats(device=DEVICE) |
|
|
|
N_RUNS = 10 |
|
times = [] |
|
|
|
model.eval() |
|
with torch.no_grad(): |
|
|
|
for i in range(N_RUNS): |
|
start_time = time.time() |
|
|
|
generated_ids = model.generate(**inputs, max_new_tokens=128) |
|
|
|
|
|
generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)] |
|
output_text = processor.batch_decode( |
|
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False |
|
)[0] |
|
|
|
|
|
try: |
|
result = ast.literal_eval(output_text) |
|
except: |
|
result = output_text |
|
|
|
end_time = time.time() |
|
times.append(end_time - start_time) |
|
print(f"Run {i+1}/{N_RUNS} - Time: {end_time - start_time:.4f} s, Output: {result}") |
|
|
|
|
|
|
|
if result['action'].upper() == 'CLICK': |
|
x, y = result['position'][0], result['position'][1] |
|
draw_point_on_image(args.image_path, [x, y], output_path="./output_image.png") |
|
|
|
|
|
|
|
avg_time = sum(times) / len(times) |
|
print(f"Average per inference time: {avg_time:.4f} seconds") |
|
|
|
|
|
print(f"Max CUDA memory after inference: {torch.cuda.max_memory_allocated(device=DEVICE)/1024**2:.2f} MB") |
|
|
|
|
|
print(f"Input image size: {Image.open(args.image_path).size}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|