atom/manifest/sets/
mod.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap};
2
3use either::Either;
4use gix::protocol::transport::client::Transport;
5use gix::{ObjectId, ThreadSafeRepository};
6use semver::{Version, VersionReq};
7use tokio::task::JoinSet;
8
9use crate::id::Tag;
10use crate::lock::{AtomDep, BoxError, GitDigest, SetDetails};
11use crate::manifest::deps::DocError;
12use crate::manifest::{AtomError, EkalaManager, SetMirror};
13use crate::store::UnpackedRef;
14use crate::store::git::{AtomQuery, Root};
15use crate::{AtomId, Manifest};
16
17#[derive(thiserror::Error, Debug)]
18pub enum Error {
19    #[error("manifest is in an inconsistent state")]
20    Inconsistent,
21    #[error("You are not inside a structured local set, `::` has no meaning as a mirror")]
22    NoLocal,
23}
24
25pub(crate) struct ResolvedSets {
26    pub(crate) atoms: ResolvedAtoms<ObjectId, Root>,
27    pub(crate) roots: HashMap<Either<Tag, SetMirror>, Root>,
28    pub(crate) transports: HashMap<gix::Url, Box<dyn Transport + Send>>,
29    details: BTreeMap<GitDigest, SetDetails>,
30    pub(crate) ekala: EkalaManager,
31    pub(crate) repo: Option<gix::Repository>,
32}
33
34pub(crate) struct ResolvedAtom<Id, R> {
35    unpacked: UnpackedRef<Id, R>,
36    remotes: BTreeSet<gix::Url>,
37}
38
39type ResolvedAtoms<Id, R> = HashMap<AtomId<R>, HashMap<Version, ResolvedAtom<Id, R>>>;
40
41pub(crate) struct SetResolver<'a> {
42    manifest: &'a Manifest,
43    repo: Option<gix::Repository>,
44    names: HashMap<Root, Tag>,
45    roots: HashMap<Either<Tag, SetMirror>, Root>,
46    tasks: JoinSet<MirrorResult>,
47    atoms: ResolvedAtoms<ObjectId, Root>,
48    sets: BTreeMap<GitDigest, SetDetails>,
49    transports: HashMap<gix::Url, Box<dyn Transport + Send>>,
50    ekala: EkalaManager,
51}
52
53type MirrorResult = Result<
54    (
55        Option<Box<dyn Transport + Send>>,
56        <Vec<AtomQuery> as IntoIterator>::IntoIter,
57        Root,
58        Tag,
59        gix::Url,
60    ),
61    BoxError,
62>;
63
64impl<'a> SetResolver<'a> {
65    /// Creates a new `SetResolver` to validate the package sets in a manifest.
66    pub(crate) fn new(
67        repo: Option<&ThreadSafeRepository>,
68        manifest: &'a Manifest,
69    ) -> Result<Self, AtomError> {
70        let len = manifest.package.sets.len();
71        let ekala = EkalaManager::new(repo)?;
72        Ok(Self {
73            manifest,
74            ekala,
75            repo: repo.map(|r| r.to_thread_local()),
76            names: HashMap::with_capacity(len),
77            roots: HashMap::with_capacity(len),
78            tasks: JoinSet::new(),
79            atoms: HashMap::with_capacity(len * 10),
80            transports: HashMap::with_capacity(len * 3),
81            sets: BTreeMap::new(),
82        })
83    }
84
85    /// Verifies the integrity of declared package sets and collects atom references.
86    ///
87    /// This function consumes the resolver and performs several critical checks to
88    /// ensure the consistency and integrity of the package sets defined in the manifest:
89    ///
90    /// 1. **Root Consistency**: It ensures that every URL within a named mirror set points to the
91    ///    same underlying repository by verifying their advertised root hashes.
92    /// 2. **Set Uniqueness**: It guarantees that a given repository URL does not belong to more
93    ///    than one mirror set, preventing ambiguity.
94    /// 3. **Version and Revision Coherency**: It aggregates all atoms from each mirror, ensuring
95    ///    that no two mirrors advertise the same atom version with a different Git revision, which
96    ///    could indicate tampering or misconfiguration.
97    ///
98    /// # Returns
99    ///
100    /// A `Result` containing a `ResolvedSets` struct on success, which holds the aggregated
101    /// results of the validation process.
102    ///
103    /// # Errors
104    ///
105    /// Returns a `BoxError` if any of the following conditions are met:
106    /// - A repository is found in more than one mirror set.
107    /// - The mirrors for a given set do not all point to the same root hash.
108    /// - An atom is advertised with the same version but different revisions across mirrors.
109    pub(crate) async fn get_and_check_sets(mut self) -> Result<ResolvedSets, BoxError> {
110        use crate::manifest::AtomSet;
111
112        for (set_tag, set) in self.manifest.package.sets.iter() {
113            match set {
114                AtomSet::Singleton(mirror) => self.process_mirror(set_tag, mirror)?,
115                AtomSet::Mirrors(mirrors) => {
116                    for m in mirrors.iter() {
117                        self.process_mirror(set_tag, m)?
118                    }
119                },
120            }
121        }
122
123        while let Some(res) = self.tasks.join_next().await {
124            self.process_remote_mirror_result(res?)?;
125        }
126
127        Ok(ResolvedSets {
128            atoms: self.atoms,
129            ekala: self.ekala,
130            transports: self.transports,
131            roots: self.roots,
132            details: self.sets,
133            repo: self.repo,
134        })
135    }
136
137    /// Processes a single mirror, either local or remote, and initiates consistency checks.
138    ///
139    /// For local mirrors, it calculates the root hash directly. For remote mirrors,
140    /// it spawns an asynchronous task to fetch repository data and perform checks.
141    fn process_mirror(
142        &mut self,
143        set_tag: &'a Tag,
144        mirror: &'a crate::manifest::SetMirror,
145    ) -> Result<(), BoxError> {
146        use crate::id::Origin;
147        use crate::manifest::SetMirror;
148        use crate::store::{QueryStore, QueryVersion};
149
150        match mirror {
151            SetMirror::Local => {
152                if let Some(repo) = self.repo.as_ref() {
153                    let root = {
154                        let commit = repo
155                            .rev_parse_single("HEAD")
156                            .map(|s| repo.find_commit(s))
157                            .map_err(Box::new)??;
158                        commit.calculate_origin()?
159                    };
160                    self.check_set_consistency(set_tag, root, &SetMirror::Local)?;
161                    self.update_sets(set_tag, root, SetMirror::Local);
162                } else {
163                    return Err(Error::NoLocal.into());
164                }
165                Ok(())
166            },
167            SetMirror::Url(url) => {
168                let url = url.to_owned();
169                let set_name = set_tag.to_owned();
170                self.tasks.spawn(async move {
171                    let mut transport = url.get_transport().ok();
172                    let atoms = url.get_atoms(transport.as_mut())?;
173                    let root = atoms.calculate_origin().inspect_err(|_| {
174                        tracing::warn!(
175                            set.tag = %set_name,
176                            set.mirror = %url,
177                            "remote advertised no atoms in:"
178                        )
179                    })?;
180                    Ok((transport, atoms, root, set_name, url))
181                });
182                Ok(())
183            },
184        }
185    }
186
187    fn update_sets(&mut self, name: &Tag, root: Root, set: SetMirror) {
188        let digest = GitDigest::from(*root);
189        self.sets
190            .entry(digest)
191            .and_modify(|e| {
192                e.mirrors.insert(set.to_owned());
193            })
194            .or_insert(SetDetails {
195                tag: name.to_owned(),
196                mirrors: BTreeSet::from([set]),
197            });
198    }
199
200    /// Handles the result of an asynchronous remote mirror check.
201    ///
202    /// This function processes the data fetched from a remote mirror, performs
203    /// consistency checks, and aggregates the results into the provided hashmaps.
204    fn process_remote_mirror_result(&mut self, result: MirrorResult) -> Result<(), BoxError> {
205        let (transport, atoms, root, set_name, url) = result?;
206        let mirror = SetMirror::Url(url.to_owned());
207        self.check_set_consistency(&set_name, root, &mirror)?;
208        self.update_sets(&set_name, root, SetMirror::Url(url.to_owned()));
209        if let Some(t) = transport {
210            self.transports.insert(url.to_owned(), t);
211        }
212
213        let cap = self.atoms.capacity();
214        let len = atoms.len();
215        if cap < len {
216            self.atoms.reserve(len - cap);
217        }
218        for atom in atoms {
219            self.check_and_insert_atom(atom, len, &url)?;
220        }
221
222        Ok(())
223    }
224
225    /// Verifies the consistency of a single atom against the existing set of resolved atoms.
226    ///
227    /// This check ensures that if an atom is advertised by multiple mirrors, it always
228    /// has the same revision for the same version.
229    fn check_and_insert_atom(
230        &mut self,
231        atom: AtomQuery,
232        size: usize,
233        mirror_url: &gix::Url,
234    ) -> Result<(), BoxError> {
235        use std::collections::hash_map::Entry;
236        let entry = self
237            .atoms
238            .entry(atom.id.to_owned())
239            .or_insert(HashMap::with_capacity(size));
240        match entry.entry(atom.version.to_owned()) {
241            Entry::Occupied(mut entry) => {
242                let existing = entry.get();
243                if existing.unpacked.rev == atom.rev {
244                    entry.get_mut().remotes.insert(mirror_url.to_owned());
245                } else {
246                    let existing_mirrors: Vec<_> =
247                        existing.remotes.iter().map(|url| url.to_string()).collect();
248                    tracing::error!(
249                        message = "mirrors for the same set are advertising an atom at \
250                                   the same version but different revisions. This could \
251                                   be the result of possible tampering. Remove the faulty \
252                                   mirror to continue.",
253                        existing.mirrors = %toml_edit::ser::to_string(&existing_mirrors)?,
254                        existing.rev = %existing.unpacked.rev,
255                        conflicting.url = %mirror_url.to_string(),
256                        conflicting.label = %atom.id,
257                        conflicting.version = %atom.version,
258                        conflicting.rev = %atom.rev,
259                    );
260                    return Err(Error::Inconsistent.into());
261                }
262            },
263            Entry::Vacant(entry) => {
264                entry.insert(ResolvedAtom {
265                    unpacked: atom,
266                    remotes: BTreeSet::from([mirror_url.to_owned()]),
267                });
268            },
269        }
270
271        Ok(())
272    }
273
274    /// Ensures that a given package set is consistent across all its mirrors.
275    ///
276    /// This check verifies two conditions:
277    /// 1. A repository root hash is not associated with more than one package set name.
278    /// 2. A package set name is not associated with more than one repository root hash.
279    fn check_set_consistency(
280        &mut self,
281        set_tag: &Tag,
282        root: Root,
283        mirror: &SetMirror,
284    ) -> Result<(), BoxError> {
285        let prev = self.names.insert(root, set_tag.to_owned());
286        if let Some(prev_tag) = &prev {
287            if prev_tag != set_tag {
288                tracing::error!(
289                    message = "the same mirror exists in more than one set",
290                    set.mirror = %mirror,
291                    set.conflict.a = %set_tag,
292                    set.conflict.b = %prev_tag,
293                );
294                return Err(Error::Inconsistent.into());
295            }
296        }
297        let prev = self.roots.insert(Either::Left(set_tag.to_owned()), root);
298        if let Some(prev) = &prev {
299            if prev != &root {
300                tracing::error!(
301                    message = "the mirrors in this set do not all point at the same set",
302                    set.name = %set_tag,
303                    set.mirror = %mirror,
304                    set.root.mirror = %*root,
305                    set.root.previous = %**prev,
306                );
307                return Err(Error::Inconsistent.into());
308            }
309        }
310        self.roots.insert(Either::Right(mirror.to_owned()), root);
311        Ok(())
312    }
313}
314
315impl ResolvedSets {
316    pub(crate) fn roots(&self) -> &HashMap<Either<Tag, SetMirror>, Root> {
317        &self.roots
318    }
319
320    pub(crate) fn atoms(&self) -> &ResolvedAtoms<ObjectId, Root> {
321        &self.atoms
322    }
323
324    pub(crate) fn details(&self) -> &BTreeMap<GitDigest, SetDetails> {
325        &self.details
326    }
327
328    pub(crate) fn resolve_atom(
329        &self,
330        id: &AtomId<Root>,
331        req: &VersionReq,
332    ) -> Result<AtomDep, DocError> {
333        use crate::store::git;
334        let versions = self
335            .atoms
336            .get(id)
337            .ok_or(DocError::Git(Box::new(git::Error::NoMatchingVersion)))?;
338        if let Some((_, atom)) = versions
339            .iter()
340            .filter(|(v, _)| req.matches(v))
341            .max_by_key(|(ref version, _)| version.to_owned())
342        {
343            Ok(AtomDep::from(atom.unpack().to_owned()))
344        } else {
345            Err(Box::new(git::Error::NoMatchingVersion).into())
346        }
347    }
348}
349
350impl<Id, R> ResolvedAtom<Id, R> {
351    pub(crate) fn unpack(&self) -> &UnpackedRef<Id, R> {
352        &self.unpacked
353    }
354}