📦 データSiloDetection
建設業界の組織内でバラバラに管理されているデータ(
📺 まず動画で見る(YouTube)
▶ 【Claude Code完全入門】誰でも使える/Skills活用法/経営者こそ使うべき ↗
※ jpskill.com 編集部が参考用に選んだ動画です。動画の内容と Skill の挙動は厳密には一致しないことがあります。
📜 元の英語説明(参考)
Detect and map data silos in construction organizations. Identify disconnected data sources and integration opportunities
🇯🇵 日本人クリエイター向け解説
建設業界の組織内でバラバラに管理されているデータ(
※ jpskill.com 編集部が日本のビジネス現場向けに補足した解説です。Skill本体の挙動とは独立した参考情報です。
下記のコマンドをコピーしてターミナル(Mac/Linux)または PowerShell(Windows)に貼り付けてください。 ダウンロード → 解凍 → 配置まで全自動。
mkdir -p ~/.claude/skills && cd ~/.claude/skills && curl -L -o data-silo-detection.zip https://jpskill.com/download/4705.zip && unzip -o data-silo-detection.zip && rm data-silo-detection.zip
$d = "$env:USERPROFILE\.claude\skills"; ni -Force -ItemType Directory $d | Out-Null; iwr https://jpskill.com/download/4705.zip -OutFile "$d\data-silo-detection.zip"; Expand-Archive "$d\data-silo-detection.zip" -DestinationPath $d -Force; ri "$d\data-silo-detection.zip"
完了後、Claude Code を再起動 → 普通に「動画プロンプト作って」のように話しかけるだけで自動発動します。
💾 手動でダウンロードしたい(コマンドが難しい人向け)
- 1. 下の青いボタンを押して
data-silo-detection.zipをダウンロード - 2. ZIPファイルをダブルクリックで解凍 →
data-silo-detectionフォルダができる - 3. そのフォルダを
C:\Users\あなたの名前\.claude\skills\(Win)または~/.claude/skills/(Mac)へ移動 - 4. Claude Code を再起動
⚠️ ダウンロード・利用は自己責任でお願いします。当サイトは内容・動作・安全性について責任を負いません。
🎯 このSkillでできること
下記の説明文を読むと、このSkillがあなたに何をしてくれるかが分かります。Claudeにこの分野の依頼をすると、自動で発動します。
📦 インストール方法 (3ステップ)
- 1. 上の「ダウンロード」ボタンを押して .skill ファイルを取得
- 2. ファイル名の拡張子を .skill から .zip に変えて展開(macは自動展開可)
- 3. 展開してできたフォルダを、ホームフォルダの
.claude/skills/に置く- · macOS / Linux:
~/.claude/skills/ - · Windows:
%USERPROFILE%\.claude\skills\
- · macOS / Linux:
Claude Code を再起動すれば完了。「このSkillを使って…」と話しかけなくても、関連する依頼で自動的に呼び出されます。
詳しい使い方ガイドを見る →- 最終更新
- 2026-05-17
- 取得日時
- 2026-05-17
- 同梱ファイル
- 1
💬 こう話しかけるだけ — サンプルプロンプト
- › Data Silo Detection の使い方を教えて
- › Data Silo Detection で何ができるか具体例で見せて
- › Data Silo Detection を初めて使う人向けにステップを案内して
これをClaude Code に貼るだけで、このSkillが自動発動します。
📖 Claude が読む原文 SKILL.md(中身を展開)
この本文は AI(Claude)が読むための原文(英語または中国語)です。日本語訳は順次追加中。
Data Silo Detection
Overview
Based on DDC methodology (Chapter 1.2), this skill detects and maps data silos in construction organizations, identifying disconnected data sources, duplicate data, and integration opportunities.
Book Reference: "Технологии и системы управления в современном строительстве" / "Technologies and Management Systems in Modern Construction"
Quick Start
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Optional, Set, Tuple
from datetime import datetime
import json
from collections import defaultdict
class DataDomain(Enum):
"""Construction data domains"""
DESIGN = "design"
COST = "cost"
SCHEDULE = "schedule"
QUALITY = "quality"
SAFETY = "safety"
PROCUREMENT = "procurement"
SITE = "site"
DOCUMENT = "document"
FINANCIAL = "financial"
HR = "hr"
class SiloSeverity(Enum):
"""Severity level of data silo"""
CRITICAL = "critical" # Major business impact
HIGH = "high" # Significant inefficiency
MEDIUM = "medium" # Noticeable issues
LOW = "low" # Minor inconvenience
class DataSourceType(Enum):
"""Types of data sources"""
DATABASE = "database"
SPREADSHEET = "spreadsheet"
FILE_SHARE = "file_share"
CLOUD_APP = "cloud_app"
DESKTOP_APP = "desktop_app"
PAPER = "paper"
EMAIL = "email"
PERSONAL = "personal"
@dataclass
class DataSource:
"""Represents a data source in the organization"""
id: str
name: str
type: DataSourceType
domain: DataDomain
owner: str
department: str
users: List[str]
data_entities: List[str]
connections: List[str] = field(default_factory=list)
update_frequency: str = "unknown"
access_level: str = "department" # personal, department, organization
has_api: bool = False
last_modified: Optional[datetime] = None
@dataclass
class DataSilo:
"""Detected data silo"""
id: str
sources: List[DataSource]
domain: DataDomain
severity: SiloSeverity
issue_type: str
description: str
impact: str
affected_users: int
affected_processes: List[str]
recommendations: List[str]
estimated_cost: Optional[float] = None
@dataclass
class DuplicateData:
"""Detected duplicate data across sources"""
entity_name: str
sources: List[str]
discrepancy_rate: float # 0-1
master_source: Optional[str] = None
issues: List[str] = field(default_factory=list)
@dataclass
class SiloAnalysis:
"""Complete silo analysis results"""
organization: str
analysis_date: datetime
total_sources: int
silos_detected: List[DataSilo]
duplicates: List[DuplicateData]
connectivity_score: float
data_flow_gaps: List[Dict]
priority_actions: List[str]
integration_roadmap: Dict
class DataSiloDetector:
"""
Detect and analyze data silos in construction organizations.
Based on DDC methodology Chapter 1.2.
"""
def __init__(self):
self.domain_relationships = self._define_domain_relationships()
self.critical_entities = self._define_critical_entities()
def _define_domain_relationships(self) -> Dict[DataDomain, List[DataDomain]]:
"""Define expected relationships between domains"""
return {
DataDomain.DESIGN: [
DataDomain.COST, DataDomain.SCHEDULE,
DataDomain.PROCUREMENT, DataDomain.QUALITY
],
DataDomain.COST: [
DataDomain.DESIGN, DataDomain.SCHEDULE,
DataDomain.FINANCIAL, DataDomain.PROCUREMENT
],
DataDomain.SCHEDULE: [
DataDomain.DESIGN, DataDomain.COST,
DataDomain.SITE, DataDomain.HR
],
DataDomain.PROCUREMENT: [
DataDomain.COST, DataDomain.DESIGN,
DataDomain.SITE, DataDomain.FINANCIAL
],
DataDomain.SITE: [
DataDomain.SCHEDULE, DataDomain.SAFETY,
DataDomain.QUALITY, DataDomain.HR
],
DataDomain.QUALITY: [
DataDomain.DESIGN, DataDomain.SITE,
DataDomain.DOCUMENT
],
DataDomain.SAFETY: [
DataDomain.SITE, DataDomain.HR,
DataDomain.DOCUMENT
],
DataDomain.FINANCIAL: [
DataDomain.COST, DataDomain.PROCUREMENT,
DataDomain.HR
]
}
def _define_critical_entities(self) -> Dict[str, List[DataDomain]]:
"""Define entities that should be shared across domains"""
return {
"project": [DataDomain.DESIGN, DataDomain.COST, DataDomain.SCHEDULE],
"budget": [DataDomain.COST, DataDomain.FINANCIAL, DataDomain.PROCUREMENT],
"schedule": [DataDomain.SCHEDULE, DataDomain.SITE, DataDomain.PROCUREMENT],
"material": [DataDomain.DESIGN, DataDomain.COST, DataDomain.PROCUREMENT],
"labor": [DataDomain.HR, DataDomain.COST, DataDomain.SCHEDULE],
"subcontractor": [DataDomain.PROCUREMENT, DataDomain.COST, DataDomain.SCHEDULE],
"rfi": [DataDomain.DESIGN, DataDomain.DOCUMENT, DataDomain.SITE],
"change_order": [DataDomain.COST, DataDomain.DESIGN, DataDomain.SCHEDULE]
}
def detect_silos(
self,
organization: str,
data_sources: List[DataSource],
process_flows: Optional[List[Dict]] = None
) -> SiloAnalysis:
"""
Detect data silos in the organization.
Args:
organization: Organization name
data_sources: List of data sources to analyze
process_flows: Optional business process flows
Returns:
Complete silo analysis
"""
# Build connectivity graph
connectivity = self._build_connectivity_graph(data_sources)
# Detect isolated sources
isolated_silos = self._detect_isolated_sources(
data_sources, connectivity
)
# Detect domain silos
domain_silos = self._detect_domain_silos(data_sources)
# Detect duplicate data
duplicates = self._detect_duplicates(data_sources)
# Detect data flow gaps
flow_gaps = self._detect_flow_gaps(
data_sources, process_flows
)
# Calculate connectivity score
connectivity_score = self._calculate_connectivity_score(
data_sources, connectivity
)
# Combine all silos
all_silos = isolated_silos + domain_silos
# Prioritize silos
prioritized_silos = self._prioritize_silos(all_silos)
# Generate priority actions
priority_actions = self._generate_priority_actions(
prioritized_silos, duplicates
)
# Create integration roadmap
roadmap = self._create_integration_roadmap(
prioritized_silos, flow_gaps
)
return SiloAnalysis(
organization=organization,
analysis_date=datetime.now(),
total_sources=len(data_sources),
silos_detected=prioritized_silos,
duplicates=duplicates,
connectivity_score=connectivity_score,
data_flow_gaps=flow_gaps,
priority_actions=priority_actions,
integration_roadmap=roadmap
)
def _build_connectivity_graph(
self,
sources: List[DataSource]
) -> Dict[str, Set[str]]:
"""Build graph of source connections"""
graph = defaultdict(set)
for source in sources:
for connection in source.connections:
graph[source.id].add(connection)
graph[connection].add(source.id)
return graph
def _detect_isolated_sources(
self,
sources: List[DataSource],
connectivity: Dict[str, Set[str]]
) -> List[DataSilo]:
"""Detect sources with no connections"""
silos = []
for source in sources:
connections = len(connectivity.get(source.id, set()))
if connections == 0:
severity = SiloSeverity.CRITICAL if source.domain in [
DataDomain.COST, DataDomain.SCHEDULE
] else SiloSeverity.HIGH
silos.append(DataSilo(
id=f"isolated_{source.id}",
sources=[source],
domain=source.domain,
severity=severity,
issue_type="isolated_source",
description=f"{source.name} has no connections to other systems",
impact="Data must be manually transferred, risking errors and delays",
affected_users=len(source.users),
affected_processes=self._get_affected_processes(source.domain),
recommendations=[
f"Connect {source.name} via API or ETL to related systems",
"Establish data synchronization schedule",
"Define master data source for shared entities"
]
))
elif connections == 1 and source.access_level == "personal":
silos.append(DataSilo(
id=f"personal_{source.id}",
sources=[source],
domain=source.domain,
severity=SiloSeverity.MEDIUM,
issue_type="personal_silo",
description=f"{source.name} is a personal data store with limited access",
impact="Data not accessible to team, knowledge loss risk",
affected_users=1,
affected_processes=self._get_affected_processes(source.domain),
recommendations=[
"Move data to shared organizational repository",
"Implement access controls instead of isolation",
"Document data structure and usage"
]
))
return silos
def _detect_domain_silos(
self,
sources: List[DataSource]
) -> List[DataSilo]:
"""Detect silos between domains that should be connected"""
silos = []
# Group sources by domain
domain_sources = defaultdict(list)
for source in sources:
domain_sources[source.domain].append(source)
# Check for missing domain connections
for domain, related_domains in self.domain_relationships.items():
domain_srcs = domain_sources.get(domain, [])
for related in related_domains:
related_srcs = domain_sources.get(related, [])
if domain_srcs and related_srcs:
# Check if any connections exist between domains
has_connection = False
for src in domain_srcs:
for rel_src in related_srcs:
if rel_src.id in src.connections:
has_connection = True
break
if not has_connection:
silos.append(DataSilo(
id=f"domain_gap_{domain.value}_{related.value}",
sources=domain_srcs + related_srcs,
domain=domain,
severity=SiloSeverity.HIGH,
issue_type="domain_disconnect",
description=f"No data flow between {domain.value} and {related.value}",
impact="Related information not synchronized, decision delays",
affected_users=sum(len(s.users) for s in domain_srcs + related_srcs),
affected_processes=self._get_affected_processes(domain) +
self._get_affected_processes(related),
recommendations=[
f"Establish integration between {domain.value} and {related.value} systems",
"Define shared data entities and master sources",
"Implement automated data synchronization"
]
))
return silos
def _detect_duplicates(
self,
sources: List[DataSource]
) -> List[DuplicateData]:
"""Detect duplicate data across sources"""
duplicates = []
# Map entities to sources
entity_sources = defaultdict(list)
for source in sources:
for entity in source.data_entities:
entity_sources[entity].append(source.id)
# Find duplicates
for entity, source_ids in entity_sources.items():
if len(source_ids) > 1:
# Check if it's a critical entity
is_critical = entity.lower() in self.critical_entities
duplicate = DuplicateData(
entity_name=entity,
sources=source_ids,
discrepancy_rate=0.0, # Would need actual data to calculate
issues=[]
)
if is_critical and len(source_ids) > 2:
duplicate.issues.append(
"Critical entity duplicated in multiple systems"
)
if not any(s for s in sources if s.id in source_ids and "master" in s.name.lower()):
duplicate.issues.append("No clear master source defined")
duplicates.append(duplicate)
return duplicates
def _detect_flow_gaps(
self,
sources: List[DataSource],
process_flows: Optional[List[Dict]]
) -> List[Dict]:
"""Detect gaps in expected data flows"""
gaps = []
# Check critical entity coverage
for entity, required_domains in self.critical_entities.items():
entity_domains = set()
for source in sources:
if entity in [e.lower() for e in source.data_entities]:
entity_domains.add(source.domain)
missing = set(required_domains) - entity_domains
if missing:
gaps.append({
"entity": entity,
"missing_domains": [d.value for d in missing],
"impact": f"{entity} data not available in {len(missing)} domains"
})
return gaps
def _calculate_connectivity_score(
self,
sources: List[DataSource],
connectivity: Dict[str, Set[str]]
) -> float:
"""Calculate overall connectivity score"""
if not sources:
return 0.0
# Calculate average connections per source
total_connections = sum(len(conns) for conns in connectivity.values())
avg_connections = total_connections / len(sources)
# Ideal connections per source
ideal_connections = 3
# Score based on average connections
connection_score = min(1.0, avg_connections / ideal_connections)
# Penalize for isolated sources
isolated = sum(1 for s in sources if s.id not in connectivity or not connectivity[s.id])
isolation_penalty = isolated / len(sources)
# API availability bonus
api_count = sum(1 for s in sources if s.has_api)
api_bonus = (api_count / len(sources)) * 0.2
return max(0, min(1.0, connection_score - isolation_penalty + api_bonus))
def _get_affected_processes(self, domain: DataDomain) -> List[str]:
"""Get business processes affected by domain"""
process_map = {
DataDomain.DESIGN: ["Design Review", "RFI Processing", "Drawing Distribution"],
DataDomain.COST: ["Budgeting", "Cost Tracking", "Invoice Processing"],
DataDomain.SCHEDULE: ["Planning", "Progress Tracking", "Resource Allocation"],
DataDomain.PROCUREMENT: ["Vendor Selection", "Purchase Orders", "Material Tracking"],
DataDomain.SITE: ["Daily Reports", "Progress Photos", "Issue Management"],
DataDomain.QUALITY: ["Inspections", "Defect Tracking", "Compliance"],
DataDomain.SAFETY: ["Incident Reporting", "Safety Inspections", "Training"],
DataDomain.FINANCIAL: ["Billing", "Payments", "Financial Reporting"],
DataDomain.HR: ["Timekeeping", "Resource Management", "Certifications"]
}
return process_map.get(domain, [])
def _prioritize_silos(
self,
silos: List[DataSilo]
) -> List[DataSilo]:
"""Prioritize silos by severity and impact"""
severity_order = {
SiloSeverity.CRITICAL: 0,
SiloSeverity.HIGH: 1,
SiloSeverity.MEDIUM: 2,
SiloSeverity.LOW: 3
}
return sorted(
silos,
key=lambda s: (severity_order[s.severity], -s.affected_users)
)
def _generate_priority_actions(
self,
silos: List[DataSilo],
duplicates: List[DuplicateData]
) -> List[str]:
"""Generate prioritized action items"""
actions = []
# Critical silos first
critical_silos = [s for s in silos if s.severity == SiloSeverity.CRITICAL]
for silo in critical_silos[:3]:
actions.append(f"URGENT: {silo.recommendations[0]}")
# Duplicate data issues
critical_dups = [d for d in duplicates if d.issues]
for dup in critical_dups[:2]:
actions.append(
f"Define master source for '{dup.entity_name}' "
f"(currently in {len(dup.sources)} sources)"
)
# High priority silos
high_silos = [s for s in silos if s.severity == SiloSeverity.HIGH]
for silo in high_silos[:3]:
if silo.recommendations:
actions.append(silo.recommendations[0])
return actions[:10]
def _create_integration_roadmap(
self,
silos: List[DataSilo],
gaps: List[Dict]
) -> Dict:
"""Create phased integration roadmap"""
roadmap = {
"Phase 1 - Quick Wins (0-3 months)": [],
"Phase 2 - Core Integration (3-6 months)": [],
"Phase 3 - Advanced Integration (6-12 months)": [],
"Phase 4 - Optimization (12+ months)": []
}
# Phase 1: Address personal silos and easy integrations
for silo in silos:
if silo.issue_type == "personal_silo":
roadmap["Phase 1 - Quick Wins (0-3 months)"].append(
f"Migrate {silo.sources[0].name} to shared repository"
)
# Phase 2: Core domain integrations
domain_gaps = [s for s in silos if s.issue_type == "domain_disconnect"]
for silo in domain_gaps[:3]:
roadmap["Phase 2 - Core Integration (3-6 months)"].append(
silo.recommendations[0] if silo.recommendations else silo.description
)
# Phase 3: Critical entity master data
roadmap["Phase 3 - Advanced Integration (6-12 months)"].extend([
"Implement master data management for shared entities",
"Deploy integration middleware/ESB",
"Establish data governance policies"
])
# Phase 4: Optimization
roadmap["Phase 4 - Optimization (12+ months)"].extend([
"Implement real-time data synchronization",
"Deploy integration monitoring and alerting",
"Continuous improvement based on metrics"
])
return roadmap
def generate_report(self, analysis: SiloAnalysis) -> str:
"""Generate silo analysis report"""
report = f"""
# Data Silo Analysis Report
## {analysis.organization}
**Analysis Date:** {analysis.analysis_date.strftime('%Y-%m-%d')}
**Data Sources Analyzed:** {analysis.total_sources}
**Connectivity Score:** {analysis.connectivity_score:.0%}
## Executive Summary
Detected **{len(analysis.silos_detected)}** data silos and **{len(analysis.duplicates)}** duplicate data issues.
### Silos by Severity
"""
severity_counts = defaultdict(int)
for silo in analysis.silos_detected:
severity_counts[silo.severity.value] += 1
for severity in ["critical", "high", "medium", "low"]:
count = severity_counts.get(severity, 0)
if count > 0:
report += f"- **{severity.title()}**: {count}\n"
report += "\n## Priority Actions\n\n"
for i, action in enumerate(analysis.priority_actions, 1):
report += f"{i}. {action}\n"
report += "\n## Detected Silos\n\n"
for silo in analysis.silos_detected[:5]:
report += f"""
### {silo.id}
- **Type:** {silo.issue_type}
- **Severity:** {silo.severity.value}
- **Impact:** {silo.impact}
- **Affected Users:** {silo.affected_users}
"""
report += "\n## Integration Roadmap\n"
for phase, items in analysis.integration_roadmap.items():
report += f"\n### {phase}\n"
for item in items:
report += f"- {item}\n"
return report
Common Use Cases
Detect Data Silos
detector = DataSiloDetector()
# Define data sources
sources = [
DataSource(
id="revit",
name="Revit Models",
type=DataSourceType.DESKTOP_APP,
domain=DataDomain.DESIGN,
owner="Design Team",
department="Engineering",
users=["architect1", "engineer1", "engineer2"],
data_entities=["building_model", "drawings", "schedules"],
connections=["navisworks"],
has_api=True
),
DataSource(
id="excel_estimates",
name="Excel Cost Estimates",
type=DataSourceType.SPREADSHEET,
domain=DataDomain.COST,
owner="Estimator",
department="Pre-construction",
users=["estimator1"],
data_entities=["costs", "quantities", "labor_rates"],
connections=[], # No connections - silo!
access_level="personal"
),
DataSource(
id="procore",
name="Procore",
type=DataSourceType.CLOUD_APP,
domain=DataDomain.SITE,
owner="Project Manager",
department="Operations",
users=["pm1", "pm2", "super1"],
data_entities=["daily_reports", "photos", "punch_list"],
connections=["primavera"],
has_api=True
)
]
analysis = detector.detect_silos(
organization="ABC Construction",
data_sources=sources
)
print(f"Silos detected: {len(analysis.silos_detected)}")
print(f"Connectivity score: {analysis.connectivity_score:.0%}")
Generate Silo Report
report = detector.generate_report(analysis)
print(report)
# Save to file
with open("silo_report.md", "w") as f:
f.write(report)
View Priority Actions
print("Priority Actions:")
for i, action in enumerate(analysis.priority_actions, 1):
print(f"{i}. {action}")
print("\nIntegration Roadmap:")
for phase, items in analysis.integration_roadmap.items():
print(f"\n{phase}:")
for item in items:
print(f" - {item}")
Quick Reference
| Component | Purpose |
|---|---|
DataSiloDetector |
Main detection engine |
DataSource |
Data source definition |
DataSilo |
Detected silo with details |
DuplicateData |
Duplicate data detection |
SiloAnalysis |
Complete analysis results |
SiloSeverity |
Severity classification |
Resources
- Book: "Data-Driven Construction" by Artem Boiko, Chapter 1.2
- Website: https://datadrivenconstruction.io
Next Steps
- Use erp-integration-analysis for system integration
- Use data-evolution-analysis for maturity assessment
- Use etl-pipeline to connect silos