#!/usr/bin/env pybricks-micropython from math import sin, cos, sqrt, pi from pybricks.hubs import EV3Brick from pybricks.ev3devices import Motor from pybricks.parameters import Port, Stop from pybricks.tools import wait, StopWatch # ============================================================ # ROBOT ARM MOTION CONTROLLER # ============================================================ # # Purpose: # One-brick motion controller for an EV3 robotic arm used to generate # repeatable phone/Cardboard XR movement trajectories. # # Supports: # - absolute motor-angle commands # - relative motor-angle commands # - absolute joint-angle commands with gear-ratio conversion # - relative joint-angle commands with gear-ratio conversion # - sequential movement # - simultaneous movement # - timed movement profiles # - easing functions based on easings.net naming # - sound feedback after every step # - low-angle jitter mitigation # - configurable motor control limits and tolerances # # Coordinate convention: # motor_angle = joint_angle * gear_ratio # # Example: # ABS_J('base', 20) # means: # base joint target = 20 deg # base motor target = 20 * 7.5 = 150 deg # ============================================================ # HARDWARE PORT CONFIGURATION # ============================================================ BASE_PORT = Port.A SHOULDER_LEFT_PORT = Port.B SHOULDER_RIGHT_PORT = Port.C ELBOW_PORT = Port.D # ============================================================ # BASIC MOTION CONFIGURATION # ============================================================ # Speed used by non-timed run_target() movements. # Unit: motor degrees per second. DEFAULT_SPEED = 200 # Speed used for optional final settling after timed track_target() movement. # Unit: motor degrees per second. SETTLE_SPEED = 120 # Polling interval for waiting loops. # Lower value = more responsive, but more CPU activity. POLL_MS = 10 # Target update interval during timed/eased movements. # Lower value = smoother target curve, but more controller updates. # If too low, very small target changes may cause unnecessary jitter. PROFILE_UPDATE_MS = 20 # If True, after a timed profile the robot performs an additional run_target() # movement to exactly settle at the final target. # # Advantage: # better final angle accuracy. # # Disadvantage: # total motion time becomes longer than duration_ms. # # For tracking-drift experiments, False is usually better when time consistency # is more important than final encoder accuracy. SETTLE_AFTER_TIMED_MOVE = False # Optional dwell at final target after timed profile before applying final stop. # Unit: ms. # # 0 preserves the requested timing most strictly. # 50-150 can help if the motor needs a short moment to reach the last target. TIMED_FINAL_TARGET_DWELL_MS = 0 # ============================================================ # GEAR RATIOS # ============================================================ # Gear ratio convention: # motor_angle = joint_angle * gear_ratio # # base: # 8-tooth motor gear -> 60-tooth driven gear # 60 / 8 = 7.5 # # shoulder: # 8-tooth motor gear -> 60-tooth driven gear # 60 / 8 = 7.5 # # elbow: # ratio = 5.0 GEAR_RATIOS = { 'base': 7.5, 'shoulder': 7.5, 'elbow': 5.0, } # ============================================================ # JOINT LIMITS # ============================================================ # Limits are expressed in JOINT degrees, not motor degrees. # # Strong recommendation: # Replace None with real safe limits before mounting the phone and tracker. # # Example: # JOINT_LIMITS = { # 'base': (-60, 60), # 'shoulder': (-25, 25), # 'elbow': (-35, 35), # } JOINT_LIMITS = { 'base': None, 'shoulder': None, 'elbow': None, } # ============================================================ # SHOULDER MOTOR DIRECTION # ============================================================ # If the two shoulder motors mechanically fight each other, change this. # # Try [1, 1] first. # If motors rotate against each other, try [1, -1]. SHOULDER_SIGNS = [1, 1] # ============================================================ # STOPPING / HOLDING CONFIGURATION # ============================================================ # Stop mode after a target movement. # # Stop.BRAKE: # Passively resists motion. Often reduces active PID jitter. # Good if the mechanism can hold itself mechanically. # # Stop.HOLD: # Actively keeps the target angle. Better against gravity/load, but may jitter # when there is backlash, low speed, small target error, or high load. # # Stop.COAST: # Lets the motor spin freely. Usually not safe for loaded arm joints. # # For first tests with phone/tracker mounted: # If the arm sags with Stop.BRAKE, use Stop.HOLD for shoulder/elbow. # If it jitters with Stop.HOLD, try Stop.BRAKE or increase tolerances. DEFAULT_STOP_MODE = Stop.BRAKE STOP_MODE_BY_JOINT = { 'base': Stop.BRAKE, 'shoulder': Stop.BRAKE, 'elbow': Stop.BRAKE, } # ============================================================ # LOW-ANGLE JITTER MITIGATION # ============================================================ # Deadband for tiny requested movements. # # If requested target is closer than this number of MOTOR degrees, # the movement is skipped. # # Reason: # Very small movement commands may cause the controller to fight encoder # quantization, gear backlash, load, or static friction. # # Use 0 to disable. DEADBAND_MOTOR_DEG_BY_JOINT = { 'base': 1.5, 'shoulder': 1.5, 'elbow': 1.5, } # During timed/eased movement, a new track_target() command is sent only if # the target has changed by at least this many MOTOR degrees. # # This reduces micro-updates that can cause low-speed jitter. # # Use 0 to send every profile update. TRACK_TARGET_MIN_UPDATE_MOTOR_DEG = 1.0 # If True, skipped tiny moves still apply the configured final stop mode. # This is useful when a previous command left the motor under active control. APPLY_STOP_AFTER_SKIPPED_SMALL_MOVE = True # ============================================================ # MOTOR CONTROL SETTINGS # ============================================================ # If True, apply Motor.control settings at the beginning of the program. # # These settings are conservative starting values. They are not universal. # If movement becomes too weak or stalls under payload, increase actuation. APPLY_MOTOR_CONTROL_SETTINGS = True # Motor-control limits: # speed = maximum motor speed, deg/s # acceleration = maximum motor acceleration, deg/s/s # actuation = maximum actuation, percent of full output # # Lower actuation can reduce aggressive correction and jitter, but can also # increase stalling risk under load. MOTOR_CONTROL_LIMITS_BY_JOINT = { 'base': (500, 800, 70), 'shoulder': (400, 600, 75), 'elbow': (400, 600, 75), } # Target tolerances: # speed = allowed deviation from zero speed, motor deg/s # position = allowed position error, motor deg # # Larger position tolerance can reduce jitter near the target. # Too large tolerance reduces accuracy. TARGET_TOLERANCES_BY_JOINT = { 'base': (20, 2), 'shoulder': (20, 2), 'elbow': (20, 2), } # Stall tolerances: # speed = speed threshold, motor deg/s # time = how long the controller must be below that speed before stall, ms STALL_TOLERANCES_BY_JOINT = { 'base': (20, 1000), 'shoulder': (20, 1000), 'elbow': (20, 1000), } # Optional PID override. # # None means: keep Pybricks default PID settings. # # Format: # (kp, ki, kd, integral_range, integral_rate, feed_forward) # # Use only after observing persistent oscillation and after testing: # - Stop.BRAKE instead of Stop.HOLD # - larger target tolerance # - lower actuation # - larger movement duration # # Example placeholder: # PID_SETTINGS_BY_JOINT = { # 'base': None, # 'shoulder': None, # 'elbow': None, # } PID_SETTINGS_BY_JOINT = { 'base': None, 'shoulder': None, 'elbow': None, } # ============================================================ # SPEED SAFETY CHECKS # ============================================================ # If True, abort movement when average required motor speed exceeds the limit. # If False, only print a warning. ABORT_IF_REQUIRED_SPEED_TOO_HIGH = False # Maximum recommended average speed for timed movement. # Unit: motor deg/s. MAX_AVG_MOTOR_SPEED_BY_JOINT = { 'base': 250, 'shoulder': 200, 'elbow': 200, } # ============================================================ # EASING CONFIGURATION # ============================================================ DEFAULT_EASING = 'linear' # If True, easing output is clamped to [0, 1]. # # This makes Back/Elastic overshoot safer, but removes their original overshoot. # # If False, Back and Elastic may command positions outside the start-target # range. Use only after JOINT_LIMITS are configured. CLAMP_EASING_OUTPUT = True # ============================================================ # SOUND CONFIGURATION # ============================================================ SOUND_ENABLED = True # If True, short beep at step start. SOUND_AT_STEP_START = False # If True, confirmation beep after each successfully completed step. SOUND_AFTER_STEP = True # If True, error sound after failed/stalled step. SOUND_ON_ERROR = True # Frequencies and durations. # Frequencies are in Hz, durations are in ms. SOUND_STEP_START = (500, 60) SOUND_STEP_OK = (1000, 90) SOUND_STEP_FAIL = (220, 250) SOUND_COMPLETE = (1200, 120) # ============================================================ # HARDWARE INITIALIZATION # ============================================================ ev3 = EV3Brick() base_motor = Motor(BASE_PORT) shoulder_left_motor = Motor(SHOULDER_LEFT_PORT) shoulder_right_motor = Motor(SHOULDER_RIGHT_PORT) elbow_motor = Motor(ELBOW_PORT) # ============================================================ # BASIC UTILITIES # ============================================================ def clamp(value, min_value, max_value): """ Returns value limited to the closed interval [min_value, max_value]. """ if value < min_value: return min_value if value > max_value: return max_value return value def abs_value(value): """ MicroPython-safe absolute value wrapper. """ if value < 0: return -value return value # ============================================================ # SOUND FUNCTIONS # ============================================================ def beep_once(sound_tuple): """ Plays one beep if sound feedback is enabled. sound_tuple: (frequency_hz, duration_ms) """ if not SOUND_ENABLED: return frequency = sound_tuple[0] duration = sound_tuple[1] ev3.speaker.beep(frequency=frequency, duration=duration) def sound_step_start(): """ Optional short sound before executing a step. """ if SOUND_AT_STEP_START: beep_once(SOUND_STEP_START) def sound_step_ok(): """ Sound after a successfully completed step. """ if SOUND_AFTER_STEP: beep_once(SOUND_STEP_OK) def sound_step_fail(): """ Sound after failed, stalled, or aborted movement. """ if SOUND_ON_ERROR: beep_once(SOUND_STEP_FAIL) wait(80) beep_once(SOUND_STEP_FAIL) def sound_all_complete(): """ Sound at the end of the full movement sequence. """ if not SOUND_ENABLED: return beep_once(SOUND_COMPLETE) wait(80) beep_once(SOUND_COMPLETE) # ============================================================ # EASING FUNCTIONS # ============================================================ # # Input: # x in [0, 1] # # Output: # eased progress value. # # Most easing functions stay in [0, 1]. # Back and Elastic can overshoot unless CLAMP_EASING_OUTPUT is True. def ease_linear(x): return x def ease_in_sine(x): return 1 - cos((x * pi) / 2) def ease_out_sine(x): return sin((x * pi) / 2) def ease_in_out_sine(x): return -(cos(pi * x) - 1) / 2 def ease_in_quad(x): return x * x def ease_out_quad(x): return 1 - (1 - x) * (1 - x) def ease_in_out_quad(x): if x < 0.5: return 2 * x * x return 1 - ((-2 * x + 2) ** 2) / 2 def ease_in_cubic(x): return x * x * x def ease_out_cubic(x): return 1 - ((1 - x) ** 3) def ease_in_out_cubic(x): if x < 0.5: return 4 * x * x * x return 1 - ((-2 * x + 2) ** 3) / 2 def ease_in_quart(x): return x ** 4 def ease_out_quart(x): return 1 - ((1 - x) ** 4) def ease_in_out_quart(x): if x < 0.5: return 8 * (x ** 4) return 1 - ((-2 * x + 2) ** 4) / 2 def ease_in_quint(x): return x ** 5 def ease_out_quint(x): return 1 - ((1 - x) ** 5) def ease_in_out_quint(x): if x < 0.5: return 16 * (x ** 5) return 1 - ((-2 * x + 2) ** 5) / 2 def ease_in_expo(x): if x == 0: return 0 return 2 ** (10 * x - 10) def ease_out_expo(x): if x == 1: return 1 return 1 - 2 ** (-10 * x) def ease_in_out_expo(x): if x == 0: return 0 if x == 1: return 1 if x < 0.5: return (2 ** (20 * x - 10)) / 2 return (2 - 2 ** (-20 * x + 10)) / 2 def ease_in_circ(x): return 1 - sqrt(1 - x * x) def ease_out_circ(x): return sqrt(1 - ((x - 1) ** 2)) def ease_in_out_circ(x): if x < 0.5: return (1 - sqrt(1 - ((2 * x) ** 2))) / 2 return (sqrt(1 - ((-2 * x + 2) ** 2)) + 1) / 2 def ease_in_back(x): c1 = 1.70158 c3 = c1 + 1 return c3 * x * x * x - c1 * x * x def ease_out_back(x): c1 = 1.70158 c3 = c1 + 1 return 1 + c3 * ((x - 1) ** 3) + c1 * ((x - 1) ** 2) def ease_in_out_back(x): c1 = 1.70158 c2 = c1 * 1.525 if x < 0.5: return (((2 * x) ** 2) * ((c2 + 1) * 2 * x - c2)) / 2 return ((((2 * x - 2) ** 2) * ((c2 + 1) * (2 * x - 2) + c2)) + 2) / 2 def ease_in_elastic(x): c4 = (2 * pi) / 3 if x == 0: return 0 if x == 1: return 1 return -(2 ** (10 * x - 10)) * sin((x * 10 - 10.75) * c4) def ease_out_elastic(x): c4 = (2 * pi) / 3 if x == 0: return 0 if x == 1: return 1 return (2 ** (-10 * x)) * sin((x * 10 - 0.75) * c4) + 1 def ease_in_out_elastic(x): c5 = (2 * pi) / 4.5 if x == 0: return 0 if x == 1: return 1 if x < 0.5: return -((2 ** (20 * x - 10)) * sin((20 * x - 11.125) * c5)) / 2 return ((2 ** (-20 * x + 10)) * sin((20 * x - 11.125) * c5)) / 2 + 1 def ease_out_bounce(x): n1 = 7.5625 d1 = 2.75 if x < 1 / d1: return n1 * x * x elif x < 2 / d1: x = x - 1.5 / d1 return n1 * x * x + 0.75 elif x < 2.5 / d1: x = x - 2.25 / d1 return n1 * x * x + 0.9375 else: x = x - 2.625 / d1 return n1 * x * x + 0.984375 def ease_in_bounce(x): return 1 - ease_out_bounce(1 - x) def ease_in_out_bounce(x): if x < 0.5: return (1 - ease_out_bounce(1 - 2 * x)) / 2 return (1 + ease_out_bounce(2 * x - 1)) / 2 EASINGS = { 'linear': ease_linear, 'easeInSine': ease_in_sine, 'easeOutSine': ease_out_sine, 'easeInOutSine': ease_in_out_sine, 'easeInQuad': ease_in_quad, 'easeOutQuad': ease_out_quad, 'easeInOutQuad': ease_in_out_quad, 'easeInCubic': ease_in_cubic, 'easeOutCubic': ease_out_cubic, 'easeInOutCubic': ease_in_out_cubic, 'easeInQuart': ease_in_quart, 'easeOutQuart': ease_out_quart, 'easeInOutQuart': ease_in_out_quart, 'easeInQuint': ease_in_quint, 'easeOutQuint': ease_out_quint, 'easeInOutQuint': ease_in_out_quint, 'easeInExpo': ease_in_expo, 'easeOutExpo': ease_out_expo, 'easeInOutExpo': ease_in_out_expo, 'easeInCirc': ease_in_circ, 'easeOutCirc': ease_out_circ, 'easeInOutCirc': ease_in_out_circ, 'easeInBack': ease_in_back, 'easeOutBack': ease_out_back, 'easeInOutBack': ease_in_out_back, 'easeInElastic': ease_in_elastic, 'easeOutElastic': ease_out_elastic, 'easeInOutElastic': ease_in_out_elastic, 'easeInBounce': ease_in_bounce, 'easeOutBounce': ease_out_bounce, 'easeInOutBounce': ease_in_out_bounce, } EASING_NAMES = [ 'linear', 'easeInSine', 'easeOutSine', 'easeInOutSine', 'easeInQuad', 'easeOutQuad', 'easeInOutQuad', 'easeInCubic', 'easeOutCubic', 'easeInOutCubic', 'easeInQuart', 'easeOutQuart', 'easeInOutQuart', 'easeInQuint', 'easeOutQuint', 'easeInOutQuint', 'easeInExpo', 'easeOutExpo', 'easeInOutExpo', 'easeInCirc', 'easeOutCirc', 'easeInOutCirc', 'easeInBack', 'easeOutBack', 'easeInOutBack', 'easeInElastic', 'easeOutElastic', 'easeInOutElastic', 'easeInBounce', 'easeOutBounce', 'easeInOutBounce', ] def get_easing(easing_name): """ Returns easing function by name. easing_name: String such as: - 'linear' - 'easeInOutSine' - 'easeOutQuad' - 'easeInOutCubic' - 'easeOutBounce' """ if easing_name is None: easing_name = DEFAULT_EASING if easing_name in EASINGS: return EASINGS[easing_name] key_lower = easing_name.lower() for name in EASING_NAMES: if name.lower() == key_lower: return EASINGS[name] raise ValueError('Unknown easing: {}'.format(easing_name)) # ============================================================ # JOINT CLASS # ============================================================ class Joint: """ Represents one logical robot-arm joint. A joint can be controlled by: - one motor, e.g. base or elbow - two synchronized motors, e.g. shoulder The class separates: - physical motor encoder angles - logical motor-coordinate angles - logical joint-coordinate angles Gear conversion: motor_angle = joint_angle * gear_ratio signs: Used when motors must rotate in opposite directions to produce the same logical joint movement. """ def __init__(self, name, motors, signs, gear_ratio): self.name = name self.motors = motors self.signs = signs self.gear_ratio = gear_ratio def motor_angle(self): """ Returns logical motor angle in motor degrees. For one motor: signed encoder angle. For two motors: signed average encoder angle. """ total = 0 count = len(self.motors) for i in range(count): total += self.motors[i].angle() * self.signs[i] return total / count def joint_angle(self): """ Returns logical joint angle in joint degrees. """ return self.motor_angle() / self.gear_ratio def joint_to_motor(self, joint_angle): """ Converts joint degrees to motor degrees. """ return joint_angle * self.gear_ratio def motor_to_joint(self, motor_angle): """ Converts motor degrees to joint degrees. """ return motor_angle / self.gear_ratio def reset_motor_angle(self, logical_motor_angle): """ Sets the current logical motor-coordinate angle. Use carefully: this changes encoder reference, not physical position. """ for i in range(len(self.motors)): self.motors[i].reset_angle(logical_motor_angle * self.signs[i]) def reset_joint_angle(self, logical_joint_angle): """ Sets the current logical joint-coordinate angle. Example: reset_joint_angle(0) makes the current physical pose become logical joint angle 0. """ logical_motor_angle = self.joint_to_motor(logical_joint_angle) self.reset_motor_angle(logical_motor_angle) def check_joint_limit(self, logical_joint_target): """ Validates target joint angle against JOINT_LIMITS. Limits are in joint degrees. """ limits = JOINT_LIMITS.get(self.name) if limits is None: return min_angle = limits[0] max_angle = limits[1] if logical_joint_target < min_angle or logical_joint_target > max_angle: raise ValueError( 'Target joint angle {} for joint {} is outside limits {}..{}'.format( logical_joint_target, self.name, min_angle, max_angle ) ) def stop_mode(self): """ Returns final stop mode for this joint. """ if self.name in STOP_MODE_BY_JOINT: return STOP_MODE_BY_JOINT[self.name] return DEFAULT_STOP_MODE def physical_motor_target(self, logical_motor_target, motor_index): """ Converts logical motor target to physical encoder target. """ return int(logical_motor_target * self.signs[motor_index]) def deadband_motor_deg(self): """ Returns movement deadband in motor degrees. """ if self.name in DEADBAND_MOTOR_DEG_BY_JOINT: return DEADBAND_MOTOR_DEG_BY_JOINT[self.name] return 0 def is_small_motor_delta(self, motor_delta): """ Checks whether a requested movement is below configured deadband. """ return abs_value(motor_delta) <= self.deadband_motor_deg() def run_motor_target(self, logical_motor_target, speed): """ Moves to target expressed in motor degrees using run_target(). """ logical_joint_target = self.motor_to_joint(logical_motor_target) self.check_joint_limit(logical_joint_target) stop_mode = self.stop_mode() for i in range(len(self.motors)): self.motors[i].run_target( speed, self.physical_motor_target(logical_motor_target, i), then=stop_mode, wait=False ) def track_motor_target(self, logical_motor_target): """ Sends target expressed in motor degrees using track_target(). Used for timed/eased profiles where the target changes many times during movement. """ logical_joint_target = self.motor_to_joint(logical_motor_target) self.check_joint_limit(logical_joint_target) for i in range(len(self.motors)): self.motors[i].track_target( self.physical_motor_target(logical_motor_target, i) ) def apply_stop_mode(self): """ Applies the configured final stop behavior manually. Used after track_target() profiles and skipped tiny moves. """ stop_mode = self.stop_mode() for motor in self.motors: if stop_mode == Stop.HOLD: motor.hold() elif stop_mode == Stop.BRAKE: motor.brake() elif stop_mode == Stop.COAST: motor.stop() else: motor.brake() def all_motors(self): """ Returns all physical motors belonging to this joint. """ return self.motors # ============================================================ # JOINT OBJECTS # ============================================================ base = Joint( 'base', [base_motor], [1], GEAR_RATIOS['base'] ) shoulder = Joint( 'shoulder', [shoulder_left_motor, shoulder_right_motor], SHOULDER_SIGNS, GEAR_RATIOS['shoulder'] ) elbow = Joint( 'elbow', [elbow_motor], [1], GEAR_RATIOS['elbow'] ) JOINTS = { 'base': base, 'shoulder': shoulder, 'elbow': elbow, } JOINT_ORDER = ['base', 'shoulder', 'elbow'] # ============================================================ # MOTOR CONTROL SETUP # ============================================================ def configure_joint_motor_control(joint): """ Applies optional Pybricks Motor.control settings for one joint. Settings are applied to all physical motors in that joint. Important: Pybricks documentation says control settings should be changed while the controller is stopped. This function stops motors before applying settings. """ for motor in joint.all_motors(): motor.stop() limits = MOTOR_CONTROL_LIMITS_BY_JOINT.get(joint.name) target_tolerances = TARGET_TOLERANCES_BY_JOINT.get(joint.name) stall_tolerances = STALL_TOLERANCES_BY_JOINT.get(joint.name) pid_settings = PID_SETTINGS_BY_JOINT.get(joint.name) for motor in joint.all_motors(): if limits is not None: speed = limits[0] acceleration = limits[1] actuation = limits[2] motor.control.limits(speed, acceleration, actuation) if target_tolerances is not None: speed_tol = target_tolerances[0] position_tol = target_tolerances[1] motor.control.target_tolerances(speed_tol, position_tol) if stall_tolerances is not None: stall_speed = stall_tolerances[0] stall_time = stall_tolerances[1] motor.control.stall_tolerances(stall_speed, stall_time) if pid_settings is not None: kp = pid_settings[0] ki = pid_settings[1] kd = pid_settings[2] integral_range = pid_settings[3] integral_rate = pid_settings[4] feed_forward = pid_settings[5] motor.control.pid( kp, ki, kd, integral_range, integral_rate, feed_forward ) def configure_all_motor_controls(): """ Applies configured motor-control parameters to all joints. """ if not APPLY_MOTOR_CONTROL_SETTINGS: return for name in JOINT_ORDER: configure_joint_motor_control(JOINTS[name]) # ============================================================ # COMMAND HELPERS # ============================================================ def ABS_M(joint_name, motor_angle): """ Absolute target in motor degrees. Example: ABS_M('base', 90) """ return { 'joint': joint_name, 'mode': 'abs', 'unit': 'motor', 'value': motor_angle } def REL_M(joint_name, motor_delta): """ Relative movement in motor degrees. Example: REL_M('base', -45) """ return { 'joint': joint_name, 'mode': 'rel', 'unit': 'motor', 'value': motor_delta } def ABS_J(joint_name, joint_angle): """ Absolute target in joint degrees. Example: ABS_J('base', 20) With gear ratio 7.5, motor target becomes 150 deg. """ return { 'joint': joint_name, 'mode': 'abs', 'unit': 'joint', 'value': joint_angle } def REL_J(joint_name, joint_delta): """ Relative movement in joint degrees. Example: REL_J('elbow', 10) With gear ratio 5.0, motor target changes by 50 deg. """ return { 'joint': joint_name, 'mode': 'rel', 'unit': 'joint', 'value': joint_delta } # Backward-compatible aliases. # These behave like the original program: values are motor degrees. def ABS(joint_name, angle): return ABS_M(joint_name, angle) def REL(joint_name, delta): return REL_M(joint_name, delta) def PARALLEL(label, commands, duration_ms=None, pause_ms=0, easing=None, clamp_easing=None): """ Creates a simultaneous movement step. commands: List of commands executed at the same time. duration_ms: None: use run_target() with DEFAULT_SPEED. integer: use timed/eased track_target() profile. easing: Name of easing function, e.g.: 'linear' 'easeInOutSine' 'easeOutQuad' clamp_easing: None: use global CLAMP_EASING_OUTPUT. True/False: override global setting for this step. """ return { 'type': 'parallel', 'label': label, 'commands': commands, 'duration_ms': duration_ms, 'pause_ms': pause_ms, 'easing': easing, 'clamp_easing': clamp_easing } def SERIES(label, commands, duration_ms=None, pause_ms=0, easing=None, clamp_easing=None, duration_mode='total'): """ Creates a sequential movement step. commands can be: 1. Flat serial command list: [ ABS_J('base', 10), ABS_J('base', 0) ] 2. Serial list of parallel groups: [ [ABS_J('base', 10), ABS_J('shoulder', 5)], [ABS_J('base', 0), ABS_J('shoulder', 0)] ] duration_mode: 'total': duration_ms is divided equally across serial groups. 'per_group': each serial group receives the full duration_ms. """ return { 'type': 'series', 'label': label, 'commands': commands, 'duration_ms': duration_ms, 'pause_ms': pause_ms, 'easing': easing, 'clamp_easing': clamp_easing, 'duration_mode': duration_mode } # ============================================================ # COMMAND PLANNING # ============================================================ def is_command_object(item): """ Returns True if item looks like a movement command. """ return type(item) == dict and 'joint' in item def validate_parallel_commands(commands): """ Prevents commanding the same joint twice in one parallel group. Example of invalid group: [ ABS_J('base', 10), REL_J('base', 5) ] Use SERIES instead. """ seen = [] for command in commands: joint_name = command['joint'] if joint_name in seen: raise ValueError( 'Joint {} appears more than once in one parallel group.'.format( joint_name ) ) seen.append(joint_name) def normalize_series_groups(commands): """ Converts flat serial command list into list of command groups. Input: [cmd1, cmd2, cmd3] Output: [[cmd1], [cmd2], [cmd3]] """ groups = [] for item in commands: if is_command_object(item): groups.append([item]) else: groups.append(item) return groups def plan_commands(commands): """ Converts user commands into executable motor-coordinate plans. Each plan contains: - joint object - start motor angle - target motor angle - start joint angle - target joint angle - delta values - skip flag for tiny movement deadband """ validate_parallel_commands(commands) plans = [] for command in commands: joint = JOINTS[command['joint']] mode = command['mode'] unit = command.get('unit', 'motor') value = command['value'] start_motor = joint.motor_angle() start_joint = joint.joint_angle() if mode == 'abs': if unit == 'motor': target_motor = value target_joint = joint.motor_to_joint(target_motor) elif unit == 'joint': target_joint = value target_motor = joint.joint_to_motor(target_joint) else: raise ValueError('Unknown unit: {}'.format(unit)) elif mode == 'rel': if unit == 'motor': target_motor = start_motor + value target_joint = joint.motor_to_joint(target_motor) elif unit == 'joint': target_joint = start_joint + value target_motor = joint.joint_to_motor(target_joint) else: raise ValueError('Unknown unit: {}'.format(unit)) else: raise ValueError('Unknown command mode: {}'.format(mode)) joint.check_joint_limit(target_joint) delta_motor = target_motor - start_motor delta_joint = target_joint - start_joint skip_small_move = joint.is_small_motor_delta(delta_motor) plans.append({ 'joint': joint, 'start_motor': start_motor, 'target_motor': target_motor, 'start_joint': start_joint, 'target_joint': target_joint, 'delta_motor': delta_motor, 'delta_joint': delta_joint, 'skip_small_move': skip_small_move, 'last_sent_motor_target': None, 'command': command }) return plans def collect_motors(plans): """ Returns all physical motors used by a set of movement plans. """ motors = [] for plan in plans: joint = plan['joint'] for motor in joint.all_motors(): motors.append(motor) return motors def active_plans_only(plans): """ Returns plans that are not skipped by the small-movement deadband. """ active = [] for plan in plans: if not plan['skip_small_move']: active.append(plan) return active # ============================================================ # DISPLAY AND LOGGING # ============================================================ def display_step_header(step_num, label): """ Shows current step on EV3 screen and console. """ ev3.screen.clear() ev3.screen.print('=== STEP {} ==='.format(step_num)) ev3.screen.print(label) print('') print('=' * 70) print('STEP {}: {}'.format(step_num, label)) print('=' * 70) def print_joint_states(prefix): """ Prints current motor and joint angles for all joints. """ print(prefix) for name in JOINT_ORDER: joint = JOINTS[name] print( ' {}: motor={} deg, joint={} deg'.format( name, joint.motor_angle(), joint.joint_angle() ) ) def print_available_easings(): """ Prints all supported easing names. """ print('Available easing styles:') for name in EASING_NAMES: print(' {}'.format(name)) def print_motion_plan(plans): """ Prints planned movement before execution. """ for plan in plans: joint = plan['joint'] print( ' {}: motor {} -> {} deg, joint {} -> {} deg'.format( joint.name, plan['start_motor'], plan['target_motor'], plan['start_joint'], plan['target_joint'] ) ) print( ' delta: motor={} deg, joint={} deg, skip_small_move={}'.format( plan['delta_motor'], plan['delta_joint'], plan['skip_small_move'] ) ) def print_final_errors(plans): """ Prints final angle error after movement. """ print('Final errors:') for plan in plans: joint = plan['joint'] actual_motor = joint.motor_angle() actual_joint = joint.joint_angle() motor_error = plan['target_motor'] - actual_motor joint_error = plan['target_joint'] - actual_joint print( ' {}: motor_error={} deg, joint_error={} deg'.format( joint.name, motor_error, joint_error ) ) # ============================================================ # WAITING AND STALL DETECTION # ============================================================ def wait_until_done(motors, timeout_ms): """ Waits until all motors report control.done(). Returns: True if all motors completed movement. False if timeout or stall occurred. """ timer = StopWatch() while True: all_done = True for motor in motors: if motor.control.stalled(): print('WARNING: motor stalled') motor.brake() return False if not motor.control.done(): all_done = False if all_done: return True if timeout_ms is not None and timer.time() > timeout_ms: print('WARNING: timeout while waiting for motors') for motor in motors: motor.brake() return False wait(POLL_MS) # ============================================================ # SPEED SAFETY # ============================================================ def check_required_speed(plan, duration_ms): """ Checks whether required average speed exceeds recommended limit. This does not measure peak speed. Easing profiles can have peak speeds higher than average speed. """ joint = plan['joint'] if duration_ms <= 0: raise ValueError('duration_ms must be greater than 0') required_avg_speed = abs_value(plan['delta_motor']) * 1000 / duration_ms max_speed = MAX_AVG_MOTOR_SPEED_BY_JOINT.get(joint.name) if max_speed is None: return True if required_avg_speed > max_speed: message = ( 'WARNING: required average motor speed for {} is {} deg/s, ' 'recommended max is {} deg/s'.format( joint.name, required_avg_speed, max_speed ) ) print(message) if ABORT_IF_REQUIRED_SPEED_TOO_HIGH: return False return True # ============================================================ # MOVEMENT EXECUTION # ============================================================ def handle_skipped_small_moves(plans): """ Logs skipped tiny moves and optionally applies final stop mode. """ for plan in plans: if plan['skip_small_move']: joint = plan['joint'] print( ' SKIP tiny move for {}: delta_motor={} deg, deadband={} deg'.format( joint.name, plan['delta_motor'], joint.deadband_motor_deg() ) ) if APPLY_STOP_AFTER_SKIPPED_SMALL_MOVE: joint.apply_stop_mode() def move_parallel_target(commands, speed=DEFAULT_SPEED): """ Executes simultaneous target movement using run_target(). Used when duration_ms is None. """ plans = plan_commands(commands) active_plans = active_plans_only(plans) print_joint_states('Before target move:') print_motion_plan(plans) handle_skipped_small_moves(plans) if len(active_plans) == 0: print('No active movement after deadband filtering.') print_joint_states('After skipped target move:') return True for plan in active_plans: joint = plan['joint'] target_motor = plan['target_motor'] joint.run_motor_target(target_motor, speed) motors = collect_motors(active_plans) ok = wait_until_done(motors, 30000) print_joint_states('After target move:') print_final_errors(plans) return ok def maybe_send_track_target(plan, motor_target, force_send): """ Sends track_target() only when target changed enough, unless force_send=True. This reduces small high-frequency target updates that can cause jitter. """ joint = plan['joint'] last_sent = plan['last_sent_motor_target'] if force_send: joint.track_motor_target(motor_target) plan['last_sent_motor_target'] = motor_target return if last_sent is None: joint.track_motor_target(motor_target) plan['last_sent_motor_target'] = motor_target return if TRACK_TARGET_MIN_UPDATE_MOTOR_DEG <= 0: joint.track_motor_target(motor_target) plan['last_sent_motor_target'] = motor_target return if abs_value(motor_target - last_sent) >= TRACK_TARGET_MIN_UPDATE_MOTOR_DEG: joint.track_motor_target(motor_target) plan['last_sent_motor_target'] = motor_target def move_parallel_timed(commands, duration_ms, easing_name=None, clamp_easing=None): """ Executes simultaneous timed/eased movement using track_target(). duration_ms: Total intended duration of the movement. easing_name: Name of easing function. If None, DEFAULT_EASING is used. clamp_easing: Whether to clamp easing output to [0, 1]. """ if duration_ms is None: return move_parallel_target(commands, DEFAULT_SPEED) if duration_ms <= 0: raise ValueError('duration_ms must be greater than 0') if clamp_easing is None: clamp_easing = CLAMP_EASING_OUTPUT easing_func = get_easing(easing_name) plans = plan_commands(commands) active_plans = active_plans_only(plans) print_joint_states('Before timed profile:') print('Timed profile duration: {} ms'.format(duration_ms)) print('Easing: {}'.format(easing_name if easing_name is not None else DEFAULT_EASING)) print('Clamp easing output: {}'.format(clamp_easing)) print_motion_plan(plans) handle_skipped_small_moves(plans) if len(active_plans) == 0: print('No active movement after deadband filtering.') print_joint_states('After skipped timed profile:') return True for plan in active_plans: ok_speed = check_required_speed(plan, duration_ms) if not ok_speed: print('ERROR: required speed too high, movement aborted.') return False timer = StopWatch() while timer.time() < duration_ms: elapsed = timer.time() linear_ratio = elapsed / duration_ms linear_ratio = clamp(linear_ratio, 0, 1) eased_ratio = easing_func(linear_ratio) if clamp_easing: eased_ratio = clamp(eased_ratio, 0, 1) for plan in active_plans: start_motor = plan['start_motor'] target_motor = plan['target_motor'] current_motor_target = start_motor + (target_motor - start_motor) * eased_ratio maybe_send_track_target( plan, current_motor_target, False ) for plan in active_plans: joint = plan['joint'] for motor in joint.all_motors(): if motor.control.stalled(): print('WARNING: motor stalled during timed profile') motor.brake() return False wait(PROFILE_UPDATE_MS) # Final exact target command. for plan in active_plans: maybe_send_track_target( plan, plan['target_motor'], True ) elapsed = timer.time() if TIMED_FINAL_TARGET_DWELL_MS > 0: wait(TIMED_FINAL_TARGET_DWELL_MS) if SETTLE_AFTER_TIMED_MOVE: for plan in active_plans: joint = plan['joint'] target_motor = plan['target_motor'] joint.run_motor_target(target_motor, SETTLE_SPEED) motors = collect_motors(active_plans) ok = wait_until_done(motors, 10000) if not ok: return False else: for plan in active_plans: joint = plan['joint'] joint.apply_stop_mode() print('Timed profile requested: {} ms'.format(duration_ms)) print('Timed profile elapsed: {} ms'.format(elapsed)) print_joint_states('After timed profile:') print_final_errors(plans) return True def execute_parallel_group(commands, duration_ms=None, easing_name=None, clamp_easing=None): """ Executes one parallel group. If duration_ms is None: use target movement with DEFAULT_SPEED. If duration_ms is set: use timed/eased movement. """ if len(commands) == 0: return True if duration_ms is None: return move_parallel_target(commands, DEFAULT_SPEED) return move_parallel_timed( commands, duration_ms, easing_name, clamp_easing ) def run_step(step_num, step): """ Executes one high-level step. Step can be: - PARALLEL(...) - SERIES(...) """ display_step_header(step_num, step['label']) sound_step_start() step_type = step['type'] commands = step['commands'] duration_ms = step.get('duration_ms', None) pause_ms = step.get('pause_ms', 0) easing_name = step.get('easing', None) clamp_easing = step.get('clamp_easing', None) if len(commands) == 0: print_joint_states('Current state:') if pause_ms > 0: wait(pause_ms) sound_step_ok() return True if step_type == 'parallel': ok = execute_parallel_group( commands, duration_ms, easing_name, clamp_easing ) elif step_type == 'series': groups = normalize_series_groups(commands) duration_mode = step.get('duration_mode', 'total') ok = True if duration_ms is None: group_duration_ms = None for group in groups: ok = execute_parallel_group( group, group_duration_ms, easing_name, clamp_easing ) if not ok: break else: if duration_mode == 'total': group_duration_ms = int(duration_ms / len(groups)) if group_duration_ms <= 0: raise ValueError('Series duration is too short for the number of groups') for group in groups: ok = execute_parallel_group( group, group_duration_ms, easing_name, clamp_easing ) if not ok: break elif duration_mode == 'per_group': group_duration_ms = duration_ms for group in groups: ok = execute_parallel_group( group, group_duration_ms, easing_name, clamp_easing ) if not ok: break else: raise ValueError('Unknown duration_mode: {}'.format(duration_mode)) else: raise ValueError('Unknown step type: {}'.format(step_type)) if pause_ms > 0: wait(pause_ms) if ok: sound_step_ok() else: sound_step_fail() return ok def reset_all_current_positions_as_zero(): """ Makes current physical pose become logical 0 deg for all joints. """ for name in JOINT_ORDER: JOINTS[name].reset_joint_angle(0) def apply_stop_to_all_joints(): """ Applies configured stop mode to all joints. """ for name in JOINT_ORDER: JOINTS[name].apply_stop_mode() # ============================================================ # TEST / RESEARCH MOVEMENT SEQUENCE # ============================================================ TEST_STEPS = [ PARALLEL( 'Reference pose: current physical position is logical zero', [], pause_ms=1000 ), PARALLEL( 'Homing: all joints to zero, linear', [ ABS_J('base', 0), ABS_J('shoulder', 0), ABS_J('elbow', 0), ], duration_ms=3000, easing='linear', pause_ms=1000 ), SERIES( 'Base joint-angle test, easeInOutSine, total 6s', [ ABS_J('base', 15), ABS_J('base', -15), ABS_J('base', 0), ], duration_ms=6000, duration_mode='total', easing='easeInOutSine', pause_ms=1000 ), SERIES( 'Shoulder joint-angle test, easeInOutQuad, total 6s', [ ABS_J('shoulder', 8), ABS_J('shoulder', -8), ABS_J('shoulder', 0), ], duration_ms=6000, duration_mode='total', easing='easeInOutQuad', pause_ms=1000 ), SERIES( 'Elbow joint-angle test, easeInOutCubic, total 6s', [ ABS_J('elbow', 15), ABS_J('elbow', -15), ABS_J('elbow', 0), ], duration_ms=6000, duration_mode='total', easing='easeInOutCubic', pause_ms=1000 ), PARALLEL( 'Base + shoulder synchronized, easeInOutSine', [ ABS_J('base', 12), ABS_J('shoulder', 8), ], duration_ms=3000, easing='easeInOutSine', pause_ms=1000 ), PARALLEL( 'Base + shoulder + elbow synchronized, easeInOutQuad', [ ABS_J('base', -12), ABS_J('shoulder', -8), ABS_J('elbow', 12), ], duration_ms=3000, easing='easeInOutQuad', pause_ms=1000 ), SERIES( 'Mixed serial groups, total 9s, easeInOutSine', [ [ ABS_J('base', 10), ABS_J('shoulder', 5), ], [ ABS_J('base', -10), ABS_J('elbow', 10), ], [ ABS_J('base', 0), ABS_J('shoulder', 0), ABS_J('elbow', 0), ], ], duration_ms=9000, duration_mode='total', easing='easeInOutSine', pause_ms=1000 ), SERIES( 'Relative joint movement, easeOutQuad, total 6s', [ REL_J('base', 10), REL_J('base', 10), REL_J('base', -20), ], duration_ms=6000, duration_mode='total', easing='easeOutQuad', pause_ms=1000 ), PARALLEL( 'Small bounce-style test, clamped', [ ABS_J('base', 8), ABS_J('elbow', 8), ], duration_ms=3000, easing='easeOutBounce', clamp_easing=True, pause_ms=1000 ), PARALLEL( 'Return all joints to zero, easeInOutSine', [ ABS_J('base', 0), ABS_J('shoulder', 0), ABS_J('elbow', 0), ], duration_ms=3000, easing='easeInOutSine', pause_ms=1000 ), # Example raw motor-angle calibration step. # Uncomment only if needed. # # PARALLEL( # 'Raw motor-angle calibration test', # [ # ABS_M('base', 90), # base joint = 90 / 7.5 = 12 deg # ABS_M('shoulder', 90), # shoulder joint = 90 / 7.5 = 12 deg # ABS_M('elbow', 90), # elbow joint = 90 / 5.0 = 18 deg # ], # duration_ms=3000, # easing='linear', # pause_ms=1000 # ), ] # ============================================================ # MAIN PROGRAM # ============================================================ ev3.screen.clear() ev3.screen.print('=== MASTER BRICK ===') ev3.screen.print('Motion test ready') print('') print('#' * 70) print('# ROBOTIC ARM MOTION TEST START') print('#' * 70) print('') print_available_easings() print('') print('Gear ratios:') print(' base: {}'.format(GEAR_RATIOS['base'])) print(' shoulder: {}'.format(GEAR_RATIOS['shoulder'])) print(' elbow: {}'.format(GEAR_RATIOS['elbow'])) print('') print('Stop modes:') print(' base: {}'.format(STOP_MODE_BY_JOINT['base'])) print(' shoulder: {}'.format(STOP_MODE_BY_JOINT['shoulder'])) print(' elbow: {}'.format(STOP_MODE_BY_JOINT['elbow'])) print('') print('Jitter mitigation:') print(' deadband motor deg by joint: {}'.format(DEADBAND_MOTOR_DEG_BY_JOINT)) print(' track target min update motor deg: {}'.format(TRACK_TARGET_MIN_UPDATE_MOTOR_DEG)) print(' apply motor control settings: {}'.format(APPLY_MOTOR_CONTROL_SETTINGS)) print('') print('Default easing: {}'.format(DEFAULT_EASING)) print('Clamp easing output: {}'.format(CLAMP_EASING_OUTPUT)) print('Settle after timed move: {}'.format(SETTLE_AFTER_TIMED_MOVE)) print('Timed final target dwell: {} ms'.format(TIMED_FINAL_TARGET_DWELL_MS)) # Apply gentler control settings before movement. configure_all_motor_controls() # Current physical pose becomes logical joint angle 0. # Place the arm in a known safe starting pose before running the program. reset_all_current_positions_as_zero() print_joint_states('Initial logical zero state:') for i in range(len(TEST_STEPS)): ok = run_step(i, TEST_STEPS[i]) if not ok: print('ERROR: step {} failed or stalled'.format(i)) sound_step_fail() break apply_stop_to_all_joints() ev3.screen.clear() ev3.screen.print('=== MASTER BRICK ===') ev3.screen.print('ALL TESTS COMPLETE') sound_all_complete() print('') print('#' * 70) print('# ALL TESTS COMPLETE') print('#' * 70) print('')