keos/thread/
mod.rs

1//! Thread abstration, an abstraction of a cpu core.
2//!
3//! ## The threading model
4//!
5//! An executing kernel consists of a collection of threads,
6//! each with their own stack and local state. Threads can be named, and
7//! provide some built-in support for low-level synchronization.
8pub mod scheduler;
9
10use crate::{KernelError, spinlock::SpinLock, task::Task};
11use abyss::{
12    addressing::{Kva, Pa},
13    dev::x86_64::apic::{IPIDest, Mode},
14    interrupt::InterruptGuard,
15    x86_64::intrinsics::cpuid,
16};
17use alloc::{boxed::Box, collections::btree_map::BTreeMap, string::String, sync::Arc};
18use core::{
19    arch::{asm, naked_asm},
20    panic::Location,
21    sync::atomic::{AtomicI32, AtomicU64, Ordering},
22};
23
24/// Size of each thread's stack.
25pub const STACK_SIZE: usize = 0x100000;
26/// Thread magic to detect stack overflow.
27pub const THREAD_MAGIC: usize = 0xdeadbeefcafebabe;
28
29/// The Thread stack.
30///
31/// DO NOT MODIFY THIS STRUCT.
32#[repr(C, align(0x100000))]
33#[doc(hidden)]
34pub(crate) struct ThreadStack {
35    pub(crate) thread: *mut Thread,
36    pub(crate) magic: usize,
37    /// Padding to fill up to [`STACK_SIZE`]
38    pub(crate) _pad:
39        [u8; STACK_SIZE - core::mem::size_of::<*mut Thread>() - core::mem::size_of::<usize>()],
40    /// Marker of address of usable stack.
41    pub(crate) _usable_marker: [u8; 0],
42    /// Pinned.
43    _pin: core::marker::PhantomPinned,
44}
45
46/// A possible state of the thread.
47#[derive(Clone, Copy, Eq, PartialEq, Debug)]
48pub enum ThreadState {
49    /// Thread is runnable.
50    Runnable,
51    /// Thread is running.
52    Running,
53    /// Thread is exited with exitcode.
54    Exited(i32),
55    /// Thread is idle.
56    Idle,
57    /// Thread is parked.
58    Parked,
59}
60
61pub(crate) struct TtyState {
62    input: &'static [u8],
63    idx: usize,
64    output: String,
65}
66
67impl crate::teletype::Teletype for TtyState {
68    fn write(&mut self, data: &[u8]) -> Result<usize, KernelError> {
69        if let Ok(s) = String::from_utf8(data.to_vec()) {
70            self.output.push_str(&s);
71            Ok(data.len())
72        } else {
73            Err(KernelError::InvalidArgument)
74        }
75    }
76
77    fn read(&mut self, data: &mut [u8]) -> Result<usize, KernelError> {
78        let read_bytes = self.input.len().wrapping_sub(self.idx).min(data.len());
79        data[..read_bytes].copy_from_slice(&self.input[self.idx..self.idx + read_bytes]);
80        self.idx += read_bytes;
81        Ok(read_bytes)
82    }
83}
84
85fn load_pt(pa: Pa) {
86    unsafe { abyss::x86_64::Cr3(pa.into_usize() as u64).apply() }
87}
88
89static EXIT_CODE_TABLE: SpinLock<BTreeMap<u64, Arc<AtomicU64>>> = SpinLock::new(BTreeMap::new());
90static THREAD_STATE_TABLE: SpinLock<BTreeMap<u64, Arc<SpinLock<ThreadState>>>> =
91    SpinLock::new(BTreeMap::new());
92
93#[unsafe(no_mangle)]
94#[doc(hidden)]
95pub fn kill_current_thread() -> ! {
96    unsafe {
97        __do_exit(-1);
98    }
99}
100
101#[unsafe(no_mangle)]
102#[doc(hidden)]
103pub unsafe fn __do_exit(exit_code: i32) -> ! {
104    let _ = abyss::interrupt::InterruptGuard::new();
105    with_current(|th| {
106        let mut et = EXIT_CODE_TABLE.lock();
107        et.remove(&th.tid);
108        et.unlock();
109
110        let mut tst = THREAD_STATE_TABLE.lock();
111        tst.remove(&th.tid);
112        tst.unlock();
113
114        th.exit_status
115            .store(0x8000_0000_0000_0000 | (exit_code as u64), Ordering::SeqCst);
116        let mut state = th.state.lock();
117        *state = ThreadState::Exited(exit_code);
118        state.unlock();
119        scheduler::scheduler().reschedule();
120    });
121    unreachable!()
122}
123
124/// Check signal for current process and perform an exit when signaled.
125pub(crate) fn __check_for_signal() {
126    let _ = __with_current(|th| {
127        let exit_status = th.exit_status.load(Ordering::SeqCst);
128        if (exit_status & 0x4000_0000_0000_0000) == 0x4000_0000_0000_0000 {
129            unsafe {
130                __do_exit(exit_status as i32);
131            }
132        }
133    });
134}
135
136/// Kill the thread by specified TID (Thread ID).
137pub fn kill_by_tid(tid: u64, exit_code: i32) -> Result<(), KernelError> {
138    let et = EXIT_CODE_TABLE.lock();
139    let Some(exit_status) = et.get(&tid) else {
140        et.unlock();
141        return Err(KernelError::InvalidArgument);
142    };
143    let exit_status = exit_status.clone();
144    et.unlock();
145
146    debug_assert_eq!(
147        exit_status.load(Ordering::SeqCst) & 0x4000_0000_FFFF_FFFF,
148        0
149    );
150
151    exit_status.store(0x4000_0000_0000_0000 | exit_code as u64, Ordering::SeqCst);
152
153    unsafe {
154        abyss::dev::x86_64::apic::send_ipi(IPIDest::AllExcludingSelf, Mode::Fixed(0x7f));
155    }
156
157    Ok(())
158}
159
160/// Get specified thread's [`ThreadState`] by TID (Thread ID).
161pub fn get_state_by_tid(tid: u64) -> Result<ThreadState, KernelError> {
162    let tst = THREAD_STATE_TABLE.lock();
163
164    let Some(state) = tst.get(&tid) else {
165        tst.unlock();
166        return Err(KernelError::InvalidArgument);
167    };
168
169    let ts_lock = state.lock();
170    let result = *ts_lock;
171
172    ts_lock.unlock();
173    tst.unlock();
174
175    Ok(result)
176}
177
178#[repr(C)]
179/// An thread abstraction.
180pub struct Thread {
181    /// A stack pointer on context switch.
182    ///
183    /// ## WARNING
184    /// DO NOT CHANGE THE OFFSET THIS FIELDS.
185    /// This offset used in context switch with hard-coded value.
186    /// You must add your own members **BELOWS** this sp field.
187    pub(crate) sp: usize,
188    /// Thread Stack
189    pub(crate) stack: Box<ThreadStack>,
190    /// Thread id
191    pub tid: u64,
192    /// Thread name
193    pub name: String,
194    /// State of the thread.
195    pub state: Arc<SpinLock<ThreadState>>,
196    pub(crate) running_cpu: Arc<AtomicI32>,
197    /// Mixture of exit state (63th and 62th bit) and exit code (lower 32 bits).
198    pub exit_status: Arc<AtomicU64>,
199    /// Interrupt Frame if thread was handling interrupt.
200    pub interrupt_frame: SpinLock<*const abyss::interrupt::Registers>,
201    #[doc(hidden)]
202    pub task: Option<Box<dyn Task>>,
203    // Grading utils.
204    pub(crate) tty_hook: SpinLock<Option<Arc<SpinLock<TtyState>>>>,
205    pub(crate) allocations: SpinLock<Option<BTreeMap<Kva, &'static Location<'static>>>>,
206}
207
208impl Thread {
209    #[doc(hidden)]
210    pub fn new<I>(name: I) -> Box<Self>
211    where
212        alloc::string::String: core::convert::From<I>,
213    {
214        static TID: AtomicU64 = AtomicU64::new(0);
215        let tid = TID.fetch_add(1, Ordering::SeqCst);
216        let mut stack: Box<ThreadStack> = unsafe { Box::new_uninit().assume_init() };
217        stack.magic = THREAD_MAGIC;
218
219        let exit_status = Arc::new(AtomicU64::new(0));
220        let mut et = EXIT_CODE_TABLE.lock();
221        et.insert(tid, exit_status.clone());
222        et.unlock();
223
224        let state = Arc::new(SpinLock::new(ThreadState::Runnable));
225        let mut tst = THREAD_STATE_TABLE.lock();
226        tst.insert(tid, state.clone());
227        tst.unlock();
228
229        Box::new(Self {
230            sp: 0,
231            stack,
232            tid,
233            name: String::from(name),
234            state,
235            exit_status,
236            interrupt_frame: SpinLock::new(core::ptr::null()),
237            running_cpu: Arc::new(AtomicI32::new(-1)),
238            task: None,
239            tty_hook: SpinLock::new(
240                __with_current(|th| {
241                    let guard = th.tty_hook.lock();
242                    let val = guard.as_ref().map(|n| n.clone());
243                    guard.unlock();
244                    val
245                })
246                .unwrap_or(None),
247            ),
248            allocations: SpinLock::new(None),
249        })
250    }
251
252    #[doc(hidden)]
253    pub fn track_alloc(&self) {
254        let mut guard = self.allocations.lock();
255        *guard = Some(BTreeMap::new());
256        guard.unlock();
257    }
258
259    #[doc(hidden)]
260    pub fn validate_alloc(&self) {
261        let guard = self.allocations.lock();
262        if let Some(allocs) = guard.as_ref()
263            && !allocs.is_empty()
264        {
265            struct DebugAlloc<'a>(&'a BTreeMap<Kva, &'static Location<'static>>);
266            impl core::fmt::Debug for DebugAlloc<'_> {
267                fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
268                    writeln!(f, "List of unallocated pages:")?;
269                    for (idx, (va, loc)) in self.0.iter().take(10).enumerate() {
270                        writeln!(f, "  {idx}: {va:?}, allocated at {loc}")?;
271                    }
272                    if self.0.len() > 10 {
273                        write!(f, "... and {:?} more allocations.", self.0.len() - 10)?;
274                    }
275                    Ok(())
276                }
277            }
278            panic!(
279                "Grader: Validating `Page` allocation state failed: Detecting {} non-freed pages.\n{:?}",
280                allocs.len(),
281                DebugAlloc(allocs)
282            );
283        }
284        guard.unlock();
285    }
286
287    pub(crate) unsafe fn do_run(&mut self) {
288        unsafe {
289            let _p = abyss::interrupt::InterruptGuard::new();
290            if with_current(|current| current as *const _ as usize != self as *const _ as usize) {
291                let next_sp = self.sp;
292                let current_sp = with_current(|th| {
293                    while self.running_cpu.load(Ordering::SeqCst) != -1 {
294                        core::hint::spin_loop()
295                    }
296                    &mut th.sp as *mut usize
297                });
298                assert_eq!(
299                    abyss::interrupt::InterruptState::current(),
300                    abyss::interrupt::InterruptState::Off
301                );
302                context_switch_trampoline(current_sp, next_sp)
303            }
304        }
305    }
306
307    pub(crate) fn run(self: Box<Self>) {
308        unsafe { Box::into_raw(self).as_mut().unwrap().do_run() }
309    }
310
311    /// Pin current thread not to be scheduled by blocking interrupt.
312    ///
313    /// When [`ThreadPinGuard`] is dropped, the current thread is unpinned.
314    /// When you hold multiple [`ThreadPinGuard`], you **MUST** drops
315    /// [`ThreadPinGuard`] as a reverse order of creation.
316    pub fn pin() -> ThreadPinGuard {
317        ThreadPinGuard::new()
318    }
319
320    #[doc(hidden)]
321    pub fn hook_stdin(&self, b: &'static [u8]) {
322        let mut guard = self.tty_hook.lock();
323        if guard.is_some() {
324            panic!("Fail to hook stdin: already hook.");
325        } else {
326            *guard = Some(Arc::new(SpinLock::new(TtyState {
327                input: b,
328                idx: 0,
329                output: String::new(),
330            })));
331            guard.unlock();
332        }
333    }
334
335    #[doc(hidden)]
336    pub fn finish_hook(&self) -> Option<String> {
337        let mut guard = self.tty_hook.lock();
338        let val = guard.take().map(|n| {
339            let guard = n.lock();
340            let val = alloc::borrow::ToOwned::to_owned(&(*guard.output));
341            guard.unlock();
342            val
343        });
344        guard.unlock();
345        val
346    }
347}
348
349/// A RAII implementation of the thread pinning.
350pub type ThreadPinGuard = InterruptGuard;
351
352/// A handle to join thread.
353pub struct JoinHandle
354where
355    Self: 'static,
356{
357    /// Thread id of this handle.
358    pub tid: u64,
359    exit_status: Arc<AtomicU64>,
360    running_cpu: Arc<AtomicI32>,
361}
362
363impl JoinHandle {
364    /// Make a join handle for Thread `th`.
365    pub fn new_for(th: &Thread) -> Self {
366        Self {
367            tid: th.tid,
368            exit_status: th.exit_status.clone(),
369            running_cpu: th.running_cpu.clone(),
370        }
371    }
372
373    /// Join this handle and returns exit code.
374    pub fn join(self) -> i32 {
375        loop {
376            let v = self.exit_status.load(Ordering::SeqCst);
377            if v >= 0x8000_0000_0000_0000 {
378                return v as i32;
379            }
380            crate::scheduler().reschedule();
381        }
382    }
383
384    /// Get scheudled cpu id of the underlying thread.
385    ///
386    /// If the thread is not runnig, returns None.
387    pub fn try_get_running_cpu(&self) -> Option<usize> {
388        match self.running_cpu.load(Ordering::SeqCst) {
389            v if v < 0 => None,
390            v => Some(v as usize),
391        }
392    }
393}
394
395unsafe impl Send for JoinHandle {}
396unsafe impl Sync for JoinHandle {}
397
398/// A handle that represent the parked thread.
399pub struct ParkHandle {
400    pub(crate) th: Box<Thread>,
401}
402
403impl ParkHandle {
404    pub(crate) fn new_for(th: Box<Thread>) -> Self {
405        Self { th }
406    }
407
408    /// Consume the handle and unpark the underlying thread.
409    pub fn unpark(self) {
410        // Wait until context switch is finished.
411        while self.th.running_cpu.load(Ordering::SeqCst) != -1 {
412            core::hint::spin_loop()
413        }
414        let mut state = self.th.state.lock();
415        *state = ThreadState::Runnable;
416        state.unlock();
417
418        scheduler::scheduler().push_to_queue(self.th);
419    }
420}
421
422unsafe impl Send for ParkHandle {}
423unsafe impl Sync for ParkHandle {}
424
425// Context switch related codes.
426
427/// The context-switch magic.
428#[unsafe(naked)]
429unsafe extern "C" fn context_switch_trampoline(_current_sp: *mut usize, _next_sp: usize) {
430    // XXX: we don't need to rflags because when threads entered this function the
431    // rflags state is always same. RDI: Current Stack pointer storage.
432    // RSI: Next Stack pointer.
433    naked_asm!(
434        "push rbp",
435        "push rbx",
436        "push r12",
437        "push r13",
438        "push r14",
439        "push r15",
440
441        // Switch.
442        "mov r8, rsp",
443        "mov [rdi], r8",
444        "mov rsp, rsi",
445
446        "pop r15",
447        "pop r14",
448        "pop r13",
449        "pop r12",
450        "pop rbx",
451        "pop rbp",
452
453        // XXX: Tail-call optimization, pass prev thread to rdi
454        "jmp {}",
455        sym finish_context_switch
456    );
457}
458
459unsafe extern "C" fn finish_context_switch(prev: &'static mut Thread) {
460    unsafe {
461        assert_eq!(
462            abyss::interrupt::InterruptState::current(),
463            abyss::interrupt::InterruptState::Off
464        );
465
466        let prev_state = {
467            let lock = prev.state.lock();
468            let result = *lock;
469            lock.unlock();
470            result
471        };
472
473        let mut prev_interrupt_frame = prev.interrupt_frame.lock();
474        *prev_interrupt_frame = abyss::x86_64::kernel_gs::current().interrupt_frame;
475        prev_interrupt_frame.unlock();
476
477        let _dropped = match prev_state {
478            ThreadState::Exited(_e) => Some(Box::from_raw(prev)),
479            ThreadState::Idle => None,
480            ThreadState::Running => {
481                let mut prev_state = prev.state.lock();
482                *prev_state = ThreadState::Runnable;
483                prev_state.unlock();
484
485                let th = Box::from_raw(prev);
486                scheduler::scheduler().push_to_queue(th);
487                None
488            }
489            ThreadState::Parked => None,
490            ThreadState::Runnable => unreachable!("{:?} {:?}", prev as *const _, prev.name),
491        };
492        with_current(|th| {
493            let mut state = th.state.lock();
494            if *state != ThreadState::Idle {
495                *state = ThreadState::Running
496            }
497            state.unlock();
498
499            __check_for_signal();
500
501            let interrupt_frame = th.interrupt_frame.lock();
502            abyss::x86_64::kernel_gs::current().interrupt_frame = *interrupt_frame;
503            interrupt_frame.unlock();
504
505            abyss::x86_64::segmentation::SegmentTable::update_tss(
506                th.stack.as_mut() as *mut _ as usize + STACK_SIZE,
507            );
508            th.running_cpu.store(cpuid() as i32, Ordering::SeqCst);
509
510            if let Some(task) = th.task.as_mut() {
511                task.with_page_table_pa(&(load_pt as fn(Pa)));
512            } else {
513                unsafe extern "C" {
514                    static mut boot_pml4e: u64;
515                }
516                load_pt(Pa::new(boot_pml4e as usize).unwrap());
517            }
518        });
519        prev.running_cpu.store(-1, Ordering::SeqCst);
520        drop(_dropped);
521    }
522}
523
524#[inline]
525pub(crate) fn __with_current<R>(f: impl FnOnce(&mut Thread) -> R) -> Result<R, usize> {
526    unsafe {
527        let mut sp: usize;
528        asm!("mov {}, rsp", out(reg) sp);
529        if let Some(stack) = ((sp & !(STACK_SIZE - 1)) as *mut ThreadStack).as_mut()
530            && stack.magic == THREAD_MAGIC
531        {
532            return Ok(f(stack.thread.as_mut().unwrap()));
533        }
534        Err(sp)
535    }
536}
537
538/// The opaque structure indicating the running thread on the current cpu.
539pub struct Current {
540    _p: (),
541}
542
543impl Current {
544    /// Run a function `f` with [`ParkHandle`] for current thread, and then park
545    /// the current thread.
546    pub fn park_with(f: impl FnOnce(ParkHandle)) {
547        with_current(|th| {
548            f(unsafe { scheduler::scheduler().park_thread(th).unwrap() });
549        });
550        assert!(
551            abyss::interrupt::InterruptState::current() == abyss::interrupt::InterruptState::On,
552            "Try to park a thread while holding a lock."
553        );
554        let _ = abyss::interrupt::InterruptGuard::new();
555        scheduler::scheduler().reschedule();
556    }
557
558    /// Exit the current thread with `exit_code`.
559    pub fn exit(exit_code: i32) -> ! {
560        assert!(
561            abyss::interrupt::InterruptState::current() == abyss::interrupt::InterruptState::On,
562            "Try to exit a thread while holding a lock."
563        );
564        unsafe {
565            __do_exit(exit_code);
566        }
567    }
568
569    /// Get the current thread's id.
570    pub fn get_tid() -> u64 {
571        with_current(|th| th.tid)
572    }
573}
574
575/// Run a function `f` with current thread as an argument.
576#[inline]
577pub fn with_current<R>(f: impl FnOnce(&mut Thread) -> R) -> R {
578    __with_current(f).unwrap_or_else(|sp| {
579        panic!(
580            "Stack overflow detected! You might allocate big local variables. Stack: {:x}",
581            sp
582        )
583    })
584}
585
586/// A struct to mimic a stack state on context switch.
587#[repr(C)]
588struct ContextSwitchFrame<F: FnOnce() + Send> {
589    _r15: usize,
590    _r14: usize,
591    _r13: usize,
592    _r12: usize,
593    _bx: usize,
594    _bp: usize,
595    ret_addr: usize,
596    thread_fn: *mut F,
597    end_of_stack: usize,
598}
599
600/// A struct to build a new thread.
601pub struct ThreadBuilder {
602    th: Box<Thread>,
603}
604
605impl ThreadBuilder {
606    /// Create a new thread builder for thread `name`.
607    pub fn new<I>(name: I) -> Self
608    where
609        alloc::string::String: core::convert::From<I>,
610    {
611        Self {
612            th: Thread::new(name),
613        }
614    }
615
616    /// Attach a task to the thread.
617    pub fn attach_task(mut self, task: Box<dyn Task>) -> Self {
618        self.th.task = Some(task);
619        self
620    }
621
622    /// Spawn the thread as a parked state.
623    pub fn spawn_as_parked<F: FnOnce() + Send + 'static>(self, thread_fn: F) -> ParkHandle {
624        let th = self.into_thread(thread_fn);
625        ParkHandle::new_for(th)
626    }
627
628    /// Spawn the thread.
629    pub fn spawn<F: FnOnce() + Send + 'static>(self, thread_fn: F) -> JoinHandle {
630        let th = self.into_thread(thread_fn);
631        let handle = JoinHandle::new_for(&th);
632        scheduler::scheduler().push_to_queue(th);
633        handle
634    }
635
636    /// Get the thread id of this thread.
637    pub fn get_tid(&self) -> u64 {
638        self.th.tid
639    }
640
641    fn into_thread<F: FnOnce() + Send + 'static>(self, thread_fn: F) -> Box<Thread> {
642        /// The very beginning of the thread
643        #[unsafe(naked)]
644        unsafe extern "C" fn start<F: FnOnce() + Send>() -> ! {
645            naked_asm!(
646                "pop rdi",
647                "sti",
648                "jmp {}",
649                sym thread_start::<F>
650            );
651        }
652
653        fn thread_start<F: FnOnce() + Send>(thread_fn: *mut F) {
654            unsafe {
655                core::intrinsics::catch_unwind(
656                    |o| {
657                        let o = *Box::from_raw(o as *mut F);
658                        o();
659                        Current::exit(0);
660                    },
661                    thread_fn as *mut u8,
662                    |_, _| {},
663                );
664            }
665        }
666
667        let Self { mut th } = self;
668        let stack = th.stack.as_mut();
669        let frame = unsafe {
670            ((&mut stack._usable_marker as *mut _ as usize
671                - core::mem::size_of::<ContextSwitchFrame<F>>())
672                as *mut ContextSwitchFrame<F>)
673                .as_mut()
674                .unwrap()
675        };
676        frame.end_of_stack = 0;
677        frame.thread_fn = Box::into_raw(Box::new(thread_fn));
678        frame.ret_addr = start::<F> as usize;
679        th.sp = frame as *mut _ as usize;
680        th.stack.thread = th.as_mut() as *mut _;
681        th
682    }
683}