Files
stachir a317c4c69d init
2026-06-10 10:41:28 +02:00

2154 lines
52 KiB
Python

#!/usr/bin/env pybricks-micropython
from math import sin, cos, sqrt, pi
from pybricks.hubs import EV3Brick
from pybricks.ev3devices import Motor
from pybricks.parameters import Port, Stop
from pybricks.tools import wait, StopWatch
# ============================================================
# ROBOT ARM MOTION CONTROLLER
# ============================================================
#
# Purpose:
# One-brick motion controller for an EV3 robotic arm used to generate
# repeatable phone/Cardboard XR movement trajectories.
#
# Supports:
# - absolute motor-angle commands
# - relative motor-angle commands
# - absolute joint-angle commands with gear-ratio conversion
# - relative joint-angle commands with gear-ratio conversion
# - sequential movement
# - simultaneous movement
# - timed movement profiles
# - easing functions based on easings.net naming
# - sound feedback after every step
# - low-angle jitter mitigation
# - configurable motor control limits and tolerances
#
# Coordinate convention:
# motor_angle = joint_angle * gear_ratio
#
# Example:
# ABS_J('base', 20)
# means:
# base joint target = 20 deg
# base motor target = 20 * 7.5 = 150 deg
# ============================================================
# HARDWARE PORT CONFIGURATION
# ============================================================
BASE_PORT = Port.A
SHOULDER_LEFT_PORT = Port.B
SHOULDER_RIGHT_PORT = Port.C
ELBOW_PORT = Port.D
# ============================================================
# BASIC MOTION CONFIGURATION
# ============================================================
# Speed used by non-timed run_target() movements.
# Unit: motor degrees per second.
DEFAULT_SPEED = 200
# Speed used for optional final settling after timed track_target() movement.
# Unit: motor degrees per second.
SETTLE_SPEED = 120
# Polling interval for waiting loops.
# Lower value = more responsive, but more CPU activity.
POLL_MS = 10
# Target update interval during timed/eased movements.
# Lower value = smoother target curve, but more controller updates.
# If too low, very small target changes may cause unnecessary jitter.
PROFILE_UPDATE_MS = 20
# If True, after a timed profile the robot performs an additional run_target()
# movement to exactly settle at the final target.
#
# Advantage:
# better final angle accuracy.
#
# Disadvantage:
# total motion time becomes longer than duration_ms.
#
# For tracking-drift experiments, False is usually better when time consistency
# is more important than final encoder accuracy.
SETTLE_AFTER_TIMED_MOVE = False
# Optional dwell at final target after timed profile before applying final stop.
# Unit: ms.
#
# 0 preserves the requested timing most strictly.
# 50-150 can help if the motor needs a short moment to reach the last target.
TIMED_FINAL_TARGET_DWELL_MS = 0
# ============================================================
# GEAR RATIOS
# ============================================================
# Gear ratio convention:
# motor_angle = joint_angle * gear_ratio
#
# base:
# 8-tooth motor gear -> 60-tooth driven gear
# 60 / 8 = 7.5
#
# shoulder:
# 8-tooth motor gear -> 60-tooth driven gear
# 60 / 8 = 7.5
#
# elbow:
# ratio = 5.0
GEAR_RATIOS = {
'base': 7.5,
'shoulder': 7.5,
'elbow': 5.0,
}
# ============================================================
# JOINT LIMITS
# ============================================================
# Limits are expressed in JOINT degrees, not motor degrees.
#
# Strong recommendation:
# Replace None with real safe limits before mounting the phone and tracker.
#
# Example:
# JOINT_LIMITS = {
# 'base': (-60, 60),
# 'shoulder': (-25, 25),
# 'elbow': (-35, 35),
# }
JOINT_LIMITS = {
'base': None,
'shoulder': None,
'elbow': None,
}
# ============================================================
# SHOULDER MOTOR DIRECTION
# ============================================================
# If the two shoulder motors mechanically fight each other, change this.
#
# Try [1, 1] first.
# If motors rotate against each other, try [1, -1].
SHOULDER_SIGNS = [1, 1]
# ============================================================
# STOPPING / HOLDING CONFIGURATION
# ============================================================
# Stop mode after a target movement.
#
# Stop.BRAKE:
# Passively resists motion. Often reduces active PID jitter.
# Good if the mechanism can hold itself mechanically.
#
# Stop.HOLD:
# Actively keeps the target angle. Better against gravity/load, but may jitter
# when there is backlash, low speed, small target error, or high load.
#
# Stop.COAST:
# Lets the motor spin freely. Usually not safe for loaded arm joints.
#
# For first tests with phone/tracker mounted:
# If the arm sags with Stop.BRAKE, use Stop.HOLD for shoulder/elbow.
# If it jitters with Stop.HOLD, try Stop.BRAKE or increase tolerances.
DEFAULT_STOP_MODE = Stop.BRAKE
STOP_MODE_BY_JOINT = {
'base': Stop.BRAKE,
'shoulder': Stop.BRAKE,
'elbow': Stop.BRAKE,
}
# ============================================================
# LOW-ANGLE JITTER MITIGATION
# ============================================================
# Deadband for tiny requested movements.
#
# If requested target is closer than this number of MOTOR degrees,
# the movement is skipped.
#
# Reason:
# Very small movement commands may cause the controller to fight encoder
# quantization, gear backlash, load, or static friction.
#
# Use 0 to disable.
DEADBAND_MOTOR_DEG_BY_JOINT = {
'base': 1.5,
'shoulder': 1.5,
'elbow': 1.5,
}
# During timed/eased movement, a new track_target() command is sent only if
# the target has changed by at least this many MOTOR degrees.
#
# This reduces micro-updates that can cause low-speed jitter.
#
# Use 0 to send every profile update.
TRACK_TARGET_MIN_UPDATE_MOTOR_DEG = 1.0
# If True, skipped tiny moves still apply the configured final stop mode.
# This is useful when a previous command left the motor under active control.
APPLY_STOP_AFTER_SKIPPED_SMALL_MOVE = True
# ============================================================
# MOTOR CONTROL SETTINGS
# ============================================================
# If True, apply Motor.control settings at the beginning of the program.
#
# These settings are conservative starting values. They are not universal.
# If movement becomes too weak or stalls under payload, increase actuation.
APPLY_MOTOR_CONTROL_SETTINGS = True
# Motor-control limits:
# speed = maximum motor speed, deg/s
# acceleration = maximum motor acceleration, deg/s/s
# actuation = maximum actuation, percent of full output
#
# Lower actuation can reduce aggressive correction and jitter, but can also
# increase stalling risk under load.
MOTOR_CONTROL_LIMITS_BY_JOINT = {
'base': (500, 800, 70),
'shoulder': (400, 600, 75),
'elbow': (400, 600, 75),
}
# Target tolerances:
# speed = allowed deviation from zero speed, motor deg/s
# position = allowed position error, motor deg
#
# Larger position tolerance can reduce jitter near the target.
# Too large tolerance reduces accuracy.
TARGET_TOLERANCES_BY_JOINT = {
'base': (20, 2),
'shoulder': (20, 2),
'elbow': (20, 2),
}
# Stall tolerances:
# speed = speed threshold, motor deg/s
# time = how long the controller must be below that speed before stall, ms
STALL_TOLERANCES_BY_JOINT = {
'base': (20, 1000),
'shoulder': (20, 1000),
'elbow': (20, 1000),
}
# Optional PID override.
#
# None means: keep Pybricks default PID settings.
#
# Format:
# (kp, ki, kd, integral_range, integral_rate, feed_forward)
#
# Use only after observing persistent oscillation and after testing:
# - Stop.BRAKE instead of Stop.HOLD
# - larger target tolerance
# - lower actuation
# - larger movement duration
#
# Example placeholder:
# PID_SETTINGS_BY_JOINT = {
# 'base': None,
# 'shoulder': None,
# 'elbow': None,
# }
PID_SETTINGS_BY_JOINT = {
'base': None,
'shoulder': None,
'elbow': None,
}
# ============================================================
# SPEED SAFETY CHECKS
# ============================================================
# If True, abort movement when average required motor speed exceeds the limit.
# If False, only print a warning.
ABORT_IF_REQUIRED_SPEED_TOO_HIGH = False
# Maximum recommended average speed for timed movement.
# Unit: motor deg/s.
MAX_AVG_MOTOR_SPEED_BY_JOINT = {
'base': 250,
'shoulder': 200,
'elbow': 200,
}
# ============================================================
# EASING CONFIGURATION
# ============================================================
DEFAULT_EASING = 'linear'
# If True, easing output is clamped to [0, 1].
#
# This makes Back/Elastic overshoot safer, but removes their original overshoot.
#
# If False, Back and Elastic may command positions outside the start-target
# range. Use only after JOINT_LIMITS are configured.
CLAMP_EASING_OUTPUT = True
# ============================================================
# SOUND CONFIGURATION
# ============================================================
SOUND_ENABLED = True
# If True, short beep at step start.
SOUND_AT_STEP_START = False
# If True, confirmation beep after each successfully completed step.
SOUND_AFTER_STEP = True
# If True, error sound after failed/stalled step.
SOUND_ON_ERROR = True
# Frequencies and durations.
# Frequencies are in Hz, durations are in ms.
SOUND_STEP_START = (500, 60)
SOUND_STEP_OK = (1000, 90)
SOUND_STEP_FAIL = (220, 250)
SOUND_COMPLETE = (1200, 120)
# ============================================================
# HARDWARE INITIALIZATION
# ============================================================
ev3 = EV3Brick()
base_motor = Motor(BASE_PORT)
shoulder_left_motor = Motor(SHOULDER_LEFT_PORT)
shoulder_right_motor = Motor(SHOULDER_RIGHT_PORT)
elbow_motor = Motor(ELBOW_PORT)
# ============================================================
# BASIC UTILITIES
# ============================================================
def clamp(value, min_value, max_value):
"""
Returns value limited to the closed interval [min_value, max_value].
"""
if value < min_value:
return min_value
if value > max_value:
return max_value
return value
def abs_value(value):
"""
MicroPython-safe absolute value wrapper.
"""
if value < 0:
return -value
return value
# ============================================================
# SOUND FUNCTIONS
# ============================================================
def beep_once(sound_tuple):
"""
Plays one beep if sound feedback is enabled.
sound_tuple:
(frequency_hz, duration_ms)
"""
if not SOUND_ENABLED:
return
frequency = sound_tuple[0]
duration = sound_tuple[1]
ev3.speaker.beep(frequency=frequency, duration=duration)
def sound_step_start():
"""
Optional short sound before executing a step.
"""
if SOUND_AT_STEP_START:
beep_once(SOUND_STEP_START)
def sound_step_ok():
"""
Sound after a successfully completed step.
"""
if SOUND_AFTER_STEP:
beep_once(SOUND_STEP_OK)
def sound_step_fail():
"""
Sound after failed, stalled, or aborted movement.
"""
if SOUND_ON_ERROR:
beep_once(SOUND_STEP_FAIL)
wait(80)
beep_once(SOUND_STEP_FAIL)
def sound_all_complete():
"""
Sound at the end of the full movement sequence.
"""
if not SOUND_ENABLED:
return
beep_once(SOUND_COMPLETE)
wait(80)
beep_once(SOUND_COMPLETE)
# ============================================================
# EASING FUNCTIONS
# ============================================================
#
# Input:
# x in [0, 1]
#
# Output:
# eased progress value.
#
# Most easing functions stay in [0, 1].
# Back and Elastic can overshoot unless CLAMP_EASING_OUTPUT is True.
def ease_linear(x):
return x
def ease_in_sine(x):
return 1 - cos((x * pi) / 2)
def ease_out_sine(x):
return sin((x * pi) / 2)
def ease_in_out_sine(x):
return -(cos(pi * x) - 1) / 2
def ease_in_quad(x):
return x * x
def ease_out_quad(x):
return 1 - (1 - x) * (1 - x)
def ease_in_out_quad(x):
if x < 0.5:
return 2 * x * x
return 1 - ((-2 * x + 2) ** 2) / 2
def ease_in_cubic(x):
return x * x * x
def ease_out_cubic(x):
return 1 - ((1 - x) ** 3)
def ease_in_out_cubic(x):
if x < 0.5:
return 4 * x * x * x
return 1 - ((-2 * x + 2) ** 3) / 2
def ease_in_quart(x):
return x ** 4
def ease_out_quart(x):
return 1 - ((1 - x) ** 4)
def ease_in_out_quart(x):
if x < 0.5:
return 8 * (x ** 4)
return 1 - ((-2 * x + 2) ** 4) / 2
def ease_in_quint(x):
return x ** 5
def ease_out_quint(x):
return 1 - ((1 - x) ** 5)
def ease_in_out_quint(x):
if x < 0.5:
return 16 * (x ** 5)
return 1 - ((-2 * x + 2) ** 5) / 2
def ease_in_expo(x):
if x == 0:
return 0
return 2 ** (10 * x - 10)
def ease_out_expo(x):
if x == 1:
return 1
return 1 - 2 ** (-10 * x)
def ease_in_out_expo(x):
if x == 0:
return 0
if x == 1:
return 1
if x < 0.5:
return (2 ** (20 * x - 10)) / 2
return (2 - 2 ** (-20 * x + 10)) / 2
def ease_in_circ(x):
return 1 - sqrt(1 - x * x)
def ease_out_circ(x):
return sqrt(1 - ((x - 1) ** 2))
def ease_in_out_circ(x):
if x < 0.5:
return (1 - sqrt(1 - ((2 * x) ** 2))) / 2
return (sqrt(1 - ((-2 * x + 2) ** 2)) + 1) / 2
def ease_in_back(x):
c1 = 1.70158
c3 = c1 + 1
return c3 * x * x * x - c1 * x * x
def ease_out_back(x):
c1 = 1.70158
c3 = c1 + 1
return 1 + c3 * ((x - 1) ** 3) + c1 * ((x - 1) ** 2)
def ease_in_out_back(x):
c1 = 1.70158
c2 = c1 * 1.525
if x < 0.5:
return (((2 * x) ** 2) * ((c2 + 1) * 2 * x - c2)) / 2
return ((((2 * x - 2) ** 2) * ((c2 + 1) * (2 * x - 2) + c2)) + 2) / 2
def ease_in_elastic(x):
c4 = (2 * pi) / 3
if x == 0:
return 0
if x == 1:
return 1
return -(2 ** (10 * x - 10)) * sin((x * 10 - 10.75) * c4)
def ease_out_elastic(x):
c4 = (2 * pi) / 3
if x == 0:
return 0
if x == 1:
return 1
return (2 ** (-10 * x)) * sin((x * 10 - 0.75) * c4) + 1
def ease_in_out_elastic(x):
c5 = (2 * pi) / 4.5
if x == 0:
return 0
if x == 1:
return 1
if x < 0.5:
return -((2 ** (20 * x - 10)) * sin((20 * x - 11.125) * c5)) / 2
return ((2 ** (-20 * x + 10)) * sin((20 * x - 11.125) * c5)) / 2 + 1
def ease_out_bounce(x):
n1 = 7.5625
d1 = 2.75
if x < 1 / d1:
return n1 * x * x
elif x < 2 / d1:
x = x - 1.5 / d1
return n1 * x * x + 0.75
elif x < 2.5 / d1:
x = x - 2.25 / d1
return n1 * x * x + 0.9375
else:
x = x - 2.625 / d1
return n1 * x * x + 0.984375
def ease_in_bounce(x):
return 1 - ease_out_bounce(1 - x)
def ease_in_out_bounce(x):
if x < 0.5:
return (1 - ease_out_bounce(1 - 2 * x)) / 2
return (1 + ease_out_bounce(2 * x - 1)) / 2
EASINGS = {
'linear': ease_linear,
'easeInSine': ease_in_sine,
'easeOutSine': ease_out_sine,
'easeInOutSine': ease_in_out_sine,
'easeInQuad': ease_in_quad,
'easeOutQuad': ease_out_quad,
'easeInOutQuad': ease_in_out_quad,
'easeInCubic': ease_in_cubic,
'easeOutCubic': ease_out_cubic,
'easeInOutCubic': ease_in_out_cubic,
'easeInQuart': ease_in_quart,
'easeOutQuart': ease_out_quart,
'easeInOutQuart': ease_in_out_quart,
'easeInQuint': ease_in_quint,
'easeOutQuint': ease_out_quint,
'easeInOutQuint': ease_in_out_quint,
'easeInExpo': ease_in_expo,
'easeOutExpo': ease_out_expo,
'easeInOutExpo': ease_in_out_expo,
'easeInCirc': ease_in_circ,
'easeOutCirc': ease_out_circ,
'easeInOutCirc': ease_in_out_circ,
'easeInBack': ease_in_back,
'easeOutBack': ease_out_back,
'easeInOutBack': ease_in_out_back,
'easeInElastic': ease_in_elastic,
'easeOutElastic': ease_out_elastic,
'easeInOutElastic': ease_in_out_elastic,
'easeInBounce': ease_in_bounce,
'easeOutBounce': ease_out_bounce,
'easeInOutBounce': ease_in_out_bounce,
}
EASING_NAMES = [
'linear',
'easeInSine',
'easeOutSine',
'easeInOutSine',
'easeInQuad',
'easeOutQuad',
'easeInOutQuad',
'easeInCubic',
'easeOutCubic',
'easeInOutCubic',
'easeInQuart',
'easeOutQuart',
'easeInOutQuart',
'easeInQuint',
'easeOutQuint',
'easeInOutQuint',
'easeInExpo',
'easeOutExpo',
'easeInOutExpo',
'easeInCirc',
'easeOutCirc',
'easeInOutCirc',
'easeInBack',
'easeOutBack',
'easeInOutBack',
'easeInElastic',
'easeOutElastic',
'easeInOutElastic',
'easeInBounce',
'easeOutBounce',
'easeInOutBounce',
]
def get_easing(easing_name):
"""
Returns easing function by name.
easing_name:
String such as:
- 'linear'
- 'easeInOutSine'
- 'easeOutQuad'
- 'easeInOutCubic'
- 'easeOutBounce'
"""
if easing_name is None:
easing_name = DEFAULT_EASING
if easing_name in EASINGS:
return EASINGS[easing_name]
key_lower = easing_name.lower()
for name in EASING_NAMES:
if name.lower() == key_lower:
return EASINGS[name]
raise ValueError('Unknown easing: {}'.format(easing_name))
# ============================================================
# JOINT CLASS
# ============================================================
class Joint:
"""
Represents one logical robot-arm joint.
A joint can be controlled by:
- one motor, e.g. base or elbow
- two synchronized motors, e.g. shoulder
The class separates:
- physical motor encoder angles
- logical motor-coordinate angles
- logical joint-coordinate angles
Gear conversion:
motor_angle = joint_angle * gear_ratio
signs:
Used when motors must rotate in opposite directions to produce the same
logical joint movement.
"""
def __init__(self, name, motors, signs, gear_ratio):
self.name = name
self.motors = motors
self.signs = signs
self.gear_ratio = gear_ratio
def motor_angle(self):
"""
Returns logical motor angle in motor degrees.
For one motor:
signed encoder angle.
For two motors:
signed average encoder angle.
"""
total = 0
count = len(self.motors)
for i in range(count):
total += self.motors[i].angle() * self.signs[i]
return total / count
def joint_angle(self):
"""
Returns logical joint angle in joint degrees.
"""
return self.motor_angle() / self.gear_ratio
def joint_to_motor(self, joint_angle):
"""
Converts joint degrees to motor degrees.
"""
return joint_angle * self.gear_ratio
def motor_to_joint(self, motor_angle):
"""
Converts motor degrees to joint degrees.
"""
return motor_angle / self.gear_ratio
def reset_motor_angle(self, logical_motor_angle):
"""
Sets the current logical motor-coordinate angle.
Use carefully:
this changes encoder reference, not physical position.
"""
for i in range(len(self.motors)):
self.motors[i].reset_angle(logical_motor_angle * self.signs[i])
def reset_joint_angle(self, logical_joint_angle):
"""
Sets the current logical joint-coordinate angle.
Example:
reset_joint_angle(0)
makes the current physical pose become logical joint angle 0.
"""
logical_motor_angle = self.joint_to_motor(logical_joint_angle)
self.reset_motor_angle(logical_motor_angle)
def check_joint_limit(self, logical_joint_target):
"""
Validates target joint angle against JOINT_LIMITS.
Limits are in joint degrees.
"""
limits = JOINT_LIMITS.get(self.name)
if limits is None:
return
min_angle = limits[0]
max_angle = limits[1]
if logical_joint_target < min_angle or logical_joint_target > max_angle:
raise ValueError(
'Target joint angle {} for joint {} is outside limits {}..{}'.format(
logical_joint_target,
self.name,
min_angle,
max_angle
)
)
def stop_mode(self):
"""
Returns final stop mode for this joint.
"""
if self.name in STOP_MODE_BY_JOINT:
return STOP_MODE_BY_JOINT[self.name]
return DEFAULT_STOP_MODE
def physical_motor_target(self, logical_motor_target, motor_index):
"""
Converts logical motor target to physical encoder target.
"""
return int(logical_motor_target * self.signs[motor_index])
def deadband_motor_deg(self):
"""
Returns movement deadband in motor degrees.
"""
if self.name in DEADBAND_MOTOR_DEG_BY_JOINT:
return DEADBAND_MOTOR_DEG_BY_JOINT[self.name]
return 0
def is_small_motor_delta(self, motor_delta):
"""
Checks whether a requested movement is below configured deadband.
"""
return abs_value(motor_delta) <= self.deadband_motor_deg()
def run_motor_target(self, logical_motor_target, speed):
"""
Moves to target expressed in motor degrees using run_target().
"""
logical_joint_target = self.motor_to_joint(logical_motor_target)
self.check_joint_limit(logical_joint_target)
stop_mode = self.stop_mode()
for i in range(len(self.motors)):
self.motors[i].run_target(
speed,
self.physical_motor_target(logical_motor_target, i),
then=stop_mode,
wait=False
)
def track_motor_target(self, logical_motor_target):
"""
Sends target expressed in motor degrees using track_target().
Used for timed/eased profiles where the target changes many times
during movement.
"""
logical_joint_target = self.motor_to_joint(logical_motor_target)
self.check_joint_limit(logical_joint_target)
for i in range(len(self.motors)):
self.motors[i].track_target(
self.physical_motor_target(logical_motor_target, i)
)
def apply_stop_mode(self):
"""
Applies the configured final stop behavior manually.
Used after track_target() profiles and skipped tiny moves.
"""
stop_mode = self.stop_mode()
for motor in self.motors:
if stop_mode == Stop.HOLD:
motor.hold()
elif stop_mode == Stop.BRAKE:
motor.brake()
elif stop_mode == Stop.COAST:
motor.stop()
else:
motor.brake()
def all_motors(self):
"""
Returns all physical motors belonging to this joint.
"""
return self.motors
# ============================================================
# JOINT OBJECTS
# ============================================================
base = Joint(
'base',
[base_motor],
[1],
GEAR_RATIOS['base']
)
shoulder = Joint(
'shoulder',
[shoulder_left_motor, shoulder_right_motor],
SHOULDER_SIGNS,
GEAR_RATIOS['shoulder']
)
elbow = Joint(
'elbow',
[elbow_motor],
[1],
GEAR_RATIOS['elbow']
)
JOINTS = {
'base': base,
'shoulder': shoulder,
'elbow': elbow,
}
JOINT_ORDER = ['base', 'shoulder', 'elbow']
# ============================================================
# MOTOR CONTROL SETUP
# ============================================================
def configure_joint_motor_control(joint):
"""
Applies optional Pybricks Motor.control settings for one joint.
Settings are applied to all physical motors in that joint.
Important:
Pybricks documentation says control settings should be changed while
the controller is stopped. This function stops motors before applying
settings.
"""
for motor in joint.all_motors():
motor.stop()
limits = MOTOR_CONTROL_LIMITS_BY_JOINT.get(joint.name)
target_tolerances = TARGET_TOLERANCES_BY_JOINT.get(joint.name)
stall_tolerances = STALL_TOLERANCES_BY_JOINT.get(joint.name)
pid_settings = PID_SETTINGS_BY_JOINT.get(joint.name)
for motor in joint.all_motors():
if limits is not None:
speed = limits[0]
acceleration = limits[1]
actuation = limits[2]
motor.control.limits(speed, acceleration, actuation)
if target_tolerances is not None:
speed_tol = target_tolerances[0]
position_tol = target_tolerances[1]
motor.control.target_tolerances(speed_tol, position_tol)
if stall_tolerances is not None:
stall_speed = stall_tolerances[0]
stall_time = stall_tolerances[1]
motor.control.stall_tolerances(stall_speed, stall_time)
if pid_settings is not None:
kp = pid_settings[0]
ki = pid_settings[1]
kd = pid_settings[2]
integral_range = pid_settings[3]
integral_rate = pid_settings[4]
feed_forward = pid_settings[5]
motor.control.pid(
kp,
ki,
kd,
integral_range,
integral_rate,
feed_forward
)
def configure_all_motor_controls():
"""
Applies configured motor-control parameters to all joints.
"""
if not APPLY_MOTOR_CONTROL_SETTINGS:
return
for name in JOINT_ORDER:
configure_joint_motor_control(JOINTS[name])
# ============================================================
# COMMAND HELPERS
# ============================================================
def ABS_M(joint_name, motor_angle):
"""
Absolute target in motor degrees.
Example:
ABS_M('base', 90)
"""
return {
'joint': joint_name,
'mode': 'abs',
'unit': 'motor',
'value': motor_angle
}
def REL_M(joint_name, motor_delta):
"""
Relative movement in motor degrees.
Example:
REL_M('base', -45)
"""
return {
'joint': joint_name,
'mode': 'rel',
'unit': 'motor',
'value': motor_delta
}
def ABS_J(joint_name, joint_angle):
"""
Absolute target in joint degrees.
Example:
ABS_J('base', 20)
With gear ratio 7.5, motor target becomes 150 deg.
"""
return {
'joint': joint_name,
'mode': 'abs',
'unit': 'joint',
'value': joint_angle
}
def REL_J(joint_name, joint_delta):
"""
Relative movement in joint degrees.
Example:
REL_J('elbow', 10)
With gear ratio 5.0, motor target changes by 50 deg.
"""
return {
'joint': joint_name,
'mode': 'rel',
'unit': 'joint',
'value': joint_delta
}
# Backward-compatible aliases.
# These behave like the original program: values are motor degrees.
def ABS(joint_name, angle):
return ABS_M(joint_name, angle)
def REL(joint_name, delta):
return REL_M(joint_name, delta)
def PARALLEL(label, commands, duration_ms=None, pause_ms=0,
easing=None, clamp_easing=None):
"""
Creates a simultaneous movement step.
commands:
List of commands executed at the same time.
duration_ms:
None:
use run_target() with DEFAULT_SPEED.
integer:
use timed/eased track_target() profile.
easing:
Name of easing function, e.g.:
'linear'
'easeInOutSine'
'easeOutQuad'
clamp_easing:
None:
use global CLAMP_EASING_OUTPUT.
True/False:
override global setting for this step.
"""
return {
'type': 'parallel',
'label': label,
'commands': commands,
'duration_ms': duration_ms,
'pause_ms': pause_ms,
'easing': easing,
'clamp_easing': clamp_easing
}
def SERIES(label, commands, duration_ms=None, pause_ms=0,
easing=None, clamp_easing=None, duration_mode='total'):
"""
Creates a sequential movement step.
commands can be:
1. Flat serial command list:
[
ABS_J('base', 10),
ABS_J('base', 0)
]
2. Serial list of parallel groups:
[
[ABS_J('base', 10), ABS_J('shoulder', 5)],
[ABS_J('base', 0), ABS_J('shoulder', 0)]
]
duration_mode:
'total':
duration_ms is divided equally across serial groups.
'per_group':
each serial group receives the full duration_ms.
"""
return {
'type': 'series',
'label': label,
'commands': commands,
'duration_ms': duration_ms,
'pause_ms': pause_ms,
'easing': easing,
'clamp_easing': clamp_easing,
'duration_mode': duration_mode
}
# ============================================================
# COMMAND PLANNING
# ============================================================
def is_command_object(item):
"""
Returns True if item looks like a movement command.
"""
return type(item) == dict and 'joint' in item
def validate_parallel_commands(commands):
"""
Prevents commanding the same joint twice in one parallel group.
Example of invalid group:
[
ABS_J('base', 10),
REL_J('base', 5)
]
Use SERIES instead.
"""
seen = []
for command in commands:
joint_name = command['joint']
if joint_name in seen:
raise ValueError(
'Joint {} appears more than once in one parallel group.'.format(
joint_name
)
)
seen.append(joint_name)
def normalize_series_groups(commands):
"""
Converts flat serial command list into list of command groups.
Input:
[cmd1, cmd2, cmd3]
Output:
[[cmd1], [cmd2], [cmd3]]
"""
groups = []
for item in commands:
if is_command_object(item):
groups.append([item])
else:
groups.append(item)
return groups
def plan_commands(commands):
"""
Converts user commands into executable motor-coordinate plans.
Each plan contains:
- joint object
- start motor angle
- target motor angle
- start joint angle
- target joint angle
- delta values
- skip flag for tiny movement deadband
"""
validate_parallel_commands(commands)
plans = []
for command in commands:
joint = JOINTS[command['joint']]
mode = command['mode']
unit = command.get('unit', 'motor')
value = command['value']
start_motor = joint.motor_angle()
start_joint = joint.joint_angle()
if mode == 'abs':
if unit == 'motor':
target_motor = value
target_joint = joint.motor_to_joint(target_motor)
elif unit == 'joint':
target_joint = value
target_motor = joint.joint_to_motor(target_joint)
else:
raise ValueError('Unknown unit: {}'.format(unit))
elif mode == 'rel':
if unit == 'motor':
target_motor = start_motor + value
target_joint = joint.motor_to_joint(target_motor)
elif unit == 'joint':
target_joint = start_joint + value
target_motor = joint.joint_to_motor(target_joint)
else:
raise ValueError('Unknown unit: {}'.format(unit))
else:
raise ValueError('Unknown command mode: {}'.format(mode))
joint.check_joint_limit(target_joint)
delta_motor = target_motor - start_motor
delta_joint = target_joint - start_joint
skip_small_move = joint.is_small_motor_delta(delta_motor)
plans.append({
'joint': joint,
'start_motor': start_motor,
'target_motor': target_motor,
'start_joint': start_joint,
'target_joint': target_joint,
'delta_motor': delta_motor,
'delta_joint': delta_joint,
'skip_small_move': skip_small_move,
'last_sent_motor_target': None,
'command': command
})
return plans
def collect_motors(plans):
"""
Returns all physical motors used by a set of movement plans.
"""
motors = []
for plan in plans:
joint = plan['joint']
for motor in joint.all_motors():
motors.append(motor)
return motors
def active_plans_only(plans):
"""
Returns plans that are not skipped by the small-movement deadband.
"""
active = []
for plan in plans:
if not plan['skip_small_move']:
active.append(plan)
return active
# ============================================================
# DISPLAY AND LOGGING
# ============================================================
def display_step_header(step_num, label):
"""
Shows current step on EV3 screen and console.
"""
ev3.screen.clear()
ev3.screen.print('=== STEP {} ==='.format(step_num))
ev3.screen.print(label)
print('')
print('=' * 70)
print('STEP {}: {}'.format(step_num, label))
print('=' * 70)
def print_joint_states(prefix):
"""
Prints current motor and joint angles for all joints.
"""
print(prefix)
for name in JOINT_ORDER:
joint = JOINTS[name]
print(
' {}: motor={} deg, joint={} deg'.format(
name,
joint.motor_angle(),
joint.joint_angle()
)
)
def print_available_easings():
"""
Prints all supported easing names.
"""
print('Available easing styles:')
for name in EASING_NAMES:
print(' {}'.format(name))
def print_motion_plan(plans):
"""
Prints planned movement before execution.
"""
for plan in plans:
joint = plan['joint']
print(
' {}: motor {} -> {} deg, joint {} -> {} deg'.format(
joint.name,
plan['start_motor'],
plan['target_motor'],
plan['start_joint'],
plan['target_joint']
)
)
print(
' delta: motor={} deg, joint={} deg, skip_small_move={}'.format(
plan['delta_motor'],
plan['delta_joint'],
plan['skip_small_move']
)
)
def print_final_errors(plans):
"""
Prints final angle error after movement.
"""
print('Final errors:')
for plan in plans:
joint = plan['joint']
actual_motor = joint.motor_angle()
actual_joint = joint.joint_angle()
motor_error = plan['target_motor'] - actual_motor
joint_error = plan['target_joint'] - actual_joint
print(
' {}: motor_error={} deg, joint_error={} deg'.format(
joint.name,
motor_error,
joint_error
)
)
# ============================================================
# WAITING AND STALL DETECTION
# ============================================================
def wait_until_done(motors, timeout_ms):
"""
Waits until all motors report control.done().
Returns:
True if all motors completed movement.
False if timeout or stall occurred.
"""
timer = StopWatch()
while True:
all_done = True
for motor in motors:
if motor.control.stalled():
print('WARNING: motor stalled')
motor.brake()
return False
if not motor.control.done():
all_done = False
if all_done:
return True
if timeout_ms is not None and timer.time() > timeout_ms:
print('WARNING: timeout while waiting for motors')
for motor in motors:
motor.brake()
return False
wait(POLL_MS)
# ============================================================
# SPEED SAFETY
# ============================================================
def check_required_speed(plan, duration_ms):
"""
Checks whether required average speed exceeds recommended limit.
This does not measure peak speed. Easing profiles can have peak speeds
higher than average speed.
"""
joint = plan['joint']
if duration_ms <= 0:
raise ValueError('duration_ms must be greater than 0')
required_avg_speed = abs_value(plan['delta_motor']) * 1000 / duration_ms
max_speed = MAX_AVG_MOTOR_SPEED_BY_JOINT.get(joint.name)
if max_speed is None:
return True
if required_avg_speed > max_speed:
message = (
'WARNING: required average motor speed for {} is {} deg/s, '
'recommended max is {} deg/s'.format(
joint.name,
required_avg_speed,
max_speed
)
)
print(message)
if ABORT_IF_REQUIRED_SPEED_TOO_HIGH:
return False
return True
# ============================================================
# MOVEMENT EXECUTION
# ============================================================
def handle_skipped_small_moves(plans):
"""
Logs skipped tiny moves and optionally applies final stop mode.
"""
for plan in plans:
if plan['skip_small_move']:
joint = plan['joint']
print(
' SKIP tiny move for {}: delta_motor={} deg, deadband={} deg'.format(
joint.name,
plan['delta_motor'],
joint.deadband_motor_deg()
)
)
if APPLY_STOP_AFTER_SKIPPED_SMALL_MOVE:
joint.apply_stop_mode()
def move_parallel_target(commands, speed=DEFAULT_SPEED):
"""
Executes simultaneous target movement using run_target().
Used when duration_ms is None.
"""
plans = plan_commands(commands)
active_plans = active_plans_only(plans)
print_joint_states('Before target move:')
print_motion_plan(plans)
handle_skipped_small_moves(plans)
if len(active_plans) == 0:
print('No active movement after deadband filtering.')
print_joint_states('After skipped target move:')
return True
for plan in active_plans:
joint = plan['joint']
target_motor = plan['target_motor']
joint.run_motor_target(target_motor, speed)
motors = collect_motors(active_plans)
ok = wait_until_done(motors, 30000)
print_joint_states('After target move:')
print_final_errors(plans)
return ok
def maybe_send_track_target(plan, motor_target, force_send):
"""
Sends track_target() only when target changed enough, unless force_send=True.
This reduces small high-frequency target updates that can cause jitter.
"""
joint = plan['joint']
last_sent = plan['last_sent_motor_target']
if force_send:
joint.track_motor_target(motor_target)
plan['last_sent_motor_target'] = motor_target
return
if last_sent is None:
joint.track_motor_target(motor_target)
plan['last_sent_motor_target'] = motor_target
return
if TRACK_TARGET_MIN_UPDATE_MOTOR_DEG <= 0:
joint.track_motor_target(motor_target)
plan['last_sent_motor_target'] = motor_target
return
if abs_value(motor_target - last_sent) >= TRACK_TARGET_MIN_UPDATE_MOTOR_DEG:
joint.track_motor_target(motor_target)
plan['last_sent_motor_target'] = motor_target
def move_parallel_timed(commands, duration_ms, easing_name=None,
clamp_easing=None):
"""
Executes simultaneous timed/eased movement using track_target().
duration_ms:
Total intended duration of the movement.
easing_name:
Name of easing function. If None, DEFAULT_EASING is used.
clamp_easing:
Whether to clamp easing output to [0, 1].
"""
if duration_ms is None:
return move_parallel_target(commands, DEFAULT_SPEED)
if duration_ms <= 0:
raise ValueError('duration_ms must be greater than 0')
if clamp_easing is None:
clamp_easing = CLAMP_EASING_OUTPUT
easing_func = get_easing(easing_name)
plans = plan_commands(commands)
active_plans = active_plans_only(plans)
print_joint_states('Before timed profile:')
print('Timed profile duration: {} ms'.format(duration_ms))
print('Easing: {}'.format(easing_name if easing_name is not None else DEFAULT_EASING))
print('Clamp easing output: {}'.format(clamp_easing))
print_motion_plan(plans)
handle_skipped_small_moves(plans)
if len(active_plans) == 0:
print('No active movement after deadband filtering.')
print_joint_states('After skipped timed profile:')
return True
for plan in active_plans:
ok_speed = check_required_speed(plan, duration_ms)
if not ok_speed:
print('ERROR: required speed too high, movement aborted.')
return False
timer = StopWatch()
while timer.time() < duration_ms:
elapsed = timer.time()
linear_ratio = elapsed / duration_ms
linear_ratio = clamp(linear_ratio, 0, 1)
eased_ratio = easing_func(linear_ratio)
if clamp_easing:
eased_ratio = clamp(eased_ratio, 0, 1)
for plan in active_plans:
start_motor = plan['start_motor']
target_motor = plan['target_motor']
current_motor_target = start_motor + (target_motor - start_motor) * eased_ratio
maybe_send_track_target(
plan,
current_motor_target,
False
)
for plan in active_plans:
joint = plan['joint']
for motor in joint.all_motors():
if motor.control.stalled():
print('WARNING: motor stalled during timed profile')
motor.brake()
return False
wait(PROFILE_UPDATE_MS)
# Final exact target command.
for plan in active_plans:
maybe_send_track_target(
plan,
plan['target_motor'],
True
)
elapsed = timer.time()
if TIMED_FINAL_TARGET_DWELL_MS > 0:
wait(TIMED_FINAL_TARGET_DWELL_MS)
if SETTLE_AFTER_TIMED_MOVE:
for plan in active_plans:
joint = plan['joint']
target_motor = plan['target_motor']
joint.run_motor_target(target_motor, SETTLE_SPEED)
motors = collect_motors(active_plans)
ok = wait_until_done(motors, 10000)
if not ok:
return False
else:
for plan in active_plans:
joint = plan['joint']
joint.apply_stop_mode()
print('Timed profile requested: {} ms'.format(duration_ms))
print('Timed profile elapsed: {} ms'.format(elapsed))
print_joint_states('After timed profile:')
print_final_errors(plans)
return True
def execute_parallel_group(commands, duration_ms=None,
easing_name=None, clamp_easing=None):
"""
Executes one parallel group.
If duration_ms is None:
use target movement with DEFAULT_SPEED.
If duration_ms is set:
use timed/eased movement.
"""
if len(commands) == 0:
return True
if duration_ms is None:
return move_parallel_target(commands, DEFAULT_SPEED)
return move_parallel_timed(
commands,
duration_ms,
easing_name,
clamp_easing
)
def run_step(step_num, step):
"""
Executes one high-level step.
Step can be:
- PARALLEL(...)
- SERIES(...)
"""
display_step_header(step_num, step['label'])
sound_step_start()
step_type = step['type']
commands = step['commands']
duration_ms = step.get('duration_ms', None)
pause_ms = step.get('pause_ms', 0)
easing_name = step.get('easing', None)
clamp_easing = step.get('clamp_easing', None)
if len(commands) == 0:
print_joint_states('Current state:')
if pause_ms > 0:
wait(pause_ms)
sound_step_ok()
return True
if step_type == 'parallel':
ok = execute_parallel_group(
commands,
duration_ms,
easing_name,
clamp_easing
)
elif step_type == 'series':
groups = normalize_series_groups(commands)
duration_mode = step.get('duration_mode', 'total')
ok = True
if duration_ms is None:
group_duration_ms = None
for group in groups:
ok = execute_parallel_group(
group,
group_duration_ms,
easing_name,
clamp_easing
)
if not ok:
break
else:
if duration_mode == 'total':
group_duration_ms = int(duration_ms / len(groups))
if group_duration_ms <= 0:
raise ValueError('Series duration is too short for the number of groups')
for group in groups:
ok = execute_parallel_group(
group,
group_duration_ms,
easing_name,
clamp_easing
)
if not ok:
break
elif duration_mode == 'per_group':
group_duration_ms = duration_ms
for group in groups:
ok = execute_parallel_group(
group,
group_duration_ms,
easing_name,
clamp_easing
)
if not ok:
break
else:
raise ValueError('Unknown duration_mode: {}'.format(duration_mode))
else:
raise ValueError('Unknown step type: {}'.format(step_type))
if pause_ms > 0:
wait(pause_ms)
if ok:
sound_step_ok()
else:
sound_step_fail()
return ok
def reset_all_current_positions_as_zero():
"""
Makes current physical pose become logical 0 deg for all joints.
"""
for name in JOINT_ORDER:
JOINTS[name].reset_joint_angle(0)
def apply_stop_to_all_joints():
"""
Applies configured stop mode to all joints.
"""
for name in JOINT_ORDER:
JOINTS[name].apply_stop_mode()
# ============================================================
# TEST / RESEARCH MOVEMENT SEQUENCE
# ============================================================
TEST_STEPS = [
PARALLEL(
'Reference pose: current physical position is logical zero',
[],
pause_ms=1000
),
PARALLEL(
'Homing: all joints to zero, linear',
[
ABS_J('base', 0),
ABS_J('shoulder', 0),
ABS_J('elbow', 0),
],
duration_ms=3000,
easing='linear',
pause_ms=1000
),
SERIES(
'Base joint-angle test, easeInOutSine, total 6s',
[
ABS_J('base', 15),
ABS_J('base', -15),
ABS_J('base', 0),
],
duration_ms=6000,
duration_mode='total',
easing='easeInOutSine',
pause_ms=1000
),
SERIES(
'Shoulder joint-angle test, easeInOutQuad, total 6s',
[
ABS_J('shoulder', 8),
ABS_J('shoulder', -8),
ABS_J('shoulder', 0),
],
duration_ms=6000,
duration_mode='total',
easing='easeInOutQuad',
pause_ms=1000
),
SERIES(
'Elbow joint-angle test, easeInOutCubic, total 6s',
[
ABS_J('elbow', 15),
ABS_J('elbow', -15),
ABS_J('elbow', 0),
],
duration_ms=6000,
duration_mode='total',
easing='easeInOutCubic',
pause_ms=1000
),
PARALLEL(
'Base + shoulder synchronized, easeInOutSine',
[
ABS_J('base', 12),
ABS_J('shoulder', 8),
],
duration_ms=3000,
easing='easeInOutSine',
pause_ms=1000
),
PARALLEL(
'Base + shoulder + elbow synchronized, easeInOutQuad',
[
ABS_J('base', -12),
ABS_J('shoulder', -8),
ABS_J('elbow', 12),
],
duration_ms=3000,
easing='easeInOutQuad',
pause_ms=1000
),
SERIES(
'Mixed serial groups, total 9s, easeInOutSine',
[
[
ABS_J('base', 10),
ABS_J('shoulder', 5),
],
[
ABS_J('base', -10),
ABS_J('elbow', 10),
],
[
ABS_J('base', 0),
ABS_J('shoulder', 0),
ABS_J('elbow', 0),
],
],
duration_ms=9000,
duration_mode='total',
easing='easeInOutSine',
pause_ms=1000
),
SERIES(
'Relative joint movement, easeOutQuad, total 6s',
[
REL_J('base', 10),
REL_J('base', 10),
REL_J('base', -20),
],
duration_ms=6000,
duration_mode='total',
easing='easeOutQuad',
pause_ms=1000
),
PARALLEL(
'Small bounce-style test, clamped',
[
ABS_J('base', 8),
ABS_J('elbow', 8),
],
duration_ms=3000,
easing='easeOutBounce',
clamp_easing=True,
pause_ms=1000
),
PARALLEL(
'Return all joints to zero, easeInOutSine',
[
ABS_J('base', 0),
ABS_J('shoulder', 0),
ABS_J('elbow', 0),
],
duration_ms=3000,
easing='easeInOutSine',
pause_ms=1000
),
# Example raw motor-angle calibration step.
# Uncomment only if needed.
#
# PARALLEL(
# 'Raw motor-angle calibration test',
# [
# ABS_M('base', 90), # base joint = 90 / 7.5 = 12 deg
# ABS_M('shoulder', 90), # shoulder joint = 90 / 7.5 = 12 deg
# ABS_M('elbow', 90), # elbow joint = 90 / 5.0 = 18 deg
# ],
# duration_ms=3000,
# easing='linear',
# pause_ms=1000
# ),
]
# ============================================================
# MAIN PROGRAM
# ============================================================
ev3.screen.clear()
ev3.screen.print('=== MASTER BRICK ===')
ev3.screen.print('Motion test ready')
print('')
print('#' * 70)
print('# ROBOTIC ARM MOTION TEST START')
print('#' * 70)
print('')
print_available_easings()
print('')
print('Gear ratios:')
print(' base: {}'.format(GEAR_RATIOS['base']))
print(' shoulder: {}'.format(GEAR_RATIOS['shoulder']))
print(' elbow: {}'.format(GEAR_RATIOS['elbow']))
print('')
print('Stop modes:')
print(' base: {}'.format(STOP_MODE_BY_JOINT['base']))
print(' shoulder: {}'.format(STOP_MODE_BY_JOINT['shoulder']))
print(' elbow: {}'.format(STOP_MODE_BY_JOINT['elbow']))
print('')
print('Jitter mitigation:')
print(' deadband motor deg by joint: {}'.format(DEADBAND_MOTOR_DEG_BY_JOINT))
print(' track target min update motor deg: {}'.format(TRACK_TARGET_MIN_UPDATE_MOTOR_DEG))
print(' apply motor control settings: {}'.format(APPLY_MOTOR_CONTROL_SETTINGS))
print('')
print('Default easing: {}'.format(DEFAULT_EASING))
print('Clamp easing output: {}'.format(CLAMP_EASING_OUTPUT))
print('Settle after timed move: {}'.format(SETTLE_AFTER_TIMED_MOVE))
print('Timed final target dwell: {} ms'.format(TIMED_FINAL_TARGET_DWELL_MS))
# Apply gentler control settings before movement.
configure_all_motor_controls()
# Current physical pose becomes logical joint angle 0.
# Place the arm in a known safe starting pose before running the program.
reset_all_current_positions_as_zero()
print_joint_states('Initial logical zero state:')
for i in range(len(TEST_STEPS)):
ok = run_step(i, TEST_STEPS[i])
if not ok:
print('ERROR: step {} failed or stalled'.format(i))
sound_step_fail()
break
apply_stop_to_all_joints()
ev3.screen.clear()
ev3.screen.print('=== MASTER BRICK ===')
ev3.screen.print('ALL TESTS COMPLETE')
sound_all_complete()
print('')
print('#' * 70)
print('# ALL TESTS COMPLETE')
print('#' * 70)
print('')