#!/usr/bin/env pybricks-micropython from math import sin, cos, pi from pybricks.hubs import EV3Brick from pybricks.ev3devices import Motor from pybricks.parameters import Port, Stop from pybricks.tools import wait, StopWatch from pybricks.messaging import BluetoothMailboxServer, TextMailbox # ============================================================ # SLAVE BRICK PROGRAM # ============================================================ # # Run this program FIRST. # # The slave: # 1. waits for master Bluetooth connection, # 2. waits for PREP command, # 3. prepares local motion plan, # 4. replies READY, # 5. waits for GO, # 6. executes local slave joints, # 7. replies DONE. # # Message format: # PREP|step_id|duration_ms|easing|joint,unit,mode,value;joint,unit,mode,value # GO|step_id # STOP # # Example payload: # PREP|3|3000|easeInOutSine|elbow_rotate,joint,abs,10;wrist_flexion_extension,joint,abs,-5 # ============================================================ # GENERAL SETTINGS # ============================================================ DEFAULT_SPEED = 200 PROFILE_UPDATE_MS = 20 POLL_MS = 10 DEFAULT_EASING = 'linear' CLAMP_EASING_OUTPUT = True STOP_MODE_DEFAULT = Stop.BRAKE DEADBAND_MOTOR_DEG = 1.5 TRACK_TARGET_MIN_UPDATE_MOTOR_DEG = 1.0 SOUND_ENABLED = True # ============================================================ # SLAVE JOINT CONFIGURATION # ============================================================ # # Replace names, gear ratios, signs, and limits with your real second-brick axes. # # ports: # list of physical motor ports. Usually one port. # # signs: # list of signs matching ports. Use -1 if motor direction is opposite. # # gear_ratio: # motor_angle = joint_angle * gear_ratio # # joint_limits: # None or (min_angle, max_angle), in JOINT degrees. SLAVE_JOINT_CONFIGS = [ { 'name': 'elbow_rotate', 'ports': [Port.A], 'signs': [1], 'gear_ratio': 7.5, 'joint_limits': None, 'stop_mode': Stop.BRAKE, }, { 'name': 'wrist_flexion_extension', 'ports': [Port.B], 'signs': [1], 'gear_ratio': 5.0, 'joint_limits': None, 'stop_mode': Stop.BRAKE, }, { 'name': 'wrist_pronation_supination', 'ports': [Port.C], 'signs': [1], 'gear_ratio': 7.5, 'joint_limits': None, 'stop_mode': Stop.BRAKE, }, { 'name': 'gripper', 'ports': [Port.D], 'signs': [1], 'gear_ratio': 1.0, 'joint_limits': None, 'stop_mode': Stop.BRAKE, }, ] # ============================================================ # HARDWARE / BLUETOOTH # ============================================================ ev3 = EV3Brick() server = BluetoothMailboxServer() cmd_box = TextMailbox('command', server) status_box = TextMailbox('status', server) # ============================================================ # UTILITIES # ============================================================ def clamp(value, min_value, max_value): if value < min_value: return min_value if value > max_value: return max_value return value def abs_value(value): if value < 0: return -value return value def beep_ok(): if SOUND_ENABLED: ev3.speaker.beep(frequency=1000, duration=80) def beep_error(): if SOUND_ENABLED: ev3.speaker.beep(frequency=220, duration=200) wait(80) ev3.speaker.beep(frequency=220, duration=200) # ============================================================ # EASING FUNCTIONS # ============================================================ def ease_linear(x): return x def ease_in_out_sine(x): return -(cos(pi * x) - 1) / 2 def ease_in_out_quad(x): if x < 0.5: return 2 * x * x return 1 - ((-2 * x + 2) ** 2) / 2 def ease_in_out_cubic(x): if x < 0.5: return 4 * x * x * x return 1 - ((-2 * x + 2) ** 3) / 2 def ease_out_quad(x): return 1 - (1 - x) * (1 - x) EASINGS = { 'linear': ease_linear, 'easeInOutSine': ease_in_out_sine, 'easeInOutQuad': ease_in_out_quad, 'easeInOutCubic': ease_in_out_cubic, 'easeOutQuad': ease_out_quad, } def get_easing(name): if name is None or name == '': name = DEFAULT_EASING if name in EASINGS: return EASINGS[name] return EASINGS[DEFAULT_EASING] # ============================================================ # JOINT CLASS # ============================================================ class Joint: """ Logical robot-arm joint. Supports one or more physical motors. Coordinate convention: motor_angle = joint_angle * gear_ratio For multiple motors: logical motor angle is the signed average of all motor encoders. """ def __init__(self, name, motors, signs, gear_ratio, joint_limits, stop_mode): self.name = name self.motors = motors self.signs = signs self.gear_ratio = gear_ratio self.joint_limits = joint_limits self.stop_mode_value = stop_mode def motor_angle(self): total = 0 count = len(self.motors) for i in range(count): total += self.motors[i].angle() * self.signs[i] return total / count def joint_angle(self): return self.motor_angle() / self.gear_ratio def joint_to_motor(self, joint_angle): return joint_angle * self.gear_ratio def motor_to_joint(self, motor_angle): return motor_angle / self.gear_ratio def reset_joint_angle(self, joint_angle): motor_angle = self.joint_to_motor(joint_angle) for i in range(len(self.motors)): self.motors[i].reset_angle(motor_angle * self.signs[i]) def check_joint_limit(self, joint_target): if self.joint_limits is None: return min_angle = self.joint_limits[0] max_angle = self.joint_limits[1] if joint_target < min_angle or joint_target > max_angle: raise ValueError( '{} target {} outside limits {}..{}'.format( self.name, joint_target, min_angle, max_angle ) ) def physical_target(self, logical_motor_target, motor_index): return int(logical_motor_target * self.signs[motor_index]) def run_motor_target(self, logical_motor_target, speed): joint_target = self.motor_to_joint(logical_motor_target) self.check_joint_limit(joint_target) for i in range(len(self.motors)): self.motors[i].run_target( speed, self.physical_target(logical_motor_target, i), then=self.stop_mode_value, wait=False ) def track_motor_target(self, logical_motor_target): joint_target = self.motor_to_joint(logical_motor_target) self.check_joint_limit(joint_target) for i in range(len(self.motors)): self.motors[i].track_target( self.physical_target(logical_motor_target, i) ) def apply_stop(self): for motor in self.motors: if self.stop_mode_value == Stop.HOLD: motor.hold() elif self.stop_mode_value == Stop.COAST: motor.stop() else: motor.brake() def all_motors(self): return self.motors # ============================================================ # JOINT CREATION # ============================================================ JOINTS = {} JOINT_ORDER = [] def create_joints(): for cfg in SLAVE_JOINT_CONFIGS: motors = [] for port in cfg['ports']: motors.append(Motor(port)) joint = Joint( cfg['name'], motors, cfg['signs'], cfg['gear_ratio'], cfg['joint_limits'], cfg['stop_mode'] ) JOINTS[cfg['name']] = joint JOINT_ORDER.append(cfg['name']) def reset_all_as_zero(): for name in JOINT_ORDER: JOINTS[name].reset_joint_angle(0) def stop_all(): for name in JOINT_ORDER: JOINTS[name].apply_stop() # ============================================================ # COMMAND PARSING / PLANNING # ============================================================ def parse_command_payload(payload): """ Parses: joint,unit,mode,value;joint,unit,mode,value Example: elbow_rotate,joint,abs,10;wrist_flexion_extension,joint,rel,-5 """ commands = [] if payload == '' or payload == 'NONE': return commands items = payload.split(';') for item in items: parts = item.split(',') if len(parts) != 4: raise ValueError('Invalid command item: {}'.format(item)) commands.append({ 'joint': parts[0], 'unit': parts[1], 'mode': parts[2], 'value': float(parts[3]) }) return commands def validate_parallel(commands): seen = [] for cmd in commands: joint_name = cmd['joint'] if joint_name in seen: raise ValueError('Joint repeated in one parallel group: {}'.format(joint_name)) seen.append(joint_name) def plan_commands(commands): validate_parallel(commands) plans = [] for cmd in commands: joint = JOINTS[cmd['joint']] unit = cmd['unit'] mode = cmd['mode'] value = cmd['value'] start_motor = joint.motor_angle() start_joint = joint.joint_angle() if mode == 'abs': if unit == 'motor': target_motor = value target_joint = joint.motor_to_joint(target_motor) elif unit == 'joint': target_joint = value target_motor = joint.joint_to_motor(target_joint) else: raise ValueError('Unknown unit: {}'.format(unit)) elif mode == 'rel': if unit == 'motor': target_motor = start_motor + value target_joint = joint.motor_to_joint(target_motor) elif unit == 'joint': target_joint = start_joint + value target_motor = joint.joint_to_motor(target_joint) else: raise ValueError('Unknown unit: {}'.format(unit)) else: raise ValueError('Unknown mode: {}'.format(mode)) joint.check_joint_limit(target_joint) delta_motor = target_motor - start_motor skip = abs_value(delta_motor) <= DEADBAND_MOTOR_DEG plans.append({ 'joint': joint, 'start_motor': start_motor, 'target_motor': target_motor, 'delta_motor': delta_motor, 'last_sent': None, 'skip': skip }) return plans # ============================================================ # MOVEMENT EXECUTION # ============================================================ def maybe_track(plan, target, force): joint = plan['joint'] last = plan['last_sent'] if force or last is None: joint.track_motor_target(target) plan['last_sent'] = target return if abs_value(target - last) >= TRACK_TARGET_MIN_UPDATE_MOTOR_DEG: joint.track_motor_target(target) plan['last_sent'] = target def execute_plan_timed(plans, duration_ms, easing_name): if duration_ms <= 0: return True easing = get_easing(easing_name) active = [] for plan in plans: if not plan['skip']: active.append(plan) if len(active) == 0: return True timer = StopWatch() while timer.time() < duration_ms: ratio = timer.time() / duration_ms ratio = clamp(ratio, 0, 1) eased = easing(ratio) if CLAMP_EASING_OUTPUT: eased = clamp(eased, 0, 1) for plan in active: target = plan['start_motor'] + plan['delta_motor'] * eased maybe_track(plan, target, False) wait(PROFILE_UPDATE_MS) for plan in active: maybe_track(plan, plan['target_motor'], True) for plan in active: plan['joint'].apply_stop() return True # ============================================================ # STATUS / DISPLAY # ============================================================ def print_states(prefix): print(prefix) for name in JOINT_ORDER: joint = JOINTS[name] print( '{}: motor={} joint={}'.format( name, joint.motor_angle(), joint.joint_angle() ) ) # ============================================================ # MAIN LOOP # ============================================================ create_joints() ev3.screen.clear() ev3.screen.print('SLAVE') ev3.screen.print('Waiting BT') print('SLAVE: waiting for Bluetooth connection...') server.wait_for_connection() print('SLAVE: connected') ev3.screen.clear() ev3.screen.print('SLAVE') ev3.screen.print('Connected') reset_all_as_zero() print_states('SLAVE initial zero:') prepared_step_id = None prepared_duration = 0 prepared_easing = DEFAULT_EASING prepared_plans = [] while True: cmd_box.wait() message = cmd_box.read() print('SLAVE RX: {}'.format(message)) try: parts = message.split('|') command_type = parts[0] print('SLAVE MSG PARTS COUNT: {}'.format(len(parts))) if command_type == 'STOP': stop_all() status_box.send('STOPPED') beep_error() elif command_type == 'PREP': if len(parts) < 5: raise ValueError('Bad PREP parts: {}'.format(len(parts))) prepared_step_id = parts[1] prepared_duration = int(parts[2]) prepared_easing = parts[3] payload = parts[4] commands = parse_command_payload(payload) prepared_plans = plan_commands(commands) ev3.screen.clear() ev3.screen.print('SLAVE PREP') ev3.screen.print(prepared_step_id) status_box.send('READY|{}'.format(prepared_step_id)) beep_ok() elif command_type == 'GO': step_id = parts[1] if step_id != prepared_step_id: status_box.send('ERR|{}|not_prepared'.format(step_id)) beep_error() else: ev3.screen.clear() ev3.screen.print('SLAVE GO') ev3.screen.print(step_id) ok = execute_plan_timed( prepared_plans, prepared_duration, prepared_easing ) if ok: status_box.send('DONE|{}'.format(step_id)) beep_ok() else: status_box.send('ERR|{}|movement_failed'.format(step_id)) beep_error() elif command_type == 'QUIT': stop_all() status_box.send('BYE') break else: status_box.send('ERR|unknown_command') beep_error() except Exception as e: stop_all() err = str(e) ev3.screen.clear() ev3.screen.print('SLAVE ERROR') ev3.screen.print(err[:16]) ev3.screen.print(err[16:32]) print('SLAVE ERROR: {}'.format(err)) # Send actual error back to master. # Keep it short because TextMailbox messages should not be too long. status_box.send('ERR|{}|{}'.format( prepared_step_id if prepared_step_id is not None else 'none', err[:40] )) beep_error() stop_all() ev3.screen.clear() ev3.screen.print('SLAVE END') print('SLAVE: program ended')