1#[cfg(test)]
14mod test;
15
16mod inner;
17
18use std::cell::RefCell;
19use std::path::{Path, PathBuf};
20
21use bstr::ByteSlice;
22use gix::{Commit, ObjectId, Remote, Repository, Tree};
23use tokio::task::JoinSet;
24
25use super::error::git::Error;
26use super::{Content, PublishOutcome, Record};
27use crate::core::AtomPaths;
28use crate::store::git::Root;
29use crate::store::{NormalizeStorePath, QueryStore};
30use crate::{Atom, AtomId};
31
32type GitAtomId = AtomId<Root>;
33pub type GitOutcome = PublishOutcome<Root>;
35pub type GitResult<T> = Result<T, Error>;
37type GitRecord = Record<Root>;
38
39pub struct GitContext<'a> {
41 repo: &'a Repository,
43 tree: Tree<'a>,
45 commit: Commit<'a>,
47 remote: Remote<'a>,
49 remote_str: &'a str,
51 root: Root,
53 push_tasks: RefCell<JoinSet<Result<Vec<u8>, Error>>>,
55 transport: Box<dyn Transport + Send>,
57 progress: &'a tracing::Span,
59}
60
61struct AtomContext<'a> {
62 paths: AtomPaths<PathBuf>,
63 atom: FoundAtom,
64 git: &'a GitContext<'a>,
65}
66
67#[derive(Debug)]
68struct FoundAtom {
69 spec: Atom,
70 id: GitAtomId,
71 tree_id: ObjectId,
72 spec_id: ObjectId,
73}
74
75use gix::diff::object::Commit as AtomCommit;
76
77#[derive(Debug, Clone)]
79pub struct CommittedAtom {
80 commit: AtomCommit,
82 id: ObjectId,
84}
85
86enum RefKind {
87 Spec,
88 Content,
89 Origin,
90}
91
92use semver::Version;
93
94struct AtomRef<'a> {
95 tag: String,
96 kind: RefKind,
97 version: &'a Version,
98}
99
100use gix::Reference;
101
102#[derive(Debug, Clone)]
103pub(super) struct AtomReferences<'a> {
105 content: Reference<'a>,
107 spec: Reference<'a>,
109 origin: Reference<'a>,
111}
112
113#[derive(Debug)]
116pub struct GitContent {
117 spec: gix::refs::Reference,
118 content: gix::refs::Reference,
119 origin: gix::refs::Reference,
120 path: PathBuf,
121}
122
123use super::{Builder, ValidAtoms};
124
125pub struct GitPublisher<'a> {
127 repo: &'a Repository,
128 remote: Remote<'a>,
129 remote_str: &'a str,
130 spec: &'a str,
131 root: Root,
132 transport: Box<dyn Transport + Send>,
133 progress: &'a tracing::Span,
134}
135
136use gix::protocol::transport::client::Transport;
137impl<'a> GitPublisher<'a> {
138 pub fn new(
140 repo: &'a Repository,
141 remote_str: &'a str,
142 spec: &'a str,
143 progress: &'a tracing::Span,
144 ) -> GitResult<Self> {
145 use crate::store::Init;
146 let remote = repo.find_remote(remote_str).map_err(Box::new)?;
147 let mut transport = remote.get_transport().map_err(Box::new)?;
148 let root = remote.ekala_root(Some(&mut transport)).map_err(|e| {
149 e.warn();
150 Error::NotInitialized
151 })?;
152
153 Ok(GitPublisher {
154 repo,
155 remote,
156 remote_str,
157 spec,
158 transport,
159 root,
160 progress,
161 })
162 }
163}
164
165fn calculate_capacity(record_count: usize) -> usize {
166 let log_count = (record_count as f64).log2();
167 let base_multiplier = 20.0;
168 let scaling_factor = (log_count - 10.0).max(0.0).powf(2.0);
169 let multiplier = base_multiplier + scaling_factor * 10.0;
170 (log_count * multiplier).ceil() as usize
171}
172
173use super::StateValidator;
174
175impl<'a> StateValidator<Root> for GitPublisher<'a> {
176 type Error = Error;
177 type Publisher = GitContext<'a>;
178
179 fn validate(publisher: &Self::Publisher) -> Result<ValidAtoms, Self::Error> {
180 use gix::traverse::tree::Recorder;
181 let mut record = Recorder::default();
182
183 publisher
184 .tree()
185 .traverse()
186 .breadthfirst(&mut record)
187 .map_err(|_| Error::NotFound)?;
188
189 let cap = calculate_capacity(record.records.len());
190 let mut atoms: HashMap<AtomTag, PathBuf> = HashMap::with_capacity(cap);
191
192 for entry in record.records {
193 let path = PathBuf::from(entry.filepath.to_str_lossy().as_ref());
194 if entry.mode.is_blob() && path.file_name() == Some(crate::MANIFEST_NAME.as_ref()) {
195 if let Ok(obj) = publisher.repo.find_object(entry.oid) {
196 match publisher.verify_manifest(&obj, &path) {
197 Ok(atom) => {
198 if let Some(duplicate) = atoms.get(&atom.tag) {
199 tracing::warn!(
200 message = "Two atoms share the same ID",
201 duplicate.tag = %atom.tag,
202 fst = %path.display(),
203 snd = %duplicate.display(),
204 );
205 return Err(Error::Duplicates);
206 }
207 atoms.insert(atom.tag, path);
208 },
209 Err(e) => e.warn(),
210 }
211 }
212 }
213 }
214
215 tracing::trace!(repo.atoms.valid.count = atoms.len());
216
217 Ok(atoms)
218 }
219}
220
221impl<'a> Builder<'a, Root> for GitPublisher<'a> {
222 type Error = Error;
223 type Publisher = GitContext<'a>;
224
225 fn build(self) -> Result<(ValidAtoms, Self::Publisher), Self::Error> {
226 let publisher = GitContext::set(
227 self.repo,
228 self.remote.clone(),
229 self.remote_str,
230 self.spec,
231 self.root,
232 self.transport,
233 self.progress,
234 )?;
235 let atoms = GitPublisher::validate(&publisher)?;
236 Ok((atoms, publisher))
237 }
238}
239
240impl GitContent {
241 #[must_use]
243 pub fn spec(&self) -> &gix::refs::Reference {
244 &self.spec
245 }
246
247 #[must_use]
249 pub fn origin(&self) -> &gix::refs::Reference {
250 &self.origin
251 }
252
253 #[must_use]
255 pub fn content(&self) -> &gix::refs::Reference {
256 &self.content
257 }
258
259 #[must_use]
261 pub fn path(&self) -> &PathBuf {
262 &self.path
263 }
264}
265
266use std::collections::HashMap;
267
268use super::Publish;
269use crate::id::AtomTag;
270
271impl<'a> super::private::Sealed for GitContext<'a> {}
272
273impl<'a> Publish<Root> for GitContext<'a> {
274 type Error = Error;
275 type Id = ObjectId;
276
277 fn publish<C>(
309 &self,
310 paths: C,
311 remotes: HashMap<AtomTag, (Version, ObjectId)>,
312 ) -> Vec<GitResult<GitOutcome>>
313 where
314 C: IntoIterator<Item = PathBuf>,
315 {
316 use crate::store::git;
317 let iter = paths.into_iter();
318 iter.map(|path| {
319 let path = match self.repo.normalize(&path) {
320 Ok(path) => path,
321 Err(git::Error::NoWorkDir) => path,
322 Err(e) => return Err(Box::new(e).into()),
323 };
324 self.publish_atom(&path, &remotes)
325 })
326 .collect()
327 }
328
329 fn publish_atom<P: AsRef<Path>>(
330 &self,
331 path: P,
332 remotes: &HashMap<AtomTag, (Version, ObjectId)>,
333 ) -> GitResult<GitOutcome> {
334 use {Err as Skipped, Ok as Published};
335 let context = AtomContext::set(path.as_ref(), self)?;
336 let span = tracing::info_span!("publish atom", atom=%context.atom.id.tag());
337 crate::log::set_sub_task(&span, &format!("⚛️ `{}`", context.atom.id.tag(),));
338 let _enter = span.enter();
339
340 let r = &context.refs(RefKind::Content);
341 let lr = self.repo.find_reference(&r.to_string());
342
343 if let Ok(lr) = lr {
344 if let Some((v, id)) = remotes.get(context.atom.id.tag()) {
345 if r.version == v && lr.id().detach() == *id {
346 return Ok(Skipped(context.atom.spec.tag.clone()));
348 }
349 }
350 }
351
352 let refs = context
353 .write_atom_commit(context.atom.tree_id)?
354 .write_refs(&context)?
355 .push(&context);
356
357 Ok(Published(GitRecord {
358 id: context.atom.id.clone(),
359 content: Content::Git(refs),
360 }))
361 }
362}
363
364impl<'a> AtomContext<'a> {
365 fn set(path: &'a Path, git: &'a GitContext) -> GitResult<Self> {
366 let (atom, paths) = git.find_and_verify_atom(path)?;
367 Ok(Self { paths, atom, git })
368 }
369}
370
371impl<'a> GitContext<'a> {
372 fn set(
373 repo: &'a Repository,
374 remote: Remote<'a>,
375 remote_str: &'a str,
376 refspec: &str,
377 root: Root,
378 transport: Box<dyn Transport + Send>,
379 progress: &'a tracing::Span,
380 ) -> GitResult<Self> {
381 let commit = repo
382 .rev_parse_single(refspec)
383 .map(|s| repo.find_commit(s))
384 .map_err(Box::new)??;
385
386 let tree = commit.tree()?;
387
388 let push_tasks = RefCell::new(JoinSet::new());
389
390 Ok(Self {
391 repo,
392 root,
393 tree,
394 commit,
395 remote,
396 remote_str,
397 push_tasks,
398 transport,
399 progress,
400 })
401 }
402
403 pub async fn await_pushes(&self, errors: &mut Vec<Error>) {
408 use tokio::sync::Mutex;
409
410 let tasks = Mutex::new(self.push_tasks.borrow_mut());
411
412 while let Some(task) = tasks.lock().await.join_next().await {
413 match task {
414 Ok(Ok(output)) => {
415 if !output.is_empty() {
416 tracing::info!(output = %String::from_utf8_lossy(&output));
417 }
418 },
419 Ok(Err(e)) => {
420 errors.push(e);
421 },
422 Err(e) => {
423 errors.push(Error::JoinFailed(e));
424 },
425 }
426 }
427 }
428
429 pub fn tree(&self) -> Tree<'a> {
431 self.tree.clone()
432 }
433
434 pub fn remote(&self) -> Remote<'a> {
436 self.remote.clone()
437 }
438}