Skip to content

Library API Reference

The audiomancer.library module provides sample library management functionality.

Overview

library

Sample library management module.

Provides tools for managing sample packs from Google Drive (or other sources) with local caching, symlink management, and integration with pattern generation.

__all__ = ['LibraryStore', 'SampleLookup', 'LibraryManager', 'LibraryError', 'PackNotFoundError', 'SourceNotAvailableError', 'scan_source_packs', 'scan_pack_files', 'group_files_into_samples', 'detect_category', 'detect_bpm', 'detect_is_loop', 'abbreviate_pack_name', 'generate_sample_id', 'PackInfo', 'PackStatus', 'SampleInfo', 'CopyStats', 'EnableResult'] module-attribute

CopyStats

Bases: TypedDict

Statistics from a copy operation.

EnableResult

Bases: TypedDict

Result from enabling a pack.

LibraryError

Bases: AudiomancerError

Sample library management errors.

Base class for errors during library operations (enable/disable packs).

Example

raise LibraryError( ... "Failed to copy pack", ... details={"pack": "808 Drum Kit", "error": "disk full"} ... )

LibraryManager

Manages sample library from source (Google Drive) to local project.

Handles: - Scanning source for available packs - Copying packs to local cache (samples/) - Creating symlinks for enabled packs (library/) - Querying enabled samples for pattern generation

__init__(source_dir: Path, samples_dir: Path, library_dir: Path, ignore_patterns: Optional[set[str]] = None)

Initialize library manager.

Parameters:

Name Type Description Default
source_dir Path

Path to source samples (e.g., Google Drive)

required
samples_dir Path

Path to local cache directory

required
library_dir Path

Path to enabled samples directory (symlinks)

required
ignore_patterns Optional[set[str]]

Set of pack names to ignore

None

disable_pack(pack_name: str) -> int

Disable a pack (remove symlinks, keep cache).

Parameters:

Name Type Description Default
pack_name str

Name of pack to disable

required

Returns:

Type Description
int

Number of samples disabled

enable_pack(pack_name: str, max_size_mb: int = 10, workers: int = 16) -> EnableResult

Enable a pack (sync wrapper for async operation).

enable_pack_async(pack_name: str, max_size_mb: int = 10, workers: int = 16) -> EnableResult async

Enable a pack (copy from source and create symlinks).

Parameters:

Name Type Description Default
pack_name str

Name of pack to enable

required
max_size_mb int

Skip files larger than this (0 = no limit)

10
workers int

Number of parallel copy workers

16

Returns:

Type Description
EnableResult

EnableResult with copy stats and enabled sample IDs

Raises:

Type Description
PackNotFoundError

If pack doesn't exist

get_pack_status(pack_name: str) -> PackStatus

Get detailed status of a pack.

Parameters:

Name Type Description Default
pack_name str

Name of pack folder

required

Returns:

Type Description
PackStatus

PackStatus with enabled/cached/remote status for each sample

Raises:

Type Description
PackNotFoundError

If pack doesn't exist in source

get_samples_by_type(instrument_type: str, bpm: Optional[float] = None, is_loop: Optional[bool] = None, limit: int = 10) -> list[str]

Get sample IDs matching criteria (implements SampleLookup protocol).

Parameters:

Name Type Description Default
instrument_type str

Category to match (bd, sn, hh, etc.)

required
bpm Optional[float]

Optional BPM to filter by

None
is_loop Optional[bool]

Optional filter for loops vs one-shots

None
limit int

Maximum results

10

Returns:

Type Description
list[str]

List of sample IDs

list_enabled_samples() -> list[SampleInfo]

List all enabled sample IDs with their info.

Returns:

Type Description
list[SampleInfo]

List of SampleInfo for all enabled samples

list_packs() -> list[PackInfo]

List all available packs from source.

Returns:

Type Description
list[PackInfo]

List of PackInfo for each pack in source directory

purge_pack(pack_name: str) -> bool

Remove pack from local cache entirely.

Parameters:

Name Type Description Default
pack_name str

Name of pack to purge

required

Returns:

Type Description
bool

True if any files were removed

search_packs(pattern: str) -> list[PackInfo]

Search packs by name pattern.

Parameters:

Name Type Description Default
pattern str

Regex pattern to match against pack names

required

Returns:

Type Description
list[PackInfo]

List of matching PackInfo

LibraryStore

Bases: Protocol

Interface for library management operations.

disable_pack(pack_name: str) -> int

Disable a pack (remove symlinks, keep cache). Returns count disabled.

enable_pack(pack_name: str, max_size_mb: int = 10, workers: int = 16) -> EnableResult

Enable a pack (copy from source and create symlinks).

get_pack_status(pack_name: str) -> PackStatus

Get detailed status of a pack.

list_enabled_samples() -> list[SampleInfo]

List all enabled sample IDs with their info.

list_packs() -> list[PackInfo]

List all available packs from source.

purge_pack(pack_name: str) -> bool

Remove pack from cache entirely.

search_packs(pattern: str) -> list[PackInfo]

Search packs by name pattern.

PackInfo

Bases: TypedDict

Information about a sample pack from the source.

PackNotFoundError

Bases: LibraryError

Sample pack does not exist in source directory.

Example

raise PackNotFoundError("Nonexistent Pack", Path("/source"))

PackStatus

Bases: TypedDict

Detailed status of a sample pack.

SampleInfo

Bases: TypedDict

Information about a single sample folder in the library.

SampleLookup

Bases: Protocol

Interface for querying available samples by type.

Used by pattern generation to find real sample IDs.

get_samples_by_type(instrument_type: str, bpm: Optional[float] = None, is_loop: Optional[bool] = None, limit: int = 10) -> list[str]

Return sample IDs matching the criteria.

Parameters:

Name Type Description Default
instrument_type str

Category to match (bd, sn, hh, oh, perc, bass, etc.)

required
bpm Optional[float]

Optional BPM to filter by (matches samples with detected BPM)

None
is_loop Optional[bool]

Optional filter for loops vs one-shots

None
limit int

Maximum number of results

10

Returns:

Type Description
list[str]

List of sample IDs (e.g., ["808dk_bd", "absttex_bd_125"])

SourceNotAvailableError

Bases: LibraryError

Source directory (e.g., Google Drive) is not accessible.

Example

raise SourceNotAvailableError(Path("/mnt/google-drive/samples"))

abbreviate_pack_name(pack_name: str) -> str

Generate short abbreviation for pack name.

Parameters:

Name Type Description Default
pack_name str

Original pack folder name

required

Returns:

Type Description
str

Short abbreviation (max 8 chars) for use in sample IDs

detect_bpm(text: str) -> int | None

Detect BPM from text (filename/path).

Returns:

Type Description
int | None

BPM value if detected and in valid range (60-200), None otherwise

detect_category(text: str) -> tuple[str, str]

Detect sample category from text (filename/path).

Returns:

Type Description
tuple[str, str]

Tuple of (category_id, category_type)

detect_is_loop(text: str) -> bool

Detect if sample is a loop vs one-shot.

Returns:

Type Description
bool

True if detected as loop, False otherwise

generate_sample_id(pack_abbr: str, category: str, bpm: int | None, is_loop: bool) -> str

Generate a sample ID from components.

Format: {pack}[lp]{category}[_{bpm}]

Examples:

  • "808dk_bd" (808 Drum Kit, kick)
  • "absttex_lp_hh_125" (Abstract Techno, loop, hi-hat, 125 BPM)

group_files_into_samples(pack_name: str, files: list[dict[str, Any]]) -> dict[str, SampleInfo]

Group files into sample folders by category.

Parameters:

Name Type Description Default
pack_name str

Pack folder name

required
files list[dict[str, Any]]

List of file info dicts from scan_pack_files

required

Returns:

Type Description
dict[str, SampleInfo]

Dict mapping sample_id to SampleInfo

scan_pack_files(source_dir: Path, pack_name: str) -> list[dict[str, Any]]

Scan a pack and categorize all audio files.

Parameters:

Name Type Description Default
source_dir Path

Path to source directory

required
pack_name str

Name of pack folder to scan

required

Returns:

Type Description
list[dict[str, Any]]

List of file info dicts with path, category, bpm, is_loop, size

scan_source_packs(source_dir: Path) -> list[str]

Get list of top-level pack folders from source directory.

Parameters:

Name Type Description Default
source_dir Path

Path to source directory (e.g., Google Drive samples folder)

required

Returns:

Type Description
list[str]

Sorted list of pack folder names

Library Manager

manager

Library management for sample packs.

Provides LibraryManager class for enabling/disabling sample packs from Google Drive (or other source) with local caching and symlink management.

LibraryManager

Manages sample library from source (Google Drive) to local project.

Handles: - Scanning source for available packs - Copying packs to local cache (samples/) - Creating symlinks for enabled packs (library/) - Querying enabled samples for pattern generation

Source code in src/audiomancer/library/manager.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
class LibraryManager:
    """Manages sample library from source (Google Drive) to local project.

    Handles:
    - Scanning source for available packs
    - Copying packs to local cache (samples/)
    - Creating symlinks for enabled packs (library/)
    - Querying enabled samples for pattern generation
    """

    def __init__(
        self,
        source_dir: Path,
        samples_dir: Path,
        library_dir: Path,
        ignore_patterns: Optional[set[str]] = None,
    ):
        """Initialize library manager.

        Args:
            source_dir: Path to source samples (e.g., Google Drive)
            samples_dir: Path to local cache directory
            library_dir: Path to enabled samples directory (symlinks)
            ignore_patterns: Set of pack names to ignore
        """
        self.source_dir = Path(source_dir).expanduser().resolve()
        self.samples_dir = Path(samples_dir).expanduser().resolve()
        self.library_dir = Path(library_dir).expanduser().resolve()
        self.ignore_patterns = ignore_patterns or set()

    def _check_source(self) -> None:
        """Verify source directory is accessible."""
        if not self.source_dir.exists():
            raise SourceNotAvailableError(self.source_dir)

    def _should_ignore(self, pack_name: str) -> bool:
        """Check if pack should be ignored."""
        for pattern in self.ignore_patterns:
            if pattern == pack_name:
                return True
            if pattern.endswith("*") and pack_name.startswith(pattern[:-1]):
                return True
        return False

    def _get_cached_sample_ids(self) -> set[str]:
        """Get sample IDs in local cache."""
        if not self.samples_dir.exists():
            return set()
        return {
            d.name
            for d in self.samples_dir.iterdir()
            if d.is_dir() and not d.name.startswith(".")
        }

    def _get_enabled_sample_ids(self) -> set[str]:
        """Get sample IDs that are enabled (symlinked)."""
        if not self.library_dir.exists():
            return set()
        return {
            d.name
            for d in self.library_dir.iterdir()
            if d.is_symlink() or d.is_dir()
        }

    def list_packs(self) -> list[PackInfo]:
        """List all available packs from source.

        Returns:
            List of PackInfo for each pack in source directory
        """
        self._check_source()

        result = []
        for pack_name in scan_source_packs(self.source_dir):
            if self._should_ignore(pack_name):
                continue

            files = scan_pack_files(self.source_dir, pack_name)
            samples = group_files_into_samples(pack_name, files)
            total_size = sum(f["size"] for f in files)

            result.append(
                PackInfo(
                    name=pack_name,
                    file_count=len(files),
                    size_mb=total_size / (1024 * 1024),
                    sample_ids=list(samples.keys()),
                )
            )

        return result

    def search_packs(self, pattern: str) -> list[PackInfo]:
        """Search packs by name pattern.

        Args:
            pattern: Regex pattern to match against pack names

        Returns:
            List of matching PackInfo
        """
        self._check_source()

        regex = re.compile(pattern, re.IGNORECASE)
        packs = scan_source_packs(self.source_dir)
        matches = [p for p in packs if regex.search(p) and not self._should_ignore(p)]

        result = []
        for pack_name in matches:
            files = scan_pack_files(self.source_dir, pack_name)
            samples = group_files_into_samples(pack_name, files)
            total_size = sum(f["size"] for f in files)

            result.append(
                PackInfo(
                    name=pack_name,
                    file_count=len(files),
                    size_mb=total_size / (1024 * 1024),
                    sample_ids=list(samples.keys()),
                )
            )

        return result

    def get_pack_status(self, pack_name: str) -> PackStatus:
        """Get detailed status of a pack.

        Args:
            pack_name: Name of pack folder

        Returns:
            PackStatus with enabled/cached/remote status for each sample

        Raises:
            PackNotFoundError: If pack doesn't exist in source
        """
        self._check_source()

        pack_dir = self.source_dir / pack_name
        if not pack_dir.exists():
            raise PackNotFoundError(pack_name, self.source_dir)

        files = scan_pack_files(self.source_dir, pack_name)
        samples = group_files_into_samples(pack_name, files)
        total_size = sum(f["size"] for f in files)

        cached_ids = self._get_cached_sample_ids()
        enabled_ids = self._get_enabled_sample_ids()

        sample_statuses = {}
        for sample_id in samples.keys():
            if sample_id in enabled_ids:
                sample_statuses[sample_id] = "enabled"
            elif sample_id in cached_ids:
                sample_statuses[sample_id] = "cached"
            else:
                sample_statuses[sample_id] = "remote"

        # Overall status
        if any(s == "enabled" for s in sample_statuses.values()):
            status = "enabled"
        elif any(s == "cached" for s in sample_statuses.values()):
            status = "cached"
        else:
            status = "remote"

        return PackStatus(
            name=pack_name,
            status=status,
            file_count=len(files),
            size_mb=total_size / (1024 * 1024),
            samples=sample_statuses,
        )

    def _enable_sample(self, sample_id: str) -> bool:
        """Create symlink in library/ for a cached sample."""
        source = self.samples_dir / sample_id
        target = self.library_dir / sample_id

        if not source.exists():
            return False

        self.library_dir.mkdir(parents=True, exist_ok=True)

        if target.exists() or target.is_symlink():
            return True

        target.symlink_to(source)
        return True

    def _disable_sample(self, sample_id: str) -> bool:
        """Remove symlink from library/."""
        target = self.library_dir / sample_id

        if target.is_symlink():
            target.unlink()
            return True
        return False

    def _file_already_copied(self, source: Path, target_dir: Path) -> bool:
        """Check if file already exists in target (by name and size)."""
        target = target_dir / source.name
        if not target.exists():
            return False
        try:
            return target.stat().st_size == source.stat().st_size
        except OSError:
            return False

    def _get_unique_filename(self, target_dir: Path, filename: str) -> str:
        """Generate unique filename if collision exists."""
        target = target_dir / filename
        if not target.exists():
            return filename

        stem = Path(filename).stem
        suffix = Path(filename).suffix
        counter = 2
        while (target_dir / f"{stem}_{counter}{suffix}").exists():
            counter += 1
        return f"{stem}_{counter}{suffix}"

    def _copy_file_sync(
        self,
        source: Path,
        target_dir: Path,
        max_size_bytes: int = 0,
    ) -> tuple[bool, str]:
        """Synchronous file copy. Returns (success, status)."""
        if max_size_bytes > 0:
            try:
                if source.stat().st_size > max_size_bytes:
                    return True, "too_large"
            except OSError:
                pass

        if self._file_already_copied(source, target_dir):
            return True, "skipped"

        filename = self._get_unique_filename(target_dir, source.name)
        target = target_dir / filename

        try:
            target_dir.mkdir(parents=True, exist_ok=True)
            shutil.copyfile(source, target)
            return True, "copied"
        except Exception:
            return False, "error"

    async def _copy_file_async(
        self,
        source: Path,
        target_dir: Path,
        semaphore: asyncio.Semaphore,
        stats: _CopyStats,
        max_size_bytes: int = 0,
    ) -> bool:
        """Async wrapper for file copy with concurrency control."""
        async with semaphore:
            success, status = await asyncio.to_thread(
                self._copy_file_sync, source, target_dir, max_size_bytes
            )

            if status == "too_large":
                await stats.add_too_large()
            elif status == "skipped":
                await stats.add_skipped()
            elif status == "copied":
                await stats.add_copied()
            else:
                await stats.add_error()

            return success

    async def enable_pack_async(
        self,
        pack_name: str,
        max_size_mb: int = 10,
        workers: int = 16,
    ) -> EnableResult:
        """Enable a pack (copy from source and create symlinks).

        Args:
            pack_name: Name of pack to enable
            max_size_mb: Skip files larger than this (0 = no limit)
            workers: Number of parallel copy workers

        Returns:
            EnableResult with copy stats and enabled sample IDs

        Raises:
            PackNotFoundError: If pack doesn't exist
        """
        self._check_source()

        pack_dir = self.source_dir / pack_name
        if not pack_dir.exists():
            raise PackNotFoundError(pack_name, self.source_dir)

        files = scan_pack_files(self.source_dir, pack_name)
        samples = group_files_into_samples(pack_name, files)
        max_size_bytes = max_size_mb * 1024 * 1024 if max_size_mb > 0 else 0

        # Build copy tasks
        copy_tasks: list[tuple[Path, Path]] = []
        for sample_id, sample_data in samples.items():
            target_dir = self.samples_dir / sample_id
            # Get files for this sample from the grouped data
            for f in files:
                # Check if file belongs to this sample
                from .scanner import detect_category, detect_bpm, detect_is_loop, generate_sample_id, abbreviate_pack_name

                pack_abbr = abbreviate_pack_name(pack_name)
                file_category, _ = detect_category(str(f["path"]))
                file_bpm = detect_bpm(str(f["path"]))
                file_is_loop = detect_is_loop(str(f["path"]))
                file_sample_id = generate_sample_id(pack_abbr, file_category, file_bpm, file_is_loop)

                if file_sample_id == sample_id:
                    copy_tasks.append((f["path"], target_dir))

        # Copy files in parallel
        stats = _CopyStats()
        if copy_tasks:
            semaphore = asyncio.Semaphore(workers)
            tasks = [
                self._copy_file_async(src, tgt, semaphore, stats, max_size_bytes)
                for src, tgt in copy_tasks
            ]
            await asyncio.gather(*tasks)

        # Enable (symlink) all samples
        enabled = 0
        sample_ids = []
        for sample_id in samples.keys():
            if self._enable_sample(sample_id):
                enabled += 1
                sample_ids.append(sample_id)

        return EnableResult(
            pack_name=pack_name,
            copy_stats=stats.to_dict(),
            samples_enabled=enabled,
            sample_ids=sample_ids,
        )

    def enable_pack(
        self,
        pack_name: str,
        max_size_mb: int = 10,
        workers: int = 16,
    ) -> EnableResult:
        """Enable a pack (sync wrapper for async operation)."""
        return asyncio.run(self.enable_pack_async(pack_name, max_size_mb, workers))

    def disable_pack(self, pack_name: str) -> int:
        """Disable a pack (remove symlinks, keep cache).

        Args:
            pack_name: Name of pack to disable

        Returns:
            Number of samples disabled
        """
        self._check_source()

        pack_dir = self.source_dir / pack_name
        if not pack_dir.exists():
            raise PackNotFoundError(pack_name, self.source_dir)

        files = scan_pack_files(self.source_dir, pack_name)
        samples = group_files_into_samples(pack_name, files)

        disabled = 0
        for sample_id in samples.keys():
            if self._disable_sample(sample_id):
                disabled += 1

        return disabled

    def purge_pack(self, pack_name: str) -> bool:
        """Remove pack from local cache entirely.

        Args:
            pack_name: Name of pack to purge

        Returns:
            True if any files were removed
        """
        self._check_source()

        pack_dir = self.source_dir / pack_name
        if not pack_dir.exists():
            raise PackNotFoundError(pack_name, self.source_dir)

        files = scan_pack_files(self.source_dir, pack_name)
        samples = group_files_into_samples(pack_name, files)

        removed_any = False
        for sample_id in samples.keys():
            # Disable first
            self._disable_sample(sample_id)

            # Remove from cache
            cache_dir = self.samples_dir / sample_id
            if cache_dir.exists():
                shutil.rmtree(cache_dir)
                removed_any = True

        return removed_any

    def list_enabled_samples(self) -> list[SampleInfo]:
        """List all enabled sample IDs with their info.

        Returns:
            List of SampleInfo for all enabled samples
        """
        enabled_ids = self._get_enabled_sample_ids()
        result = []

        for sample_id in sorted(enabled_ids):
            sample_dir = self.library_dir / sample_id

            # Count audio files
            file_count = 0
            if sample_dir.exists():
                for f in sample_dir.iterdir():
                    if f.suffix.lower() in AUDIO_EXTENSIONS:
                        file_count += 1

            # Parse sample ID to extract info
            # Format: {pack}[_lp]_{category}[_{bpm}]
            parts = sample_id.split("_")
            is_loop = "lp" in parts
            bpm = None
            category = "misc"

            # Find BPM (last numeric part)
            for part in reversed(parts):
                if part.isdigit() and 60 <= int(part) <= 200:
                    bpm = int(part)
                    break

            # Find category (common ones)
            categories = {
                "bd", "sn", "cp", "hh", "ch", "oh", "tom", "perc",
                "bass", "synth", "lead", "pad", "vox", "fx", "loop",
            }
            for part in parts:
                if part in categories:
                    category = part
                    break

            result.append(
                SampleInfo(
                    id=sample_id,
                    category=category,
                    category_type="unknown",  # Would need source scan to determine
                    bpm=bpm,
                    is_loop=is_loop,
                    file_count=file_count,
                    pack_name="",  # Would need source scan to determine
                    enabled=True,
                )
            )

        return result

    # SampleLookup protocol implementation
    def get_samples_by_type(
        self,
        instrument_type: str,
        bpm: Optional[float] = None,
        is_loop: Optional[bool] = None,
        limit: int = 10,
    ) -> list[str]:
        """Get sample IDs matching criteria (implements SampleLookup protocol).

        Args:
            instrument_type: Category to match (bd, sn, hh, etc.)
            bpm: Optional BPM to filter by
            is_loop: Optional filter for loops vs one-shots
            limit: Maximum results

        Returns:
            List of sample IDs
        """
        enabled = self.list_enabled_samples()
        matches = []

        # Map common instrument type names to categories
        type_mapping = {
            "kick": ["bd", "bd808", "bd909"],
            "snare": ["sn", "sn808", "sn909"],
            "clap": ["cp"],
            "hihat": ["hh", "ch", "oh"],
            "hat": ["hh", "ch", "oh"],
            "tom": ["tom", "htom", "mtom", "ltom"],
            "perc": ["perc", "conga", "bongo", "shaker", "rim"],
            "bass": ["bass", "sub", "bass808"],
            "synth": ["synth", "lead", "pad", "chord", "stab"],
            "vocal": ["vox", "speech", "chant"],
            "fx": ["fx", "riser", "sweep", "impact"],
        }

        # Get categories to match
        categories = type_mapping.get(instrument_type.lower(), [instrument_type.lower()])

        for sample in enabled:
            # Check category match
            if sample.get("category") not in categories:
                continue

            # Check BPM match (with tolerance)
            sample_bpm = sample.get("bpm")
            if bpm is not None and sample_bpm is not None:
                if abs(sample_bpm - bpm) > 10:
                    continue

            # Check loop/one-shot match
            if is_loop is not None and sample.get("is_loop") != is_loop:
                continue

            sample_id = sample.get("id")
            if sample_id:
                matches.append(sample_id)

            if len(matches) >= limit:
                break

        return matches

__init__(source_dir: Path, samples_dir: Path, library_dir: Path, ignore_patterns: Optional[set[str]] = None)

Initialize library manager.

Parameters:

Name Type Description Default
source_dir Path

Path to source samples (e.g., Google Drive)

required
samples_dir Path

Path to local cache directory

required
library_dir Path

Path to enabled samples directory (symlinks)

required
ignore_patterns Optional[set[str]]

Set of pack names to ignore

None
Source code in src/audiomancer/library/manager.py
def __init__(
    self,
    source_dir: Path,
    samples_dir: Path,
    library_dir: Path,
    ignore_patterns: Optional[set[str]] = None,
):
    """Initialize library manager.

    Args:
        source_dir: Path to source samples (e.g., Google Drive)
        samples_dir: Path to local cache directory
        library_dir: Path to enabled samples directory (symlinks)
        ignore_patterns: Set of pack names to ignore
    """
    self.source_dir = Path(source_dir).expanduser().resolve()
    self.samples_dir = Path(samples_dir).expanduser().resolve()
    self.library_dir = Path(library_dir).expanduser().resolve()
    self.ignore_patterns = ignore_patterns or set()

disable_pack(pack_name: str) -> int

Disable a pack (remove symlinks, keep cache).

Parameters:

Name Type Description Default
pack_name str

Name of pack to disable

required

Returns:

Type Description
int

Number of samples disabled

Source code in src/audiomancer/library/manager.py
def disable_pack(self, pack_name: str) -> int:
    """Disable a pack (remove symlinks, keep cache).

    Args:
        pack_name: Name of pack to disable

    Returns:
        Number of samples disabled
    """
    self._check_source()

    pack_dir = self.source_dir / pack_name
    if not pack_dir.exists():
        raise PackNotFoundError(pack_name, self.source_dir)

    files = scan_pack_files(self.source_dir, pack_name)
    samples = group_files_into_samples(pack_name, files)

    disabled = 0
    for sample_id in samples.keys():
        if self._disable_sample(sample_id):
            disabled += 1

    return disabled

enable_pack(pack_name: str, max_size_mb: int = 10, workers: int = 16) -> EnableResult

Enable a pack (sync wrapper for async operation).

Source code in src/audiomancer/library/manager.py
def enable_pack(
    self,
    pack_name: str,
    max_size_mb: int = 10,
    workers: int = 16,
) -> EnableResult:
    """Enable a pack (sync wrapper for async operation)."""
    return asyncio.run(self.enable_pack_async(pack_name, max_size_mb, workers))

enable_pack_async(pack_name: str, max_size_mb: int = 10, workers: int = 16) -> EnableResult async

Enable a pack (copy from source and create symlinks).

Parameters:

Name Type Description Default
pack_name str

Name of pack to enable

required
max_size_mb int

Skip files larger than this (0 = no limit)

10
workers int

Number of parallel copy workers

16

Returns:

Type Description
EnableResult

EnableResult with copy stats and enabled sample IDs

Raises:

Type Description
PackNotFoundError

If pack doesn't exist

Source code in src/audiomancer/library/manager.py
async def enable_pack_async(
    self,
    pack_name: str,
    max_size_mb: int = 10,
    workers: int = 16,
) -> EnableResult:
    """Enable a pack (copy from source and create symlinks).

    Args:
        pack_name: Name of pack to enable
        max_size_mb: Skip files larger than this (0 = no limit)
        workers: Number of parallel copy workers

    Returns:
        EnableResult with copy stats and enabled sample IDs

    Raises:
        PackNotFoundError: If pack doesn't exist
    """
    self._check_source()

    pack_dir = self.source_dir / pack_name
    if not pack_dir.exists():
        raise PackNotFoundError(pack_name, self.source_dir)

    files = scan_pack_files(self.source_dir, pack_name)
    samples = group_files_into_samples(pack_name, files)
    max_size_bytes = max_size_mb * 1024 * 1024 if max_size_mb > 0 else 0

    # Build copy tasks
    copy_tasks: list[tuple[Path, Path]] = []
    for sample_id, sample_data in samples.items():
        target_dir = self.samples_dir / sample_id
        # Get files for this sample from the grouped data
        for f in files:
            # Check if file belongs to this sample
            from .scanner import detect_category, detect_bpm, detect_is_loop, generate_sample_id, abbreviate_pack_name

            pack_abbr = abbreviate_pack_name(pack_name)
            file_category, _ = detect_category(str(f["path"]))
            file_bpm = detect_bpm(str(f["path"]))
            file_is_loop = detect_is_loop(str(f["path"]))
            file_sample_id = generate_sample_id(pack_abbr, file_category, file_bpm, file_is_loop)

            if file_sample_id == sample_id:
                copy_tasks.append((f["path"], target_dir))

    # Copy files in parallel
    stats = _CopyStats()
    if copy_tasks:
        semaphore = asyncio.Semaphore(workers)
        tasks = [
            self._copy_file_async(src, tgt, semaphore, stats, max_size_bytes)
            for src, tgt in copy_tasks
        ]
        await asyncio.gather(*tasks)

    # Enable (symlink) all samples
    enabled = 0
    sample_ids = []
    for sample_id in samples.keys():
        if self._enable_sample(sample_id):
            enabled += 1
            sample_ids.append(sample_id)

    return EnableResult(
        pack_name=pack_name,
        copy_stats=stats.to_dict(),
        samples_enabled=enabled,
        sample_ids=sample_ids,
    )

get_pack_status(pack_name: str) -> PackStatus

Get detailed status of a pack.

Parameters:

Name Type Description Default
pack_name str

Name of pack folder

required

Returns:

Type Description
PackStatus

PackStatus with enabled/cached/remote status for each sample

Raises:

Type Description
PackNotFoundError

If pack doesn't exist in source

Source code in src/audiomancer/library/manager.py
def get_pack_status(self, pack_name: str) -> PackStatus:
    """Get detailed status of a pack.

    Args:
        pack_name: Name of pack folder

    Returns:
        PackStatus with enabled/cached/remote status for each sample

    Raises:
        PackNotFoundError: If pack doesn't exist in source
    """
    self._check_source()

    pack_dir = self.source_dir / pack_name
    if not pack_dir.exists():
        raise PackNotFoundError(pack_name, self.source_dir)

    files = scan_pack_files(self.source_dir, pack_name)
    samples = group_files_into_samples(pack_name, files)
    total_size = sum(f["size"] for f in files)

    cached_ids = self._get_cached_sample_ids()
    enabled_ids = self._get_enabled_sample_ids()

    sample_statuses = {}
    for sample_id in samples.keys():
        if sample_id in enabled_ids:
            sample_statuses[sample_id] = "enabled"
        elif sample_id in cached_ids:
            sample_statuses[sample_id] = "cached"
        else:
            sample_statuses[sample_id] = "remote"

    # Overall status
    if any(s == "enabled" for s in sample_statuses.values()):
        status = "enabled"
    elif any(s == "cached" for s in sample_statuses.values()):
        status = "cached"
    else:
        status = "remote"

    return PackStatus(
        name=pack_name,
        status=status,
        file_count=len(files),
        size_mb=total_size / (1024 * 1024),
        samples=sample_statuses,
    )

get_samples_by_type(instrument_type: str, bpm: Optional[float] = None, is_loop: Optional[bool] = None, limit: int = 10) -> list[str]

Get sample IDs matching criteria (implements SampleLookup protocol).

Parameters:

Name Type Description Default
instrument_type str

Category to match (bd, sn, hh, etc.)

required
bpm Optional[float]

Optional BPM to filter by

None
is_loop Optional[bool]

Optional filter for loops vs one-shots

None
limit int

Maximum results

10

Returns:

Type Description
list[str]

List of sample IDs

Source code in src/audiomancer/library/manager.py
def get_samples_by_type(
    self,
    instrument_type: str,
    bpm: Optional[float] = None,
    is_loop: Optional[bool] = None,
    limit: int = 10,
) -> list[str]:
    """Get sample IDs matching criteria (implements SampleLookup protocol).

    Args:
        instrument_type: Category to match (bd, sn, hh, etc.)
        bpm: Optional BPM to filter by
        is_loop: Optional filter for loops vs one-shots
        limit: Maximum results

    Returns:
        List of sample IDs
    """
    enabled = self.list_enabled_samples()
    matches = []

    # Map common instrument type names to categories
    type_mapping = {
        "kick": ["bd", "bd808", "bd909"],
        "snare": ["sn", "sn808", "sn909"],
        "clap": ["cp"],
        "hihat": ["hh", "ch", "oh"],
        "hat": ["hh", "ch", "oh"],
        "tom": ["tom", "htom", "mtom", "ltom"],
        "perc": ["perc", "conga", "bongo", "shaker", "rim"],
        "bass": ["bass", "sub", "bass808"],
        "synth": ["synth", "lead", "pad", "chord", "stab"],
        "vocal": ["vox", "speech", "chant"],
        "fx": ["fx", "riser", "sweep", "impact"],
    }

    # Get categories to match
    categories = type_mapping.get(instrument_type.lower(), [instrument_type.lower()])

    for sample in enabled:
        # Check category match
        if sample.get("category") not in categories:
            continue

        # Check BPM match (with tolerance)
        sample_bpm = sample.get("bpm")
        if bpm is not None and sample_bpm is not None:
            if abs(sample_bpm - bpm) > 10:
                continue

        # Check loop/one-shot match
        if is_loop is not None and sample.get("is_loop") != is_loop:
            continue

        sample_id = sample.get("id")
        if sample_id:
            matches.append(sample_id)

        if len(matches) >= limit:
            break

    return matches

list_enabled_samples() -> list[SampleInfo]

List all enabled sample IDs with their info.

Returns:

Type Description
list[SampleInfo]

List of SampleInfo for all enabled samples

Source code in src/audiomancer/library/manager.py
def list_enabled_samples(self) -> list[SampleInfo]:
    """List all enabled sample IDs with their info.

    Returns:
        List of SampleInfo for all enabled samples
    """
    enabled_ids = self._get_enabled_sample_ids()
    result = []

    for sample_id in sorted(enabled_ids):
        sample_dir = self.library_dir / sample_id

        # Count audio files
        file_count = 0
        if sample_dir.exists():
            for f in sample_dir.iterdir():
                if f.suffix.lower() in AUDIO_EXTENSIONS:
                    file_count += 1

        # Parse sample ID to extract info
        # Format: {pack}[_lp]_{category}[_{bpm}]
        parts = sample_id.split("_")
        is_loop = "lp" in parts
        bpm = None
        category = "misc"

        # Find BPM (last numeric part)
        for part in reversed(parts):
            if part.isdigit() and 60 <= int(part) <= 200:
                bpm = int(part)
                break

        # Find category (common ones)
        categories = {
            "bd", "sn", "cp", "hh", "ch", "oh", "tom", "perc",
            "bass", "synth", "lead", "pad", "vox", "fx", "loop",
        }
        for part in parts:
            if part in categories:
                category = part
                break

        result.append(
            SampleInfo(
                id=sample_id,
                category=category,
                category_type="unknown",  # Would need source scan to determine
                bpm=bpm,
                is_loop=is_loop,
                file_count=file_count,
                pack_name="",  # Would need source scan to determine
                enabled=True,
            )
        )

    return result

list_packs() -> list[PackInfo]

List all available packs from source.

Returns:

Type Description
list[PackInfo]

List of PackInfo for each pack in source directory

Source code in src/audiomancer/library/manager.py
def list_packs(self) -> list[PackInfo]:
    """List all available packs from source.

    Returns:
        List of PackInfo for each pack in source directory
    """
    self._check_source()

    result = []
    for pack_name in scan_source_packs(self.source_dir):
        if self._should_ignore(pack_name):
            continue

        files = scan_pack_files(self.source_dir, pack_name)
        samples = group_files_into_samples(pack_name, files)
        total_size = sum(f["size"] for f in files)

        result.append(
            PackInfo(
                name=pack_name,
                file_count=len(files),
                size_mb=total_size / (1024 * 1024),
                sample_ids=list(samples.keys()),
            )
        )

    return result

purge_pack(pack_name: str) -> bool

Remove pack from local cache entirely.

Parameters:

Name Type Description Default
pack_name str

Name of pack to purge

required

Returns:

Type Description
bool

True if any files were removed

Source code in src/audiomancer/library/manager.py
def purge_pack(self, pack_name: str) -> bool:
    """Remove pack from local cache entirely.

    Args:
        pack_name: Name of pack to purge

    Returns:
        True if any files were removed
    """
    self._check_source()

    pack_dir = self.source_dir / pack_name
    if not pack_dir.exists():
        raise PackNotFoundError(pack_name, self.source_dir)

    files = scan_pack_files(self.source_dir, pack_name)
    samples = group_files_into_samples(pack_name, files)

    removed_any = False
    for sample_id in samples.keys():
        # Disable first
        self._disable_sample(sample_id)

        # Remove from cache
        cache_dir = self.samples_dir / sample_id
        if cache_dir.exists():
            shutil.rmtree(cache_dir)
            removed_any = True

    return removed_any

search_packs(pattern: str) -> list[PackInfo]

Search packs by name pattern.

Parameters:

Name Type Description Default
pattern str

Regex pattern to match against pack names

required

Returns:

Type Description
list[PackInfo]

List of matching PackInfo

Source code in src/audiomancer/library/manager.py
def search_packs(self, pattern: str) -> list[PackInfo]:
    """Search packs by name pattern.

    Args:
        pattern: Regex pattern to match against pack names

    Returns:
        List of matching PackInfo
    """
    self._check_source()

    regex = re.compile(pattern, re.IGNORECASE)
    packs = scan_source_packs(self.source_dir)
    matches = [p for p in packs if regex.search(p) and not self._should_ignore(p)]

    result = []
    for pack_name in matches:
        files = scan_pack_files(self.source_dir, pack_name)
        samples = group_files_into_samples(pack_name, files)
        total_size = sum(f["size"] for f in files)

        result.append(
            PackInfo(
                name=pack_name,
                file_count=len(files),
                size_mb=total_size / (1024 * 1024),
                sample_ids=list(samples.keys()),
            )
        )

    return result

Scanner

scanner

Sample pack scanning and categorization.

Ported from library_manager.py with improvements for audiomancer integration.

abbreviate_pack_name(pack_name: str) -> str

Generate short abbreviation for pack name.

Parameters:

Name Type Description Default
pack_name str

Original pack folder name

required

Returns:

Type Description
str

Short abbreviation (max 8 chars) for use in sample IDs

Source code in src/audiomancer/library/scanner.py
def abbreviate_pack_name(pack_name: str) -> str:
    """Generate short abbreviation for pack name.

    Args:
        pack_name: Original pack folder name

    Returns:
        Short abbreviation (max 8 chars) for use in sample IDs
    """
    name = pack_name.lower()
    # Remove common suffixes
    name = re.sub(r"\s*\(sample pack\)\s*", "", name)
    name = re.sub(r"\s*sample\s*pack\s*", "", name)
    name = re.sub(r"\s*-\s*bandlab\s*$", "", name)
    name = re.sub(r"\s+\([a-f0-9]+\)\s*$", "", name)
    name = re.sub(r"[_\-]+", " ", name)

    # Apply abbreviations
    for full, abbr in PACK_ABBREVIATIONS.items():
        name = name.replace(full, abbr)

    # Clean up
    name = re.sub(r"[^a-z0-9\s]", "", name)
    name = re.sub(r"\s+", "", name)

    # Truncate if still too long
    if len(name) > 8:
        words = re.findall(r"[a-z]+", pack_name.lower())
        if words:
            name = "".join(w[:2] for w in words[:4])[:8]

    return name or "pack"

detect_bpm(text: str) -> int | None

Detect BPM from text (filename/path).

Returns:

Type Description
int | None

BPM value if detected and in valid range (60-200), None otherwise

Source code in src/audiomancer/library/scanner.py
def detect_bpm(text: str) -> int | None:
    """Detect BPM from text (filename/path).

    Returns:
        BPM value if detected and in valid range (60-200), None otherwise
    """
    text_lower = text.lower()
    for pattern in BPM_PATTERNS:
        match = re.search(pattern, text_lower)
        if match:
            bpm = int(match.group(1))
            if 60 <= bpm <= 200:
                return bpm
    return None

detect_category(text: str) -> tuple[str, str]

Detect sample category from text (filename/path).

Returns:

Type Description
tuple[str, str]

Tuple of (category_id, category_type)

Source code in src/audiomancer/library/scanner.py
def detect_category(text: str) -> tuple[str, str]:
    """Detect sample category from text (filename/path).

    Returns:
        Tuple of (category_id, category_type)
    """
    text_lower = text.lower()
    for pattern, category, cat_type in CATEGORY_PATTERNS:
        if re.search(pattern, text_lower, re.IGNORECASE):
            return category, cat_type
    return "misc", "misc"

detect_is_loop(text: str) -> bool

Detect if sample is a loop vs one-shot.

Returns:

Type Description
bool

True if detected as loop, False otherwise

Source code in src/audiomancer/library/scanner.py
def detect_is_loop(text: str) -> bool:
    """Detect if sample is a loop vs one-shot.

    Returns:
        True if detected as loop, False otherwise
    """
    text_lower = text.lower()
    # One-shot patterns take precedence
    for pattern in ONESHOT_PATTERNS:
        if re.search(pattern, text_lower):
            return False
    # Check for loop patterns
    for pattern in LOOP_PATTERNS:
        if re.search(pattern, text_lower):
            return True
    return False

generate_sample_id(pack_abbr: str, category: str, bpm: int | None, is_loop: bool) -> str

Generate a sample ID from components.

Format: {pack}[lp]{category}[_{bpm}]

Examples:

  • "808dk_bd" (808 Drum Kit, kick)
  • "absttex_lp_hh_125" (Abstract Techno, loop, hi-hat, 125 BPM)
Source code in src/audiomancer/library/scanner.py
def generate_sample_id(
    pack_abbr: str,
    category: str,
    bpm: int | None,
    is_loop: bool,
) -> str:
    """Generate a sample ID from components.

    Format: {pack}[_lp]_{category}[_{bpm}]

    Examples:
        - "808dk_bd" (808 Drum Kit, kick)
        - "absttex_lp_hh_125" (Abstract Techno, loop, hi-hat, 125 BPM)
    """
    parts = [pack_abbr] if pack_abbr else []
    if is_loop:
        parts.append("lp")
    parts.append(category)
    if bpm:
        parts.append(str(bpm))

    base_id = "_".join(parts)
    base_id = re.sub(r"[^a-z0-9_]", "", base_id.lower())
    base_id = re.sub(r"_+", "_", base_id).strip("_")

    return base_id if len(base_id) >= 2 else f"smp_{base_id}"

group_files_into_samples(pack_name: str, files: list[dict[str, Any]]) -> dict[str, SampleInfo]

Group files into sample folders by category.

Parameters:

Name Type Description Default
pack_name str

Pack folder name

required
files list[dict[str, Any]]

List of file info dicts from scan_pack_files

required

Returns:

Type Description
dict[str, SampleInfo]

Dict mapping sample_id to SampleInfo

Source code in src/audiomancer/library/scanner.py
def group_files_into_samples(
    pack_name: str,
    files: list[dict[str, Any]],
) -> dict[str, SampleInfo]:
    """Group files into sample folders by category.

    Args:
        pack_name: Pack folder name
        files: List of file info dicts from scan_pack_files

    Returns:
        Dict mapping sample_id to SampleInfo
    """
    pack_abbr = abbreviate_pack_name(pack_name)
    groups: dict[tuple, list[dict]] = defaultdict(list)

    for f in files:
        key = (f["category"], f["bpm"], f["is_loop"])
        groups[key].append(f)

    result: dict[str, SampleInfo] = {}
    for (category, bpm, is_loop), group_files in groups.items():
        sample_id = generate_sample_id(pack_abbr, category, bpm, is_loop)
        result[sample_id] = SampleInfo(
            id=sample_id,
            category=category,
            category_type=group_files[0]["cat_type"] if group_files else "misc",
            bpm=bpm,
            is_loop=is_loop,
            file_count=len(group_files),
            pack_name=pack_name,
            enabled=False,
        )

    return result

scan_pack_files(source_dir: Path, pack_name: str) -> list[dict[str, Any]]

Scan a pack and categorize all audio files.

Parameters:

Name Type Description Default
source_dir Path

Path to source directory

required
pack_name str

Name of pack folder to scan

required

Returns:

Type Description
list[dict[str, Any]]

List of file info dicts with path, category, bpm, is_loop, size

Source code in src/audiomancer/library/scanner.py
def scan_pack_files(source_dir: Path, pack_name: str) -> list[dict[str, Any]]:
    """Scan a pack and categorize all audio files.

    Args:
        source_dir: Path to source directory
        pack_name: Name of pack folder to scan

    Returns:
        List of file info dicts with path, category, bpm, is_loop, size
    """
    pack_dir = source_dir / pack_name
    files = []

    for root, _, filenames in os.walk(pack_dir):
        for f in filenames:
            if Path(f).suffix.lower() in AUDIO_EXTENSIONS:
                file_path = Path(root) / f
                rel_path = file_path.relative_to(source_dir)
                search_text = str(rel_path)

                category, cat_type = detect_category(search_text)
                bpm = detect_bpm(search_text)
                is_loop = detect_is_loop(search_text)

                try:
                    size = file_path.stat().st_size
                except OSError:
                    size = 0

                files.append(
                    {
                        "path": file_path,
                        "category": category,
                        "cat_type": cat_type,
                        "bpm": bpm,
                        "is_loop": is_loop,
                        "size": size,
                    }
                )

    return files

scan_source_packs(source_dir: Path) -> list[str]

Get list of top-level pack folders from source directory.

Parameters:

Name Type Description Default
source_dir Path

Path to source directory (e.g., Google Drive samples folder)

required

Returns:

Type Description
list[str]

Sorted list of pack folder names

Source code in src/audiomancer/library/scanner.py
def scan_source_packs(source_dir: Path) -> list[str]:
    """Get list of top-level pack folders from source directory.

    Args:
        source_dir: Path to source directory (e.g., Google Drive samples folder)

    Returns:
        Sorted list of pack folder names
    """
    if not source_dir.exists():
        return []

    return sorted(
        d.name
        for d in source_dir.iterdir()
        if d.is_dir() and not d.name.startswith(".")
    )