Coverage Report

Created: 2024-02-20 21:15

/builds/xfbs/cindy/src/command/add.rs
Line
Count
Source (jump to first uncovered line)
1
use super::UPDATE_INTERVAL;
2
use crate::{
3
    cli::AddCommand,
4
    database::{Database, Handle},
5
    hash::{BoxHash, Hash, ReadDigester},
6
    Cindy, Tag,
7
};
8
use anyhow::{Context, Result};
9
use flume::{Receiver, Sender};
10
use futures::StreamExt;
11
use std::{
12
    collections::{BTreeMap, BTreeSet},
13
    fs::{create_dir_all, hard_link, File, Metadata},
14
    io::{stdout, ErrorKind, Write},
15
    path::{Path, PathBuf},
16
    time::Instant,
17
};
18
use tokio::{
19
    sync::mpsc::channel,
20
    task::{spawn_blocking, JoinHandle},
21
};
22
23
19
fn path_tags(path: &Path) -> impl Iterator<Item = Tag> + '_ {
24
19
    let path_tag = Tag::new("path".into(), format!("/{}", path.display()));
25
19
    let filename_tag = Tag::new(
26
19
        "filename".into(),
27
19
        path.file_name().unwrap().to_string_lossy().into_owned(),
28
19
    );
29
19
    let directory_tag = Tag::new(
30
19
        "directory".into(),
31
19
        format!("/{}", path.parent().unwrap().display()),
32
19
    );
33
19
    let pathprefix_tags = path
34
19
        .ancestors()
35
19
        .skip(1)
36
32
        .map(|ancestor| Tag::new("ancestor".into(), format!("/{}", ancestor.display())));
37
19
    [path_tag, directory_tag, filename_tag]
38
19
        .into_iter()
39
19
        .chain(pathprefix_tags)
40
19
}
41
42
fn add_file<H: Handle>(
43
    database: &Database<H>,
44
    hash: &Hash,
45
    tags: &[Tag],
46
    paths: &mut dyn Iterator<Item = &Path>,
47
) -> Result<()> {
48
17
    database.hash_add(hash)
?0
;
49
50
51
    for 
tag34
in tags {
51
34
        database.tag_value_create(tag.name(), tag.value())
?0
;
52
34
        database.hash_tag_add(hash, tag.name(), tag.value())
?0
;
53
    }
54
55
17
    add_path_tags(database, hash, paths)
?0
;
56
57
17
    Ok(())
58
17
}
59
60
17
fn add_path_tags<H: Handle>(
61
17
    database: &Database<H>,
62
17
    hash: &Hash,
63
17
    paths: &mut dyn Iterator<Item = &Path>,
64
17
) -> Result<()> {
65
34
    for 
path17
in paths {
66
79
        for tag in path_tags(
path17
) {
67
79
            database.tag_value_create(tag.name(), tag.value())
?0
;
68
79
            database.hash_tag_add(hash, tag.name(), tag.value())
?0
;
69
        }
70
    }
71
17
    Ok(())
72
17
}
73
74
impl Cindy {
75
12
    pub async fn command_add(&self, command: &AddCommand) -> Result<()> {
76
106
        
self.add_files(&command.paths, command.recursive)12
.await
77
12
    }
78
79
12
    pub async fn add_files(&self, files: &[PathBuf], recursive: bool) -> Result<()> {
80
12
        let files = self
81
12
            .list_files(files, recursive)
82
23
            .await
83
12
            .context("Listing files")
?0
;
84
28
        let 
hashes12
=
self.hash_files(files)12
.await.
context("Hashing files")12
?0
;
85
12
        let hashes = self
86
12
            .skip_known(hashes)
87
11
            .await
88
12
            .context("Skipping known files")
?0
;
89
12
        let hashes = self
90
12
            .scan_metadata(hashes)
91
32
            .await
92
12
            .context("Scanning metadata")
?0
;
93
12
        let mut database = self.database().
await0
;
94
12
        spawn_blocking(move || {
95
12
            let transaction = database.transaction()
?0
;
96
17
            for (hash, (metadata, paths)) in hashes.
iter()12
{
97
17
                add_file(
98
17
                    &transaction,
99
17
                    Hash::new(&hash[..]),
100
17
                    metadata,
101
17
                    &mut paths.iter().map(|p| p.as_path()),
102
17
                )
?0
;
103
            }
104
12
            transaction.commit()
?0
;
105
12
            Ok(()) as Result<()>
106
12
        })
107
12
        .await
?0
?0
;
108
12
        Ok(())
109
12
    }
110
111
    /// Given a file, compute it's hash.
112
17
    pub fn hash_file(&self, path: &Path) -> Result<BoxHash> {
113
17
        let mut file = File::open(self.root().join(path))
?0
;
114
17
        let hash = self.hasher().hash_read(&mut file)
?0
;
115
17
        Ok(hash)
116
17
    }
117
118
    /// Given a path and a hash, add it to the data index.
119
17
    pub fn data_add(&self, file: &Path, hash: &Hash) -> Result<()> {
120
17
        let path = self.hash_path(hash);
121
17
        if !path.exists() {
122
17
            let file = self.root().join(file);
123
17
            create_dir_all(path.parent().unwrap())
?0
;
124
125
            // try to reflink first, if enabled
126
            #[cfg(feature = "reflink")]
127
17
            match reflink::reflink(&file, &path) {
128
0
                Ok(()) => return Ok(()),
129
17
                Err(err) => println!("Error reflinking: {err}"),
130
17
            }
131
17
132
17
            // fall back to a hard link
133
17
            hard_link(file, path).or_else(|error| 
match error0
{
134
0
                error if error.kind() == ErrorKind::AlreadyExists => Ok(()),
135
0
                error => Err(error),
136
17
            
}0
)
?0
137
0
        }
138
17
        Ok(())
139
17
    }
140
141
12
    fn launch_hasher_tasks(
142
12
        &self,
143
12
        files: Receiver<(PathBuf, Metadata)>,
144
12
        hashes: Sender<(PathBuf, Metadata, BoxHash)>,
145
12
        tasks: usize,
146
12
    ) -> Vec<JoinHandle<Result<()>>> {
147
12
        (0..tasks)
148
192
            .map(|_| {
149
192
                let files = files.clone();
150
192
                let hashes = hashes.clone();
151
192
                let cindy = self.clone();
152
192
                spawn_blocking(move || {
153
192
                    for (
path, metadata17
) in files.iter() {
154
17
                        let hash = cindy.hash_file(&path)
?0
;
155
17
                        cindy.data_add(&path, &hash)
?0
;
156
17
                        hashes.send((path, metadata, hash))
?0
;
157
                    }
158
192
                    Ok(()) as Result<()>
159
192
                })
160
192
            })
161
12
            .collect()
162
12
    }
163
164
12
    fn launch_scanner_tasks(
165
12
        &self,
166
12
        files: Receiver<(BoxHash, Metadata, BTreeSet<PathBuf>)>,
167
12
        hashes: Sender<(BoxHash, Vec<Tag>, BTreeSet<PathBuf>)>,
168
12
        tasks: usize,
169
12
    ) -> Vec<JoinHandle<Result<()>>> {
170
12
        (0..tasks)
171
192
            .map(|_| {
172
192
                let files = files.clone();
173
192
                let hashes = hashes.clone();
174
192
                let cindy = self.clone();
175
192
                spawn_blocking(move || {
176
192
                    for (
hash, metadata, paths17
) in files.iter() {
177
17
                        let filesize = Tag::new("filesize".into(), metadata.len().to_string());
178
17
                        let mut tags = vec![filesize];
179
17
                        let path = cindy.hash_path(&hash);
180
17
                        #[cfg(feature = "ffmpeg")]
181
17
                        match crate::media::media_info(&path) {
182
3
                            Ok(info) => {
183
17
                                for tag in info.
tags()3
{
184
17
                                    tags.push(tag);
185
17
                                }
186
                            }
187
14
                            Err(error) => {
188
14
                                println!("{paths:?}: {error:#}");
189
14
                            }
190
                        }
191
17
                        hashes.send((hash, tags, paths))
?0
;
192
                    }
193
192
                    Ok(()) as Result<()>
194
192
                })
195
192
            })
196
12
            .collect()
197
12
    }
198
199
    /// Scan file metadata.
200
12
    pub async fn scan_metadata(
201
12
        &self,
202
12
        files: BTreeMap<BoxHash, (Metadata, BTreeSet<PathBuf>)>,
203
12
    ) -> Result<BTreeMap<BoxHash, (Vec<Tag>, BTreeSet<PathBuf>)>> {
204
12
        let total_files = files.len();
205
12
206
12
        // task submitting files to queue
207
12
        let (file_sender, file_receiver) =
208
12
            flume::bounded::<(BoxHash, Metadata, BTreeSet<PathBuf>)>(1024);
209
12
        let sender = tokio::spawn(async move {
210
17
            for (hash, (metadata, paths)) in 
files.into_iter()12
{
211
17
                file_sender.send_async((hash, metadata, paths)).
await0
?0
;
212
            }
213
12
            Ok(()) as Result<()>
214
12
        });
215
12
216
12
        // tasks to pop messages off the queue and generate hashes
217
12
        let (hash_sender, hash_receiver) =
218
12
            flume::bounded::<(BoxHash, Vec<Tag>, BTreeSet<PathBuf>)>(1024);
219
12
        let hasher_tasks = self.launch_scanner_tasks(file_receiver, hash_sender, 16);
220
12
221
12
        // start collecting hashes
222
12
        let collect = tokio::spawn(async move {
223
12
            let mut stream = hash_receiver.stream();
224
12
            let mut last_update = Instant::now();
225
12
            let mut current_files = 0;
226
12
            let mut files: BTreeMap<BoxHash, (Vec<Tag>, BTreeSet<PathBuf>)> = BTreeMap::new();
227
29
            while let Some((
hash, metadata, paths17
)) = stream.next().
await19
{
228
17
                current_files += 1;
229
17
230
17
                if Instant::now().duration_since(last_update) > UPDATE_INTERVAL {
231
0
                    last_update = Instant::now();
232
0
                    print!("\r\x1B[2Kscanning {current_files}/{total_files} files)");
233
0
                    stdout().flush().unwrap();
234
17
                }
235
236
17
                files.insert(hash, (metadata, paths));
237
            }
238
239
12
            println!("\r\x1B[2Kscanning {current_files}/{total_files} files");
240
12
            Ok(files) as Result<BTreeMap<BoxHash, (Vec<Tag>, BTreeSet<PathBuf>)>>
241
12
        });
242
243
        // await for futures
244
192
        for task in 
hasher_tasks.into_iter()12
{
245
192
            task.
await21
?0
?0
;
246
        }
247
12
        sender.
await0
?0
?0
;
248
12
        collect.
await11
?0
249
12
    }
250
251
    /// Skip known files
252
12
    pub async fn skip_known(
253
12
        &self,
254
12
        mut files: BTreeMap<BoxHash, (Metadata, BTreeSet<PathBuf>)>,
255
12
    ) -> Result<BTreeMap<BoxHash, (Metadata, BTreeSet<PathBuf>)>> {
256
12
        let mut database = self.database().
await0
;
257
12
        spawn_blocking(move || {
258
12
            let mut last_update = Instant::now();
259
12
            let total_files = files.len();
260
12
            let mut current_files = 0;
261
262
            // check for existing files
263
12
            let transaction = database.transaction()
?0
;
264
12
            let mut exists = BTreeSet::new();
265
29
            for (
hash, (_metadata, paths17
)) in &files {
266
17
                current_files += 1;
267
17
                if Instant::now().duration_since(last_update) > UPDATE_INTERVAL {
268
0
                    last_update = Instant::now();
269
0
                    print!("\r\x1B[2Kdeduplicating {current_files}/{total_files} files");
270
0
                    stdout().flush().unwrap();
271
17
                }
272
273
                // if a file already exists, just save the paths
274
17
                if transaction.hash_exists(hash)
?0
{
275
0
                    exists.insert(hash.clone());
276
0
                    add_path_tags(&transaction, hash, &mut paths.iter().map(PathBuf::as_path))?;
277
17
                }
278
            }
279
280
12
            transaction.commit()
?0
;
281
282
            // remove files that already exists
283
12
            for 
hash0
in exists {
284
0
                files.remove(&hash);
285
0
            }
286
287
12
            println!("\r\x1B[2Kdeduplicating {current_files}/{total_files} files");
288
12
289
12
            Ok(files) as Result<_>
290
12
        })
291
11
        .await
?0
292
12
    }
293
294
    /// Hash files.
295
12
    pub async fn hash_files(
296
12
        &self,
297
12
        files: BTreeMap<PathBuf, Metadata>,
298
12
    ) -> Result<BTreeMap<BoxHash, (Metadata, BTreeSet<PathBuf>)>> {
299
12
        let total_files = files.len();
300
17
        let total_bytes: u64 = files.values().map(|metadata| metadata.len()).sum();
301
12
302
12
        // task submitting files to queue
303
12
        let (file_sender, file_receiver) = flume::bounded::<(PathBuf, Metadata)>(1024);
304
12
        let sender = tokio::spawn(async move {
305
17
            for (file, metadata) in 
files.into_iter()12
{
306
17
                file_sender.send_async((file, metadata)).
await0
?0
;
307
            }
308
12
            Ok(()) as Result<()>
309
12
        });
310
12
311
12
        // tasks to pop messages off the queue and generate hashes
312
12
        let (hash_sender, hash_receiver) = flume::bounded::<(PathBuf, Metadata, BoxHash)>(1024);
313
12
        let hasher_tasks = self.launch_hasher_tasks(file_receiver, hash_sender, 16);
314
12
315
12
        // start collecting hashes
316
12
        let collect = tokio::spawn(async move {
317
12
            let mut stream = hash_receiver.stream();
318
12
            let mut last_update = Instant::now();
319
12
            let mut current_files = 0;
320
12
            let mut current_bytes = 0;
321
12
            let mut files: BTreeMap<BoxHash, (Metadata, BTreeSet<PathBuf>)> = BTreeMap::new();
322
29
            while let Some((
path, metadata, hash17
)) = stream.next().
await19
{
323
17
                current_files += 1;
324
17
                current_bytes += metadata.len();
325
17
326
17
                if Instant::now().duration_since(last_update) > UPDATE_INTERVAL {
327
1
                    last_update = Instant::now();
328
1
                    print!("\r\x1B[2Khashing {current_files}/{total_files} files ({current_bytes}/{total_bytes} bytes)");
329
1
                    stdout().flush().unwrap();
330
16
                }
331
332
17
                files
333
17
                    .entry(hash)
334
17
                    .or_insert_with(move || (metadata, BTreeSet::new()))
335
17
                    .1
336
17
                    .insert(path);
337
            }
338
339
12
            println!("\r\x1B[2Khashing {current_files}/{total_files} files ({current_bytes}/{total_bytes} bytes)");
340
12
            Ok(files) as Result<BTreeMap<BoxHash, (Metadata, BTreeSet<PathBuf>)>>
341
12
        });
342
343
        // await for futures
344
192
        for task in 
hasher_tasks.into_iter()12
{
345
192
            task.
await21
?0
?0
;
346
        }
347
12
        sender.
await0
?0
?0
;
348
12
        collect.
await7
?0
349
12
    }
350
351
    /// List files (recursively)
352
12
    pub async fn list_files(
353
12
        &self,
354
12
        paths: &[PathBuf],
355
12
        recursive: bool,
356
12
    ) -> Result<BTreeMap<PathBuf, Metadata>> {
357
12
        let (sender, mut receiver) = channel::<(PathBuf, Metadata)>(1024);
358
12
        let files = tokio::spawn(async move {
359
12
            let mut files: BTreeMap<PathBuf, Metadata> = BTreeMap::new();
360
12
            let mut last_update = Instant::now();
361
12
            let mut bytes = 0;
362
29
            while let Some((
path, metadata17
)) = receiver.recv().
await15
{
363
17
                if Instant::now().duration_since(last_update) > UPDATE_INTERVAL {
364
0
                    last_update = Instant::now();
365
0
                    print!("\r\x1B[2Klisting {} files ({bytes} bytes)", files.len());
366
0
                    stdout().flush().unwrap();
367
17
                }
368
369
17
                let file_len = metadata.len();
370
17
                if files.insert(path, metadata).is_none() {
371
17
                    bytes += file_len;
372
17
                }
0
373
            }
374
375
12
            println!("\rlisting {} files ({bytes} bytes)", files.len());
376
12
            files
377
12
        });
378
379
24
        for 
path12
in paths {
380
12
            let path = std::fs::canonicalize(path)
?0
;
381
12
            if recursive {
382
3
                let sender = sender.clone();
383
3
                let root = self.root().to_path_buf();
384
3
                let cindy = self.clone();
385
3
                spawn_blocking(move || {
386
3
                    // make sure we don't recurse into our own data or thumbs paths
387
3
                    let cindy_folder = cindy.cindy_folder();
388
3
                    let filter = |path: &Path| {
389
2
                        if path == cindy_folder {
390
2
                            return false;
391
0
                        }
392
0
393
0
                        true
394
2
                    };
395
396
                    // scan files recursively
397
8
                    for result in scan_files(
&path, &filter3
) {
398
8
                        let (path, metadata) = result
?0
;
399
8
                        let path = path.strip_prefix(&root)
?0
.to_path_buf();
400
8
                        let sender = sender.clone();
401
8
                        sender.blocking_send((path, metadata))
?0
;
402
                    }
403
404
3
                    Ok(()) as Result<()>
405
3
                })
406
3
                .await
?0
?0
;
407
            } else {
408
9
                let metadata = tokio::fs::metadata(&path).
await8
?0
;
409
9
                let path = path.strip_prefix(self.root())
?0
.to_path_buf();
410
9
                sender.send((path, metadata)).
await0
?0
;
411
            }
412
        }
413
12
        drop(sender);
414
12
415
12
        Ok(files.await
?0
)
416
12
    }
417
}
418
419
8
fn scan_files<'a>(
420
8
    path: &Path,
421
8
    filter: &'a dyn Fn(&Path) -> bool,
422
8
) -> impl Iterator<Item = Result<(PathBuf, Metadata)>> + 'a {
423
8
    let files = std::fs::read_dir(path).unwrap();
424
8
    files
425
20
        .map(move |entry| {
426
20
            let entry = entry
?0
;
427
20
            let path = entry.path().to_path_buf();
428
20
            let metadata = entry.metadata()
?0
;
429
20
            Ok((path, metadata))
430
20
        })
431
20
        .flat_map(|result| {
432
20
            let result: Box<dyn Iterator<Item = Result<(PathBuf, Metadata)>> + 'a> = match 
result18
{
433
20
                Ok((
path, metadata2
)) if metadata.is_dir() &&
filter(&path5
) => {
434
2
                    Box::new(scan_files(&path, filter))
435
                }
436
3
                Ok((_, metadata)) if !metadata.is_file() => Box::new(std::iter::empty()),
437
15
                other => Box::new(std::iter::once(other)),
438
            };
439
20
            result
440
20
        })
441
8
}
442
443
#[cfg(test)]
444
mod tests {
445
    use super::*;
446
    use std::fs::{create_dir, write};
447
    use tempfile::tempdir;
448
449
1
    #[test]
450
1
    fn test_path_tags_subfolder() {
451
1
        let tags: BTreeSet<_> = path_tags(Path::new("images/vacation/boat.jpg")).collect();
452
1
        assert!(tags.contains(&Tag::new("path".into(), "/images/vacation/boat.jpg".into())));
453
1
        assert!(tags.contains(&Tag::new("directory".into(), "/images/vacation".into())));
454
1
        assert!(tags.contains(&Tag::new("filename".into(), "boat.jpg".into())));
455
1
        assert!(tags.contains(&Tag::new("ancestor".into(), "/images/vacation".into())));
456
1
        assert!(tags.contains(&Tag::new("ancestor".into(), "/images".into())));
457
1
        assert!(tags.contains(&Tag::new("ancestor".into(), "/".into())));
458
1
    }
459
460
1
    #[test]
461
1
    fn test_path_tags_root() {
462
1
        let tags: BTreeSet<_> = path_tags(Path::new("boat.jpg")).collect();
463
1
        assert!(tags.contains(&Tag::new("path".into(), "/boat.jpg".into())));
464
1
        assert!(tags.contains(&Tag::new("directory".into(), "/".into())));
465
1
        assert!(tags.contains(&Tag::new("filename".into(), "boat.jpg".into())));
466
1
        assert!(tags.contains(&Tag::new("ancestor".into(), "/".into())));
467
1
    }
468
469
1
    #[test]
470
1
    fn can_scan_files() {
471
1
        let dir = tempdir().unwrap();
472
1
        let files = scan_files(&dir.path(), &|_| 
true0
).count();
473
1
        assert_eq!(files, 0);
474
1
    }
475
476
1
    #[test]
477
1
    fn can_scan_files_many2() {
478
1
        let dir = tempdir().unwrap();
479
1
        write(dir.path().join("file1.txt"), "hello").unwrap();
480
1
        write(dir.path().join("file2.txt"), "world").unwrap();
481
1
        write(dir.path().join("file3.txt"), "test").unwrap();
482
1
        let files: BTreeMap<_, _> = scan_files(&dir.path(), &|_| 
true0
)
483
3
            .map(|result| {
484
3
                result.map(|(path, metadata)| {
485
3
                    (
486
3
                        path.strip_prefix(dir.path()).unwrap().to_path_buf(),
487
3
                        metadata,
488
3
                    )
489
3
                })
490
3
            })
491
1
            .collect::<Result<BTreeMap<_, _>, _>>()
492
1
            .unwrap();
493
1
        assert_eq!(files.len(), 3);
494
1
        assert_eq!(files.get(Path::new("file1.txt")).unwrap().len(), 5);
495
1
        assert_eq!(files.get(Path::new("file2.txt")).unwrap().len(), 5);
496
1
        assert_eq!(files.get(Path::new("file3.txt")).unwrap().len(), 4);
497
1
    }
498
499
1
    #[test]
500
1
    fn can_scan_files_folder() {
501
1
        let dir = tempdir().unwrap();
502
1
        // create folders
503
1
        create_dir(dir.path().join("folder")).unwrap();
504
1
        create_dir(dir.path().join("other")).unwrap();
505
1
        create_dir(dir.path().join("hidden")).unwrap();
506
1
507
1
        // create files
508
1
        write(dir.path().join("folder").join("file1.txt"), "hello").unwrap();
509
1
        write(dir.path().join("folder").join("file2.txt"), "world").unwrap();
510
1
        write(dir.path().join("folder").join("file3.txt"), "test").unwrap();
511
1
        write(dir.path().join("other").join("taxes.txt"), "expensive").unwrap();
512
1
        write(dir.path().join("hidden").join("secrets.txt"), "shh").unwrap();
513
1
514
1
        // iterate
515
3
        let files: BTreeMap<_, _> = scan_files(&dir.path(), &|path| {
516
3
            path.strip_prefix(dir.path())
517
3
                .map(|path| path != Path::new("hidden"))
518
3
                .unwrap_or(true)
519
3
        })
520
4
        .map(|result| {
521
4
            result.map(|(path, metadata)| {
522
4
                (
523
4
                    path.strip_prefix(dir.path()).unwrap().to_path_buf(),
524
4
                    metadata,
525
4
                )
526
4
            })
527
4
        })
528
1
        .collect::<Result<BTreeMap<_, _>, _>>()
529
1
        .unwrap();
530
1
        assert_eq!(files.len(), 4);
531
1
        assert_eq!(files.get(Path::new("folder/file1.txt")).unwrap().len(), 5);
532
1
        assert_eq!(files.get(Path::new("folder/file2.txt")).unwrap().len(), 5);
533
1
        assert_eq!(files.get(Path::new("folder/file3.txt")).unwrap().len(), 4);
534
1
    }
535
}