class_name RhythmConductor extends AudioStreamPlayer @export var bpm: float = 120.0 @export var measures: int = 4 @export var beat_offset: float = 0.0 @export var perfect_window: float = 0.060 @export var good_window: float = 0.120 @export var bad_window: float = 0.200 @export var starts_on_ready := true var song_position := 0.0 var song_position_in_beats := 0 var sec_per_beat := 0.5 var last_reported_beat := -1 var current_measure := 1 var running := false var _start_time_usec := 0 var _uses_fallback_clock := false signal beat(position: int) signal measure(position: int) signal action_judged(action_name: String, rating: Dictionary) func _ready() -> void: add_to_group("rhythm_conductor") sec_per_beat = 60.0 / bpm if is_inside_tree(): _event_bus().connect("rhythm_action_requested", _on_rhythm_action_requested) if starts_on_ready: start() func _physics_process(_delta: float) -> void: if not running: return song_position = get_current_song_position() var adjusted_position: float = _apply_beat_offset(song_position) song_position_in_beats = int(floor(adjusted_position / sec_per_beat)) _report_beat() func start() -> void: running = true _uses_fallback_clock = stream == null _start_time_usec = Time.get_ticks_usec() song_position = 0.0 song_position_in_beats = 0 last_reported_beat = -1 current_measure = 1 if not _uses_fallback_clock: play() func stop_conductor() -> void: if playing: stop() running = false func get_current_song_position() -> float: if running and playing: var current_position: float = get_playback_position() + AudioServer.get_time_since_last_mix() current_position -= AudioServer.get_output_latency() return maxf(0.0, current_position) if running: return float(Time.get_ticks_usec() - _start_time_usec) / 1000000.0 return song_position func judge_action(action_name: String) -> Dictionary: var rating := get_current_rating() rating["action"] = action_name emit_signal("action_judged", action_name, rating) var bus := _event_bus() bus.emit_signal("judgement_made", StringName(str(rating.get("label", "miss"))), float(rating.get("diff", INF)) * 1000.0) bus.emit_signal("action_judged", StringName(action_name), rating) return rating func get_current_rating() -> Dictionary: return get_rating_for_time(get_current_song_position()) func get_rating_for_time(time_seconds: float) -> Dictionary: var adjusted_time: float = _apply_beat_offset(time_seconds) if adjusted_time < 0.0: return _rating_result("miss", Color("ff0055"), 0, 0.0, INF, INF) var nearest_beat: int = int(round(adjusted_time / sec_per_beat)) var nearest_beat_time: float = nearest_beat * sec_per_beat var diff: float = adjusted_time - nearest_beat_time var abs_diff: float = absf(diff) if abs_diff <= perfect_window: return _rating_result("perfect", Color("00f2ff"), nearest_beat, nearest_beat_time, diff, abs_diff) if abs_diff <= good_window: return _rating_result("good", Color("ffffff"), nearest_beat, nearest_beat_time, diff, abs_diff) if abs_diff <= bad_window: return _rating_result("bad", Color("ffaa00"), nearest_beat, nearest_beat_time, diff, abs_diff) return _rating_result("miss", Color("ff0055"), nearest_beat, nearest_beat_time, diff, abs_diff) func get_current_beat_progress() -> float: return get_beat_progress_for_time(get_current_song_position()) func get_beat_progress_for_time(time_seconds: float) -> float: var adjusted_time: float = _apply_beat_offset(time_seconds) if adjusted_time < 0.0: return 0.0 return fposmod(adjusted_time, sec_per_beat) / sec_per_beat func get_time_to_nearest_beat(time_seconds: float) -> float: var adjusted_time: float = _apply_beat_offset(time_seconds) if adjusted_time < 0.0: return INF var nearest_beat_time: float = round(adjusted_time / sec_per_beat) * sec_per_beat return adjusted_time - nearest_beat_time func _report_beat() -> void: if last_reported_beat < song_position_in_beats: if current_measure > measures: current_measure = 1 emit_signal("beat", song_position_in_beats) emit_signal("measure", current_measure) _event_bus().emit_signal("beat_ticked", song_position_in_beats) last_reported_beat = song_position_in_beats current_measure += 1 func _on_rhythm_action_requested(action_name: StringName) -> void: judge_action(str(action_name)) func _event_bus() -> Node: var root := get_tree().root var bus := root.get_node_or_null("EventBus") if bus == null: bus = load("res://autoload/event_bus.gd").new() bus.name = "EventBus" root.add_child(bus) return bus func _apply_beat_offset(time_seconds: float) -> float: return time_seconds + beat_offset func _rating_result(label: String, color: Color, nearest_beat: int, nearest_beat_time: float, diff: float, abs_diff: float) -> Dictionary: return { "label": label, "color": color, "nearest_beat": nearest_beat, "nearest_beat_time": nearest_beat_time, "diff": diff, "abs_diff": abs_diff, } func _format_signed_ms(seconds: float) -> String: if is_inf(seconds): return "--" return "%+.0f" % (seconds * 1000.0)