atom/publish/git/
mod.rs

1//! # Atom Publishing for a Git Store
2//!
3//! This module provides the types and logical necessary to efficienctly publish Atoms
4//! to a Git repository. Atom's are stored as orphaned git histories so they can be
5//! efficiently fetched. For trivial verification, an Atom's commit hash is made
6//! reproducible by using constants for the timestamps and meta-data.
7//!
8//! Additionally, a git reference is stored under the Atom's ref path to the original
9//! source, ensuring it is never garbage collected and an Atom can always be verified.
10//!
11//! A hexadecimal representation of the source commit is also stored in the reproducible
12//! Atom commit header, ensuring it is tied to its source in an unforgable manner.
13#[cfg(test)]
14mod test;
15
16mod inner;
17
18use std::cell::RefCell;
19use std::path::{Path, PathBuf};
20
21use bstr::ByteSlice;
22use gix::{Commit, ObjectId, Remote, Repository, Tree};
23use tokio::task::JoinSet;
24
25use super::error::git::Error;
26use super::{Content, PublishOutcome, Record};
27use crate::core::AtomPaths;
28use crate::store::git::Root;
29use crate::store::{NormalizeStorePath, QueryStore};
30use crate::{Atom, AtomId};
31
32type GitAtomId = AtomId<Root>;
33/// The Outcome of an Atom publish attempt to a Git store.
34pub type GitOutcome = PublishOutcome<Root>;
35/// The Result type used for various methods during publishing to a Git store.
36pub type GitResult<T> = Result<T, Error>;
37type GitRecord = Record<Root>;
38
39/// Holds the shared context needed for publishing Atoms.
40pub struct GitContext<'a> {
41    /// Reference to the repository we are publish from.
42    repo: &'a Repository,
43    /// The repository tree object for the given commit.
44    tree: Tree<'a>,
45    /// The commit to publish from.
46    commit: Commit<'a>,
47    /// Store the given remote
48    remote: Remote<'a>,
49    /// Store the given remote str for convenient use.
50    remote_str: &'a str,
51    /// The reported root commit according to the remote.
52    root: Root,
53    /// A [`JoinSet`] of push tasks to avoid blocking on them.
54    push_tasks: RefCell<JoinSet<Result<Vec<u8>, Error>>>,
55    /// A git server transport to avoid connection overhead
56    transport: Box<dyn Transport + Send>,
57    /// The Span representing to overall progress bar (for easy incrementing)
58    progress: &'a tracing::Span,
59}
60
61struct AtomContext<'a> {
62    paths: AtomPaths<PathBuf>,
63    atom: FoundAtom,
64    git: &'a GitContext<'a>,
65}
66
67#[derive(Debug)]
68struct FoundAtom {
69    spec: Atom,
70    id: GitAtomId,
71    tree_id: ObjectId,
72    spec_id: ObjectId,
73}
74
75use gix::diff::object::Commit as AtomCommit;
76
77/// Struct to hold the result of writing atom commits
78#[derive(Debug, Clone)]
79pub struct CommittedAtom {
80    /// The raw structure representing the atom that was successfully committed.
81    commit: AtomCommit,
82    /// The object id of the Atom commit.
83    id: ObjectId,
84}
85
86enum RefKind {
87    Spec,
88    Content,
89    Origin,
90}
91
92use semver::Version;
93
94struct AtomRef<'a> {
95    tag: String,
96    kind: RefKind,
97    version: &'a Version,
98}
99
100use gix::Reference;
101
102#[derive(Debug, Clone)]
103/// Struct representing the git refs pointing to the atom's parts
104pub(super) struct AtomReferences<'a> {
105    /// The git ref pointing to the atoms content
106    content: Reference<'a>,
107    /// Git ref pointing to the tree object containing the atom's manifest and lock
108    spec: Reference<'a>,
109    /// The git ref pointing the commit the atom was published from
110    origin: Reference<'a>,
111}
112
113/// The Git specific content which will be returned for presenting to the user after
114/// an Atom is successfully published.
115#[derive(Debug)]
116pub struct GitContent {
117    spec: gix::refs::Reference,
118    content: gix::refs::Reference,
119    origin: gix::refs::Reference,
120    path: PathBuf,
121}
122
123use super::{Builder, ValidAtoms};
124
125/// The type representing a Git specific Atom publisher.
126pub struct GitPublisher<'a> {
127    repo: &'a Repository,
128    remote: Remote<'a>,
129    remote_str: &'a str,
130    spec: &'a str,
131    root: Root,
132    transport: Box<dyn Transport + Send>,
133    progress: &'a tracing::Span,
134}
135
136use gix::protocol::transport::client::Transport;
137impl<'a> GitPublisher<'a> {
138    /// Constructs a new [`GitPublisher`].
139    pub fn new(
140        repo: &'a Repository,
141        remote_str: &'a str,
142        spec: &'a str,
143        progress: &'a tracing::Span,
144    ) -> GitResult<Self> {
145        use crate::store::Init;
146        let remote = repo.find_remote(remote_str).map_err(Box::new)?;
147        let mut transport = remote.get_transport().map_err(Box::new)?;
148        let root = remote.ekala_root(Some(&mut transport)).map_err(|e| {
149            e.warn();
150            Error::NotInitialized
151        })?;
152
153        Ok(GitPublisher {
154            repo,
155            remote,
156            remote_str,
157            spec,
158            transport,
159            root,
160            progress,
161        })
162    }
163}
164
165fn calculate_capacity(record_count: usize) -> usize {
166    let log_count = (record_count as f64).log2();
167    let base_multiplier = 20.0;
168    let scaling_factor = (log_count - 10.0).max(0.0).powf(2.0);
169    let multiplier = base_multiplier + scaling_factor * 10.0;
170    (log_count * multiplier).ceil() as usize
171}
172
173use super::StateValidator;
174
175impl<'a> StateValidator<Root> for GitPublisher<'a> {
176    type Error = Error;
177    type Publisher = GitContext<'a>;
178
179    fn validate(publisher: &Self::Publisher) -> Result<ValidAtoms, Self::Error> {
180        use gix::traverse::tree::Recorder;
181        let mut record = Recorder::default();
182
183        publisher
184            .tree()
185            .traverse()
186            .breadthfirst(&mut record)
187            .map_err(|_| Error::NotFound)?;
188
189        let cap = calculate_capacity(record.records.len());
190        let mut atoms: HashMap<AtomTag, PathBuf> = HashMap::with_capacity(cap);
191
192        for entry in record.records {
193            let path = PathBuf::from(entry.filepath.to_str_lossy().as_ref());
194            if entry.mode.is_blob() && path.file_name() == Some(crate::MANIFEST_NAME.as_ref()) {
195                if let Ok(obj) = publisher.repo.find_object(entry.oid) {
196                    match publisher.verify_manifest(&obj, &path) {
197                        Ok(atom) => {
198                            if let Some(duplicate) = atoms.get(&atom.tag) {
199                                tracing::warn!(
200                                    message = "Two atoms share the same ID",
201                                    duplicate.tag = %atom.tag,
202                                    fst = %path.display(),
203                                    snd = %duplicate.display(),
204                                );
205                                return Err(Error::Duplicates);
206                            }
207                            atoms.insert(atom.tag, path);
208                        },
209                        Err(e) => e.warn(),
210                    }
211                }
212            }
213        }
214
215        tracing::trace!(repo.atoms.valid.count = atoms.len());
216
217        Ok(atoms)
218    }
219}
220
221impl<'a> Builder<'a, Root> for GitPublisher<'a> {
222    type Error = Error;
223    type Publisher = GitContext<'a>;
224
225    fn build(self) -> Result<(ValidAtoms, Self::Publisher), Self::Error> {
226        let publisher = GitContext::set(
227            self.repo,
228            self.remote.clone(),
229            self.remote_str,
230            self.spec,
231            self.root,
232            self.transport,
233            self.progress,
234        )?;
235        let atoms = GitPublisher::validate(&publisher)?;
236        Ok((atoms, publisher))
237    }
238}
239
240impl GitContent {
241    /// Return a reference to the Atom spec Git ref.
242    #[must_use]
243    pub fn spec(&self) -> &gix::refs::Reference {
244        &self.spec
245    }
246
247    /// Return a reference to the Atom src Git ref.
248    #[must_use]
249    pub fn origin(&self) -> &gix::refs::Reference {
250        &self.origin
251    }
252
253    /// Return a reference to the Atom content ref.
254    #[must_use]
255    pub fn content(&self) -> &gix::refs::Reference {
256        &self.content
257    }
258
259    /// Return a reference to the path to the Atom.
260    #[must_use]
261    pub fn path(&self) -> &PathBuf {
262        &self.path
263    }
264}
265
266use std::collections::HashMap;
267
268use super::Publish;
269use crate::id::AtomTag;
270
271impl<'a> super::private::Sealed for GitContext<'a> {}
272
273impl<'a> Publish<Root> for GitContext<'a> {
274    type Error = Error;
275    type Id = ObjectId;
276
277    /// Publishes atoms.
278    ///
279    /// This function processes a collection of paths, each representing an atom to be published.
280    /// The publishing process includes path normalization, existence checks, and actual
281    /// publishing attempts.
282    ///
283    /// # Path Normalization
284    /// - First attempts to interpret each path as relative to the caller's current location inside
285    ///   the repository.
286    /// - If normalization fails (e.g., in a bare repository), falls back to treating the path as
287    ///   already relative to the repo root.
288    /// - The normalized path is used to search the Git history, not the file system.
289    ///
290    /// # Publishing Process
291    /// For each path:
292    /// 1. Normalizes the path (as described above).
293    /// 2. Checks if the atom already exists in the repository.
294    ///    - If it exists, the atom is skipped, and a log message is generated.
295    /// 3. Attempts to publish the atom.
296    ///    - If successful, the atom is added to the repository.
297    ///    - If any error occurs during publishing, the atom is skipped, and an error is logged.
298    ///
299    /// # Error Handling
300    /// - The function aims to process all provided paths, even if some fail.
301    /// - Errors and skipped atoms are collected as results but do not halt the overall process.
302    /// - The function continues until all the atoms have been processed.
303    ///
304    /// # Return Value
305    /// Returns a vector of results types (`Vec<Result<PublishOutcome<T>, Self::Error>>`), where the
306    /// outter result represents whether an atom has failed, and the inner result determines whether
307    /// an atom was safely skipped, e.g. because it already exists..
308    fn publish<C>(
309        &self,
310        paths: C,
311        remotes: HashMap<AtomTag, (Version, ObjectId)>,
312    ) -> Vec<GitResult<GitOutcome>>
313    where
314        C: IntoIterator<Item = PathBuf>,
315    {
316        use crate::store::git;
317        let iter = paths.into_iter();
318        iter.map(|path| {
319            let path = match self.repo.normalize(&path) {
320                Ok(path) => path,
321                Err(git::Error::NoWorkDir) => path,
322                Err(e) => return Err(Box::new(e).into()),
323            };
324            self.publish_atom(&path, &remotes)
325        })
326        .collect()
327    }
328
329    fn publish_atom<P: AsRef<Path>>(
330        &self,
331        path: P,
332        remotes: &HashMap<AtomTag, (Version, ObjectId)>,
333    ) -> GitResult<GitOutcome> {
334        use {Err as Skipped, Ok as Published};
335        let context = AtomContext::set(path.as_ref(), self)?;
336        let span = tracing::info_span!("publish atom", atom=%context.atom.id.tag());
337        crate::log::set_sub_task(&span, &format!("⚛️ `{}`", context.atom.id.tag(),));
338        let _enter = span.enter();
339
340        let r = &context.refs(RefKind::Content);
341        let lr = self.repo.find_reference(&r.to_string());
342
343        if let Ok(lr) = lr {
344            if let Some((v, id)) = remotes.get(context.atom.id.tag()) {
345                if r.version == v && lr.id().detach() == *id {
346                    // remote and local atoms are identical; skip
347                    return Ok(Skipped(context.atom.spec.tag.clone()));
348                }
349            }
350        }
351
352        let refs = context
353            .write_atom_commit(context.atom.tree_id)?
354            .write_refs(&context)?
355            .push(&context);
356
357        Ok(Published(GitRecord {
358            id: context.atom.id.clone(),
359            content: Content::Git(refs),
360        }))
361    }
362}
363
364impl<'a> AtomContext<'a> {
365    fn set(path: &'a Path, git: &'a GitContext) -> GitResult<Self> {
366        let (atom, paths) = git.find_and_verify_atom(path)?;
367        Ok(Self { paths, atom, git })
368    }
369}
370
371impl<'a> GitContext<'a> {
372    fn set(
373        repo: &'a Repository,
374        remote: Remote<'a>,
375        remote_str: &'a str,
376        refspec: &str,
377        root: Root,
378        transport: Box<dyn Transport + Send>,
379        progress: &'a tracing::Span,
380    ) -> GitResult<Self> {
381        let commit = repo
382            .rev_parse_single(refspec)
383            .map(|s| repo.find_commit(s))
384            .map_err(Box::new)??;
385
386        let tree = commit.tree()?;
387
388        let push_tasks = RefCell::new(JoinSet::new());
389
390        Ok(Self {
391            repo,
392            root,
393            tree,
394            commit,
395            remote,
396            remote_str,
397            push_tasks,
398            transport,
399            progress,
400        })
401    }
402
403    /// A method used to await the results of the concurrently running Git pushes,
404    /// which were offloaded to a seperate thread of execution of Tokio's runtime.
405    ///
406    /// An errors that occurred will be collected into a [`Vec`].
407    pub async fn await_pushes(&self, errors: &mut Vec<Error>) {
408        use tokio::sync::Mutex;
409
410        let tasks = Mutex::new(self.push_tasks.borrow_mut());
411
412        while let Some(task) = tasks.lock().await.join_next().await {
413            match task {
414                Ok(Ok(output)) => {
415                    if !output.is_empty() {
416                        tracing::info!(output = %String::from_utf8_lossy(&output));
417                    }
418                },
419                Ok(Err(e)) => {
420                    errors.push(e);
421                },
422                Err(e) => {
423                    errors.push(Error::JoinFailed(e));
424                },
425            }
426        }
427    }
428
429    /// Return a reference to the git tree object of the commit the Atom originates from.
430    pub fn tree(&self) -> Tree<'a> {
431        self.tree.clone()
432    }
433
434    /// Return a reference to the git remote.
435    pub fn remote(&self) -> Remote<'a> {
436        self.remote.clone()
437    }
438}