CAN Service: content-addressable storage with HTTP API, SQLite metadata, file-based blob storage, thumbnail generation, and integrity verification. can-sync v1: P2P sync sidecar using iroh-docs for encrypted peer-to-peer replication with library/filter-based selective sync. Fully builds but being superseded by v2 (simplified full-mirror approach). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
742 lines
22 KiB
Rust
742 lines
22 KiB
Rust
use reqwest::multipart;
|
|
use std::sync::Arc;
|
|
use tempfile::TempDir;
|
|
use tokio::net::TcpListener;
|
|
|
|
// We need to import the binary crate's internals.
|
|
// Since integration tests can't access `mod` items directly, we'll spin up
|
|
// the server using the library-like approach by duplicating setup logic.
|
|
// A cleaner approach is to test through HTTP.
|
|
|
|
/// Helper: spin up a test server and return its base URL + temp dir handle.
|
|
async fn spawn_test_server() -> (String, TempDir) {
|
|
let tmp = TempDir::new().unwrap();
|
|
let storage_root = tmp.path().to_path_buf();
|
|
|
|
// Create config.yaml in tempdir
|
|
let config_content = format!(
|
|
r#"storage_root: "{}"
|
|
admin_token: "test_token"
|
|
enable_thumbnail_cache: true
|
|
rebuild_error_threshold: 50
|
|
verify_interval_hours: 999
|
|
"#,
|
|
storage_root.to_string_lossy().replace('\\', "/")
|
|
);
|
|
|
|
let config_path = tmp.path().join("config.yaml");
|
|
std::fs::write(&config_path, &config_content).unwrap();
|
|
|
|
// Load config
|
|
let config: can_service::config::Config =
|
|
serde_yaml::from_str(&config_content).unwrap();
|
|
config.ensure_dirs().unwrap();
|
|
|
|
// Open DB
|
|
let db = can_service::db::open(&config.db_path()).unwrap();
|
|
let config = Arc::new(config);
|
|
|
|
let state = can_service::AppState {
|
|
config: config.clone(),
|
|
db,
|
|
};
|
|
|
|
// Build router
|
|
let app = axum::Router::new()
|
|
.merge(can_service::routes::router())
|
|
.with_state(state);
|
|
|
|
// Bind to random port
|
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
let addr = listener.local_addr().unwrap();
|
|
let base_url = format!("http://{}", addr);
|
|
|
|
tokio::spawn(async move {
|
|
axum::serve(listener, app).await.unwrap();
|
|
});
|
|
|
|
// Give the server a moment to start
|
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
|
|
|
(base_url, tmp)
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_ingest_and_retrieve_metadata() {
|
|
let (base_url, _tmp) = spawn_test_server().await;
|
|
let client = reqwest::Client::new();
|
|
|
|
// Ingest a file
|
|
let file_part = multipart::Part::bytes(b"hello world".to_vec())
|
|
.file_name("hello.txt")
|
|
.mime_str("text/plain")
|
|
.unwrap();
|
|
|
|
let form = multipart::Form::new()
|
|
.part("file", file_part)
|
|
.text("application", "TestApp")
|
|
.text("user", "jason")
|
|
.text("tags", "greeting,test")
|
|
.text("description", "A test file")
|
|
.text("human_file_name", "hello.txt")
|
|
.text("human_readable_path", "/docs/");
|
|
|
|
let resp = client
|
|
.post(format!("{}/api/v1/can/0/ingest", base_url))
|
|
.multipart(form)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 200);
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(body["status"], "success");
|
|
|
|
let hash = body["data"]["hash"].as_str().unwrap().to_string();
|
|
let timestamp = body["data"]["timestamp"].as_i64().unwrap();
|
|
assert!(!hash.is_empty());
|
|
assert!(timestamp > 0);
|
|
|
|
// Retrieve metadata
|
|
let resp = client
|
|
.get(format!("{}/api/v1/can/0/asset/{}/meta", base_url, hash))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 200);
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(body["status"], "success");
|
|
assert_eq!(body["data"]["hash"], hash);
|
|
assert_eq!(body["data"]["mime_type"], "text/plain");
|
|
assert_eq!(body["data"]["application"], "TestApp");
|
|
assert_eq!(body["data"]["user"], "jason");
|
|
assert_eq!(body["data"]["description"], "A test file");
|
|
assert_eq!(body["data"]["human_filename"], "hello.txt");
|
|
assert_eq!(body["data"]["human_path"], "/docs/");
|
|
|
|
let tags = body["data"]["tags"].as_array().unwrap();
|
|
assert!(tags.contains(&serde_json::json!("greeting")));
|
|
assert!(tags.contains(&serde_json::json!("test")));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_retrieve_physical_asset() {
|
|
let (base_url, _tmp) = spawn_test_server().await;
|
|
let client = reqwest::Client::new();
|
|
|
|
let file_content = b"binary content here";
|
|
let file_part = multipart::Part::bytes(file_content.to_vec())
|
|
.file_name("data.bin")
|
|
.mime_str("application/octet-stream")
|
|
.unwrap();
|
|
|
|
let form = multipart::Form::new().part("file", file_part);
|
|
|
|
let resp = client
|
|
.post(format!("{}/api/v1/can/0/ingest", base_url))
|
|
.multipart(form)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
let hash = body["data"]["hash"].as_str().unwrap();
|
|
|
|
// Download the asset
|
|
let resp = client
|
|
.get(format!("{}/api/v1/can/0/asset/{}", base_url, hash))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 200);
|
|
let downloaded = resp.bytes().await.unwrap();
|
|
assert_eq!(downloaded.as_ref(), file_content);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_patch_metadata() {
|
|
let (base_url, _tmp) = spawn_test_server().await;
|
|
let client = reqwest::Client::new();
|
|
|
|
// Ingest
|
|
let file_part = multipart::Part::bytes(b"patch me".to_vec())
|
|
.file_name("patch.txt")
|
|
.mime_str("text/plain")
|
|
.unwrap();
|
|
|
|
let form = multipart::Form::new()
|
|
.part("file", file_part)
|
|
.text("tags", "original")
|
|
.text("description", "original desc");
|
|
|
|
let resp = client
|
|
.post(format!("{}/api/v1/can/0/ingest", base_url))
|
|
.multipart(form)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
let hash = body["data"]["hash"].as_str().unwrap().to_string();
|
|
|
|
// Patch
|
|
let patch_body = serde_json::json!({
|
|
"tags": ["updated", "new_tag"],
|
|
"description": "updated description"
|
|
});
|
|
|
|
let resp = client
|
|
.patch(format!("{}/api/v1/can/0/asset/{}", base_url, hash))
|
|
.json(&patch_body)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 200);
|
|
|
|
// Verify
|
|
let resp = client
|
|
.get(format!("{}/api/v1/can/0/asset/{}/meta", base_url, hash))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(body["data"]["description"], "updated description");
|
|
let tags = body["data"]["tags"].as_array().unwrap();
|
|
assert!(tags.contains(&serde_json::json!("updated")));
|
|
assert!(tags.contains(&serde_json::json!("new_tag")));
|
|
assert!(!tags.contains(&serde_json::json!("original")));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_list_assets_pagination() {
|
|
let (base_url, _tmp) = spawn_test_server().await;
|
|
let client = reqwest::Client::new();
|
|
|
|
// Ingest 5 files
|
|
for i in 0..5 {
|
|
let content = format!("file content {}", i);
|
|
let file_part = multipart::Part::bytes(content.into_bytes())
|
|
.file_name(format!("file_{}.txt", i))
|
|
.mime_str("text/plain")
|
|
.unwrap();
|
|
|
|
let form = multipart::Form::new()
|
|
.part("file", file_part)
|
|
.text("application", "ListTest");
|
|
|
|
client
|
|
.post(format!("{}/api/v1/can/0/ingest", base_url))
|
|
.multipart(form)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
// Small delay so timestamps differ
|
|
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
|
}
|
|
|
|
// List with limit=2
|
|
let resp = client
|
|
.get(format!(
|
|
"{}/api/v1/can/0/list?limit=2&offset=0",
|
|
base_url
|
|
))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 200);
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
let items = body["data"]["items"].as_array().unwrap();
|
|
assert_eq!(items.len(), 2);
|
|
assert_eq!(body["data"]["pagination"]["total"], 5);
|
|
assert_eq!(body["data"]["pagination"]["limit"], 2);
|
|
|
|
// List with offset=3
|
|
let resp = client
|
|
.get(format!(
|
|
"{}/api/v1/can/0/list?limit=10&offset=3",
|
|
base_url
|
|
))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
let items = body["data"]["items"].as_array().unwrap();
|
|
assert_eq!(items.len(), 2); // 5 total, offset 3 = 2 remaining
|
|
|
|
// List with application filter
|
|
let resp = client
|
|
.get(format!(
|
|
"{}/api/v1/can/0/list?application=ListTest",
|
|
base_url
|
|
))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(body["data"]["pagination"]["total"], 5);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_search_assets() {
|
|
let (base_url, _tmp) = spawn_test_server().await;
|
|
let client = reqwest::Client::new();
|
|
|
|
// Ingest files with different tags and metadata
|
|
let ingest = |name: &str, tags: &str, mime: &str| {
|
|
let base = base_url.clone();
|
|
let client = client.clone();
|
|
let name = name.to_string();
|
|
let tags = tags.to_string();
|
|
let mime = mime.to_string();
|
|
async move {
|
|
let file_part = multipart::Part::bytes(format!("content of {}", name).into_bytes())
|
|
.file_name(name.clone())
|
|
.mime_str(&mime)
|
|
.unwrap();
|
|
|
|
let form = multipart::Form::new()
|
|
.part("file", file_part)
|
|
.text("tags", tags)
|
|
.text("user", "tester");
|
|
|
|
let resp = client
|
|
.post(format!("{}/api/v1/can/0/ingest", base))
|
|
.multipart(form)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
body["data"]["hash"].as_str().unwrap().to_string()
|
|
}
|
|
};
|
|
|
|
let _h1 = ingest("photo.jpg", "nature,landscape", "image/jpeg").await;
|
|
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
|
let _h2 = ingest("doc.pdf", "work,report", "application/pdf").await;
|
|
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
|
let _h3 = ingest("nature.png", "nature,macro", "image/png").await;
|
|
|
|
// Search by tags
|
|
let resp = client
|
|
.get(format!(
|
|
"{}/api/v1/can/0/search?tags=nature",
|
|
base_url
|
|
))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(body["data"]["pagination"]["total"], 2);
|
|
|
|
// Search by mime_type
|
|
let resp = client
|
|
.get(format!(
|
|
"{}/api/v1/can/0/search?mime_type=application/pdf",
|
|
base_url
|
|
))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(body["data"]["pagination"]["total"], 1);
|
|
|
|
// Search by user
|
|
let resp = client
|
|
.get(format!(
|
|
"{}/api/v1/can/0/search?user=tester",
|
|
base_url
|
|
))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(body["data"]["pagination"]["total"], 3);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_asset_not_found() {
|
|
let (base_url, _tmp) = spawn_test_server().await;
|
|
let client = reqwest::Client::new();
|
|
|
|
let resp = client
|
|
.get(format!(
|
|
"{}/api/v1/can/0/asset/nonexistent_hash",
|
|
base_url
|
|
))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 404);
|
|
|
|
let resp = client
|
|
.get(format!(
|
|
"{}/api/v1/can/0/asset/nonexistent_hash/meta",
|
|
base_url
|
|
))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 404);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_thumbnail_fallback_svg() {
|
|
let (base_url, _tmp) = spawn_test_server().await;
|
|
let client = reqwest::Client::new();
|
|
|
|
// Ingest a non-image file
|
|
let file_part = multipart::Part::bytes(b"not an image".to_vec())
|
|
.file_name("doc.txt")
|
|
.mime_str("text/plain")
|
|
.unwrap();
|
|
|
|
let form = multipart::Form::new().part("file", file_part);
|
|
|
|
let resp = client
|
|
.post(format!("{}/api/v1/can/0/ingest", base_url))
|
|
.multipart(form)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
let hash = body["data"]["hash"].as_str().unwrap();
|
|
|
|
// Request thumbnail - should get SVG fallback
|
|
let resp = client
|
|
.get(format!(
|
|
"{}/api/v1/can/0/asset/{}/thumb/128/128",
|
|
base_url, hash
|
|
))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 200);
|
|
let content_type = resp.headers().get("content-type").unwrap().to_str().unwrap();
|
|
assert!(content_type.contains("svg"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_list_order() {
|
|
let (base_url, _tmp) = spawn_test_server().await;
|
|
let client = reqwest::Client::new();
|
|
|
|
// Ingest 3 files with delays
|
|
for i in 0..3 {
|
|
let file_part = multipart::Part::bytes(format!("order test {}", i).into_bytes())
|
|
.file_name(format!("order_{}.txt", i))
|
|
.mime_str("text/plain")
|
|
.unwrap();
|
|
|
|
let form = multipart::Form::new().part("file", file_part);
|
|
|
|
client
|
|
.post(format!("{}/api/v1/can/0/ingest", base_url))
|
|
.multipart(form)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
tokio::time::sleep(std::time::Duration::from_millis(15)).await;
|
|
}
|
|
|
|
// List descending (default)
|
|
let resp = client
|
|
.get(format!("{}/api/v1/can/0/list?order=desc", base_url))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
let items = body["data"]["items"].as_array().unwrap();
|
|
assert_eq!(items.len(), 3);
|
|
let ts0 = items[0]["timestamp"].as_i64().unwrap();
|
|
let ts1 = items[1]["timestamp"].as_i64().unwrap();
|
|
let ts2 = items[2]["timestamp"].as_i64().unwrap();
|
|
assert!(ts0 > ts1);
|
|
assert!(ts1 > ts2);
|
|
|
|
// List ascending
|
|
let resp = client
|
|
.get(format!("{}/api/v1/can/0/list?order=asc", base_url))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
let items = body["data"]["items"].as_array().unwrap();
|
|
let ts0 = items[0]["timestamp"].as_i64().unwrap();
|
|
let ts1 = items[1]["timestamp"].as_i64().unwrap();
|
|
assert!(ts0 < ts1);
|
|
}
|
|
|
|
// ── JSON data ingest tests ──────────────────────────────────────────────
|
|
|
|
#[tokio::test]
|
|
async fn test_data_ingest_minimal() {
|
|
let (base_url, _tmp) = spawn_test_server().await;
|
|
let client = reqwest::Client::new();
|
|
|
|
// Minimal call: just data
|
|
let resp = client
|
|
.post(format!("{}/api/v1/can/0/ingest/data", base_url))
|
|
.json(&serde_json::json!({
|
|
"data": { "key": "value", "count": 42 }
|
|
}))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 200);
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(body["status"], "success");
|
|
|
|
let hash = body["data"]["hash"].as_str().unwrap();
|
|
let filename = body["data"]["filename"].as_str().unwrap();
|
|
assert!(!hash.is_empty());
|
|
assert!(filename.ends_with(".json"));
|
|
|
|
// Retrieve and verify it's stored as pretty JSON
|
|
let resp = client
|
|
.get(format!("{}/api/v1/can/0/asset/{}", base_url, hash))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 200);
|
|
let stored: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(stored["key"], "value");
|
|
assert_eq!(stored["count"], 42);
|
|
|
|
// Verify metadata defaults to application/json
|
|
let resp = client
|
|
.get(format!("{}/api/v1/can/0/asset/{}/meta", base_url, hash))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let meta: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(meta["data"]["mime_type"], "application/json");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_data_ingest_with_all_metadata() {
|
|
let (base_url, _tmp) = spawn_test_server().await;
|
|
let client = reqwest::Client::new();
|
|
|
|
let resp = client
|
|
.post(format!("{}/api/v1/can/0/ingest/data", base_url))
|
|
.json(&serde_json::json!({
|
|
"data": {
|
|
"agent_id": "planner-v2",
|
|
"session": "abc-123",
|
|
"output": ["step1", "step2", "step3"]
|
|
},
|
|
"application": "AgentOrchestrator",
|
|
"user": "agent_planner",
|
|
"tags": "agent,plan,session",
|
|
"description": "Planning agent output for session abc-123",
|
|
"human_file_name": "plan_output.json",
|
|
"human_readable_path": "/agents/planner/"
|
|
}))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 200);
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
let hash = body["data"]["hash"].as_str().unwrap();
|
|
|
|
// Verify all metadata persisted
|
|
let resp = client
|
|
.get(format!("{}/api/v1/can/0/asset/{}/meta", base_url, hash))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let meta: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(meta["data"]["application"], "AgentOrchestrator");
|
|
assert_eq!(meta["data"]["user"], "agent_planner");
|
|
assert_eq!(meta["data"]["description"], "Planning agent output for session abc-123");
|
|
assert_eq!(meta["data"]["human_filename"], "plan_output.json");
|
|
assert_eq!(meta["data"]["human_path"], "/agents/planner/");
|
|
assert_eq!(meta["data"]["mime_type"], "application/json");
|
|
|
|
let tags = meta["data"]["tags"].as_array().unwrap();
|
|
assert!(tags.contains(&serde_json::json!("agent")));
|
|
assert!(tags.contains(&serde_json::json!("plan")));
|
|
assert!(tags.contains(&serde_json::json!("session")));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_data_ingest_various_json_types() {
|
|
let (base_url, _tmp) = spawn_test_server().await;
|
|
let client = reqwest::Client::new();
|
|
|
|
// Store a plain string
|
|
let resp = client
|
|
.post(format!("{}/api/v1/can/0/ingest/data", base_url))
|
|
.json(&serde_json::json!({
|
|
"data": "just a plain string log entry",
|
|
"tags": "log"
|
|
}))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 200);
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
let hash_str = body["data"]["hash"].as_str().unwrap();
|
|
|
|
// Retrieve and verify the string was stored
|
|
let resp = client
|
|
.get(format!("{}/api/v1/can/0/asset/{}", base_url, hash_str))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let stored: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(stored, "just a plain string log entry");
|
|
|
|
// Store an array
|
|
let resp = client
|
|
.post(format!("{}/api/v1/can/0/ingest/data", base_url))
|
|
.json(&serde_json::json!({
|
|
"data": [1, 2, 3, "four", null, true]
|
|
}))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 200);
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
let hash_arr = body["data"]["hash"].as_str().unwrap();
|
|
|
|
let resp = client
|
|
.get(format!("{}/api/v1/can/0/asset/{}", base_url, hash_arr))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let stored: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(stored, serde_json::json!([1, 2, 3, "four", null, true]));
|
|
|
|
// Store a number
|
|
let resp = client
|
|
.post(format!("{}/api/v1/can/0/ingest/data", base_url))
|
|
.json(&serde_json::json!({
|
|
"data": 99.5
|
|
}))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 200);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_data_ingest_shows_up_in_list_and_search() {
|
|
let (base_url, _tmp) = spawn_test_server().await;
|
|
let client = reqwest::Client::new();
|
|
|
|
// Ingest via JSON data endpoint
|
|
client
|
|
.post(format!("{}/api/v1/can/0/ingest/data", base_url))
|
|
.json(&serde_json::json!({
|
|
"data": { "sensor": "temperature", "value": 22.5 },
|
|
"application": "IoTAgent",
|
|
"tags": "sensor,temperature"
|
|
}))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
// Also ingest via multipart
|
|
let file_part = multipart::Part::bytes(b"binary sensor log".to_vec())
|
|
.file_name("sensor.bin")
|
|
.mime_str("application/octet-stream")
|
|
.unwrap();
|
|
let form = multipart::Form::new()
|
|
.part("file", file_part)
|
|
.text("application", "IoTAgent")
|
|
.text("tags", "sensor,binary");
|
|
client
|
|
.post(format!("{}/api/v1/can/0/ingest", base_url))
|
|
.multipart(form)
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
|
|
|
// Both should show up in list
|
|
let resp = client
|
|
.get(format!("{}/api/v1/can/0/list?application=IoTAgent", base_url))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(body["data"]["pagination"]["total"], 2);
|
|
|
|
// Search by tag should find the JSON one
|
|
let resp = client
|
|
.get(format!("{}/api/v1/can/0/search?tags=temperature", base_url))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(body["data"]["pagination"]["total"], 1);
|
|
assert_eq!(body["data"]["items"][0]["mime_type"], "application/json");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_data_ingest_custom_mime_type() {
|
|
let (base_url, _tmp) = spawn_test_server().await;
|
|
let client = reqwest::Client::new();
|
|
|
|
// Agent stores data but overrides mime_type to text/plain
|
|
let resp = client
|
|
.post(format!("{}/api/v1/can/0/ingest/data", base_url))
|
|
.json(&serde_json::json!({
|
|
"data": "This is a plain text log line from the agent",
|
|
"mime_type": "text/plain",
|
|
"human_file_name": "agent.log"
|
|
}))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(resp.status(), 200);
|
|
let body: serde_json::Value = resp.json().await.unwrap();
|
|
let filename = body["data"]["filename"].as_str().unwrap();
|
|
assert!(filename.ends_with(".txt"), "Expected .txt extension, got {}", filename);
|
|
|
|
let hash = body["data"]["hash"].as_str().unwrap();
|
|
let resp = client
|
|
.get(format!("{}/api/v1/can/0/asset/{}/meta", base_url, hash))
|
|
.send()
|
|
.await
|
|
.unwrap();
|
|
|
|
let meta: serde_json::Value = resp.json().await.unwrap();
|
|
assert_eq!(meta["data"]["mime_type"], "text/plain");
|
|
assert_eq!(meta["data"]["human_filename"], "agent.log");
|
|
}
|