#!/usr/bin/env pybricks-micropython from math import sin, cos, sqrt, pi from pybricks.hubs import EV3Brick from pybricks.ev3devices import Motor from pybricks.parameters import Port, Stop from pybricks.tools import wait, StopWatch # ============================================================ # SLAVE BRICK STANDALONE JOINT TEST PROGRAM # ============================================================ # # Purpose: # Test only the motors/joints connected to the second EV3 brick. # # This version is autonomous: # - no Bluetooth # - no master brick # - no phone/XR logging integration yet # # It supports: # - absolute motor-angle commands # - relative motor-angle commands # - absolute joint-angle commands with gear ratio # - relative joint-angle commands with gear ratio # - serial movement # - parallel movement # - timed movement # - easing profiles # - sound after each step # - low-angle jitter mitigation # - configurable stop modes and motor-control tolerances # # Important: # The gear ratios for the second brick are currently placeholders. # Replace them with the real gear ratios of your slave-side joints. # ============================================================ # GLOBAL MOTION SETTINGS # ============================================================ DEFAULT_SPEED = 200 # motor deg/s for non-timed movement SETTLE_SPEED = 120 # motor deg/s for optional final correction POLL_MS = 10 PROFILE_UPDATE_MS = 20 SETTLE_AFTER_TIMED_MOVE = False TIMED_FINAL_TARGET_DWELL_MS = 0 DEFAULT_EASING = 'linear' CLAMP_EASING_OUTPUT = True # ============================================================ # SOUND SETTINGS # ============================================================ SOUND_ENABLED = True SOUND_AT_STEP_START = False SOUND_AFTER_STEP = True SOUND_ON_ERROR = True SOUND_STEP_START = (500, 60) SOUND_STEP_OK = (1000, 90) SOUND_STEP_FAIL = (220, 250) SOUND_COMPLETE = (1200, 120) # ============================================================ # JITTER MITIGATION SETTINGS # ============================================================ # Movement smaller than this number of MOTOR degrees is skipped. # Increase this if low-angle movements cause buzzing/jitter. DEFAULT_DEADBAND_MOTOR_DEG = 1.5 # During timed movement, a new target is sent only if it differs from # the previously sent target by at least this many MOTOR degrees. TRACK_TARGET_MIN_UPDATE_MOTOR_DEG = 1.0 APPLY_STOP_AFTER_SKIPPED_SMALL_MOVE = True # ============================================================ # MOTOR CONTROL SETTINGS # ============================================================ APPLY_MOTOR_CONTROL_SETTINGS = True # Conservative defaults. # Format: # speed limit, acceleration limit, actuation limit # # Units: # speed: deg/s # acceleration: deg/s/s # actuation: percent DEFAULT_CONTROL_LIMITS = (400, 600, 75) # Format: # speed tolerance, position tolerance # # Larger position tolerance can reduce target jitter. DEFAULT_TARGET_TOLERANCES = (20, 2) # Format: # stall speed, stall time DEFAULT_STALL_TOLERANCES = (20, 1000) # Optional PID override. # Keep None unless you deliberately tune PID. # # Format: # (kp, ki, kd, integral_range, integral_rate, feed_forward) DEFAULT_PID_SETTINGS = None # ============================================================ # SPEED SAFETY SETTINGS # ============================================================ ABORT_IF_REQUIRED_SPEED_TOO_HIGH = False DEFAULT_MAX_AVG_MOTOR_SPEED = 220 # ============================================================ # SLAVE JOINT CONFIGURATION # ============================================================ # # Edit this section to match the real second-brick joints. # # Fields: # name: # Logical joint name used in movement commands. # # port: # EV3 motor port. # # gear_ratio: # motor_angle = joint_angle * gear_ratio # # If unknown, keep 1.0 temporarily and test in motor-angle mode first. # # sign: # 1 or -1. # Use -1 if the logical direction is opposite to physical motor direction. # # joint_limits: # None or (min_angle, max_angle), in JOINT degrees. # # stop_mode: # Stop.BRAKE usually reduces active jitter. # Stop.HOLD is stronger but may jitter under backlash/load. # # deadband_motor_deg: # Tiny movement skip threshold in MOTOR degrees. # # max_avg_motor_speed: # Warning/abort threshold for timed movement. # # Example real naming could be: # wrist_pitch, wrist_roll, wrist_yaw, gripper # # I use neutral names here because your exact second-brick mechanism # was not specified. SLAVE_JOINT_CONFIGS = [ { 'name': 'elbow_rotate', 'port': Port.A, 'gear_ratio': 7.5, 'sign': 1, 'joint_limits': None, 'stop_mode': Stop.BRAKE, 'deadband_motor_deg': DEFAULT_DEADBAND_MOTOR_DEG, 'max_avg_motor_speed': DEFAULT_MAX_AVG_MOTOR_SPEED, }, { 'name': 'wrist_flexion_extension', 'port': Port.B, 'gear_ratio': 5.0, 'sign': 1, 'joint_limits': None, 'stop_mode': Stop.BRAKE, 'deadband_motor_deg': DEFAULT_DEADBAND_MOTOR_DEG, 'max_avg_motor_speed': DEFAULT_MAX_AVG_MOTOR_SPEED, }, { 'name': 'wrist_pronation_supination', 'port': Port.C, 'gear_ratio': 7.5, 'sign': 1, 'joint_limits': None, 'stop_mode': Stop.BRAKE, 'deadband_motor_deg': DEFAULT_DEADBAND_MOTOR_DEG, 'max_avg_motor_speed': DEFAULT_MAX_AVG_MOTOR_SPEED, }, { 'name': 'gripper', 'port': Port.D, 'gear_ratio': 1.0, 'sign': 1, 'joint_limits': None, 'stop_mode': Stop.BRAKE, 'deadband_motor_deg': DEFAULT_DEADBAND_MOTOR_DEG, 'max_avg_motor_speed': DEFAULT_MAX_AVG_MOTOR_SPEED, }, ] # ============================================================ # HARDWARE INITIALIZATION # ============================================================ ev3 = EV3Brick() # ============================================================ # BASIC UTILITIES # ============================================================ def clamp(value, min_value, max_value): if value < min_value: return min_value if value > max_value: return max_value return value def abs_value(value): if value < 0: return -value return value # ============================================================ # SOUND FUNCTIONS # ============================================================ def beep_once(sound_tuple): if not SOUND_ENABLED: return frequency = sound_tuple[0] duration = sound_tuple[1] ev3.speaker.beep(frequency=frequency, duration=duration) def sound_step_start(): if SOUND_AT_STEP_START: beep_once(SOUND_STEP_START) def sound_step_ok(): if SOUND_AFTER_STEP: beep_once(SOUND_STEP_OK) def sound_step_fail(): if SOUND_ON_ERROR: beep_once(SOUND_STEP_FAIL) wait(80) beep_once(SOUND_STEP_FAIL) def sound_all_complete(): if not SOUND_ENABLED: return beep_once(SOUND_COMPLETE) wait(80) beep_once(SOUND_COMPLETE) # ============================================================ # EASING FUNCTIONS # ============================================================ def ease_linear(x): return x def ease_in_sine(x): return 1 - cos((x * pi) / 2) def ease_out_sine(x): return sin((x * pi) / 2) def ease_in_out_sine(x): return -(cos(pi * x) - 1) / 2 def ease_in_quad(x): return x * x def ease_out_quad(x): return 1 - (1 - x) * (1 - x) def ease_in_out_quad(x): if x < 0.5: return 2 * x * x return 1 - ((-2 * x + 2) ** 2) / 2 def ease_in_cubic(x): return x * x * x def ease_out_cubic(x): return 1 - ((1 - x) ** 3) def ease_in_out_cubic(x): if x < 0.5: return 4 * x * x * x return 1 - ((-2 * x + 2) ** 3) / 2 def ease_in_quart(x): return x ** 4 def ease_out_quart(x): return 1 - ((1 - x) ** 4) def ease_in_out_quart(x): if x < 0.5: return 8 * (x ** 4) return 1 - ((-2 * x + 2) ** 4) / 2 def ease_in_quint(x): return x ** 5 def ease_out_quint(x): return 1 - ((1 - x) ** 5) def ease_in_out_quint(x): if x < 0.5: return 16 * (x ** 5) return 1 - ((-2 * x + 2) ** 5) / 2 def ease_in_expo(x): if x == 0: return 0 return 2 ** (10 * x - 10) def ease_out_expo(x): if x == 1: return 1 return 1 - 2 ** (-10 * x) def ease_in_out_expo(x): if x == 0: return 0 if x == 1: return 1 if x < 0.5: return (2 ** (20 * x - 10)) / 2 return (2 - 2 ** (-20 * x + 10)) / 2 def ease_in_circ(x): return 1 - sqrt(1 - x * x) def ease_out_circ(x): return sqrt(1 - ((x - 1) ** 2)) def ease_in_out_circ(x): if x < 0.5: return (1 - sqrt(1 - ((2 * x) ** 2))) / 2 return (sqrt(1 - ((-2 * x + 2) ** 2)) + 1) / 2 def ease_in_back(x): c1 = 1.70158 c3 = c1 + 1 return c3 * x * x * x - c1 * x * x def ease_out_back(x): c1 = 1.70158 c3 = c1 + 1 return 1 + c3 * ((x - 1) ** 3) + c1 * ((x - 1) ** 2) def ease_in_out_back(x): c1 = 1.70158 c2 = c1 * 1.525 if x < 0.5: return (((2 * x) ** 2) * ((c2 + 1) * 2 * x - c2)) / 2 return ((((2 * x - 2) ** 2) * ((c2 + 1) * (2 * x - 2) + c2)) + 2) / 2 def ease_in_elastic(x): c4 = (2 * pi) / 3 if x == 0: return 0 if x == 1: return 1 return -(2 ** (10 * x - 10)) * sin((x * 10 - 10.75) * c4) def ease_out_elastic(x): c4 = (2 * pi) / 3 if x == 0: return 0 if x == 1: return 1 return (2 ** (-10 * x)) * sin((x * 10 - 0.75) * c4) + 1 def ease_in_out_elastic(x): c5 = (2 * pi) / 4.5 if x == 0: return 0 if x == 1: return 1 if x < 0.5: return -((2 ** (20 * x - 10)) * sin((20 * x - 11.125) * c5)) / 2 return ((2 ** (-20 * x + 10)) * sin((20 * x - 11.125) * c5)) / 2 + 1 def ease_out_bounce(x): n1 = 7.5625 d1 = 2.75 if x < 1 / d1: return n1 * x * x elif x < 2 / d1: x = x - 1.5 / d1 return n1 * x * x + 0.75 elif x < 2.5 / d1: x = x - 2.25 / d1 return n1 * x * x + 0.9375 else: x = x - 2.625 / d1 return n1 * x * x + 0.984375 def ease_in_bounce(x): return 1 - ease_out_bounce(1 - x) def ease_in_out_bounce(x): if x < 0.5: return (1 - ease_out_bounce(1 - 2 * x)) / 2 return (1 + ease_out_bounce(2 * x - 1)) / 2 EASINGS = { 'linear': ease_linear, 'easeInSine': ease_in_sine, 'easeOutSine': ease_out_sine, 'easeInOutSine': ease_in_out_sine, 'easeInQuad': ease_in_quad, 'easeOutQuad': ease_out_quad, 'easeInOutQuad': ease_in_out_quad, 'easeInCubic': ease_in_cubic, 'easeOutCubic': ease_out_cubic, 'easeInOutCubic': ease_in_out_cubic, 'easeInQuart': ease_in_quart, 'easeOutQuart': ease_out_quart, 'easeInOutQuart': ease_in_out_quart, 'easeInQuint': ease_in_quint, 'easeOutQuint': ease_out_quint, 'easeInOutQuint': ease_in_out_quint, 'easeInExpo': ease_in_expo, 'easeOutExpo': ease_out_expo, 'easeInOutExpo': ease_in_out_expo, 'easeInCirc': ease_in_circ, 'easeOutCirc': ease_out_circ, 'easeInOutCirc': ease_in_out_circ, 'easeInBack': ease_in_back, 'easeOutBack': ease_out_back, 'easeInOutBack': ease_in_out_back, 'easeInElastic': ease_in_elastic, 'easeOutElastic': ease_out_elastic, 'easeInOutElastic': ease_in_out_elastic, 'easeInBounce': ease_in_bounce, 'easeOutBounce': ease_out_bounce, 'easeInOutBounce': ease_in_out_bounce, } def get_easing(easing_name): if easing_name is None: easing_name = DEFAULT_EASING if easing_name in EASINGS: return EASINGS[easing_name] key_lower = easing_name.lower() for name in EASINGS: if name.lower() == key_lower: return EASINGS[name] raise ValueError('Unknown easing: {}'.format(easing_name)) # ============================================================ # JOINT CLASS # ============================================================ class Joint: """ Represents one logical slave-brick joint. This version assumes one physical motor per joint. If some slave-side mechanism uses two motors for one joint, this class can be extended in the same way as the master-brick shoulder joint. Coordinate convention: motor_angle = joint_angle * gear_ratio """ def __init__(self, name, motor, sign, gear_ratio, joint_limits, stop_mode, deadband_motor_deg, max_avg_motor_speed): self.name = name self.motor = motor self.sign = sign self.gear_ratio = gear_ratio self.joint_limits = joint_limits self.stop_mode_value = stop_mode self.deadband_motor_deg_value = deadband_motor_deg self.max_avg_motor_speed = max_avg_motor_speed def motor_angle(self): """ Returns logical motor angle in motor degrees. """ return self.motor.angle() * self.sign def joint_angle(self): """ Returns logical joint angle in joint degrees. """ return self.motor_angle() / self.gear_ratio def joint_to_motor(self, joint_angle): """ Converts joint degrees to motor degrees. """ return joint_angle * self.gear_ratio def motor_to_joint(self, motor_angle): """ Converts motor degrees to joint degrees. """ return motor_angle / self.gear_ratio def reset_motor_angle(self, logical_motor_angle): """ Sets the current logical motor-coordinate angle. """ self.motor.reset_angle(logical_motor_angle * self.sign) def reset_joint_angle(self, logical_joint_angle): """ Sets the current logical joint-coordinate angle. """ self.reset_motor_angle(self.joint_to_motor(logical_joint_angle)) def check_joint_limit(self, logical_joint_target): """ Validates target joint angle against this joint's limits. """ if self.joint_limits is None: return min_angle = self.joint_limits[0] max_angle = self.joint_limits[1] if logical_joint_target < min_angle or logical_joint_target > max_angle: raise ValueError( 'Target joint angle {} for joint {} is outside limits {}..{}'.format( logical_joint_target, self.name, min_angle, max_angle ) ) def physical_motor_target(self, logical_motor_target): """ Converts logical motor-coordinate target into physical encoder target. """ return int(logical_motor_target * self.sign) def deadband_motor_deg(self): return self.deadband_motor_deg_value def is_small_motor_delta(self, motor_delta): return abs_value(motor_delta) <= self.deadband_motor_deg_value def run_motor_target(self, logical_motor_target, speed): """ Moves to target expressed in motor degrees. """ logical_joint_target = self.motor_to_joint(logical_motor_target) self.check_joint_limit(logical_joint_target) self.motor.run_target( speed, self.physical_motor_target(logical_motor_target), then=self.stop_mode_value, wait=False ) def track_motor_target(self, logical_motor_target): """ Sends target expressed in motor degrees. Used in timed/eased movement. """ logical_joint_target = self.motor_to_joint(logical_motor_target) self.check_joint_limit(logical_joint_target) self.motor.track_target( self.physical_motor_target(logical_motor_target) ) def apply_stop_mode(self): """ Applies configured final stop behavior. """ if self.stop_mode_value == Stop.HOLD: self.motor.hold() elif self.stop_mode_value == Stop.BRAKE: self.motor.brake() elif self.stop_mode_value == Stop.COAST: self.motor.stop() else: self.motor.brake() def all_motors(self): return [self.motor] # ============================================================ # CREATE JOINTS FROM CONFIGURATION # ============================================================ JOINTS = {} JOINT_ORDER = [] def create_joints_from_config(): """ Creates Motor and Joint objects from SLAVE_JOINT_CONFIGS. """ for cfg in SLAVE_JOINT_CONFIGS: name = cfg['name'] port = cfg['port'] motor = Motor(port) joint = Joint( name, motor, cfg['sign'], cfg['gear_ratio'], cfg['joint_limits'], cfg['stop_mode'], cfg['deadband_motor_deg'], cfg['max_avg_motor_speed'] ) JOINTS[name] = joint JOINT_ORDER.append(name) # ============================================================ # MOTOR CONTROL SETUP # ============================================================ def configure_joint_motor_control(joint): """ Applies conservative motor control settings to one joint. """ motor = joint.motor motor.stop() if DEFAULT_CONTROL_LIMITS is not None: speed = DEFAULT_CONTROL_LIMITS[0] acceleration = DEFAULT_CONTROL_LIMITS[1] actuation = DEFAULT_CONTROL_LIMITS[2] motor.control.limits(speed, acceleration, actuation) if DEFAULT_TARGET_TOLERANCES is not None: speed_tol = DEFAULT_TARGET_TOLERANCES[0] position_tol = DEFAULT_TARGET_TOLERANCES[1] motor.control.target_tolerances(speed_tol, position_tol) if DEFAULT_STALL_TOLERANCES is not None: stall_speed = DEFAULT_STALL_TOLERANCES[0] stall_time = DEFAULT_STALL_TOLERANCES[1] motor.control.stall_tolerances(stall_speed, stall_time) if DEFAULT_PID_SETTINGS is not None: kp = DEFAULT_PID_SETTINGS[0] ki = DEFAULT_PID_SETTINGS[1] kd = DEFAULT_PID_SETTINGS[2] integral_range = DEFAULT_PID_SETTINGS[3] integral_rate = DEFAULT_PID_SETTINGS[4] feed_forward = DEFAULT_PID_SETTINGS[5] motor.control.pid( kp, ki, kd, integral_range, integral_rate, feed_forward ) def configure_all_motor_controls(): if not APPLY_MOTOR_CONTROL_SETTINGS: return for name in JOINT_ORDER: configure_joint_motor_control(JOINTS[name]) # ============================================================ # COMMAND HELPERS # ============================================================ def ABS_M(joint_name, motor_angle): """ Absolute target in motor degrees. """ return { 'joint': joint_name, 'mode': 'abs', 'unit': 'motor', 'value': motor_angle } def REL_M(joint_name, motor_delta): """ Relative movement in motor degrees. """ return { 'joint': joint_name, 'mode': 'rel', 'unit': 'motor', 'value': motor_delta } def ABS_J(joint_name, joint_angle): """ Absolute target in joint degrees. """ return { 'joint': joint_name, 'mode': 'abs', 'unit': 'joint', 'value': joint_angle } def REL_J(joint_name, joint_delta): """ Relative movement in joint degrees. """ return { 'joint': joint_name, 'mode': 'rel', 'unit': 'joint', 'value': joint_delta } def PARALLEL(label, commands, duration_ms=None, pause_ms=0, easing=None, clamp_easing=None): """ Simultaneous movement step. """ return { 'type': 'parallel', 'label': label, 'commands': commands, 'duration_ms': duration_ms, 'pause_ms': pause_ms, 'easing': easing, 'clamp_easing': clamp_easing } def SERIES(label, commands, duration_ms=None, pause_ms=0, easing=None, clamp_easing=None, duration_mode='total'): """ Sequential movement step. commands can be: [cmd1, cmd2, cmd3] or serial groups: [ [cmd1, cmd2], [cmd3, cmd4] ] duration_mode: 'total': total duration is divided between groups. 'per_group': each group receives full duration_ms. """ return { 'type': 'series', 'label': label, 'commands': commands, 'duration_ms': duration_ms, 'pause_ms': pause_ms, 'easing': easing, 'clamp_easing': clamp_easing, 'duration_mode': duration_mode } # ============================================================ # COMMAND PLANNING # ============================================================ def is_command_object(item): return type(item) == dict and 'joint' in item def normalize_series_groups(commands): groups = [] for item in commands: if is_command_object(item): groups.append([item]) else: groups.append(item) return groups def validate_parallel_commands(commands): seen = [] for command in commands: joint_name = command['joint'] if joint_name in seen: raise ValueError( 'Joint {} appears more than once in one parallel group.'.format( joint_name ) ) seen.append(joint_name) def plan_commands(commands): """ Converts user commands into motor-coordinate movement plans. """ validate_parallel_commands(commands) plans = [] for command in commands: joint = JOINTS[command['joint']] mode = command['mode'] unit = command.get('unit', 'motor') value = command['value'] start_motor = joint.motor_angle() start_joint = joint.joint_angle() if mode == 'abs': if unit == 'motor': target_motor = value target_joint = joint.motor_to_joint(target_motor) elif unit == 'joint': target_joint = value target_motor = joint.joint_to_motor(target_joint) else: raise ValueError('Unknown unit: {}'.format(unit)) elif mode == 'rel': if unit == 'motor': target_motor = start_motor + value target_joint = joint.motor_to_joint(target_motor) elif unit == 'joint': target_joint = start_joint + value target_motor = joint.joint_to_motor(target_joint) else: raise ValueError('Unknown unit: {}'.format(unit)) else: raise ValueError('Unknown command mode: {}'.format(mode)) joint.check_joint_limit(target_joint) delta_motor = target_motor - start_motor delta_joint = target_joint - start_joint skip_small_move = joint.is_small_motor_delta(delta_motor) plans.append({ 'joint': joint, 'start_motor': start_motor, 'target_motor': target_motor, 'start_joint': start_joint, 'target_joint': target_joint, 'delta_motor': delta_motor, 'delta_joint': delta_joint, 'skip_small_move': skip_small_move, 'last_sent_motor_target': None, 'command': command }) return plans def active_plans_only(plans): active = [] for plan in plans: if not plan['skip_small_move']: active.append(plan) return active def collect_motors(plans): motors = [] for plan in plans: joint = plan['joint'] for motor in joint.all_motors(): motors.append(motor) return motors # ============================================================ # DISPLAY AND LOGGING # ============================================================ def display_step_header(step_num, label): ev3.screen.clear() ev3.screen.print('=== SLAVE ===') ev3.screen.print('STEP {}'.format(step_num)) ev3.screen.print(label) print('') print('=' * 70) print('SLAVE STEP {}: {}'.format(step_num, label)) print('=' * 70) def print_joint_states(prefix): print(prefix) for name in JOINT_ORDER: joint = JOINTS[name] print( ' {}: motor={} deg, joint={} deg'.format( name, joint.motor_angle(), joint.joint_angle() ) ) def print_motion_plan(plans): for plan in plans: joint = plan['joint'] print( ' {}: motor {} -> {} deg, joint {} -> {} deg'.format( joint.name, plan['start_motor'], plan['target_motor'], plan['start_joint'], plan['target_joint'] ) ) print( ' delta: motor={} deg, joint={} deg, skip_small_move={}'.format( plan['delta_motor'], plan['delta_joint'], plan['skip_small_move'] ) ) def print_final_errors(plans): print('Final errors:') for plan in plans: joint = plan['joint'] actual_motor = joint.motor_angle() actual_joint = joint.joint_angle() motor_error = plan['target_motor'] - actual_motor joint_error = plan['target_joint'] - actual_joint print( ' {}: motor_error={} deg, joint_error={} deg'.format( joint.name, motor_error, joint_error ) ) def print_joint_configuration(): print('Slave joint configuration:') for name in JOINT_ORDER: joint = JOINTS[name] print( ' {}: gear_ratio={}, limit={}, stop_mode={}, deadband_motor_deg={}'.format( name, joint.gear_ratio, joint.joint_limits, joint.stop_mode_value, joint.deadband_motor_deg_value ) ) # ============================================================ # WAITING / SAFETY # ============================================================ def wait_until_done(motors, timeout_ms): timer = StopWatch() while True: all_done = True for motor in motors: if motor.control.stalled(): print('WARNING: motor stalled') motor.brake() return False if not motor.control.done(): all_done = False if all_done: return True if timeout_ms is not None and timer.time() > timeout_ms: print('WARNING: timeout while waiting for motors') for motor in motors: motor.brake() return False wait(POLL_MS) def check_required_speed(plan, duration_ms): joint = plan['joint'] if duration_ms <= 0: raise ValueError('duration_ms must be greater than 0') required_avg_speed = abs_value(plan['delta_motor']) * 1000 / duration_ms if required_avg_speed > joint.max_avg_motor_speed: print( 'WARNING: required average speed for {} is {} deg/s; recommended max is {} deg/s'.format( joint.name, required_avg_speed, joint.max_avg_motor_speed ) ) if ABORT_IF_REQUIRED_SPEED_TOO_HIGH: return False return True # ============================================================ # MOVEMENT EXECUTION # ============================================================ def handle_skipped_small_moves(plans): for plan in plans: if plan['skip_small_move']: joint = plan['joint'] print( ' SKIP tiny move for {}: delta_motor={} deg, deadband={} deg'.format( joint.name, plan['delta_motor'], joint.deadband_motor_deg() ) ) if APPLY_STOP_AFTER_SKIPPED_SMALL_MOVE: joint.apply_stop_mode() def move_parallel_target(commands, speed=DEFAULT_SPEED): plans = plan_commands(commands) active_plans = active_plans_only(plans) print_joint_states('Before target move:') print_motion_plan(plans) handle_skipped_small_moves(plans) if len(active_plans) == 0: print('No active movement after deadband filtering.') print_joint_states('After skipped target move:') return True for plan in active_plans: joint = plan['joint'] joint.run_motor_target(plan['target_motor'], speed) motors = collect_motors(active_plans) ok = wait_until_done(motors, 30000) print_joint_states('After target move:') print_final_errors(plans) return ok def maybe_send_track_target(plan, motor_target, force_send): joint = plan['joint'] last_sent = plan['last_sent_motor_target'] if force_send: joint.track_motor_target(motor_target) plan['last_sent_motor_target'] = motor_target return if last_sent is None: joint.track_motor_target(motor_target) plan['last_sent_motor_target'] = motor_target return if TRACK_TARGET_MIN_UPDATE_MOTOR_DEG <= 0: joint.track_motor_target(motor_target) plan['last_sent_motor_target'] = motor_target return if abs_value(motor_target - last_sent) >= TRACK_TARGET_MIN_UPDATE_MOTOR_DEG: joint.track_motor_target(motor_target) plan['last_sent_motor_target'] = motor_target def move_parallel_timed(commands, duration_ms, easing_name=None, clamp_easing=None): if duration_ms is None: return move_parallel_target(commands, DEFAULT_SPEED) if duration_ms <= 0: raise ValueError('duration_ms must be greater than 0') if clamp_easing is None: clamp_easing = CLAMP_EASING_OUTPUT easing_func = get_easing(easing_name) plans = plan_commands(commands) active_plans = active_plans_only(plans) print_joint_states('Before timed move:') print('Timed profile duration: {} ms'.format(duration_ms)) print('Easing: {}'.format(easing_name if easing_name is not None else DEFAULT_EASING)) print('Clamp easing output: {}'.format(clamp_easing)) print_motion_plan(plans) handle_skipped_small_moves(plans) if len(active_plans) == 0: print('No active movement after deadband filtering.') print_joint_states('After skipped timed move:') return True for plan in active_plans: ok_speed = check_required_speed(plan, duration_ms) if not ok_speed: print('ERROR: required speed too high, movement aborted.') return False timer = StopWatch() while timer.time() < duration_ms: elapsed = timer.time() linear_ratio = elapsed / duration_ms linear_ratio = clamp(linear_ratio, 0, 1) eased_ratio = easing_func(linear_ratio) if clamp_easing: eased_ratio = clamp(eased_ratio, 0, 1) for plan in active_plans: start_motor = plan['start_motor'] target_motor = plan['target_motor'] current_target = start_motor + (target_motor - start_motor) * eased_ratio maybe_send_track_target( plan, current_target, False ) for plan in active_plans: joint = plan['joint'] for motor in joint.all_motors(): if motor.control.stalled(): print('WARNING: motor stalled during timed profile') motor.brake() return False wait(PROFILE_UPDATE_MS) for plan in active_plans: maybe_send_track_target( plan, plan['target_motor'], True ) elapsed = timer.time() if TIMED_FINAL_TARGET_DWELL_MS > 0: wait(TIMED_FINAL_TARGET_DWELL_MS) if SETTLE_AFTER_TIMED_MOVE: for plan in active_plans: joint = plan['joint'] joint.run_motor_target(plan['target_motor'], SETTLE_SPEED) motors = collect_motors(active_plans) ok = wait_until_done(motors, 10000) if not ok: return False else: for plan in active_plans: joint = plan['joint'] joint.apply_stop_mode() print('Timed profile requested: {} ms'.format(duration_ms)) print('Timed profile elapsed: {} ms'.format(elapsed)) print_joint_states('After timed move:') print_final_errors(plans) return True def execute_parallel_group(commands, duration_ms=None, easing_name=None, clamp_easing=None): if len(commands) == 0: return True if duration_ms is None: return move_parallel_target(commands, DEFAULT_SPEED) return move_parallel_timed( commands, duration_ms, easing_name, clamp_easing ) def run_step(step_num, step): display_step_header(step_num, step['label']) sound_step_start() step_type = step['type'] commands = step['commands'] duration_ms = step.get('duration_ms', None) pause_ms = step.get('pause_ms', 0) easing_name = step.get('easing', None) clamp_easing = step.get('clamp_easing', None) if len(commands) == 0: print_joint_states('Current state:') if pause_ms > 0: wait(pause_ms) sound_step_ok() return True if step_type == 'parallel': ok = execute_parallel_group( commands, duration_ms, easing_name, clamp_easing ) elif step_type == 'series': groups = normalize_series_groups(commands) duration_mode = step.get('duration_mode', 'total') ok = True if duration_ms is None: for group in groups: ok = execute_parallel_group( group, None, easing_name, clamp_easing ) if not ok: break else: if duration_mode == 'total': group_duration_ms = int(duration_ms / len(groups)) if group_duration_ms <= 0: raise ValueError('Series duration is too short') for group in groups: ok = execute_parallel_group( group, group_duration_ms, easing_name, clamp_easing ) if not ok: break elif duration_mode == 'per_group': for group in groups: ok = execute_parallel_group( group, duration_ms, easing_name, clamp_easing ) if not ok: break else: raise ValueError('Unknown duration_mode: {}'.format(duration_mode)) else: raise ValueError('Unknown step type: {}'.format(step_type)) if pause_ms > 0: wait(pause_ms) if ok: sound_step_ok() else: sound_step_fail() return ok def reset_all_current_positions_as_zero(): """ Current physical position becomes logical joint angle 0 for all slave joints. """ for name in JOINT_ORDER: JOINTS[name].reset_joint_angle(0) def apply_stop_to_all_joints(): for name in JOINT_ORDER: JOINTS[name].apply_stop_mode() # ============================================================ # SLAVE TEST SEQUENCE # ============================================================ # # Because the real slave-side gear ratios are not known yet, this sequence uses # small movements. If gear_ratio remains 1.0, joint angle equals motor angle. # # Suggested workflow: # 1. Test with ABS_M / REL_M first. # 2. Measure or confirm gear ratios. # 3. Replace gear_ratio values in SLAVE_JOINT_CONFIGS. # 4. Then use ABS_J / REL_J as the main research-level interface. TEST_STEPS = [ PARALLEL( 'Reference pose: current slave pose is logical zero', [], pause_ms=1000 ), PARALLEL( 'Homing all slave joints to zero', [ ABS_J('elbow_rotate', 0), ABS_J('wrist_flexion_extension', 0), ABS_J('wrist_pronation_supination', 0), ABS_J('gripper', 0), ], duration_ms=3000, easing='linear', pause_ms=1000 ), SERIES( 'Slave joint 1 individual test', [ ABS_J('elbow_rotate', 20), ABS_J('elbow_rotate', -20), ABS_J('elbow_rotate', 0), ], duration_ms=6000, duration_mode='total', easing='easeInOutSine', pause_ms=1000 ), SERIES( 'Slave joint 2 individual test', [ ABS_J('wrist_flexion_extension', 20), ABS_J('wrist_flexion_extension', -20), ABS_J('wrist_flexion_extension', 0), ], duration_ms=6000, duration_mode='total', easing='easeInOutSine', pause_ms=1000 ), SERIES( 'Slave joint 3 individual test', [ ABS_J('wrist_pronation_supination', 20), ABS_J('wrist_pronation_supination', -20), ABS_J('wrist_pronation_supination', 0), ], duration_ms=6000, duration_mode='total', easing='easeInOutSine', pause_ms=1000 ), SERIES( 'Slave joint 4 individual test', [ ABS_J('gripper', 20), ABS_J('gripper', -20), ABS_J('gripper', 0), ], duration_ms=6000, duration_mode='total', easing='easeInOutSine', pause_ms=1000 ), PARALLEL( 'Slave joints 1 + 2 synchronized test', [ ABS_J('elbow_rotate', 15), ABS_J('wrist_flexion_extension', 15), ], duration_ms=3000, easing='easeInOutQuad', pause_ms=1000 ), PARALLEL( 'Slave joints 3 + 4 synchronized test', [ ABS_J('wrist_pronation_supination', 15), ABS_J('gripper', 15), ], duration_ms=3000, easing='easeInOutQuad', pause_ms=1000 ), PARALLEL( 'All slave joints synchronized positive', [ ABS_J('elbow_rotate', 10), ABS_J('wrist_flexion_extension', 10), ABS_J('wrist_pronation_supination', 10), ABS_J('gripper', 10), ], duration_ms=3000, easing='easeInOutCubic', pause_ms=1000 ), PARALLEL( 'All slave joints synchronized negative', [ ABS_J('elbow_rotate', -10), ABS_J('wrist_flexion_extension', -10), ABS_J('wrist_pronation_supination', -10), ABS_J('gripper', -10), ], duration_ms=3000, easing='easeInOutCubic', pause_ms=1000 ), SERIES( 'Relative movement test on slave joint 1', [ REL_J('elbow_rotate', 10), REL_J('elbow_rotate', 10), REL_J('elbow_rotate', -20), ], duration_ms=6000, duration_mode='total', easing='easeOutQuad', pause_ms=1000 ), SERIES( 'Mixed slave parallel groups', [ [ ABS_J('elbow_rotate', 10), ABS_J('wrist_flexion_extension', 5), ], [ ABS_J('wrist_pronation_supination', 10), ABS_J('gripper', 5), ], [ ABS_J('elbow_rotate', 0), ABS_J('wrist_flexion_extension', 0), ABS_J('wrist_pronation_supination', 0), ABS_J('gripper', 0), ], ], duration_ms=9000, duration_mode='total', easing='easeInOutSine', pause_ms=1000 ), PARALLEL( 'Return all slave joints to zero', [ ABS_J('elbow_rotate', 0), ABS_J('wrist_flexion_extension', 0), ABS_J('wrist_pronation_supination', 0), ABS_J('gripper', 0), ], duration_ms=3000, easing='easeInOutSine', pause_ms=1000 ), # Raw motor-angle example. # Useful before gear ratios are known. # # PARALLEL( # 'Raw motor-angle calibration test', # [ # ABS_M('elbow_rotate', 90), # ABS_M('wrist_flexion_extension', 90), # ABS_M('wrist_pronation_supination', 90), # ABS_M('gripper', 90), # ], # duration_ms=3000, # easing='linear', # pause_ms=1000 # ), ] # ============================================================ # MAIN PROGRAM # ============================================================ ev3.screen.clear() ev3.screen.print('=== SLAVE BRICK ===') ev3.screen.print('Standalone test') print('') print('#' * 70) print('# SLAVE BRICK STANDALONE JOINT TEST START') print('#' * 70) print('') create_joints_from_config() configure_all_motor_controls() print_joint_configuration() print('') print('Default easing: {}'.format(DEFAULT_EASING)) print('Clamp easing output: {}'.format(CLAMP_EASING_OUTPUT)) print('Settle after timed move: {}'.format(SETTLE_AFTER_TIMED_MOVE)) print('Track target min update: {} motor deg'.format(TRACK_TARGET_MIN_UPDATE_MOTOR_DEG)) print('Default control limits: {}'.format(DEFAULT_CONTROL_LIMITS)) print('Default target tolerances: {}'.format(DEFAULT_TARGET_TOLERANCES)) # Place the slave-side mechanism in a known safe pose before running. reset_all_current_positions_as_zero() print_joint_states('Initial slave logical zero state:') for i in range(len(TEST_STEPS)): ok = run_step(i, TEST_STEPS[i]) if not ok: print('ERROR: slave step {} failed or stalled'.format(i)) sound_step_fail() break apply_stop_to_all_joints() ev3.screen.clear() ev3.screen.print('=== SLAVE BRICK ===') ev3.screen.print('ALL TESTS COMPLETE') sound_all_complete() print('') print('#' * 70) print('# SLAVE BRICK TEST COMPLETE') print('#' * 70) print('')