From a317c4c69deee6e72a8bb1d2f7d6c863b18430df Mon Sep 17 00:00:00 2001 From: stachir Date: Wed, 10 Jun 2026 10:41:28 +0200 Subject: [PATCH] init --- ev3-robotic-arm-master/.vscode/launch.json | 22 + .../robotarmmovement/tests/test01_master.py | 795 ++++++ .../robotarmmovement/tests/test01_slave.py | 629 +++++ .../tests/test_00_onlymaster.py | 2154 +++++++++++++++++ .../tests/test_00_onlyslave.py | 1715 +++++++++++++ 5 files changed, 5315 insertions(+) create mode 100644 ev3-robotic-arm-master/.vscode/launch.json create mode 100644 ev3-robotic-arm-master/robotarmmovement/tests/test01_master.py create mode 100644 ev3-robotic-arm-master/robotarmmovement/tests/test01_slave.py create mode 100644 ev3-robotic-arm-master/robotarmmovement/tests/test_00_onlymaster.py create mode 100644 ev3-robotic-arm-master/robotarmmovement/tests/test_00_onlyslave.py diff --git a/ev3-robotic-arm-master/.vscode/launch.json b/ev3-robotic-arm-master/.vscode/launch.json new file mode 100644 index 0000000..1db906a --- /dev/null +++ b/ev3-robotic-arm-master/.vscode/launch.json @@ -0,0 +1,22 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Download and Run current file", + "type": "ev3devBrowser", + "request": "launch", + "program": "/home/robot/${workspaceFolderBasename}/${relativeFile}", + "interactiveTerminal": true + }, + { + "name": "Download and Run my-program", + "type": "ev3devBrowser", + "request": "launch", + "program": "/home/robot/${workspaceFolderBasename}/my-program (replace 'my-program' with the actual path)", + "interactiveTerminal": true + } + ] +} \ No newline at end of file diff --git a/ev3-robotic-arm-master/robotarmmovement/tests/test01_master.py b/ev3-robotic-arm-master/robotarmmovement/tests/test01_master.py new file mode 100644 index 0000000..10df929 --- /dev/null +++ b/ev3-robotic-arm-master/robotarmmovement/tests/test01_master.py @@ -0,0 +1,795 @@ +#!/usr/bin/env pybricks-micropython + +from math import cos, pi + +from pybricks.hubs import EV3Brick +from pybricks.ev3devices import Motor +from pybricks.parameters import Port, Stop +from pybricks.tools import wait, StopWatch +from pybricks.messaging import BluetoothMailboxClient, TextMailbox + + +# ============================================================ +# MASTER BRICK PROGRAM +# ============================================================ +# +# Run slave_main.py FIRST. +# Then run this master program. +# +# Master: +# - connects to slave, +# - sends full slave-side movement plan, +# - waits for READY, +# - sends GO, +# - executes master-side local joints, +# - waits for slave DONE. +# +# This avoids sending per-joint commands during the movement itself. + + +# ============================================================ +# BLUETOOTH SETTINGS +# ============================================================ + +# Change this to the Bluetooth name of your slave EV3. +# By default many EV3 bricks are named 'ev3dev'. +SLAVE_BRICK_NAME = 'ev3armslave' + +# Optional delay after sending GO before master starts its own local movement. +# +# Bluetooth mailboxes are not hard real-time. If your measurements show that +# master starts earlier than slave, increase this value slightly. +# +# Start with 0. Later calibrate using camera/audio/LED timestamps. +MASTER_START_DELAY_AFTER_GO_MS = 0 + + +# ============================================================ +# MASTER JOINT PORTS +# ============================================================ + +BASE_PORT = Port.A +SHOULDER_LEFT_PORT = Port.B +SHOULDER_RIGHT_PORT = Port.C +ELBOW_PORT = Port.D + + +# ============================================================ +# MOTION SETTINGS +# ============================================================ + +DEFAULT_SPEED = 200 +PROFILE_UPDATE_MS = 20 +POLL_MS = 10 + +DEFAULT_EASING = 'linear' +CLAMP_EASING_OUTPUT = True + +DEADBAND_MOTOR_DEG = 1.5 +TRACK_TARGET_MIN_UPDATE_MOTOR_DEG = 1.0 + +SOUND_ENABLED = True + + +# ============================================================ +# MASTER JOINT CONFIGURATION +# ============================================================ + +GEAR_RATIOS = { + 'base': 7.5, + 'shoulder': 7.5, + 'elbow': 5.0, +} + +JOINT_LIMITS = { + 'base': None, + 'shoulder': None, + 'elbow': None, +} + +SHOULDER_SIGNS = [1, 1] + +STOP_MODE_BY_JOINT = { + 'base': Stop.BRAKE, + 'shoulder': Stop.BRAKE, + 'elbow': Stop.BRAKE, +} + + +# ============================================================ +# HARDWARE / BLUETOOTH +# ============================================================ + +ev3 = EV3Brick() + +base_motor = Motor(BASE_PORT) +shoulder_left_motor = Motor(SHOULDER_LEFT_PORT) +shoulder_right_motor = Motor(SHOULDER_RIGHT_PORT) +elbow_motor = Motor(ELBOW_PORT) + +client = BluetoothMailboxClient() +cmd_box = TextMailbox('command', client) +status_box = TextMailbox('status', client) + + +# ============================================================ +# UTILITIES +# ============================================================ + +def clamp(value, min_value, max_value): + if value < min_value: + return min_value + if value > max_value: + return max_value + return value + + +def abs_value(value): + if value < 0: + return -value + return value + + +def beep_ok(): + if SOUND_ENABLED: + ev3.speaker.beep(frequency=1000, duration=80) + + +def beep_error(): + if SOUND_ENABLED: + ev3.speaker.beep(frequency=220, duration=200) + wait(80) + ev3.speaker.beep(frequency=220, duration=200) + + +def beep_start_marker(): + if SOUND_ENABLED: + ev3.speaker.beep(frequency=1400, duration=60) + + +# ============================================================ +# EASING FUNCTIONS +# ============================================================ + +def ease_linear(x): + return x + + +def ease_in_out_sine(x): + return -(cos(pi * x) - 1) / 2 + + +def ease_in_out_quad(x): + if x < 0.5: + return 2 * x * x + return 1 - ((-2 * x + 2) ** 2) / 2 + + +def ease_in_out_cubic(x): + if x < 0.5: + return 4 * x * x * x + return 1 - ((-2 * x + 2) ** 3) / 2 + + +def ease_out_quad(x): + return 1 - (1 - x) * (1 - x) + + +EASINGS = { + 'linear': ease_linear, + 'easeInOutSine': ease_in_out_sine, + 'easeInOutQuad': ease_in_out_quad, + 'easeInOutCubic': ease_in_out_cubic, + 'easeOutQuad': ease_out_quad, +} + + +def get_easing(name): + if name is None or name == '': + name = DEFAULT_EASING + + if name in EASINGS: + return EASINGS[name] + + return EASINGS[DEFAULT_EASING] + + +# ============================================================ +# JOINT CLASS +# ============================================================ + +class Joint: + """ + Logical master-side joint. + + Supports one or more physical motors. + + Coordinate convention: + motor_angle = joint_angle * gear_ratio + """ + + def __init__(self, name, motors, signs, gear_ratio, joint_limits, stop_mode): + self.name = name + self.motors = motors + self.signs = signs + self.gear_ratio = gear_ratio + self.joint_limits = joint_limits + self.stop_mode_value = stop_mode + + def motor_angle(self): + total = 0 + count = len(self.motors) + + for i in range(count): + total += self.motors[i].angle() * self.signs[i] + + return total / count + + def joint_angle(self): + return self.motor_angle() / self.gear_ratio + + def joint_to_motor(self, joint_angle): + return joint_angle * self.gear_ratio + + def motor_to_joint(self, motor_angle): + return motor_angle / self.gear_ratio + + def reset_joint_angle(self, joint_angle): + motor_angle = self.joint_to_motor(joint_angle) + + for i in range(len(self.motors)): + self.motors[i].reset_angle(motor_angle * self.signs[i]) + + def check_joint_limit(self, joint_target): + if self.joint_limits is None: + return + + min_angle = self.joint_limits[0] + max_angle = self.joint_limits[1] + + if joint_target < min_angle or joint_target > max_angle: + raise ValueError( + '{} target {} outside limits {}..{}'.format( + self.name, joint_target, min_angle, max_angle + ) + ) + + def physical_target(self, logical_motor_target, motor_index): + return int(logical_motor_target * self.signs[motor_index]) + + def track_motor_target(self, logical_motor_target): + joint_target = self.motor_to_joint(logical_motor_target) + self.check_joint_limit(joint_target) + + for i in range(len(self.motors)): + self.motors[i].track_target( + self.physical_target(logical_motor_target, i) + ) + + def apply_stop(self): + for motor in self.motors: + if self.stop_mode_value == Stop.HOLD: + motor.hold() + elif self.stop_mode_value == Stop.COAST: + motor.stop() + else: + motor.brake() + + +# ============================================================ +# MASTER JOINTS +# ============================================================ + +base = Joint( + 'base', + [base_motor], + [1], + GEAR_RATIOS['base'], + JOINT_LIMITS['base'], + STOP_MODE_BY_JOINT['base'] +) + +shoulder = Joint( + 'shoulder', + [shoulder_left_motor, shoulder_right_motor], + SHOULDER_SIGNS, + GEAR_RATIOS['shoulder'], + JOINT_LIMITS['shoulder'], + STOP_MODE_BY_JOINT['shoulder'] +) + +elbow = Joint( + 'elbow', + [elbow_motor], + [1], + GEAR_RATIOS['elbow'], + JOINT_LIMITS['elbow'], + STOP_MODE_BY_JOINT['elbow'] +) + +JOINTS = { + 'base': base, + 'shoulder': shoulder, + 'elbow': elbow, +} + +JOINT_ORDER = ['base', 'shoulder', 'elbow'] + + +# ============================================================ +# COMMAND HELPERS +# ============================================================ + +def ABS_J(joint, angle): + return { + 'joint': joint, + 'unit': 'joint', + 'mode': 'abs', + 'value': angle + } + + +def REL_J(joint, delta): + return { + 'joint': joint, + 'unit': 'joint', + 'mode': 'rel', + 'value': delta + } + + +def ABS_M(joint, angle): + return { + 'joint': joint, + 'unit': 'motor', + 'mode': 'abs', + 'value': angle + } + + +def REL_M(joint, delta): + return { + 'joint': joint, + 'unit': 'motor', + 'mode': 'rel', + 'value': delta + } + + +def STEP(label, master_cmds, slave_cmds, duration_ms, easing='linear', pause_ms=1000): + return { + 'label': label, + 'master_cmds': master_cmds, + 'slave_cmds': slave_cmds, + 'duration_ms': duration_ms, + 'easing': easing, + 'pause_ms': pause_ms + } + + +# ============================================================ +# LOCAL MASTER PLANNING +# ============================================================ + +def validate_parallel(commands): + seen = [] + + for cmd in commands: + name = cmd['joint'] + + if name in seen: + raise ValueError('Joint repeated in one parallel group: {}'.format(name)) + + seen.append(name) + + +def plan_master_commands(commands): + validate_parallel(commands) + + plans = [] + + for cmd in commands: + joint = JOINTS[cmd['joint']] + unit = cmd['unit'] + mode = cmd['mode'] + value = cmd['value'] + + start_motor = joint.motor_angle() + start_joint = joint.joint_angle() + + if mode == 'abs': + if unit == 'motor': + target_motor = value + target_joint = joint.motor_to_joint(target_motor) + elif unit == 'joint': + target_joint = value + target_motor = joint.joint_to_motor(target_joint) + else: + raise ValueError('Unknown unit: {}'.format(unit)) + + elif mode == 'rel': + if unit == 'motor': + target_motor = start_motor + value + target_joint = joint.motor_to_joint(target_motor) + elif unit == 'joint': + target_joint = start_joint + value + target_motor = joint.joint_to_motor(target_joint) + else: + raise ValueError('Unknown unit: {}'.format(unit)) + + else: + raise ValueError('Unknown mode: {}'.format(mode)) + + joint.check_joint_limit(target_joint) + + delta_motor = target_motor - start_motor + skip = abs_value(delta_motor) <= DEADBAND_MOTOR_DEG + + plans.append({ + 'joint': joint, + 'start_motor': start_motor, + 'target_motor': target_motor, + 'delta_motor': delta_motor, + 'last_sent': None, + 'skip': skip + }) + + return plans + + +def maybe_track(plan, target, force): + joint = plan['joint'] + last = plan['last_sent'] + + if force or last is None: + joint.track_motor_target(target) + plan['last_sent'] = target + return + + if abs_value(target - last) >= TRACK_TARGET_MIN_UPDATE_MOTOR_DEG: + joint.track_motor_target(target) + plan['last_sent'] = target + + +def execute_master_plan_timed(plans, duration_ms, easing_name): + if duration_ms <= 0: + return True + + easing = get_easing(easing_name) + active = [] + + for plan in plans: + if not plan['skip']: + active.append(plan) + + if len(active) == 0: + return True + + timer = StopWatch() + + while timer.time() < duration_ms: + ratio = timer.time() / duration_ms + ratio = clamp(ratio, 0, 1) + + eased = easing(ratio) + + if CLAMP_EASING_OUTPUT: + eased = clamp(eased, 0, 1) + + for plan in active: + target = plan['start_motor'] + plan['delta_motor'] * eased + maybe_track(plan, target, False) + + wait(PROFILE_UPDATE_MS) + + for plan in active: + maybe_track(plan, plan['target_motor'], True) + + for plan in active: + plan['joint'].apply_stop() + + return True + + +def reset_master_as_zero(): + for name in JOINT_ORDER: + JOINTS[name].reset_joint_angle(0) + + +def print_master_states(prefix): + print(prefix) + + for name in JOINT_ORDER: + joint = JOINTS[name] + print( + '{}: motor={} joint={}'.format( + name, joint.motor_angle(), joint.joint_angle() + ) + ) + + +# ============================================================ +# SLAVE MESSAGE ENCODING +# ============================================================ + +def encode_slave_commands(commands): + if len(commands) == 0: + return 'NONE' + + parts = [] + + for cmd in commands: + parts.append( + '{},{},{},{}'.format( + cmd['joint'], + cmd['unit'], + cmd['mode'], + cmd['value'] + ) + ) + + return ';'.join(parts) + + +def send_slave_prepare(step_id, duration_ms, easing, slave_cmds): + payload = encode_slave_commands(slave_cmds) + + msg = 'PREP|{}|{}|{}|{}'.format( + step_id, + duration_ms, + easing, + payload + ) + + print('MASTER TX: {}'.format(msg)) + cmd_box.send(msg) + + status_box.wait() + reply = status_box.read() + + print('MASTER RX: {}'.format(reply)) + + expected = 'READY|{}'.format(step_id) + + if reply != expected: + ev3.screen.clear() + ev3.screen.print('SLAVE PREP ERR') + ev3.screen.print(reply[:16]) + ev3.screen.print(reply[16:32]) + raise RuntimeError('Unexpected slave reply: {}'.format(reply)) + + +def send_slave_go(step_id): + msg = 'GO|{}'.format(step_id) + print('MASTER TX: {}'.format(msg)) + cmd_box.send(msg) + + +def wait_slave_done(step_id): + status_box.wait() + reply = status_box.read() + + print('MASTER RX: {}'.format(reply)) + + expected = 'DONE|{}'.format(step_id) + + if reply != expected: + ev3.screen.clear() + ev3.screen.print('SLAVE RUN ERR') + ev3.screen.print(reply[:16]) + ev3.screen.print(reply[16:32]) + raise RuntimeError('Unexpected slave completion reply: {}'.format(reply)) + + +# ============================================================ +# COORDINATED STEP EXECUTION +# ============================================================ + +def run_coordinated_step(step_id, step): + label = step['label'] + master_cmds = step['master_cmds'] + slave_cmds = step['slave_cmds'] + duration_ms = step['duration_ms'] + easing = step['easing'] + pause_ms = step['pause_ms'] + + ev3.screen.clear() + ev3.screen.print('STEP {}'.format(step_id)) + ev3.screen.print(label) + + print('') + print('=' * 70) + print('COORDINATED STEP {}: {}'.format(step_id, label)) + print('=' * 70) + + master_plans = plan_master_commands(master_cmds) + + # Prepare slave before movement. + if len(slave_cmds) > 0: + send_slave_prepare(step_id, duration_ms, easing, slave_cmds) + + # Start marker. + beep_start_marker() + + # Send GO to slave, then start local motion. + if len(slave_cmds) > 0: + send_slave_go(step_id) + + if MASTER_START_DELAY_AFTER_GO_MS > 0: + wait(MASTER_START_DELAY_AFTER_GO_MS) + + execute_master_plan_timed(master_plans, duration_ms, easing) + + # Wait for slave completion. + if len(slave_cmds) > 0: + wait_slave_done(step_id) + + print_master_states('MASTER after step:') + + beep_ok() + + if pause_ms > 0: + wait(pause_ms) + + +# ============================================================ +# TEST SCENARIO INVOLVING BOTH BRICKS +# ============================================================ +# +# Master joints: +# base, shoulder, elbow +# +# Slave joints: +# elbow_rotate, wrist_flexion_extension, wrist_pronation_supination, gripper +# +# Rename slave joints after you define their real mechanical meaning. + +SCENARIO = [ + + STEP( + 'Test one slave joint only', + master_cmds=[ + ABS_J('base', 0), + ], + slave_cmds=[ + ABS_J('elbow_rotate', 0), + ], + duration_ms=1000, + easing='linear', + pause_ms=1000 + ), + + + STEP( + 'All joints zero', + master_cmds=[ + ABS_J('base', 0), + ABS_J('shoulder', 0), + ABS_J('elbow', 0), + ], + slave_cmds=[ + ABS_J('elbow_rotate', 0), + ABS_J('wrist_flexion_extension', 0), + ABS_J('wrist_pronation_supination', 0), + ABS_J('gripper', 0), + ], + duration_ms=3000, + easing='linear', + pause_ms=1000 + ), + + STEP( + 'Base + elbow_rotate synchronized', + master_cmds=[ + ABS_J('base', 15), + ], + slave_cmds=[ + ABS_J('elbow_rotate', 15), + ], + duration_ms=3000, + easing='easeInOutSine', + pause_ms=1000 + ), + + STEP( + 'Shoulder + elbow + two slave joints', + master_cmds=[ + ABS_J('shoulder', 8), + ABS_J('elbow', 12), + ], + slave_cmds=[ + ABS_J('wrist_flexion_extension', 10), + ABS_J('wrist_pronation_supination', -10), + ], + duration_ms=4000, + easing='easeInOutQuad', + pause_ms=1000 + ), + + STEP( + 'All master and slave joints small negative pose', + master_cmds=[ + ABS_J('base', -12), + ABS_J('shoulder', -8), + ABS_J('elbow', -12), + ], + slave_cmds=[ + ABS_J('elbow_rotate', -10), + ABS_J('wrist_flexion_extension', -10), + ABS_J('wrist_pronation_supination', 10), + ABS_J('gripper', 5), + ], + duration_ms=4000, + easing='easeInOutCubic', + pause_ms=1000 + ), + + STEP( + 'Relative coordinated movement', + master_cmds=[ + REL_J('base', 10), + REL_J('elbow', 8), + ], + slave_cmds=[ + REL_J('elbow_rotate', 8), + REL_J('gripper', -5), + ], + duration_ms=3000, + easing='easeOutQuad', + pause_ms=1000 + ), + + STEP( + 'Return all joints to zero', + master_cmds=[ + ABS_J('base', 0), + ABS_J('shoulder', 0), + ABS_J('elbow', 0), + ], + slave_cmds=[ + ABS_J('elbow_rotate', 0), + ABS_J('wrist_flexion_extension', 0), + ABS_J('wrist_pronation_supination', 0), + ABS_J('gripper', 0), + ], + duration_ms=4000, + easing='easeInOutSine', + pause_ms=1000 + ), +] + + +# ============================================================ +# MAIN PROGRAM +# ============================================================ + +ev3.screen.clear() +ev3.screen.print('MASTER') +ev3.screen.print('Connecting') + +print('MASTER: connecting to slave {}'.format(SLAVE_BRICK_NAME)) +client.connect(SLAVE_BRICK_NAME) +print('MASTER: connected') + +ev3.screen.clear() +ev3.screen.print('MASTER') +ev3.screen.print('Connected') + +reset_master_as_zero() +print_master_states('MASTER initial zero:') + +try: + for i in range(len(SCENARIO)): + run_coordinated_step(i, SCENARIO[i]) + +except Exception as e: + print('MASTER ERROR: {}'.format(e)) + beep_error() + cmd_box.send('STOP') + +ev3.screen.clear() +ev3.screen.print('MASTER') +ev3.screen.print('DONE') + +print('MASTER: scenario complete') +beep_ok() \ No newline at end of file diff --git a/ev3-robotic-arm-master/robotarmmovement/tests/test01_slave.py b/ev3-robotic-arm-master/robotarmmovement/tests/test01_slave.py new file mode 100644 index 0000000..5fadd05 --- /dev/null +++ b/ev3-robotic-arm-master/robotarmmovement/tests/test01_slave.py @@ -0,0 +1,629 @@ +#!/usr/bin/env pybricks-micropython + +from math import sin, cos, pi + +from pybricks.hubs import EV3Brick +from pybricks.ev3devices import Motor +from pybricks.parameters import Port, Stop +from pybricks.tools import wait, StopWatch +from pybricks.messaging import BluetoothMailboxServer, TextMailbox + + +# ============================================================ +# SLAVE BRICK PROGRAM +# ============================================================ +# +# Run this program FIRST. +# +# The slave: +# 1. waits for master Bluetooth connection, +# 2. waits for PREP command, +# 3. prepares local motion plan, +# 4. replies READY, +# 5. waits for GO, +# 6. executes local slave joints, +# 7. replies DONE. +# +# Message format: +# PREP|step_id|duration_ms|easing|joint,unit,mode,value;joint,unit,mode,value +# GO|step_id +# STOP +# +# Example payload: +# PREP|3|3000|easeInOutSine|elbow_rotate,joint,abs,10;wrist_flexion_extension,joint,abs,-5 + + +# ============================================================ +# GENERAL SETTINGS +# ============================================================ + +DEFAULT_SPEED = 200 +PROFILE_UPDATE_MS = 20 +POLL_MS = 10 + +DEFAULT_EASING = 'linear' +CLAMP_EASING_OUTPUT = True + +STOP_MODE_DEFAULT = Stop.BRAKE + +DEADBAND_MOTOR_DEG = 1.5 +TRACK_TARGET_MIN_UPDATE_MOTOR_DEG = 1.0 + +SOUND_ENABLED = True + + +# ============================================================ +# SLAVE JOINT CONFIGURATION +# ============================================================ +# +# Replace names, gear ratios, signs, and limits with your real second-brick axes. +# +# ports: +# list of physical motor ports. Usually one port. +# +# signs: +# list of signs matching ports. Use -1 if motor direction is opposite. +# +# gear_ratio: +# motor_angle = joint_angle * gear_ratio +# +# joint_limits: +# None or (min_angle, max_angle), in JOINT degrees. + +SLAVE_JOINT_CONFIGS = [ + { + 'name': 'elbow_rotate', + 'ports': [Port.A], + 'signs': [1], + 'gear_ratio': 7.5, + 'joint_limits': None, + 'stop_mode': Stop.BRAKE, + }, + { + 'name': 'wrist_flexion_extension', + 'ports': [Port.B], + 'signs': [1], + 'gear_ratio': 5.0, + 'joint_limits': None, + 'stop_mode': Stop.BRAKE, + }, + { + 'name': 'wrist_pronation_supination', + 'ports': [Port.C], + 'signs': [1], + 'gear_ratio': 7.5, + 'joint_limits': None, + 'stop_mode': Stop.BRAKE, + }, + { + 'name': 'gripper', + 'ports': [Port.D], + 'signs': [1], + 'gear_ratio': 1.0, + 'joint_limits': None, + 'stop_mode': Stop.BRAKE, + }, +] + + +# ============================================================ +# HARDWARE / BLUETOOTH +# ============================================================ + +ev3 = EV3Brick() + +server = BluetoothMailboxServer() +cmd_box = TextMailbox('command', server) +status_box = TextMailbox('status', server) + + +# ============================================================ +# UTILITIES +# ============================================================ + +def clamp(value, min_value, max_value): + if value < min_value: + return min_value + if value > max_value: + return max_value + return value + + +def abs_value(value): + if value < 0: + return -value + return value + + +def beep_ok(): + if SOUND_ENABLED: + ev3.speaker.beep(frequency=1000, duration=80) + + +def beep_error(): + if SOUND_ENABLED: + ev3.speaker.beep(frequency=220, duration=200) + wait(80) + ev3.speaker.beep(frequency=220, duration=200) + + +# ============================================================ +# EASING FUNCTIONS +# ============================================================ + +def ease_linear(x): + return x + + +def ease_in_out_sine(x): + return -(cos(pi * x) - 1) / 2 + + +def ease_in_out_quad(x): + if x < 0.5: + return 2 * x * x + return 1 - ((-2 * x + 2) ** 2) / 2 + + +def ease_in_out_cubic(x): + if x < 0.5: + return 4 * x * x * x + return 1 - ((-2 * x + 2) ** 3) / 2 + + +def ease_out_quad(x): + return 1 - (1 - x) * (1 - x) + + +EASINGS = { + 'linear': ease_linear, + 'easeInOutSine': ease_in_out_sine, + 'easeInOutQuad': ease_in_out_quad, + 'easeInOutCubic': ease_in_out_cubic, + 'easeOutQuad': ease_out_quad, +} + + +def get_easing(name): + if name is None or name == '': + name = DEFAULT_EASING + + if name in EASINGS: + return EASINGS[name] + + return EASINGS[DEFAULT_EASING] + + +# ============================================================ +# JOINT CLASS +# ============================================================ + +class Joint: + """ + Logical robot-arm joint. + + Supports one or more physical motors. + + Coordinate convention: + motor_angle = joint_angle * gear_ratio + + For multiple motors: + logical motor angle is the signed average of all motor encoders. + """ + + def __init__(self, name, motors, signs, gear_ratio, joint_limits, stop_mode): + self.name = name + self.motors = motors + self.signs = signs + self.gear_ratio = gear_ratio + self.joint_limits = joint_limits + self.stop_mode_value = stop_mode + + def motor_angle(self): + total = 0 + count = len(self.motors) + + for i in range(count): + total += self.motors[i].angle() * self.signs[i] + + return total / count + + def joint_angle(self): + return self.motor_angle() / self.gear_ratio + + def joint_to_motor(self, joint_angle): + return joint_angle * self.gear_ratio + + def motor_to_joint(self, motor_angle): + return motor_angle / self.gear_ratio + + def reset_joint_angle(self, joint_angle): + motor_angle = self.joint_to_motor(joint_angle) + + for i in range(len(self.motors)): + self.motors[i].reset_angle(motor_angle * self.signs[i]) + + def check_joint_limit(self, joint_target): + if self.joint_limits is None: + return + + min_angle = self.joint_limits[0] + max_angle = self.joint_limits[1] + + if joint_target < min_angle or joint_target > max_angle: + raise ValueError( + '{} target {} outside limits {}..{}'.format( + self.name, joint_target, min_angle, max_angle + ) + ) + + def physical_target(self, logical_motor_target, motor_index): + return int(logical_motor_target * self.signs[motor_index]) + + def run_motor_target(self, logical_motor_target, speed): + joint_target = self.motor_to_joint(logical_motor_target) + self.check_joint_limit(joint_target) + + for i in range(len(self.motors)): + self.motors[i].run_target( + speed, + self.physical_target(logical_motor_target, i), + then=self.stop_mode_value, + wait=False + ) + + def track_motor_target(self, logical_motor_target): + joint_target = self.motor_to_joint(logical_motor_target) + self.check_joint_limit(joint_target) + + for i in range(len(self.motors)): + self.motors[i].track_target( + self.physical_target(logical_motor_target, i) + ) + + def apply_stop(self): + for motor in self.motors: + if self.stop_mode_value == Stop.HOLD: + motor.hold() + elif self.stop_mode_value == Stop.COAST: + motor.stop() + else: + motor.brake() + + def all_motors(self): + return self.motors + + +# ============================================================ +# JOINT CREATION +# ============================================================ + +JOINTS = {} +JOINT_ORDER = [] + + +def create_joints(): + for cfg in SLAVE_JOINT_CONFIGS: + motors = [] + + for port in cfg['ports']: + motors.append(Motor(port)) + + joint = Joint( + cfg['name'], + motors, + cfg['signs'], + cfg['gear_ratio'], + cfg['joint_limits'], + cfg['stop_mode'] + ) + + JOINTS[cfg['name']] = joint + JOINT_ORDER.append(cfg['name']) + + +def reset_all_as_zero(): + for name in JOINT_ORDER: + JOINTS[name].reset_joint_angle(0) + + +def stop_all(): + for name in JOINT_ORDER: + JOINTS[name].apply_stop() + + +# ============================================================ +# COMMAND PARSING / PLANNING +# ============================================================ + +def parse_command_payload(payload): + """ + Parses: + joint,unit,mode,value;joint,unit,mode,value + + Example: + elbow_rotate,joint,abs,10;wrist_flexion_extension,joint,rel,-5 + """ + commands = [] + + if payload == '' or payload == 'NONE': + return commands + + items = payload.split(';') + + for item in items: + parts = item.split(',') + + if len(parts) != 4: + raise ValueError('Invalid command item: {}'.format(item)) + + commands.append({ + 'joint': parts[0], + 'unit': parts[1], + 'mode': parts[2], + 'value': float(parts[3]) + }) + + return commands + + +def validate_parallel(commands): + seen = [] + + for cmd in commands: + joint_name = cmd['joint'] + + if joint_name in seen: + raise ValueError('Joint repeated in one parallel group: {}'.format(joint_name)) + + seen.append(joint_name) + + +def plan_commands(commands): + validate_parallel(commands) + + plans = [] + + for cmd in commands: + joint = JOINTS[cmd['joint']] + unit = cmd['unit'] + mode = cmd['mode'] + value = cmd['value'] + + start_motor = joint.motor_angle() + start_joint = joint.joint_angle() + + if mode == 'abs': + if unit == 'motor': + target_motor = value + target_joint = joint.motor_to_joint(target_motor) + elif unit == 'joint': + target_joint = value + target_motor = joint.joint_to_motor(target_joint) + else: + raise ValueError('Unknown unit: {}'.format(unit)) + + elif mode == 'rel': + if unit == 'motor': + target_motor = start_motor + value + target_joint = joint.motor_to_joint(target_motor) + elif unit == 'joint': + target_joint = start_joint + value + target_motor = joint.joint_to_motor(target_joint) + else: + raise ValueError('Unknown unit: {}'.format(unit)) + + else: + raise ValueError('Unknown mode: {}'.format(mode)) + + joint.check_joint_limit(target_joint) + + delta_motor = target_motor - start_motor + skip = abs_value(delta_motor) <= DEADBAND_MOTOR_DEG + + plans.append({ + 'joint': joint, + 'start_motor': start_motor, + 'target_motor': target_motor, + 'delta_motor': delta_motor, + 'last_sent': None, + 'skip': skip + }) + + return plans + + +# ============================================================ +# MOVEMENT EXECUTION +# ============================================================ + +def maybe_track(plan, target, force): + joint = plan['joint'] + last = plan['last_sent'] + + if force or last is None: + joint.track_motor_target(target) + plan['last_sent'] = target + return + + if abs_value(target - last) >= TRACK_TARGET_MIN_UPDATE_MOTOR_DEG: + joint.track_motor_target(target) + plan['last_sent'] = target + + +def execute_plan_timed(plans, duration_ms, easing_name): + if duration_ms <= 0: + return True + + easing = get_easing(easing_name) + active = [] + + for plan in plans: + if not plan['skip']: + active.append(plan) + + if len(active) == 0: + return True + + timer = StopWatch() + + while timer.time() < duration_ms: + ratio = timer.time() / duration_ms + ratio = clamp(ratio, 0, 1) + + eased = easing(ratio) + + if CLAMP_EASING_OUTPUT: + eased = clamp(eased, 0, 1) + + for plan in active: + target = plan['start_motor'] + plan['delta_motor'] * eased + maybe_track(plan, target, False) + + wait(PROFILE_UPDATE_MS) + + for plan in active: + maybe_track(plan, plan['target_motor'], True) + + for plan in active: + plan['joint'].apply_stop() + + return True + + +# ============================================================ +# STATUS / DISPLAY +# ============================================================ + +def print_states(prefix): + print(prefix) + + for name in JOINT_ORDER: + joint = JOINTS[name] + print( + '{}: motor={} joint={}'.format( + name, joint.motor_angle(), joint.joint_angle() + ) + ) + + +# ============================================================ +# MAIN LOOP +# ============================================================ + +create_joints() + +ev3.screen.clear() +ev3.screen.print('SLAVE') +ev3.screen.print('Waiting BT') + +print('SLAVE: waiting for Bluetooth connection...') +server.wait_for_connection() +print('SLAVE: connected') + +ev3.screen.clear() +ev3.screen.print('SLAVE') +ev3.screen.print('Connected') + +reset_all_as_zero() +print_states('SLAVE initial zero:') + +prepared_step_id = None +prepared_duration = 0 +prepared_easing = DEFAULT_EASING +prepared_plans = [] + +while True: + cmd_box.wait() + message = cmd_box.read() + + print('SLAVE RX: {}'.format(message)) + + try: + parts = message.split('|') + command_type = parts[0] + + print('SLAVE MSG PARTS COUNT: {}'.format(len(parts))) + + if command_type == 'STOP': + stop_all() + status_box.send('STOPPED') + beep_error() + + elif command_type == 'PREP': + if len(parts) < 5: + raise ValueError('Bad PREP parts: {}'.format(len(parts))) + + prepared_step_id = parts[1] + prepared_duration = int(parts[2]) + prepared_easing = parts[3] + payload = parts[4] + + commands = parse_command_payload(payload) + prepared_plans = plan_commands(commands) + + ev3.screen.clear() + ev3.screen.print('SLAVE PREP') + ev3.screen.print(prepared_step_id) + + status_box.send('READY|{}'.format(prepared_step_id)) + beep_ok() + + elif command_type == 'GO': + step_id = parts[1] + + if step_id != prepared_step_id: + status_box.send('ERR|{}|not_prepared'.format(step_id)) + beep_error() + else: + ev3.screen.clear() + ev3.screen.print('SLAVE GO') + ev3.screen.print(step_id) + + ok = execute_plan_timed( + prepared_plans, + prepared_duration, + prepared_easing + ) + + if ok: + status_box.send('DONE|{}'.format(step_id)) + beep_ok() + else: + status_box.send('ERR|{}|movement_failed'.format(step_id)) + beep_error() + + elif command_type == 'QUIT': + stop_all() + status_box.send('BYE') + break + + else: + status_box.send('ERR|unknown_command') + beep_error() + + except Exception as e: + stop_all() + + err = str(e) + + ev3.screen.clear() + ev3.screen.print('SLAVE ERROR') + ev3.screen.print(err[:16]) + ev3.screen.print(err[16:32]) + + print('SLAVE ERROR: {}'.format(err)) + + # Send actual error back to master. + # Keep it short because TextMailbox messages should not be too long. + status_box.send('ERR|{}|{}'.format( + prepared_step_id if prepared_step_id is not None else 'none', + err[:40] + )) + + beep_error() + +stop_all() +ev3.screen.clear() +ev3.screen.print('SLAVE END') +print('SLAVE: program ended') \ No newline at end of file diff --git a/ev3-robotic-arm-master/robotarmmovement/tests/test_00_onlymaster.py b/ev3-robotic-arm-master/robotarmmovement/tests/test_00_onlymaster.py new file mode 100644 index 0000000..8d61090 --- /dev/null +++ b/ev3-robotic-arm-master/robotarmmovement/tests/test_00_onlymaster.py @@ -0,0 +1,2154 @@ +#!/usr/bin/env pybricks-micropython + +from math import sin, cos, sqrt, pi + +from pybricks.hubs import EV3Brick +from pybricks.ev3devices import Motor +from pybricks.parameters import Port, Stop +from pybricks.tools import wait, StopWatch + + +# ============================================================ +# ROBOT ARM MOTION CONTROLLER +# ============================================================ +# +# Purpose: +# One-brick motion controller for an EV3 robotic arm used to generate +# repeatable phone/Cardboard XR movement trajectories. +# +# Supports: +# - absolute motor-angle commands +# - relative motor-angle commands +# - absolute joint-angle commands with gear-ratio conversion +# - relative joint-angle commands with gear-ratio conversion +# - sequential movement +# - simultaneous movement +# - timed movement profiles +# - easing functions based on easings.net naming +# - sound feedback after every step +# - low-angle jitter mitigation +# - configurable motor control limits and tolerances +# +# Coordinate convention: +# motor_angle = joint_angle * gear_ratio +# +# Example: +# ABS_J('base', 20) +# means: +# base joint target = 20 deg +# base motor target = 20 * 7.5 = 150 deg + + +# ============================================================ +# HARDWARE PORT CONFIGURATION +# ============================================================ + +BASE_PORT = Port.A +SHOULDER_LEFT_PORT = Port.B +SHOULDER_RIGHT_PORT = Port.C +ELBOW_PORT = Port.D + + +# ============================================================ +# BASIC MOTION CONFIGURATION +# ============================================================ + +# Speed used by non-timed run_target() movements. +# Unit: motor degrees per second. +DEFAULT_SPEED = 200 + +# Speed used for optional final settling after timed track_target() movement. +# Unit: motor degrees per second. +SETTLE_SPEED = 120 + +# Polling interval for waiting loops. +# Lower value = more responsive, but more CPU activity. +POLL_MS = 10 + +# Target update interval during timed/eased movements. +# Lower value = smoother target curve, but more controller updates. +# If too low, very small target changes may cause unnecessary jitter. +PROFILE_UPDATE_MS = 20 + +# If True, after a timed profile the robot performs an additional run_target() +# movement to exactly settle at the final target. +# +# Advantage: +# better final angle accuracy. +# +# Disadvantage: +# total motion time becomes longer than duration_ms. +# +# For tracking-drift experiments, False is usually better when time consistency +# is more important than final encoder accuracy. +SETTLE_AFTER_TIMED_MOVE = False + +# Optional dwell at final target after timed profile before applying final stop. +# Unit: ms. +# +# 0 preserves the requested timing most strictly. +# 50-150 can help if the motor needs a short moment to reach the last target. +TIMED_FINAL_TARGET_DWELL_MS = 0 + + +# ============================================================ +# GEAR RATIOS +# ============================================================ + +# Gear ratio convention: +# motor_angle = joint_angle * gear_ratio +# +# base: +# 8-tooth motor gear -> 60-tooth driven gear +# 60 / 8 = 7.5 +# +# shoulder: +# 8-tooth motor gear -> 60-tooth driven gear +# 60 / 8 = 7.5 +# +# elbow: +# ratio = 5.0 +GEAR_RATIOS = { + 'base': 7.5, + 'shoulder': 7.5, + 'elbow': 5.0, +} + + +# ============================================================ +# JOINT LIMITS +# ============================================================ + +# Limits are expressed in JOINT degrees, not motor degrees. +# +# Strong recommendation: +# Replace None with real safe limits before mounting the phone and tracker. +# +# Example: +# JOINT_LIMITS = { +# 'base': (-60, 60), +# 'shoulder': (-25, 25), +# 'elbow': (-35, 35), +# } +JOINT_LIMITS = { + 'base': None, + 'shoulder': None, + 'elbow': None, +} + + +# ============================================================ +# SHOULDER MOTOR DIRECTION +# ============================================================ + +# If the two shoulder motors mechanically fight each other, change this. +# +# Try [1, 1] first. +# If motors rotate against each other, try [1, -1]. +SHOULDER_SIGNS = [1, 1] + + +# ============================================================ +# STOPPING / HOLDING CONFIGURATION +# ============================================================ + +# Stop mode after a target movement. +# +# Stop.BRAKE: +# Passively resists motion. Often reduces active PID jitter. +# Good if the mechanism can hold itself mechanically. +# +# Stop.HOLD: +# Actively keeps the target angle. Better against gravity/load, but may jitter +# when there is backlash, low speed, small target error, or high load. +# +# Stop.COAST: +# Lets the motor spin freely. Usually not safe for loaded arm joints. +# +# For first tests with phone/tracker mounted: +# If the arm sags with Stop.BRAKE, use Stop.HOLD for shoulder/elbow. +# If it jitters with Stop.HOLD, try Stop.BRAKE or increase tolerances. +DEFAULT_STOP_MODE = Stop.BRAKE + +STOP_MODE_BY_JOINT = { + 'base': Stop.BRAKE, + 'shoulder': Stop.BRAKE, + 'elbow': Stop.BRAKE, +} + + +# ============================================================ +# LOW-ANGLE JITTER MITIGATION +# ============================================================ + +# Deadband for tiny requested movements. +# +# If requested target is closer than this number of MOTOR degrees, +# the movement is skipped. +# +# Reason: +# Very small movement commands may cause the controller to fight encoder +# quantization, gear backlash, load, or static friction. +# +# Use 0 to disable. +DEADBAND_MOTOR_DEG_BY_JOINT = { + 'base': 1.5, + 'shoulder': 1.5, + 'elbow': 1.5, +} + +# During timed/eased movement, a new track_target() command is sent only if +# the target has changed by at least this many MOTOR degrees. +# +# This reduces micro-updates that can cause low-speed jitter. +# +# Use 0 to send every profile update. +TRACK_TARGET_MIN_UPDATE_MOTOR_DEG = 1.0 + +# If True, skipped tiny moves still apply the configured final stop mode. +# This is useful when a previous command left the motor under active control. +APPLY_STOP_AFTER_SKIPPED_SMALL_MOVE = True + + +# ============================================================ +# MOTOR CONTROL SETTINGS +# ============================================================ + +# If True, apply Motor.control settings at the beginning of the program. +# +# These settings are conservative starting values. They are not universal. +# If movement becomes too weak or stalls under payload, increase actuation. +APPLY_MOTOR_CONTROL_SETTINGS = True + +# Motor-control limits: +# speed = maximum motor speed, deg/s +# acceleration = maximum motor acceleration, deg/s/s +# actuation = maximum actuation, percent of full output +# +# Lower actuation can reduce aggressive correction and jitter, but can also +# increase stalling risk under load. +MOTOR_CONTROL_LIMITS_BY_JOINT = { + 'base': (500, 800, 70), + 'shoulder': (400, 600, 75), + 'elbow': (400, 600, 75), +} + +# Target tolerances: +# speed = allowed deviation from zero speed, motor deg/s +# position = allowed position error, motor deg +# +# Larger position tolerance can reduce jitter near the target. +# Too large tolerance reduces accuracy. +TARGET_TOLERANCES_BY_JOINT = { + 'base': (20, 2), + 'shoulder': (20, 2), + 'elbow': (20, 2), +} + +# Stall tolerances: +# speed = speed threshold, motor deg/s +# time = how long the controller must be below that speed before stall, ms +STALL_TOLERANCES_BY_JOINT = { + 'base': (20, 1000), + 'shoulder': (20, 1000), + 'elbow': (20, 1000), +} + +# Optional PID override. +# +# None means: keep Pybricks default PID settings. +# +# Format: +# (kp, ki, kd, integral_range, integral_rate, feed_forward) +# +# Use only after observing persistent oscillation and after testing: +# - Stop.BRAKE instead of Stop.HOLD +# - larger target tolerance +# - lower actuation +# - larger movement duration +# +# Example placeholder: +# PID_SETTINGS_BY_JOINT = { +# 'base': None, +# 'shoulder': None, +# 'elbow': None, +# } +PID_SETTINGS_BY_JOINT = { + 'base': None, + 'shoulder': None, + 'elbow': None, +} + + +# ============================================================ +# SPEED SAFETY CHECKS +# ============================================================ + +# If True, abort movement when average required motor speed exceeds the limit. +# If False, only print a warning. +ABORT_IF_REQUIRED_SPEED_TOO_HIGH = False + +# Maximum recommended average speed for timed movement. +# Unit: motor deg/s. +MAX_AVG_MOTOR_SPEED_BY_JOINT = { + 'base': 250, + 'shoulder': 200, + 'elbow': 200, +} + + +# ============================================================ +# EASING CONFIGURATION +# ============================================================ + +DEFAULT_EASING = 'linear' + +# If True, easing output is clamped to [0, 1]. +# +# This makes Back/Elastic overshoot safer, but removes their original overshoot. +# +# If False, Back and Elastic may command positions outside the start-target +# range. Use only after JOINT_LIMITS are configured. +CLAMP_EASING_OUTPUT = True + + +# ============================================================ +# SOUND CONFIGURATION +# ============================================================ + +SOUND_ENABLED = True + +# If True, short beep at step start. +SOUND_AT_STEP_START = False + +# If True, confirmation beep after each successfully completed step. +SOUND_AFTER_STEP = True + +# If True, error sound after failed/stalled step. +SOUND_ON_ERROR = True + +# Frequencies and durations. +# Frequencies are in Hz, durations are in ms. +SOUND_STEP_START = (500, 60) +SOUND_STEP_OK = (1000, 90) +SOUND_STEP_FAIL = (220, 250) +SOUND_COMPLETE = (1200, 120) + + +# ============================================================ +# HARDWARE INITIALIZATION +# ============================================================ + +ev3 = EV3Brick() + +base_motor = Motor(BASE_PORT) +shoulder_left_motor = Motor(SHOULDER_LEFT_PORT) +shoulder_right_motor = Motor(SHOULDER_RIGHT_PORT) +elbow_motor = Motor(ELBOW_PORT) + + +# ============================================================ +# BASIC UTILITIES +# ============================================================ + +def clamp(value, min_value, max_value): + """ + Returns value limited to the closed interval [min_value, max_value]. + """ + if value < min_value: + return min_value + + if value > max_value: + return max_value + + return value + + +def abs_value(value): + """ + MicroPython-safe absolute value wrapper. + """ + if value < 0: + return -value + + return value + + +# ============================================================ +# SOUND FUNCTIONS +# ============================================================ + +def beep_once(sound_tuple): + """ + Plays one beep if sound feedback is enabled. + + sound_tuple: + (frequency_hz, duration_ms) + """ + if not SOUND_ENABLED: + return + + frequency = sound_tuple[0] + duration = sound_tuple[1] + ev3.speaker.beep(frequency=frequency, duration=duration) + + +def sound_step_start(): + """ + Optional short sound before executing a step. + """ + if SOUND_AT_STEP_START: + beep_once(SOUND_STEP_START) + + +def sound_step_ok(): + """ + Sound after a successfully completed step. + """ + if SOUND_AFTER_STEP: + beep_once(SOUND_STEP_OK) + + +def sound_step_fail(): + """ + Sound after failed, stalled, or aborted movement. + """ + if SOUND_ON_ERROR: + beep_once(SOUND_STEP_FAIL) + wait(80) + beep_once(SOUND_STEP_FAIL) + + +def sound_all_complete(): + """ + Sound at the end of the full movement sequence. + """ + if not SOUND_ENABLED: + return + + beep_once(SOUND_COMPLETE) + wait(80) + beep_once(SOUND_COMPLETE) + + +# ============================================================ +# EASING FUNCTIONS +# ============================================================ +# +# Input: +# x in [0, 1] +# +# Output: +# eased progress value. +# +# Most easing functions stay in [0, 1]. +# Back and Elastic can overshoot unless CLAMP_EASING_OUTPUT is True. + + +def ease_linear(x): + return x + + +def ease_in_sine(x): + return 1 - cos((x * pi) / 2) + + +def ease_out_sine(x): + return sin((x * pi) / 2) + + +def ease_in_out_sine(x): + return -(cos(pi * x) - 1) / 2 + + +def ease_in_quad(x): + return x * x + + +def ease_out_quad(x): + return 1 - (1 - x) * (1 - x) + + +def ease_in_out_quad(x): + if x < 0.5: + return 2 * x * x + + return 1 - ((-2 * x + 2) ** 2) / 2 + + +def ease_in_cubic(x): + return x * x * x + + +def ease_out_cubic(x): + return 1 - ((1 - x) ** 3) + + +def ease_in_out_cubic(x): + if x < 0.5: + return 4 * x * x * x + + return 1 - ((-2 * x + 2) ** 3) / 2 + + +def ease_in_quart(x): + return x ** 4 + + +def ease_out_quart(x): + return 1 - ((1 - x) ** 4) + + +def ease_in_out_quart(x): + if x < 0.5: + return 8 * (x ** 4) + + return 1 - ((-2 * x + 2) ** 4) / 2 + + +def ease_in_quint(x): + return x ** 5 + + +def ease_out_quint(x): + return 1 - ((1 - x) ** 5) + + +def ease_in_out_quint(x): + if x < 0.5: + return 16 * (x ** 5) + + return 1 - ((-2 * x + 2) ** 5) / 2 + + +def ease_in_expo(x): + if x == 0: + return 0 + + return 2 ** (10 * x - 10) + + +def ease_out_expo(x): + if x == 1: + return 1 + + return 1 - 2 ** (-10 * x) + + +def ease_in_out_expo(x): + if x == 0: + return 0 + + if x == 1: + return 1 + + if x < 0.5: + return (2 ** (20 * x - 10)) / 2 + + return (2 - 2 ** (-20 * x + 10)) / 2 + + +def ease_in_circ(x): + return 1 - sqrt(1 - x * x) + + +def ease_out_circ(x): + return sqrt(1 - ((x - 1) ** 2)) + + +def ease_in_out_circ(x): + if x < 0.5: + return (1 - sqrt(1 - ((2 * x) ** 2))) / 2 + + return (sqrt(1 - ((-2 * x + 2) ** 2)) + 1) / 2 + + +def ease_in_back(x): + c1 = 1.70158 + c3 = c1 + 1 + + return c3 * x * x * x - c1 * x * x + + +def ease_out_back(x): + c1 = 1.70158 + c3 = c1 + 1 + + return 1 + c3 * ((x - 1) ** 3) + c1 * ((x - 1) ** 2) + + +def ease_in_out_back(x): + c1 = 1.70158 + c2 = c1 * 1.525 + + if x < 0.5: + return (((2 * x) ** 2) * ((c2 + 1) * 2 * x - c2)) / 2 + + return ((((2 * x - 2) ** 2) * ((c2 + 1) * (2 * x - 2) + c2)) + 2) / 2 + + +def ease_in_elastic(x): + c4 = (2 * pi) / 3 + + if x == 0: + return 0 + + if x == 1: + return 1 + + return -(2 ** (10 * x - 10)) * sin((x * 10 - 10.75) * c4) + + +def ease_out_elastic(x): + c4 = (2 * pi) / 3 + + if x == 0: + return 0 + + if x == 1: + return 1 + + return (2 ** (-10 * x)) * sin((x * 10 - 0.75) * c4) + 1 + + +def ease_in_out_elastic(x): + c5 = (2 * pi) / 4.5 + + if x == 0: + return 0 + + if x == 1: + return 1 + + if x < 0.5: + return -((2 ** (20 * x - 10)) * sin((20 * x - 11.125) * c5)) / 2 + + return ((2 ** (-20 * x + 10)) * sin((20 * x - 11.125) * c5)) / 2 + 1 + + +def ease_out_bounce(x): + n1 = 7.5625 + d1 = 2.75 + + if x < 1 / d1: + return n1 * x * x + + elif x < 2 / d1: + x = x - 1.5 / d1 + return n1 * x * x + 0.75 + + elif x < 2.5 / d1: + x = x - 2.25 / d1 + return n1 * x * x + 0.9375 + + else: + x = x - 2.625 / d1 + return n1 * x * x + 0.984375 + + +def ease_in_bounce(x): + return 1 - ease_out_bounce(1 - x) + + +def ease_in_out_bounce(x): + if x < 0.5: + return (1 - ease_out_bounce(1 - 2 * x)) / 2 + + return (1 + ease_out_bounce(2 * x - 1)) / 2 + + +EASINGS = { + 'linear': ease_linear, + + 'easeInSine': ease_in_sine, + 'easeOutSine': ease_out_sine, + 'easeInOutSine': ease_in_out_sine, + + 'easeInQuad': ease_in_quad, + 'easeOutQuad': ease_out_quad, + 'easeInOutQuad': ease_in_out_quad, + + 'easeInCubic': ease_in_cubic, + 'easeOutCubic': ease_out_cubic, + 'easeInOutCubic': ease_in_out_cubic, + + 'easeInQuart': ease_in_quart, + 'easeOutQuart': ease_out_quart, + 'easeInOutQuart': ease_in_out_quart, + + 'easeInQuint': ease_in_quint, + 'easeOutQuint': ease_out_quint, + 'easeInOutQuint': ease_in_out_quint, + + 'easeInExpo': ease_in_expo, + 'easeOutExpo': ease_out_expo, + 'easeInOutExpo': ease_in_out_expo, + + 'easeInCirc': ease_in_circ, + 'easeOutCirc': ease_out_circ, + 'easeInOutCirc': ease_in_out_circ, + + 'easeInBack': ease_in_back, + 'easeOutBack': ease_out_back, + 'easeInOutBack': ease_in_out_back, + + 'easeInElastic': ease_in_elastic, + 'easeOutElastic': ease_out_elastic, + 'easeInOutElastic': ease_in_out_elastic, + + 'easeInBounce': ease_in_bounce, + 'easeOutBounce': ease_out_bounce, + 'easeInOutBounce': ease_in_out_bounce, +} + +EASING_NAMES = [ + 'linear', + + 'easeInSine', + 'easeOutSine', + 'easeInOutSine', + + 'easeInQuad', + 'easeOutQuad', + 'easeInOutQuad', + + 'easeInCubic', + 'easeOutCubic', + 'easeInOutCubic', + + 'easeInQuart', + 'easeOutQuart', + 'easeInOutQuart', + + 'easeInQuint', + 'easeOutQuint', + 'easeInOutQuint', + + 'easeInExpo', + 'easeOutExpo', + 'easeInOutExpo', + + 'easeInCirc', + 'easeOutCirc', + 'easeInOutCirc', + + 'easeInBack', + 'easeOutBack', + 'easeInOutBack', + + 'easeInElastic', + 'easeOutElastic', + 'easeInOutElastic', + + 'easeInBounce', + 'easeOutBounce', + 'easeInOutBounce', +] + + +def get_easing(easing_name): + """ + Returns easing function by name. + + easing_name: + String such as: + - 'linear' + - 'easeInOutSine' + - 'easeOutQuad' + - 'easeInOutCubic' + - 'easeOutBounce' + """ + if easing_name is None: + easing_name = DEFAULT_EASING + + if easing_name in EASINGS: + return EASINGS[easing_name] + + key_lower = easing_name.lower() + + for name in EASING_NAMES: + if name.lower() == key_lower: + return EASINGS[name] + + raise ValueError('Unknown easing: {}'.format(easing_name)) + + +# ============================================================ +# JOINT CLASS +# ============================================================ + +class Joint: + """ + Represents one logical robot-arm joint. + + A joint can be controlled by: + - one motor, e.g. base or elbow + - two synchronized motors, e.g. shoulder + + The class separates: + - physical motor encoder angles + - logical motor-coordinate angles + - logical joint-coordinate angles + + Gear conversion: + motor_angle = joint_angle * gear_ratio + + signs: + Used when motors must rotate in opposite directions to produce the same + logical joint movement. + """ + + def __init__(self, name, motors, signs, gear_ratio): + self.name = name + self.motors = motors + self.signs = signs + self.gear_ratio = gear_ratio + + def motor_angle(self): + """ + Returns logical motor angle in motor degrees. + + For one motor: + signed encoder angle. + + For two motors: + signed average encoder angle. + """ + total = 0 + count = len(self.motors) + + for i in range(count): + total += self.motors[i].angle() * self.signs[i] + + return total / count + + def joint_angle(self): + """ + Returns logical joint angle in joint degrees. + """ + return self.motor_angle() / self.gear_ratio + + def joint_to_motor(self, joint_angle): + """ + Converts joint degrees to motor degrees. + """ + return joint_angle * self.gear_ratio + + def motor_to_joint(self, motor_angle): + """ + Converts motor degrees to joint degrees. + """ + return motor_angle / self.gear_ratio + + def reset_motor_angle(self, logical_motor_angle): + """ + Sets the current logical motor-coordinate angle. + + Use carefully: + this changes encoder reference, not physical position. + """ + for i in range(len(self.motors)): + self.motors[i].reset_angle(logical_motor_angle * self.signs[i]) + + def reset_joint_angle(self, logical_joint_angle): + """ + Sets the current logical joint-coordinate angle. + + Example: + reset_joint_angle(0) + makes the current physical pose become logical joint angle 0. + """ + logical_motor_angle = self.joint_to_motor(logical_joint_angle) + self.reset_motor_angle(logical_motor_angle) + + def check_joint_limit(self, logical_joint_target): + """ + Validates target joint angle against JOINT_LIMITS. + + Limits are in joint degrees. + """ + limits = JOINT_LIMITS.get(self.name) + + if limits is None: + return + + min_angle = limits[0] + max_angle = limits[1] + + if logical_joint_target < min_angle or logical_joint_target > max_angle: + raise ValueError( + 'Target joint angle {} for joint {} is outside limits {}..{}'.format( + logical_joint_target, + self.name, + min_angle, + max_angle + ) + ) + + def stop_mode(self): + """ + Returns final stop mode for this joint. + """ + if self.name in STOP_MODE_BY_JOINT: + return STOP_MODE_BY_JOINT[self.name] + + return DEFAULT_STOP_MODE + + def physical_motor_target(self, logical_motor_target, motor_index): + """ + Converts logical motor target to physical encoder target. + """ + return int(logical_motor_target * self.signs[motor_index]) + + def deadband_motor_deg(self): + """ + Returns movement deadband in motor degrees. + """ + if self.name in DEADBAND_MOTOR_DEG_BY_JOINT: + return DEADBAND_MOTOR_DEG_BY_JOINT[self.name] + + return 0 + + def is_small_motor_delta(self, motor_delta): + """ + Checks whether a requested movement is below configured deadband. + """ + return abs_value(motor_delta) <= self.deadband_motor_deg() + + def run_motor_target(self, logical_motor_target, speed): + """ + Moves to target expressed in motor degrees using run_target(). + """ + logical_joint_target = self.motor_to_joint(logical_motor_target) + self.check_joint_limit(logical_joint_target) + + stop_mode = self.stop_mode() + + for i in range(len(self.motors)): + self.motors[i].run_target( + speed, + self.physical_motor_target(logical_motor_target, i), + then=stop_mode, + wait=False + ) + + def track_motor_target(self, logical_motor_target): + """ + Sends target expressed in motor degrees using track_target(). + + Used for timed/eased profiles where the target changes many times + during movement. + """ + logical_joint_target = self.motor_to_joint(logical_motor_target) + self.check_joint_limit(logical_joint_target) + + for i in range(len(self.motors)): + self.motors[i].track_target( + self.physical_motor_target(logical_motor_target, i) + ) + + def apply_stop_mode(self): + """ + Applies the configured final stop behavior manually. + + Used after track_target() profiles and skipped tiny moves. + """ + stop_mode = self.stop_mode() + + for motor in self.motors: + if stop_mode == Stop.HOLD: + motor.hold() + + elif stop_mode == Stop.BRAKE: + motor.brake() + + elif stop_mode == Stop.COAST: + motor.stop() + + else: + motor.brake() + + def all_motors(self): + """ + Returns all physical motors belonging to this joint. + """ + return self.motors + + +# ============================================================ +# JOINT OBJECTS +# ============================================================ + +base = Joint( + 'base', + [base_motor], + [1], + GEAR_RATIOS['base'] +) + +shoulder = Joint( + 'shoulder', + [shoulder_left_motor, shoulder_right_motor], + SHOULDER_SIGNS, + GEAR_RATIOS['shoulder'] +) + +elbow = Joint( + 'elbow', + [elbow_motor], + [1], + GEAR_RATIOS['elbow'] +) + +JOINTS = { + 'base': base, + 'shoulder': shoulder, + 'elbow': elbow, +} + +JOINT_ORDER = ['base', 'shoulder', 'elbow'] + + +# ============================================================ +# MOTOR CONTROL SETUP +# ============================================================ + +def configure_joint_motor_control(joint): + """ + Applies optional Pybricks Motor.control settings for one joint. + + Settings are applied to all physical motors in that joint. + + Important: + Pybricks documentation says control settings should be changed while + the controller is stopped. This function stops motors before applying + settings. + """ + for motor in joint.all_motors(): + motor.stop() + + limits = MOTOR_CONTROL_LIMITS_BY_JOINT.get(joint.name) + target_tolerances = TARGET_TOLERANCES_BY_JOINT.get(joint.name) + stall_tolerances = STALL_TOLERANCES_BY_JOINT.get(joint.name) + pid_settings = PID_SETTINGS_BY_JOINT.get(joint.name) + + for motor in joint.all_motors(): + + if limits is not None: + speed = limits[0] + acceleration = limits[1] + actuation = limits[2] + motor.control.limits(speed, acceleration, actuation) + + if target_tolerances is not None: + speed_tol = target_tolerances[0] + position_tol = target_tolerances[1] + motor.control.target_tolerances(speed_tol, position_tol) + + if stall_tolerances is not None: + stall_speed = stall_tolerances[0] + stall_time = stall_tolerances[1] + motor.control.stall_tolerances(stall_speed, stall_time) + + if pid_settings is not None: + kp = pid_settings[0] + ki = pid_settings[1] + kd = pid_settings[2] + integral_range = pid_settings[3] + integral_rate = pid_settings[4] + feed_forward = pid_settings[5] + motor.control.pid( + kp, + ki, + kd, + integral_range, + integral_rate, + feed_forward + ) + + +def configure_all_motor_controls(): + """ + Applies configured motor-control parameters to all joints. + """ + if not APPLY_MOTOR_CONTROL_SETTINGS: + return + + for name in JOINT_ORDER: + configure_joint_motor_control(JOINTS[name]) + + +# ============================================================ +# COMMAND HELPERS +# ============================================================ + +def ABS_M(joint_name, motor_angle): + """ + Absolute target in motor degrees. + + Example: + ABS_M('base', 90) + """ + return { + 'joint': joint_name, + 'mode': 'abs', + 'unit': 'motor', + 'value': motor_angle + } + + +def REL_M(joint_name, motor_delta): + """ + Relative movement in motor degrees. + + Example: + REL_M('base', -45) + """ + return { + 'joint': joint_name, + 'mode': 'rel', + 'unit': 'motor', + 'value': motor_delta + } + + +def ABS_J(joint_name, joint_angle): + """ + Absolute target in joint degrees. + + Example: + ABS_J('base', 20) + With gear ratio 7.5, motor target becomes 150 deg. + """ + return { + 'joint': joint_name, + 'mode': 'abs', + 'unit': 'joint', + 'value': joint_angle + } + + +def REL_J(joint_name, joint_delta): + """ + Relative movement in joint degrees. + + Example: + REL_J('elbow', 10) + With gear ratio 5.0, motor target changes by 50 deg. + """ + return { + 'joint': joint_name, + 'mode': 'rel', + 'unit': 'joint', + 'value': joint_delta + } + + +# Backward-compatible aliases. +# These behave like the original program: values are motor degrees. +def ABS(joint_name, angle): + return ABS_M(joint_name, angle) + + +def REL(joint_name, delta): + return REL_M(joint_name, delta) + + +def PARALLEL(label, commands, duration_ms=None, pause_ms=0, + easing=None, clamp_easing=None): + """ + Creates a simultaneous movement step. + + commands: + List of commands executed at the same time. + + duration_ms: + None: + use run_target() with DEFAULT_SPEED. + integer: + use timed/eased track_target() profile. + + easing: + Name of easing function, e.g.: + 'linear' + 'easeInOutSine' + 'easeOutQuad' + + clamp_easing: + None: + use global CLAMP_EASING_OUTPUT. + True/False: + override global setting for this step. + """ + return { + 'type': 'parallel', + 'label': label, + 'commands': commands, + 'duration_ms': duration_ms, + 'pause_ms': pause_ms, + 'easing': easing, + 'clamp_easing': clamp_easing + } + + +def SERIES(label, commands, duration_ms=None, pause_ms=0, + easing=None, clamp_easing=None, duration_mode='total'): + """ + Creates a sequential movement step. + + commands can be: + + 1. Flat serial command list: + [ + ABS_J('base', 10), + ABS_J('base', 0) + ] + + 2. Serial list of parallel groups: + [ + [ABS_J('base', 10), ABS_J('shoulder', 5)], + [ABS_J('base', 0), ABS_J('shoulder', 0)] + ] + + duration_mode: + 'total': + duration_ms is divided equally across serial groups. + + 'per_group': + each serial group receives the full duration_ms. + """ + return { + 'type': 'series', + 'label': label, + 'commands': commands, + 'duration_ms': duration_ms, + 'pause_ms': pause_ms, + 'easing': easing, + 'clamp_easing': clamp_easing, + 'duration_mode': duration_mode + } + + +# ============================================================ +# COMMAND PLANNING +# ============================================================ + +def is_command_object(item): + """ + Returns True if item looks like a movement command. + """ + return type(item) == dict and 'joint' in item + + +def validate_parallel_commands(commands): + """ + Prevents commanding the same joint twice in one parallel group. + + Example of invalid group: + [ + ABS_J('base', 10), + REL_J('base', 5) + ] + + Use SERIES instead. + """ + seen = [] + + for command in commands: + joint_name = command['joint'] + + if joint_name in seen: + raise ValueError( + 'Joint {} appears more than once in one parallel group.'.format( + joint_name + ) + ) + + seen.append(joint_name) + + +def normalize_series_groups(commands): + """ + Converts flat serial command list into list of command groups. + + Input: + [cmd1, cmd2, cmd3] + + Output: + [[cmd1], [cmd2], [cmd3]] + """ + groups = [] + + for item in commands: + if is_command_object(item): + groups.append([item]) + else: + groups.append(item) + + return groups + + +def plan_commands(commands): + """ + Converts user commands into executable motor-coordinate plans. + + Each plan contains: + - joint object + - start motor angle + - target motor angle + - start joint angle + - target joint angle + - delta values + - skip flag for tiny movement deadband + """ + validate_parallel_commands(commands) + + plans = [] + + for command in commands: + joint = JOINTS[command['joint']] + mode = command['mode'] + unit = command.get('unit', 'motor') + value = command['value'] + + start_motor = joint.motor_angle() + start_joint = joint.joint_angle() + + if mode == 'abs': + if unit == 'motor': + target_motor = value + target_joint = joint.motor_to_joint(target_motor) + + elif unit == 'joint': + target_joint = value + target_motor = joint.joint_to_motor(target_joint) + + else: + raise ValueError('Unknown unit: {}'.format(unit)) + + elif mode == 'rel': + if unit == 'motor': + target_motor = start_motor + value + target_joint = joint.motor_to_joint(target_motor) + + elif unit == 'joint': + target_joint = start_joint + value + target_motor = joint.joint_to_motor(target_joint) + + else: + raise ValueError('Unknown unit: {}'.format(unit)) + + else: + raise ValueError('Unknown command mode: {}'.format(mode)) + + joint.check_joint_limit(target_joint) + + delta_motor = target_motor - start_motor + delta_joint = target_joint - start_joint + + skip_small_move = joint.is_small_motor_delta(delta_motor) + + plans.append({ + 'joint': joint, + 'start_motor': start_motor, + 'target_motor': target_motor, + 'start_joint': start_joint, + 'target_joint': target_joint, + 'delta_motor': delta_motor, + 'delta_joint': delta_joint, + 'skip_small_move': skip_small_move, + 'last_sent_motor_target': None, + 'command': command + }) + + return plans + + +def collect_motors(plans): + """ + Returns all physical motors used by a set of movement plans. + """ + motors = [] + + for plan in plans: + joint = plan['joint'] + + for motor in joint.all_motors(): + motors.append(motor) + + return motors + + +def active_plans_only(plans): + """ + Returns plans that are not skipped by the small-movement deadband. + """ + active = [] + + for plan in plans: + if not plan['skip_small_move']: + active.append(plan) + + return active + + +# ============================================================ +# DISPLAY AND LOGGING +# ============================================================ + +def display_step_header(step_num, label): + """ + Shows current step on EV3 screen and console. + """ + ev3.screen.clear() + ev3.screen.print('=== STEP {} ==='.format(step_num)) + ev3.screen.print(label) + + print('') + print('=' * 70) + print('STEP {}: {}'.format(step_num, label)) + print('=' * 70) + + +def print_joint_states(prefix): + """ + Prints current motor and joint angles for all joints. + """ + print(prefix) + + for name in JOINT_ORDER: + joint = JOINTS[name] + + print( + ' {}: motor={} deg, joint={} deg'.format( + name, + joint.motor_angle(), + joint.joint_angle() + ) + ) + + +def print_available_easings(): + """ + Prints all supported easing names. + """ + print('Available easing styles:') + + for name in EASING_NAMES: + print(' {}'.format(name)) + + +def print_motion_plan(plans): + """ + Prints planned movement before execution. + """ + for plan in plans: + joint = plan['joint'] + + print( + ' {}: motor {} -> {} deg, joint {} -> {} deg'.format( + joint.name, + plan['start_motor'], + plan['target_motor'], + plan['start_joint'], + plan['target_joint'] + ) + ) + + print( + ' delta: motor={} deg, joint={} deg, skip_small_move={}'.format( + plan['delta_motor'], + plan['delta_joint'], + plan['skip_small_move'] + ) + ) + + +def print_final_errors(plans): + """ + Prints final angle error after movement. + """ + print('Final errors:') + + for plan in plans: + joint = plan['joint'] + actual_motor = joint.motor_angle() + actual_joint = joint.joint_angle() + + motor_error = plan['target_motor'] - actual_motor + joint_error = plan['target_joint'] - actual_joint + + print( + ' {}: motor_error={} deg, joint_error={} deg'.format( + joint.name, + motor_error, + joint_error + ) + ) + + +# ============================================================ +# WAITING AND STALL DETECTION +# ============================================================ + +def wait_until_done(motors, timeout_ms): + """ + Waits until all motors report control.done(). + + Returns: + True if all motors completed movement. + False if timeout or stall occurred. + """ + timer = StopWatch() + + while True: + all_done = True + + for motor in motors: + if motor.control.stalled(): + print('WARNING: motor stalled') + motor.brake() + return False + + if not motor.control.done(): + all_done = False + + if all_done: + return True + + if timeout_ms is not None and timer.time() > timeout_ms: + print('WARNING: timeout while waiting for motors') + + for motor in motors: + motor.brake() + + return False + + wait(POLL_MS) + + +# ============================================================ +# SPEED SAFETY +# ============================================================ + +def check_required_speed(plan, duration_ms): + """ + Checks whether required average speed exceeds recommended limit. + + This does not measure peak speed. Easing profiles can have peak speeds + higher than average speed. + """ + joint = plan['joint'] + + if duration_ms <= 0: + raise ValueError('duration_ms must be greater than 0') + + required_avg_speed = abs_value(plan['delta_motor']) * 1000 / duration_ms + max_speed = MAX_AVG_MOTOR_SPEED_BY_JOINT.get(joint.name) + + if max_speed is None: + return True + + if required_avg_speed > max_speed: + message = ( + 'WARNING: required average motor speed for {} is {} deg/s, ' + 'recommended max is {} deg/s'.format( + joint.name, + required_avg_speed, + max_speed + ) + ) + + print(message) + + if ABORT_IF_REQUIRED_SPEED_TOO_HIGH: + return False + + return True + + +# ============================================================ +# MOVEMENT EXECUTION +# ============================================================ + +def handle_skipped_small_moves(plans): + """ + Logs skipped tiny moves and optionally applies final stop mode. + """ + for plan in plans: + if plan['skip_small_move']: + joint = plan['joint'] + + print( + ' SKIP tiny move for {}: delta_motor={} deg, deadband={} deg'.format( + joint.name, + plan['delta_motor'], + joint.deadband_motor_deg() + ) + ) + + if APPLY_STOP_AFTER_SKIPPED_SMALL_MOVE: + joint.apply_stop_mode() + + +def move_parallel_target(commands, speed=DEFAULT_SPEED): + """ + Executes simultaneous target movement using run_target(). + + Used when duration_ms is None. + """ + plans = plan_commands(commands) + active_plans = active_plans_only(plans) + + print_joint_states('Before target move:') + print_motion_plan(plans) + handle_skipped_small_moves(plans) + + if len(active_plans) == 0: + print('No active movement after deadband filtering.') + print_joint_states('After skipped target move:') + return True + + for plan in active_plans: + joint = plan['joint'] + target_motor = plan['target_motor'] + joint.run_motor_target(target_motor, speed) + + motors = collect_motors(active_plans) + + ok = wait_until_done(motors, 30000) + + print_joint_states('After target move:') + print_final_errors(plans) + + return ok + + +def maybe_send_track_target(plan, motor_target, force_send): + """ + Sends track_target() only when target changed enough, unless force_send=True. + + This reduces small high-frequency target updates that can cause jitter. + """ + joint = plan['joint'] + last_sent = plan['last_sent_motor_target'] + + if force_send: + joint.track_motor_target(motor_target) + plan['last_sent_motor_target'] = motor_target + return + + if last_sent is None: + joint.track_motor_target(motor_target) + plan['last_sent_motor_target'] = motor_target + return + + if TRACK_TARGET_MIN_UPDATE_MOTOR_DEG <= 0: + joint.track_motor_target(motor_target) + plan['last_sent_motor_target'] = motor_target + return + + if abs_value(motor_target - last_sent) >= TRACK_TARGET_MIN_UPDATE_MOTOR_DEG: + joint.track_motor_target(motor_target) + plan['last_sent_motor_target'] = motor_target + + +def move_parallel_timed(commands, duration_ms, easing_name=None, + clamp_easing=None): + """ + Executes simultaneous timed/eased movement using track_target(). + + duration_ms: + Total intended duration of the movement. + + easing_name: + Name of easing function. If None, DEFAULT_EASING is used. + + clamp_easing: + Whether to clamp easing output to [0, 1]. + """ + if duration_ms is None: + return move_parallel_target(commands, DEFAULT_SPEED) + + if duration_ms <= 0: + raise ValueError('duration_ms must be greater than 0') + + if clamp_easing is None: + clamp_easing = CLAMP_EASING_OUTPUT + + easing_func = get_easing(easing_name) + + plans = plan_commands(commands) + active_plans = active_plans_only(plans) + + print_joint_states('Before timed profile:') + print('Timed profile duration: {} ms'.format(duration_ms)) + print('Easing: {}'.format(easing_name if easing_name is not None else DEFAULT_EASING)) + print('Clamp easing output: {}'.format(clamp_easing)) + print_motion_plan(plans) + handle_skipped_small_moves(plans) + + if len(active_plans) == 0: + print('No active movement after deadband filtering.') + print_joint_states('After skipped timed profile:') + return True + + for plan in active_plans: + ok_speed = check_required_speed(plan, duration_ms) + + if not ok_speed: + print('ERROR: required speed too high, movement aborted.') + return False + + timer = StopWatch() + + while timer.time() < duration_ms: + elapsed = timer.time() + + linear_ratio = elapsed / duration_ms + linear_ratio = clamp(linear_ratio, 0, 1) + + eased_ratio = easing_func(linear_ratio) + + if clamp_easing: + eased_ratio = clamp(eased_ratio, 0, 1) + + for plan in active_plans: + start_motor = plan['start_motor'] + target_motor = plan['target_motor'] + + current_motor_target = start_motor + (target_motor - start_motor) * eased_ratio + + maybe_send_track_target( + plan, + current_motor_target, + False + ) + + for plan in active_plans: + joint = plan['joint'] + + for motor in joint.all_motors(): + if motor.control.stalled(): + print('WARNING: motor stalled during timed profile') + motor.brake() + return False + + wait(PROFILE_UPDATE_MS) + + # Final exact target command. + for plan in active_plans: + maybe_send_track_target( + plan, + plan['target_motor'], + True + ) + + elapsed = timer.time() + + if TIMED_FINAL_TARGET_DWELL_MS > 0: + wait(TIMED_FINAL_TARGET_DWELL_MS) + + if SETTLE_AFTER_TIMED_MOVE: + for plan in active_plans: + joint = plan['joint'] + target_motor = plan['target_motor'] + joint.run_motor_target(target_motor, SETTLE_SPEED) + + motors = collect_motors(active_plans) + ok = wait_until_done(motors, 10000) + + if not ok: + return False + + else: + for plan in active_plans: + joint = plan['joint'] + joint.apply_stop_mode() + + print('Timed profile requested: {} ms'.format(duration_ms)) + print('Timed profile elapsed: {} ms'.format(elapsed)) + print_joint_states('After timed profile:') + print_final_errors(plans) + + return True + + +def execute_parallel_group(commands, duration_ms=None, + easing_name=None, clamp_easing=None): + """ + Executes one parallel group. + + If duration_ms is None: + use target movement with DEFAULT_SPEED. + + If duration_ms is set: + use timed/eased movement. + """ + if len(commands) == 0: + return True + + if duration_ms is None: + return move_parallel_target(commands, DEFAULT_SPEED) + + return move_parallel_timed( + commands, + duration_ms, + easing_name, + clamp_easing + ) + + +def run_step(step_num, step): + """ + Executes one high-level step. + + Step can be: + - PARALLEL(...) + - SERIES(...) + """ + display_step_header(step_num, step['label']) + sound_step_start() + + step_type = step['type'] + commands = step['commands'] + duration_ms = step.get('duration_ms', None) + pause_ms = step.get('pause_ms', 0) + easing_name = step.get('easing', None) + clamp_easing = step.get('clamp_easing', None) + + if len(commands) == 0: + print_joint_states('Current state:') + + if pause_ms > 0: + wait(pause_ms) + + sound_step_ok() + return True + + if step_type == 'parallel': + ok = execute_parallel_group( + commands, + duration_ms, + easing_name, + clamp_easing + ) + + elif step_type == 'series': + groups = normalize_series_groups(commands) + duration_mode = step.get('duration_mode', 'total') + + ok = True + + if duration_ms is None: + group_duration_ms = None + + for group in groups: + ok = execute_parallel_group( + group, + group_duration_ms, + easing_name, + clamp_easing + ) + + if not ok: + break + + else: + if duration_mode == 'total': + group_duration_ms = int(duration_ms / len(groups)) + + if group_duration_ms <= 0: + raise ValueError('Series duration is too short for the number of groups') + + for group in groups: + ok = execute_parallel_group( + group, + group_duration_ms, + easing_name, + clamp_easing + ) + + if not ok: + break + + elif duration_mode == 'per_group': + group_duration_ms = duration_ms + + for group in groups: + ok = execute_parallel_group( + group, + group_duration_ms, + easing_name, + clamp_easing + ) + + if not ok: + break + + else: + raise ValueError('Unknown duration_mode: {}'.format(duration_mode)) + + else: + raise ValueError('Unknown step type: {}'.format(step_type)) + + if pause_ms > 0: + wait(pause_ms) + + if ok: + sound_step_ok() + else: + sound_step_fail() + + return ok + + +def reset_all_current_positions_as_zero(): + """ + Makes current physical pose become logical 0 deg for all joints. + """ + for name in JOINT_ORDER: + JOINTS[name].reset_joint_angle(0) + + +def apply_stop_to_all_joints(): + """ + Applies configured stop mode to all joints. + """ + for name in JOINT_ORDER: + JOINTS[name].apply_stop_mode() + + +# ============================================================ +# TEST / RESEARCH MOVEMENT SEQUENCE +# ============================================================ + +TEST_STEPS = [ + + PARALLEL( + 'Reference pose: current physical position is logical zero', + [], + pause_ms=1000 + ), + + PARALLEL( + 'Homing: all joints to zero, linear', + [ + ABS_J('base', 0), + ABS_J('shoulder', 0), + ABS_J('elbow', 0), + ], + duration_ms=3000, + easing='linear', + pause_ms=1000 + ), + + SERIES( + 'Base joint-angle test, easeInOutSine, total 6s', + [ + ABS_J('base', 15), + ABS_J('base', -15), + ABS_J('base', 0), + ], + duration_ms=6000, + duration_mode='total', + easing='easeInOutSine', + pause_ms=1000 + ), + + SERIES( + 'Shoulder joint-angle test, easeInOutQuad, total 6s', + [ + ABS_J('shoulder', 8), + ABS_J('shoulder', -8), + ABS_J('shoulder', 0), + ], + duration_ms=6000, + duration_mode='total', + easing='easeInOutQuad', + pause_ms=1000 + ), + + SERIES( + 'Elbow joint-angle test, easeInOutCubic, total 6s', + [ + ABS_J('elbow', 15), + ABS_J('elbow', -15), + ABS_J('elbow', 0), + ], + duration_ms=6000, + duration_mode='total', + easing='easeInOutCubic', + pause_ms=1000 + ), + + PARALLEL( + 'Base + shoulder synchronized, easeInOutSine', + [ + ABS_J('base', 12), + ABS_J('shoulder', 8), + ], + duration_ms=3000, + easing='easeInOutSine', + pause_ms=1000 + ), + + PARALLEL( + 'Base + shoulder + elbow synchronized, easeInOutQuad', + [ + ABS_J('base', -12), + ABS_J('shoulder', -8), + ABS_J('elbow', 12), + ], + duration_ms=3000, + easing='easeInOutQuad', + pause_ms=1000 + ), + + SERIES( + 'Mixed serial groups, total 9s, easeInOutSine', + [ + [ + ABS_J('base', 10), + ABS_J('shoulder', 5), + ], + [ + ABS_J('base', -10), + ABS_J('elbow', 10), + ], + [ + ABS_J('base', 0), + ABS_J('shoulder', 0), + ABS_J('elbow', 0), + ], + ], + duration_ms=9000, + duration_mode='total', + easing='easeInOutSine', + pause_ms=1000 + ), + + SERIES( + 'Relative joint movement, easeOutQuad, total 6s', + [ + REL_J('base', 10), + REL_J('base', 10), + REL_J('base', -20), + ], + duration_ms=6000, + duration_mode='total', + easing='easeOutQuad', + pause_ms=1000 + ), + + PARALLEL( + 'Small bounce-style test, clamped', + [ + ABS_J('base', 8), + ABS_J('elbow', 8), + ], + duration_ms=3000, + easing='easeOutBounce', + clamp_easing=True, + pause_ms=1000 + ), + + PARALLEL( + 'Return all joints to zero, easeInOutSine', + [ + ABS_J('base', 0), + ABS_J('shoulder', 0), + ABS_J('elbow', 0), + ], + duration_ms=3000, + easing='easeInOutSine', + pause_ms=1000 + ), + + # Example raw motor-angle calibration step. + # Uncomment only if needed. + # + # PARALLEL( + # 'Raw motor-angle calibration test', + # [ + # ABS_M('base', 90), # base joint = 90 / 7.5 = 12 deg + # ABS_M('shoulder', 90), # shoulder joint = 90 / 7.5 = 12 deg + # ABS_M('elbow', 90), # elbow joint = 90 / 5.0 = 18 deg + # ], + # duration_ms=3000, + # easing='linear', + # pause_ms=1000 + # ), +] + + +# ============================================================ +# MAIN PROGRAM +# ============================================================ + +ev3.screen.clear() +ev3.screen.print('=== MASTER BRICK ===') +ev3.screen.print('Motion test ready') + +print('') +print('#' * 70) +print('# ROBOTIC ARM MOTION TEST START') +print('#' * 70) +print('') + +print_available_easings() + +print('') +print('Gear ratios:') +print(' base: {}'.format(GEAR_RATIOS['base'])) +print(' shoulder: {}'.format(GEAR_RATIOS['shoulder'])) +print(' elbow: {}'.format(GEAR_RATIOS['elbow'])) + +print('') +print('Stop modes:') +print(' base: {}'.format(STOP_MODE_BY_JOINT['base'])) +print(' shoulder: {}'.format(STOP_MODE_BY_JOINT['shoulder'])) +print(' elbow: {}'.format(STOP_MODE_BY_JOINT['elbow'])) + +print('') +print('Jitter mitigation:') +print(' deadband motor deg by joint: {}'.format(DEADBAND_MOTOR_DEG_BY_JOINT)) +print(' track target min update motor deg: {}'.format(TRACK_TARGET_MIN_UPDATE_MOTOR_DEG)) +print(' apply motor control settings: {}'.format(APPLY_MOTOR_CONTROL_SETTINGS)) + +print('') +print('Default easing: {}'.format(DEFAULT_EASING)) +print('Clamp easing output: {}'.format(CLAMP_EASING_OUTPUT)) +print('Settle after timed move: {}'.format(SETTLE_AFTER_TIMED_MOVE)) +print('Timed final target dwell: {} ms'.format(TIMED_FINAL_TARGET_DWELL_MS)) + +# Apply gentler control settings before movement. +configure_all_motor_controls() + +# Current physical pose becomes logical joint angle 0. +# Place the arm in a known safe starting pose before running the program. +reset_all_current_positions_as_zero() + +print_joint_states('Initial logical zero state:') + +for i in range(len(TEST_STEPS)): + ok = run_step(i, TEST_STEPS[i]) + + if not ok: + print('ERROR: step {} failed or stalled'.format(i)) + sound_step_fail() + break + +apply_stop_to_all_joints() + +ev3.screen.clear() +ev3.screen.print('=== MASTER BRICK ===') +ev3.screen.print('ALL TESTS COMPLETE') + +sound_all_complete() + +print('') +print('#' * 70) +print('# ALL TESTS COMPLETE') +print('#' * 70) +print('') \ No newline at end of file diff --git a/ev3-robotic-arm-master/robotarmmovement/tests/test_00_onlyslave.py b/ev3-robotic-arm-master/robotarmmovement/tests/test_00_onlyslave.py new file mode 100644 index 0000000..5a8d0c9 --- /dev/null +++ b/ev3-robotic-arm-master/robotarmmovement/tests/test_00_onlyslave.py @@ -0,0 +1,1715 @@ +#!/usr/bin/env pybricks-micropython + +from math import sin, cos, sqrt, pi + +from pybricks.hubs import EV3Brick +from pybricks.ev3devices import Motor +from pybricks.parameters import Port, Stop +from pybricks.tools import wait, StopWatch + + +# ============================================================ +# SLAVE BRICK STANDALONE JOINT TEST PROGRAM +# ============================================================ +# +# Purpose: +# Test only the motors/joints connected to the second EV3 brick. +# +# This version is autonomous: +# - no Bluetooth +# - no master brick +# - no phone/XR logging integration yet +# +# It supports: +# - absolute motor-angle commands +# - relative motor-angle commands +# - absolute joint-angle commands with gear ratio +# - relative joint-angle commands with gear ratio +# - serial movement +# - parallel movement +# - timed movement +# - easing profiles +# - sound after each step +# - low-angle jitter mitigation +# - configurable stop modes and motor-control tolerances +# +# Important: +# The gear ratios for the second brick are currently placeholders. +# Replace them with the real gear ratios of your slave-side joints. + + +# ============================================================ +# GLOBAL MOTION SETTINGS +# ============================================================ + +DEFAULT_SPEED = 200 # motor deg/s for non-timed movement +SETTLE_SPEED = 120 # motor deg/s for optional final correction +POLL_MS = 10 +PROFILE_UPDATE_MS = 20 + +SETTLE_AFTER_TIMED_MOVE = False +TIMED_FINAL_TARGET_DWELL_MS = 0 + +DEFAULT_EASING = 'linear' +CLAMP_EASING_OUTPUT = True + + +# ============================================================ +# SOUND SETTINGS +# ============================================================ + +SOUND_ENABLED = True +SOUND_AT_STEP_START = False +SOUND_AFTER_STEP = True +SOUND_ON_ERROR = True + +SOUND_STEP_START = (500, 60) +SOUND_STEP_OK = (1000, 90) +SOUND_STEP_FAIL = (220, 250) +SOUND_COMPLETE = (1200, 120) + + +# ============================================================ +# JITTER MITIGATION SETTINGS +# ============================================================ + +# Movement smaller than this number of MOTOR degrees is skipped. +# Increase this if low-angle movements cause buzzing/jitter. +DEFAULT_DEADBAND_MOTOR_DEG = 1.5 + +# During timed movement, a new target is sent only if it differs from +# the previously sent target by at least this many MOTOR degrees. +TRACK_TARGET_MIN_UPDATE_MOTOR_DEG = 1.0 + +APPLY_STOP_AFTER_SKIPPED_SMALL_MOVE = True + + +# ============================================================ +# MOTOR CONTROL SETTINGS +# ============================================================ + +APPLY_MOTOR_CONTROL_SETTINGS = True + +# Conservative defaults. +# Format: +# speed limit, acceleration limit, actuation limit +# +# Units: +# speed: deg/s +# acceleration: deg/s/s +# actuation: percent +DEFAULT_CONTROL_LIMITS = (400, 600, 75) + +# Format: +# speed tolerance, position tolerance +# +# Larger position tolerance can reduce target jitter. +DEFAULT_TARGET_TOLERANCES = (20, 2) + +# Format: +# stall speed, stall time +DEFAULT_STALL_TOLERANCES = (20, 1000) + +# Optional PID override. +# Keep None unless you deliberately tune PID. +# +# Format: +# (kp, ki, kd, integral_range, integral_rate, feed_forward) +DEFAULT_PID_SETTINGS = None + + +# ============================================================ +# SPEED SAFETY SETTINGS +# ============================================================ + +ABORT_IF_REQUIRED_SPEED_TOO_HIGH = False +DEFAULT_MAX_AVG_MOTOR_SPEED = 220 + + +# ============================================================ +# SLAVE JOINT CONFIGURATION +# ============================================================ +# +# Edit this section to match the real second-brick joints. +# +# Fields: +# name: +# Logical joint name used in movement commands. +# +# port: +# EV3 motor port. +# +# gear_ratio: +# motor_angle = joint_angle * gear_ratio +# +# If unknown, keep 1.0 temporarily and test in motor-angle mode first. +# +# sign: +# 1 or -1. +# Use -1 if the logical direction is opposite to physical motor direction. +# +# joint_limits: +# None or (min_angle, max_angle), in JOINT degrees. +# +# stop_mode: +# Stop.BRAKE usually reduces active jitter. +# Stop.HOLD is stronger but may jitter under backlash/load. +# +# deadband_motor_deg: +# Tiny movement skip threshold in MOTOR degrees. +# +# max_avg_motor_speed: +# Warning/abort threshold for timed movement. +# +# Example real naming could be: +# wrist_pitch, wrist_roll, wrist_yaw, gripper +# +# I use neutral names here because your exact second-brick mechanism +# was not specified. + +SLAVE_JOINT_CONFIGS = [ + + { + 'name': 'elbow_rotate', + 'port': Port.A, + 'gear_ratio': 7.5, + 'sign': 1, + 'joint_limits': None, + 'stop_mode': Stop.BRAKE, + 'deadband_motor_deg': DEFAULT_DEADBAND_MOTOR_DEG, + 'max_avg_motor_speed': DEFAULT_MAX_AVG_MOTOR_SPEED, + }, + + { + 'name': 'wrist_flexion_extension', + 'port': Port.B, + 'gear_ratio': 5.0, + 'sign': 1, + 'joint_limits': None, + 'stop_mode': Stop.BRAKE, + 'deadband_motor_deg': DEFAULT_DEADBAND_MOTOR_DEG, + 'max_avg_motor_speed': DEFAULT_MAX_AVG_MOTOR_SPEED, + }, + + { + 'name': 'wrist_pronation_supination', + 'port': Port.C, + 'gear_ratio': 7.5, + 'sign': 1, + 'joint_limits': None, + 'stop_mode': Stop.BRAKE, + 'deadband_motor_deg': DEFAULT_DEADBAND_MOTOR_DEG, + 'max_avg_motor_speed': DEFAULT_MAX_AVG_MOTOR_SPEED, + }, + + { + 'name': 'gripper', + 'port': Port.D, + 'gear_ratio': 1.0, + 'sign': 1, + 'joint_limits': None, + 'stop_mode': Stop.BRAKE, + 'deadband_motor_deg': DEFAULT_DEADBAND_MOTOR_DEG, + 'max_avg_motor_speed': DEFAULT_MAX_AVG_MOTOR_SPEED, + }, +] + + +# ============================================================ +# HARDWARE INITIALIZATION +# ============================================================ + +ev3 = EV3Brick() + + +# ============================================================ +# BASIC UTILITIES +# ============================================================ + +def clamp(value, min_value, max_value): + if value < min_value: + return min_value + + if value > max_value: + return max_value + + return value + + +def abs_value(value): + if value < 0: + return -value + + return value + + +# ============================================================ +# SOUND FUNCTIONS +# ============================================================ + +def beep_once(sound_tuple): + if not SOUND_ENABLED: + return + + frequency = sound_tuple[0] + duration = sound_tuple[1] + ev3.speaker.beep(frequency=frequency, duration=duration) + + +def sound_step_start(): + if SOUND_AT_STEP_START: + beep_once(SOUND_STEP_START) + + +def sound_step_ok(): + if SOUND_AFTER_STEP: + beep_once(SOUND_STEP_OK) + + +def sound_step_fail(): + if SOUND_ON_ERROR: + beep_once(SOUND_STEP_FAIL) + wait(80) + beep_once(SOUND_STEP_FAIL) + + +def sound_all_complete(): + if not SOUND_ENABLED: + return + + beep_once(SOUND_COMPLETE) + wait(80) + beep_once(SOUND_COMPLETE) + + +# ============================================================ +# EASING FUNCTIONS +# ============================================================ + +def ease_linear(x): + return x + + +def ease_in_sine(x): + return 1 - cos((x * pi) / 2) + + +def ease_out_sine(x): + return sin((x * pi) / 2) + + +def ease_in_out_sine(x): + return -(cos(pi * x) - 1) / 2 + + +def ease_in_quad(x): + return x * x + + +def ease_out_quad(x): + return 1 - (1 - x) * (1 - x) + + +def ease_in_out_quad(x): + if x < 0.5: + return 2 * x * x + + return 1 - ((-2 * x + 2) ** 2) / 2 + + +def ease_in_cubic(x): + return x * x * x + + +def ease_out_cubic(x): + return 1 - ((1 - x) ** 3) + + +def ease_in_out_cubic(x): + if x < 0.5: + return 4 * x * x * x + + return 1 - ((-2 * x + 2) ** 3) / 2 + + +def ease_in_quart(x): + return x ** 4 + + +def ease_out_quart(x): + return 1 - ((1 - x) ** 4) + + +def ease_in_out_quart(x): + if x < 0.5: + return 8 * (x ** 4) + + return 1 - ((-2 * x + 2) ** 4) / 2 + + +def ease_in_quint(x): + return x ** 5 + + +def ease_out_quint(x): + return 1 - ((1 - x) ** 5) + + +def ease_in_out_quint(x): + if x < 0.5: + return 16 * (x ** 5) + + return 1 - ((-2 * x + 2) ** 5) / 2 + + +def ease_in_expo(x): + if x == 0: + return 0 + + return 2 ** (10 * x - 10) + + +def ease_out_expo(x): + if x == 1: + return 1 + + return 1 - 2 ** (-10 * x) + + +def ease_in_out_expo(x): + if x == 0: + return 0 + + if x == 1: + return 1 + + if x < 0.5: + return (2 ** (20 * x - 10)) / 2 + + return (2 - 2 ** (-20 * x + 10)) / 2 + + +def ease_in_circ(x): + return 1 - sqrt(1 - x * x) + + +def ease_out_circ(x): + return sqrt(1 - ((x - 1) ** 2)) + + +def ease_in_out_circ(x): + if x < 0.5: + return (1 - sqrt(1 - ((2 * x) ** 2))) / 2 + + return (sqrt(1 - ((-2 * x + 2) ** 2)) + 1) / 2 + + +def ease_in_back(x): + c1 = 1.70158 + c3 = c1 + 1 + + return c3 * x * x * x - c1 * x * x + + +def ease_out_back(x): + c1 = 1.70158 + c3 = c1 + 1 + + return 1 + c3 * ((x - 1) ** 3) + c1 * ((x - 1) ** 2) + + +def ease_in_out_back(x): + c1 = 1.70158 + c2 = c1 * 1.525 + + if x < 0.5: + return (((2 * x) ** 2) * ((c2 + 1) * 2 * x - c2)) / 2 + + return ((((2 * x - 2) ** 2) * ((c2 + 1) * (2 * x - 2) + c2)) + 2) / 2 + + +def ease_in_elastic(x): + c4 = (2 * pi) / 3 + + if x == 0: + return 0 + + if x == 1: + return 1 + + return -(2 ** (10 * x - 10)) * sin((x * 10 - 10.75) * c4) + + +def ease_out_elastic(x): + c4 = (2 * pi) / 3 + + if x == 0: + return 0 + + if x == 1: + return 1 + + return (2 ** (-10 * x)) * sin((x * 10 - 0.75) * c4) + 1 + + +def ease_in_out_elastic(x): + c5 = (2 * pi) / 4.5 + + if x == 0: + return 0 + + if x == 1: + return 1 + + if x < 0.5: + return -((2 ** (20 * x - 10)) * sin((20 * x - 11.125) * c5)) / 2 + + return ((2 ** (-20 * x + 10)) * sin((20 * x - 11.125) * c5)) / 2 + 1 + + +def ease_out_bounce(x): + n1 = 7.5625 + d1 = 2.75 + + if x < 1 / d1: + return n1 * x * x + + elif x < 2 / d1: + x = x - 1.5 / d1 + return n1 * x * x + 0.75 + + elif x < 2.5 / d1: + x = x - 2.25 / d1 + return n1 * x * x + 0.9375 + + else: + x = x - 2.625 / d1 + return n1 * x * x + 0.984375 + + +def ease_in_bounce(x): + return 1 - ease_out_bounce(1 - x) + + +def ease_in_out_bounce(x): + if x < 0.5: + return (1 - ease_out_bounce(1 - 2 * x)) / 2 + + return (1 + ease_out_bounce(2 * x - 1)) / 2 + + +EASINGS = { + 'linear': ease_linear, + + 'easeInSine': ease_in_sine, + 'easeOutSine': ease_out_sine, + 'easeInOutSine': ease_in_out_sine, + + 'easeInQuad': ease_in_quad, + 'easeOutQuad': ease_out_quad, + 'easeInOutQuad': ease_in_out_quad, + + 'easeInCubic': ease_in_cubic, + 'easeOutCubic': ease_out_cubic, + 'easeInOutCubic': ease_in_out_cubic, + + 'easeInQuart': ease_in_quart, + 'easeOutQuart': ease_out_quart, + 'easeInOutQuart': ease_in_out_quart, + + 'easeInQuint': ease_in_quint, + 'easeOutQuint': ease_out_quint, + 'easeInOutQuint': ease_in_out_quint, + + 'easeInExpo': ease_in_expo, + 'easeOutExpo': ease_out_expo, + 'easeInOutExpo': ease_in_out_expo, + + 'easeInCirc': ease_in_circ, + 'easeOutCirc': ease_out_circ, + 'easeInOutCirc': ease_in_out_circ, + + 'easeInBack': ease_in_back, + 'easeOutBack': ease_out_back, + 'easeInOutBack': ease_in_out_back, + + 'easeInElastic': ease_in_elastic, + 'easeOutElastic': ease_out_elastic, + 'easeInOutElastic': ease_in_out_elastic, + + 'easeInBounce': ease_in_bounce, + 'easeOutBounce': ease_out_bounce, + 'easeInOutBounce': ease_in_out_bounce, +} + + +def get_easing(easing_name): + if easing_name is None: + easing_name = DEFAULT_EASING + + if easing_name in EASINGS: + return EASINGS[easing_name] + + key_lower = easing_name.lower() + + for name in EASINGS: + if name.lower() == key_lower: + return EASINGS[name] + + raise ValueError('Unknown easing: {}'.format(easing_name)) + + +# ============================================================ +# JOINT CLASS +# ============================================================ + +class Joint: + """ + Represents one logical slave-brick joint. + + This version assumes one physical motor per joint. If some slave-side + mechanism uses two motors for one joint, this class can be extended in the + same way as the master-brick shoulder joint. + + Coordinate convention: + motor_angle = joint_angle * gear_ratio + """ + + def __init__(self, name, motor, sign, gear_ratio, joint_limits, + stop_mode, deadband_motor_deg, max_avg_motor_speed): + self.name = name + self.motor = motor + self.sign = sign + self.gear_ratio = gear_ratio + self.joint_limits = joint_limits + self.stop_mode_value = stop_mode + self.deadband_motor_deg_value = deadband_motor_deg + self.max_avg_motor_speed = max_avg_motor_speed + + def motor_angle(self): + """ + Returns logical motor angle in motor degrees. + """ + return self.motor.angle() * self.sign + + def joint_angle(self): + """ + Returns logical joint angle in joint degrees. + """ + return self.motor_angle() / self.gear_ratio + + def joint_to_motor(self, joint_angle): + """ + Converts joint degrees to motor degrees. + """ + return joint_angle * self.gear_ratio + + def motor_to_joint(self, motor_angle): + """ + Converts motor degrees to joint degrees. + """ + return motor_angle / self.gear_ratio + + def reset_motor_angle(self, logical_motor_angle): + """ + Sets the current logical motor-coordinate angle. + """ + self.motor.reset_angle(logical_motor_angle * self.sign) + + def reset_joint_angle(self, logical_joint_angle): + """ + Sets the current logical joint-coordinate angle. + """ + self.reset_motor_angle(self.joint_to_motor(logical_joint_angle)) + + def check_joint_limit(self, logical_joint_target): + """ + Validates target joint angle against this joint's limits. + """ + if self.joint_limits is None: + return + + min_angle = self.joint_limits[0] + max_angle = self.joint_limits[1] + + if logical_joint_target < min_angle or logical_joint_target > max_angle: + raise ValueError( + 'Target joint angle {} for joint {} is outside limits {}..{}'.format( + logical_joint_target, + self.name, + min_angle, + max_angle + ) + ) + + def physical_motor_target(self, logical_motor_target): + """ + Converts logical motor-coordinate target into physical encoder target. + """ + return int(logical_motor_target * self.sign) + + def deadband_motor_deg(self): + return self.deadband_motor_deg_value + + def is_small_motor_delta(self, motor_delta): + return abs_value(motor_delta) <= self.deadband_motor_deg_value + + def run_motor_target(self, logical_motor_target, speed): + """ + Moves to target expressed in motor degrees. + """ + logical_joint_target = self.motor_to_joint(logical_motor_target) + self.check_joint_limit(logical_joint_target) + + self.motor.run_target( + speed, + self.physical_motor_target(logical_motor_target), + then=self.stop_mode_value, + wait=False + ) + + def track_motor_target(self, logical_motor_target): + """ + Sends target expressed in motor degrees. + Used in timed/eased movement. + """ + logical_joint_target = self.motor_to_joint(logical_motor_target) + self.check_joint_limit(logical_joint_target) + + self.motor.track_target( + self.physical_motor_target(logical_motor_target) + ) + + def apply_stop_mode(self): + """ + Applies configured final stop behavior. + """ + if self.stop_mode_value == Stop.HOLD: + self.motor.hold() + + elif self.stop_mode_value == Stop.BRAKE: + self.motor.brake() + + elif self.stop_mode_value == Stop.COAST: + self.motor.stop() + + else: + self.motor.brake() + + def all_motors(self): + return [self.motor] + + +# ============================================================ +# CREATE JOINTS FROM CONFIGURATION +# ============================================================ + +JOINTS = {} +JOINT_ORDER = [] + + +def create_joints_from_config(): + """ + Creates Motor and Joint objects from SLAVE_JOINT_CONFIGS. + """ + for cfg in SLAVE_JOINT_CONFIGS: + name = cfg['name'] + port = cfg['port'] + + motor = Motor(port) + + joint = Joint( + name, + motor, + cfg['sign'], + cfg['gear_ratio'], + cfg['joint_limits'], + cfg['stop_mode'], + cfg['deadband_motor_deg'], + cfg['max_avg_motor_speed'] + ) + + JOINTS[name] = joint + JOINT_ORDER.append(name) + + +# ============================================================ +# MOTOR CONTROL SETUP +# ============================================================ + +def configure_joint_motor_control(joint): + """ + Applies conservative motor control settings to one joint. + """ + motor = joint.motor + motor.stop() + + if DEFAULT_CONTROL_LIMITS is not None: + speed = DEFAULT_CONTROL_LIMITS[0] + acceleration = DEFAULT_CONTROL_LIMITS[1] + actuation = DEFAULT_CONTROL_LIMITS[2] + motor.control.limits(speed, acceleration, actuation) + + if DEFAULT_TARGET_TOLERANCES is not None: + speed_tol = DEFAULT_TARGET_TOLERANCES[0] + position_tol = DEFAULT_TARGET_TOLERANCES[1] + motor.control.target_tolerances(speed_tol, position_tol) + + if DEFAULT_STALL_TOLERANCES is not None: + stall_speed = DEFAULT_STALL_TOLERANCES[0] + stall_time = DEFAULT_STALL_TOLERANCES[1] + motor.control.stall_tolerances(stall_speed, stall_time) + + if DEFAULT_PID_SETTINGS is not None: + kp = DEFAULT_PID_SETTINGS[0] + ki = DEFAULT_PID_SETTINGS[1] + kd = DEFAULT_PID_SETTINGS[2] + integral_range = DEFAULT_PID_SETTINGS[3] + integral_rate = DEFAULT_PID_SETTINGS[4] + feed_forward = DEFAULT_PID_SETTINGS[5] + motor.control.pid( + kp, + ki, + kd, + integral_range, + integral_rate, + feed_forward + ) + + +def configure_all_motor_controls(): + if not APPLY_MOTOR_CONTROL_SETTINGS: + return + + for name in JOINT_ORDER: + configure_joint_motor_control(JOINTS[name]) + + +# ============================================================ +# COMMAND HELPERS +# ============================================================ + +def ABS_M(joint_name, motor_angle): + """ + Absolute target in motor degrees. + """ + return { + 'joint': joint_name, + 'mode': 'abs', + 'unit': 'motor', + 'value': motor_angle + } + + +def REL_M(joint_name, motor_delta): + """ + Relative movement in motor degrees. + """ + return { + 'joint': joint_name, + 'mode': 'rel', + 'unit': 'motor', + 'value': motor_delta + } + + +def ABS_J(joint_name, joint_angle): + """ + Absolute target in joint degrees. + """ + return { + 'joint': joint_name, + 'mode': 'abs', + 'unit': 'joint', + 'value': joint_angle + } + + +def REL_J(joint_name, joint_delta): + """ + Relative movement in joint degrees. + """ + return { + 'joint': joint_name, + 'mode': 'rel', + 'unit': 'joint', + 'value': joint_delta + } + + +def PARALLEL(label, commands, duration_ms=None, pause_ms=0, + easing=None, clamp_easing=None): + """ + Simultaneous movement step. + """ + return { + 'type': 'parallel', + 'label': label, + 'commands': commands, + 'duration_ms': duration_ms, + 'pause_ms': pause_ms, + 'easing': easing, + 'clamp_easing': clamp_easing + } + + +def SERIES(label, commands, duration_ms=None, pause_ms=0, + easing=None, clamp_easing=None, duration_mode='total'): + """ + Sequential movement step. + + commands can be: + [cmd1, cmd2, cmd3] + + or serial groups: + [ + [cmd1, cmd2], + [cmd3, cmd4] + ] + + duration_mode: + 'total': + total duration is divided between groups. + + 'per_group': + each group receives full duration_ms. + """ + return { + 'type': 'series', + 'label': label, + 'commands': commands, + 'duration_ms': duration_ms, + 'pause_ms': pause_ms, + 'easing': easing, + 'clamp_easing': clamp_easing, + 'duration_mode': duration_mode + } + + +# ============================================================ +# COMMAND PLANNING +# ============================================================ + +def is_command_object(item): + return type(item) == dict and 'joint' in item + + +def normalize_series_groups(commands): + groups = [] + + for item in commands: + if is_command_object(item): + groups.append([item]) + else: + groups.append(item) + + return groups + + +def validate_parallel_commands(commands): + seen = [] + + for command in commands: + joint_name = command['joint'] + + if joint_name in seen: + raise ValueError( + 'Joint {} appears more than once in one parallel group.'.format( + joint_name + ) + ) + + seen.append(joint_name) + + +def plan_commands(commands): + """ + Converts user commands into motor-coordinate movement plans. + """ + validate_parallel_commands(commands) + + plans = [] + + for command in commands: + joint = JOINTS[command['joint']] + mode = command['mode'] + unit = command.get('unit', 'motor') + value = command['value'] + + start_motor = joint.motor_angle() + start_joint = joint.joint_angle() + + if mode == 'abs': + if unit == 'motor': + target_motor = value + target_joint = joint.motor_to_joint(target_motor) + + elif unit == 'joint': + target_joint = value + target_motor = joint.joint_to_motor(target_joint) + + else: + raise ValueError('Unknown unit: {}'.format(unit)) + + elif mode == 'rel': + if unit == 'motor': + target_motor = start_motor + value + target_joint = joint.motor_to_joint(target_motor) + + elif unit == 'joint': + target_joint = start_joint + value + target_motor = joint.joint_to_motor(target_joint) + + else: + raise ValueError('Unknown unit: {}'.format(unit)) + + else: + raise ValueError('Unknown command mode: {}'.format(mode)) + + joint.check_joint_limit(target_joint) + + delta_motor = target_motor - start_motor + delta_joint = target_joint - start_joint + skip_small_move = joint.is_small_motor_delta(delta_motor) + + plans.append({ + 'joint': joint, + 'start_motor': start_motor, + 'target_motor': target_motor, + 'start_joint': start_joint, + 'target_joint': target_joint, + 'delta_motor': delta_motor, + 'delta_joint': delta_joint, + 'skip_small_move': skip_small_move, + 'last_sent_motor_target': None, + 'command': command + }) + + return plans + + +def active_plans_only(plans): + active = [] + + for plan in plans: + if not plan['skip_small_move']: + active.append(plan) + + return active + + +def collect_motors(plans): + motors = [] + + for plan in plans: + joint = plan['joint'] + + for motor in joint.all_motors(): + motors.append(motor) + + return motors + + +# ============================================================ +# DISPLAY AND LOGGING +# ============================================================ + +def display_step_header(step_num, label): + ev3.screen.clear() + ev3.screen.print('=== SLAVE ===') + ev3.screen.print('STEP {}'.format(step_num)) + ev3.screen.print(label) + + print('') + print('=' * 70) + print('SLAVE STEP {}: {}'.format(step_num, label)) + print('=' * 70) + + +def print_joint_states(prefix): + print(prefix) + + for name in JOINT_ORDER: + joint = JOINTS[name] + + print( + ' {}: motor={} deg, joint={} deg'.format( + name, + joint.motor_angle(), + joint.joint_angle() + ) + ) + + +def print_motion_plan(plans): + for plan in plans: + joint = plan['joint'] + + print( + ' {}: motor {} -> {} deg, joint {} -> {} deg'.format( + joint.name, + plan['start_motor'], + plan['target_motor'], + plan['start_joint'], + plan['target_joint'] + ) + ) + + print( + ' delta: motor={} deg, joint={} deg, skip_small_move={}'.format( + plan['delta_motor'], + plan['delta_joint'], + plan['skip_small_move'] + ) + ) + + +def print_final_errors(plans): + print('Final errors:') + + for plan in plans: + joint = plan['joint'] + + actual_motor = joint.motor_angle() + actual_joint = joint.joint_angle() + + motor_error = plan['target_motor'] - actual_motor + joint_error = plan['target_joint'] - actual_joint + + print( + ' {}: motor_error={} deg, joint_error={} deg'.format( + joint.name, + motor_error, + joint_error + ) + ) + + +def print_joint_configuration(): + print('Slave joint configuration:') + + for name in JOINT_ORDER: + joint = JOINTS[name] + + print( + ' {}: gear_ratio={}, limit={}, stop_mode={}, deadband_motor_deg={}'.format( + name, + joint.gear_ratio, + joint.joint_limits, + joint.stop_mode_value, + joint.deadband_motor_deg_value + ) + ) + + +# ============================================================ +# WAITING / SAFETY +# ============================================================ + +def wait_until_done(motors, timeout_ms): + timer = StopWatch() + + while True: + all_done = True + + for motor in motors: + if motor.control.stalled(): + print('WARNING: motor stalled') + motor.brake() + return False + + if not motor.control.done(): + all_done = False + + if all_done: + return True + + if timeout_ms is not None and timer.time() > timeout_ms: + print('WARNING: timeout while waiting for motors') + + for motor in motors: + motor.brake() + + return False + + wait(POLL_MS) + + +def check_required_speed(plan, duration_ms): + joint = plan['joint'] + + if duration_ms <= 0: + raise ValueError('duration_ms must be greater than 0') + + required_avg_speed = abs_value(plan['delta_motor']) * 1000 / duration_ms + + if required_avg_speed > joint.max_avg_motor_speed: + print( + 'WARNING: required average speed for {} is {} deg/s; recommended max is {} deg/s'.format( + joint.name, + required_avg_speed, + joint.max_avg_motor_speed + ) + ) + + if ABORT_IF_REQUIRED_SPEED_TOO_HIGH: + return False + + return True + + +# ============================================================ +# MOVEMENT EXECUTION +# ============================================================ + +def handle_skipped_small_moves(plans): + for plan in plans: + if plan['skip_small_move']: + joint = plan['joint'] + + print( + ' SKIP tiny move for {}: delta_motor={} deg, deadband={} deg'.format( + joint.name, + plan['delta_motor'], + joint.deadband_motor_deg() + ) + ) + + if APPLY_STOP_AFTER_SKIPPED_SMALL_MOVE: + joint.apply_stop_mode() + + +def move_parallel_target(commands, speed=DEFAULT_SPEED): + plans = plan_commands(commands) + active_plans = active_plans_only(plans) + + print_joint_states('Before target move:') + print_motion_plan(plans) + handle_skipped_small_moves(plans) + + if len(active_plans) == 0: + print('No active movement after deadband filtering.') + print_joint_states('After skipped target move:') + return True + + for plan in active_plans: + joint = plan['joint'] + joint.run_motor_target(plan['target_motor'], speed) + + motors = collect_motors(active_plans) + ok = wait_until_done(motors, 30000) + + print_joint_states('After target move:') + print_final_errors(plans) + + return ok + + +def maybe_send_track_target(plan, motor_target, force_send): + joint = plan['joint'] + last_sent = plan['last_sent_motor_target'] + + if force_send: + joint.track_motor_target(motor_target) + plan['last_sent_motor_target'] = motor_target + return + + if last_sent is None: + joint.track_motor_target(motor_target) + plan['last_sent_motor_target'] = motor_target + return + + if TRACK_TARGET_MIN_UPDATE_MOTOR_DEG <= 0: + joint.track_motor_target(motor_target) + plan['last_sent_motor_target'] = motor_target + return + + if abs_value(motor_target - last_sent) >= TRACK_TARGET_MIN_UPDATE_MOTOR_DEG: + joint.track_motor_target(motor_target) + plan['last_sent_motor_target'] = motor_target + + +def move_parallel_timed(commands, duration_ms, easing_name=None, + clamp_easing=None): + if duration_ms is None: + return move_parallel_target(commands, DEFAULT_SPEED) + + if duration_ms <= 0: + raise ValueError('duration_ms must be greater than 0') + + if clamp_easing is None: + clamp_easing = CLAMP_EASING_OUTPUT + + easing_func = get_easing(easing_name) + + plans = plan_commands(commands) + active_plans = active_plans_only(plans) + + print_joint_states('Before timed move:') + print('Timed profile duration: {} ms'.format(duration_ms)) + print('Easing: {}'.format(easing_name if easing_name is not None else DEFAULT_EASING)) + print('Clamp easing output: {}'.format(clamp_easing)) + print_motion_plan(plans) + handle_skipped_small_moves(plans) + + if len(active_plans) == 0: + print('No active movement after deadband filtering.') + print_joint_states('After skipped timed move:') + return True + + for plan in active_plans: + ok_speed = check_required_speed(plan, duration_ms) + + if not ok_speed: + print('ERROR: required speed too high, movement aborted.') + return False + + timer = StopWatch() + + while timer.time() < duration_ms: + elapsed = timer.time() + + linear_ratio = elapsed / duration_ms + linear_ratio = clamp(linear_ratio, 0, 1) + + eased_ratio = easing_func(linear_ratio) + + if clamp_easing: + eased_ratio = clamp(eased_ratio, 0, 1) + + for plan in active_plans: + start_motor = plan['start_motor'] + target_motor = plan['target_motor'] + + current_target = start_motor + (target_motor - start_motor) * eased_ratio + + maybe_send_track_target( + plan, + current_target, + False + ) + + for plan in active_plans: + joint = plan['joint'] + + for motor in joint.all_motors(): + if motor.control.stalled(): + print('WARNING: motor stalled during timed profile') + motor.brake() + return False + + wait(PROFILE_UPDATE_MS) + + for plan in active_plans: + maybe_send_track_target( + plan, + plan['target_motor'], + True + ) + + elapsed = timer.time() + + if TIMED_FINAL_TARGET_DWELL_MS > 0: + wait(TIMED_FINAL_TARGET_DWELL_MS) + + if SETTLE_AFTER_TIMED_MOVE: + for plan in active_plans: + joint = plan['joint'] + joint.run_motor_target(plan['target_motor'], SETTLE_SPEED) + + motors = collect_motors(active_plans) + ok = wait_until_done(motors, 10000) + + if not ok: + return False + + else: + for plan in active_plans: + joint = plan['joint'] + joint.apply_stop_mode() + + print('Timed profile requested: {} ms'.format(duration_ms)) + print('Timed profile elapsed: {} ms'.format(elapsed)) + print_joint_states('After timed move:') + print_final_errors(plans) + + return True + + +def execute_parallel_group(commands, duration_ms=None, + easing_name=None, clamp_easing=None): + if len(commands) == 0: + return True + + if duration_ms is None: + return move_parallel_target(commands, DEFAULT_SPEED) + + return move_parallel_timed( + commands, + duration_ms, + easing_name, + clamp_easing + ) + + +def run_step(step_num, step): + display_step_header(step_num, step['label']) + sound_step_start() + + step_type = step['type'] + commands = step['commands'] + duration_ms = step.get('duration_ms', None) + pause_ms = step.get('pause_ms', 0) + easing_name = step.get('easing', None) + clamp_easing = step.get('clamp_easing', None) + + if len(commands) == 0: + print_joint_states('Current state:') + + if pause_ms > 0: + wait(pause_ms) + + sound_step_ok() + return True + + if step_type == 'parallel': + ok = execute_parallel_group( + commands, + duration_ms, + easing_name, + clamp_easing + ) + + elif step_type == 'series': + groups = normalize_series_groups(commands) + duration_mode = step.get('duration_mode', 'total') + + ok = True + + if duration_ms is None: + for group in groups: + ok = execute_parallel_group( + group, + None, + easing_name, + clamp_easing + ) + + if not ok: + break + + else: + if duration_mode == 'total': + group_duration_ms = int(duration_ms / len(groups)) + + if group_duration_ms <= 0: + raise ValueError('Series duration is too short') + + for group in groups: + ok = execute_parallel_group( + group, + group_duration_ms, + easing_name, + clamp_easing + ) + + if not ok: + break + + elif duration_mode == 'per_group': + for group in groups: + ok = execute_parallel_group( + group, + duration_ms, + easing_name, + clamp_easing + ) + + if not ok: + break + + else: + raise ValueError('Unknown duration_mode: {}'.format(duration_mode)) + + else: + raise ValueError('Unknown step type: {}'.format(step_type)) + + if pause_ms > 0: + wait(pause_ms) + + if ok: + sound_step_ok() + else: + sound_step_fail() + + return ok + + +def reset_all_current_positions_as_zero(): + """ + Current physical position becomes logical joint angle 0 for all slave joints. + """ + for name in JOINT_ORDER: + JOINTS[name].reset_joint_angle(0) + + +def apply_stop_to_all_joints(): + for name in JOINT_ORDER: + JOINTS[name].apply_stop_mode() + + +# ============================================================ +# SLAVE TEST SEQUENCE +# ============================================================ +# +# Because the real slave-side gear ratios are not known yet, this sequence uses +# small movements. If gear_ratio remains 1.0, joint angle equals motor angle. +# +# Suggested workflow: +# 1. Test with ABS_M / REL_M first. +# 2. Measure or confirm gear ratios. +# 3. Replace gear_ratio values in SLAVE_JOINT_CONFIGS. +# 4. Then use ABS_J / REL_J as the main research-level interface. + +TEST_STEPS = [ + + PARALLEL( + 'Reference pose: current slave pose is logical zero', + [], + pause_ms=1000 + ), + + PARALLEL( + 'Homing all slave joints to zero', + [ + ABS_J('elbow_rotate', 0), + ABS_J('wrist_flexion_extension', 0), + ABS_J('wrist_pronation_supination', 0), + ABS_J('gripper', 0), + ], + duration_ms=3000, + easing='linear', + pause_ms=1000 + ), + + SERIES( + 'Slave joint 1 individual test', + [ + ABS_J('elbow_rotate', 20), + ABS_J('elbow_rotate', -20), + ABS_J('elbow_rotate', 0), + ], + duration_ms=6000, + duration_mode='total', + easing='easeInOutSine', + pause_ms=1000 + ), + + SERIES( + 'Slave joint 2 individual test', + [ + ABS_J('wrist_flexion_extension', 20), + ABS_J('wrist_flexion_extension', -20), + ABS_J('wrist_flexion_extension', 0), + ], + duration_ms=6000, + duration_mode='total', + easing='easeInOutSine', + pause_ms=1000 + ), + + SERIES( + 'Slave joint 3 individual test', + [ + ABS_J('wrist_pronation_supination', 20), + ABS_J('wrist_pronation_supination', -20), + ABS_J('wrist_pronation_supination', 0), + ], + duration_ms=6000, + duration_mode='total', + easing='easeInOutSine', + pause_ms=1000 + ), + + SERIES( + 'Slave joint 4 individual test', + [ + ABS_J('gripper', 20), + ABS_J('gripper', -20), + ABS_J('gripper', 0), + ], + duration_ms=6000, + duration_mode='total', + easing='easeInOutSine', + pause_ms=1000 + ), + + PARALLEL( + 'Slave joints 1 + 2 synchronized test', + [ + ABS_J('elbow_rotate', 15), + ABS_J('wrist_flexion_extension', 15), + ], + duration_ms=3000, + easing='easeInOutQuad', + pause_ms=1000 + ), + + PARALLEL( + 'Slave joints 3 + 4 synchronized test', + [ + ABS_J('wrist_pronation_supination', 15), + ABS_J('gripper', 15), + ], + duration_ms=3000, + easing='easeInOutQuad', + pause_ms=1000 + ), + + PARALLEL( + 'All slave joints synchronized positive', + [ + ABS_J('elbow_rotate', 10), + ABS_J('wrist_flexion_extension', 10), + ABS_J('wrist_pronation_supination', 10), + ABS_J('gripper', 10), + ], + duration_ms=3000, + easing='easeInOutCubic', + pause_ms=1000 + ), + + PARALLEL( + 'All slave joints synchronized negative', + [ + ABS_J('elbow_rotate', -10), + ABS_J('wrist_flexion_extension', -10), + ABS_J('wrist_pronation_supination', -10), + ABS_J('gripper', -10), + ], + duration_ms=3000, + easing='easeInOutCubic', + pause_ms=1000 + ), + + SERIES( + 'Relative movement test on slave joint 1', + [ + REL_J('elbow_rotate', 10), + REL_J('elbow_rotate', 10), + REL_J('elbow_rotate', -20), + ], + duration_ms=6000, + duration_mode='total', + easing='easeOutQuad', + pause_ms=1000 + ), + + SERIES( + 'Mixed slave parallel groups', + [ + [ + ABS_J('elbow_rotate', 10), + ABS_J('wrist_flexion_extension', 5), + ], + [ + ABS_J('wrist_pronation_supination', 10), + ABS_J('gripper', 5), + ], + [ + ABS_J('elbow_rotate', 0), + ABS_J('wrist_flexion_extension', 0), + ABS_J('wrist_pronation_supination', 0), + ABS_J('gripper', 0), + ], + ], + duration_ms=9000, + duration_mode='total', + easing='easeInOutSine', + pause_ms=1000 + ), + + PARALLEL( + 'Return all slave joints to zero', + [ + ABS_J('elbow_rotate', 0), + ABS_J('wrist_flexion_extension', 0), + ABS_J('wrist_pronation_supination', 0), + ABS_J('gripper', 0), + ], + duration_ms=3000, + easing='easeInOutSine', + pause_ms=1000 + ), + + # Raw motor-angle example. + # Useful before gear ratios are known. + # + # PARALLEL( + # 'Raw motor-angle calibration test', + # [ + # ABS_M('elbow_rotate', 90), + # ABS_M('wrist_flexion_extension', 90), + # ABS_M('wrist_pronation_supination', 90), + # ABS_M('gripper', 90), + # ], + # duration_ms=3000, + # easing='linear', + # pause_ms=1000 + # ), +] + + +# ============================================================ +# MAIN PROGRAM +# ============================================================ + +ev3.screen.clear() +ev3.screen.print('=== SLAVE BRICK ===') +ev3.screen.print('Standalone test') + +print('') +print('#' * 70) +print('# SLAVE BRICK STANDALONE JOINT TEST START') +print('#' * 70) +print('') + +create_joints_from_config() +configure_all_motor_controls() + +print_joint_configuration() + +print('') +print('Default easing: {}'.format(DEFAULT_EASING)) +print('Clamp easing output: {}'.format(CLAMP_EASING_OUTPUT)) +print('Settle after timed move: {}'.format(SETTLE_AFTER_TIMED_MOVE)) +print('Track target min update: {} motor deg'.format(TRACK_TARGET_MIN_UPDATE_MOTOR_DEG)) +print('Default control limits: {}'.format(DEFAULT_CONTROL_LIMITS)) +print('Default target tolerances: {}'.format(DEFAULT_TARGET_TOLERANCES)) + +# Place the slave-side mechanism in a known safe pose before running. +reset_all_current_positions_as_zero() + +print_joint_states('Initial slave logical zero state:') + +for i in range(len(TEST_STEPS)): + ok = run_step(i, TEST_STEPS[i]) + + if not ok: + print('ERROR: slave step {} failed or stalled'.format(i)) + sound_step_fail() + break + +apply_stop_to_all_joints() + +ev3.screen.clear() +ev3.screen.print('=== SLAVE BRICK ===') +ev3.screen.print('ALL TESTS COMPLETE') + +sound_all_complete() + +print('') +print('#' * 70) +print('# SLAVE BRICK TEST COMPLETE') +print('#' * 70) +print('') \ No newline at end of file