diff --git a/src/llm-coding-tools-core/src/tools/bash/tokio_impl.rs b/src/llm-coding-tools-core/src/tools/bash/tokio_impl.rs index 1def5aaf..557cdbb5 100644 --- a/src/llm-coding-tools-core/src/tools/bash/tokio_impl.rs +++ b/src/llm-coding-tools-core/src/tools/bash/tokio_impl.rs @@ -2,11 +2,111 @@ use super::{BashOutput, PIPE_BUFFER_CAPACITY}; use crate::error::{ToolError, ToolResult}; +use core::fmt::Write; +use parking_lot::Mutex; use process_wrap::tokio::*; use std::path::Path; +use std::pin::Pin; use std::process::Stdio; +use std::sync::Arc; use std::time::Duration; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio::task::JoinHandle; + +/// Maximum time to wait for pipe drains after timeout kill. +const PIPE_DRAIN_GRACE_PERIOD: Duration = Duration::from_millis(100); +/// Read chunk size for async pipe draining. +const PIPE_DRAIN_READ_CHUNK: usize = 8 * 1024; + +type SharedPipeBuffer = Arc>>; + +struct PipeDrainTask { + handle: JoinHandle<()>, + buffer: SharedPipeBuffer, +} + +#[inline] +fn spawn_pipe_drain_task(mut pipe: R) -> PipeDrainTask +where + R: AsyncRead + Unpin + Send + 'static, +{ + let buffer: SharedPipeBuffer = Arc::new(Mutex::new(Vec::with_capacity(PIPE_BUFFER_CAPACITY))); + let task_buffer = Arc::clone(&buffer); + + let handle = tokio::spawn(async move { + let mut chunk = [0_u8; PIPE_DRAIN_READ_CHUNK]; + loop { + match pipe.read(&mut chunk).await { + Ok(0) => break, + Ok(read) => task_buffer.lock().extend_from_slice(&chunk[..read]), + Err(_) => break, + } + } + }); + + PipeDrainTask { handle, buffer } +} + +#[inline] +fn take_pipe_buffer(buffer: SharedPipeBuffer) -> Vec { + match Arc::try_unwrap(buffer) { + Ok(mutex) => mutex.into_inner(), + Err(shared) => shared.lock().clone(), + } +} + +#[inline] +async fn await_pipe_drain_task(task: PipeDrainTask) -> Vec { + let PipeDrainTask { handle, buffer } = task; + let _ = handle.await; + take_pipe_buffer(buffer) +} + +#[inline] +async fn await_pipe_drain_task_with_grace(task: PipeDrainTask, grace: Duration) -> Vec { + let PipeDrainTask { mut handle, buffer } = task; + + tokio::select! { + _ = &mut handle => {}, + _ = tokio::time::sleep(grace) => { + // Preserve strict timeout semantics while retaining buffered bytes. + // Buffer state is shared outside the task so abort cannot discard it. + handle.abort(); + let _ = handle.await; + } + } + + take_pipe_buffer(buffer) +} + +#[inline] +fn timeout_with_buffered_output( + timeout: Duration, + stdout_data: &[u8], + stderr_data: &[u8], +) -> ToolError { + let stdout = String::from_utf8_lossy(stdout_data); + let stderr = String::from_utf8_lossy(stderr_data); + + // Base message + outputs + stderr label. + let mut message = String::with_capacity(stdout.len() + stderr.len() + 64); + let _ = write!(message, "command timed out after {}ms", timeout.as_millis()); + + if !stdout.is_empty() { + message.push('\n'); + message.push_str(&stdout); + } + + if !stderr.is_empty() { + if stdout.is_empty() || !stdout.ends_with('\n') { + message.push('\n'); + } + message.push_str("[stderr]\n"); + message.push_str(&stderr); + } + + ToolError::Timeout(message) +} /// Executes a shell command with optional working directory and timeout. /// @@ -67,40 +167,29 @@ pub async fn execute_command( // Take stdout/stderr handles to drain them concurrently with process wait. // This prevents deadlock when output exceeds pipe buffer (~64KB Linux, ~4KB Windows). - let mut stdout_pipe = child.stdout().take().expect("stdout was piped"); - let mut stderr_pipe = child.stderr().take().expect("stderr was piped"); + let stdout_pipe = child.stdout().take().expect("stdout was piped"); + let stderr_pipe = child.stderr().take().expect("stderr was piped"); - // Race between timeout and (process completion + pipe draining). - // Using join! inside select! avoids tokio::spawn overhead while still - // providing concurrent I/O for the pipe reads. - tokio::select! { + // Keep output drains independent from timeout selection so timed-out + // commands can still return buffered stdout/stderr. + let stdout_task = spawn_pipe_drain_task(stdout_pipe); + let stderr_task = spawn_pipe_drain_task(stderr_pipe); + + // Race between timeout and process completion. Pipe drain tasks keep running + // regardless of which branch wins this select. + let wait_result = tokio::select! { biased; // Check timeout first for consistent behavior - _ = tokio::time::sleep(timeout) => { - // Timeout: explicitly kill the process tree (Job Object on Windows, process group on Unix) - let _ = Pin::from(child.kill()).await; - Err(ToolError::Timeout(format!( - "command timed out after {}ms", - timeout.as_millis() - ))) - } + _ = tokio::time::sleep(timeout) => None, + status = child.wait() => Some(status), + }; - result = async { - tokio::join!( - child.wait(), - async { - let mut buf = Vec::with_capacity(PIPE_BUFFER_CAPACITY); - let _ = stdout_pipe.read_to_end(&mut buf).await; - buf - }, - async { - let mut buf = Vec::with_capacity(PIPE_BUFFER_CAPACITY); - let _ = stderr_pipe.read_to_end(&mut buf).await; - buf - } - ) - } => { - let (status, stdout_data, stderr_data) = result; + match wait_result { + Some(status) => { + let (stdout_data, stderr_data) = tokio::join!( + await_pipe_drain_task(stdout_task), + await_pipe_drain_task(stderr_task) + ); let status = status.map_err(|e| ToolError::Execution(e.to_string()))?; Ok(BashOutput { @@ -109,6 +198,22 @@ pub async fn execute_command( stderr: String::from_utf8_lossy(&stderr_data).into_owned(), }) } + None => { + // Timeout: explicitly kill the process tree (Job Object on Windows, + // process group on Unix), then briefly await pipe drains for buffered output. + let _ = Pin::from(child.kill()).await; + + let (stdout_data, stderr_data) = tokio::join!( + await_pipe_drain_task_with_grace(stdout_task, PIPE_DRAIN_GRACE_PERIOD), + await_pipe_drain_task_with_grace(stderr_task, PIPE_DRAIN_GRACE_PERIOD) + ); + + Err(timeout_with_buffered_output( + timeout, + &stdout_data, + &stderr_data, + )) + } } } @@ -157,6 +262,50 @@ mod tests { assert!(matches!(result, Err(ToolError::Timeout(_)))); } + #[tokio::test] + async fn timeout_preserves_buffered_output() { + let cmd = if cfg!(target_os = "windows") { + "echo stdout-before-timeout & echo stderr-before-timeout 1>&2 & ping -n 10 127.0.0.1 >nul" + } else { + "echo stdout-before-timeout; echo stderr-before-timeout 1>&2; sleep 10" + }; + + let result = execute_command(cmd, None, Duration::from_millis(500)).await; + match result { + Err(ToolError::Timeout(message)) => { + assert!(message.contains("stdout-before-timeout")); + assert!(message.contains("stderr-before-timeout")); + } + other => panic!("expected timeout error, got: {other:?}"), + } + } + + #[tokio::test] + async fn grace_abort_retains_shared_pipe_buffer() { + use tokio::sync::oneshot; + + let buffer: SharedPipeBuffer = Arc::new(Mutex::new(Vec::with_capacity(32))); + let task_buffer = Arc::clone(&buffer); + let (written_tx, written_rx) = oneshot::channel(); + let (_block_tx, block_rx) = oneshot::channel::<()>(); + + let handle = tokio::spawn(async move { + task_buffer.lock().extend_from_slice(b"partial-output"); + let _ = written_tx.send(()); + let _ = block_rx.await; // block infinitely, task will be cancelled by grace period timeout + }); + + written_rx + .await + .expect("drain task should write buffered output before abort"); + + let data = + await_pipe_drain_task_with_grace(PipeDrainTask { handle, buffer }, Duration::ZERO) + .await; + + assert_eq!(data, b"partial-output"); + } + #[tokio::test] async fn invalid_workdir_returns_error() { let result = execute_command(