8
n8n 中文网amn8n.com

多 CRM 数据同步至 Google Sheets 并附带 AI 去重

高级

这是一个CRM, AI Summarization领域的自动化工作流,包含 18 个节点。主要使用 Set, Code, Merge, Webhook, ErrorTrigger 等节点。 将 HubSpot、Pipedrive 和 Salesforce 数据同步至 Google Sheets,使用 OpenAI 进行去重

前置要求
  • HTTP Webhook 端点(n8n 会自动生成)
  • Google Sheets API 凭证
  • OpenAI API Key
工作流预览
可视化展示节点连接关系,支持缩放和平移
导出工作流
复制以下 JSON 配置到 n8n 导入,即可使用此工作流
{
  "id": "Si4R0nr5H78eQWI8",
  "meta": {
    "instanceId": "f31c8cf0f30c343fde4e229f596b53099ee0496367bfd39c53458e5afe95e91e",
    "templateCredsSetupCompleted": true
  },
  "name": "多 CRM 数据同步至 Google Sheets 并附带 AI 去重",
  "tags": [],
  "nodes": [
    {
      "id": "9ebee5e9-64a0-44d3-b2f7-e80a4288bede",
      "name": "每日同步计划",
      "type": "n8n-nodes-base.scheduleTrigger",
      "position": [
        -1088,
        -160
      ],
      "parameters": {
        "rule": {
          "interval": [
            {
              "field": "cronExpression",
              "expression": "0 2 * * *"
            }
          ]
        }
      },
      "typeVersion": 1.2
    },
    {
      "id": "120c27c5-c98e-4cc7-ac5d-e816d6b8b7a9",
      "name": "手动同步 Webhook",
      "type": "n8n-nodes-base.webhook",
      "position": [
        -1088,
        -16
      ],
      "webhookId": "crm-sync-manual",
      "parameters": {
        "path": "crm-sync-manual",
        "options": {},
        "httpMethod": "POST",
        "responseMode": "lastNode"
      },
      "typeVersion": 2
    },
    {
      "id": "fbf40d96-d15a-4de2-b14c-10dc01758ef8",
      "name": "触发器路由器",
      "type": "n8n-nodes-base.merge",
      "position": [
        -880,
        -96
      ],
      "parameters": {
        "mode": "chooseBranch",
        "output": "input1"
      },
      "typeVersion": 3.2
    },
    {
      "id": "7b87c097-0337-4dc5-b33c-35b0ceebc906",
      "name": "配置中心",
      "type": "n8n-nodes-base.set",
      "position": [
        -688,
        -96
      ],
      "parameters": {
        "options": {},
        "assignments": {
          "assignments": [
            {
              "id": "89af6d5e-7c8b-4f3e-a9d2-1b5e4c8f9a7d",
              "name": "runId",
              "type": "string",
              "value": "={{ $now.toFormat('yyyy-MM-dd-HHmmss') }}"
            },
            {
              "id": "a7c5e3b2-9d4f-4e8a-b6c1-8f3e5d7a9b2c",
              "name": "stagingSheetId",
              "type": "string",
              "value": "YOUR_STAGING_SHEET_ID"
            },
            {
              "id": "c3e7f9a5-2b1d-4c8e-a7f6-9e5d3b1c7a8f",
              "name": "masterSheetId",
              "type": "string",
              "value": "YOUR_MASTER_SHEET_ID"
            },
            {
              "id": "d9b3c7e2-8f5a-4e1d-9c7b-3a5e8d2f9c1b",
              "name": "hubspotApiKey",
              "type": "string",
              "value": "YOUR_HUBSPOT_API_KEY"
            },
            {
              "id": "e5f8a3c7-1b9d-4c2e-8a7f-5b3e9d1c8f7a",
              "name": "pipedriveApiKey",
              "type": "string",
              "value": "YOUR_PIPEDRIVE_API_KEY"
            },
            {
              "id": "f2c8e7a9-5d3b-4e1c-9a8f-7b5e3d2c9f1a",
              "name": "salesforceInstance",
              "type": "string",
              "value": "YOUR_SALESFORCE_INSTANCE"
            },
            {
              "id": "a8e5f3c7-9b1d-4e2a-8c7f-5a3e9b1d7c8f",
              "name": "salesforceAccessToken",
              "type": "string",
              "value": "YOUR_SALESFORCE_ACCESS_TOKEN"
            },
            {
              "id": "b7d9c3e5-8f2a-4c1e-9b7a-3e5f8d2a9c1b",
              "name": "batchSize",
              "type": "number",
              "value": 100
            },
            {
              "id": "c5e8f7a3-1b9d-4c2e-8a7f-9b3e5d1c8f7a",
              "name": "qualityThreshold",
              "type": "number",
              "value": 0.7
            },
            {
              "id": "d7b9e5c3-8f2a-4c1e-9a7f-2b3e5d1c8f7a",
              "name": "deduplicationKeys",
              "type": "json",
              "value": "[\"email\", \"companyName\", \"phone\"]"
            },
            {
              "id": "e9c7b5a3-1f8d-4c2e-8a7f-5b3e9d1c8f7a",
              "name": "requiredFields",
              "type": "json",
              "value": "[\"email\", \"firstName\", \"lastName\", \"companyName\"]"
            },
            {
              "id": "f5a8c7e3-9b1d-4e2a-8c7f-3b5e9d1c8f7a",
              "name": "slackWebhookUrl",
              "type": "string",
              "value": "YOUR_SLACK_WEBHOOK_URL"
            },
            {
              "id": "a3b7c9e5-8f2a-4c1e-9a7f-1b5e3d2c9f1a",
              "name": "mcpServerEndpoint",
              "type": "string",
              "value": "http://localhost:8000"
            }
          ]
        }
      },
      "typeVersion": 3.4
    },
    {
      "id": "edf6a60a-0853-4406-a261-0d2af0e2bd65",
      "name": "并行 CRM 数据获取器",
      "type": "n8n-nodes-base.code",
      "position": [
        -480,
        -96
      ],
      "parameters": {
        "jsCode": "// Combine all results\nconst allData = [...hubspotData, ...pipedriveData, ...salesforceData];\n\n// Export data for pandas processing\nconst csvData = allData.map(record => ({\n  source: record.source,\n  recordId: record.recordId,\n  firstName: record.firstName,\n  lastName: record.lastName,\n  email: record.email,\n  companyName: record.companyName,\n  phone: record.phone,\n  stage: record.stage,\n  lastModified: record.lastModified\n}));\n\n// Create CSV content manually (Papa Parse not needed)\nconst csvHeaders = ['source','recordId','firstName','lastName','email','companyName','phone','stage','lastModified'];\nconst csvRows = csvData.map(record => \n  csvHeaders.map(header => `\"${record[header] || ''}\"`).join(',')\n);\nconst csvContent = [csvHeaders.join(','), ...csvRows].join('\\n');\n\n// Add metadata and return\nconst output = {\n  runId: config.runId,\n  timestamp: new Date().toISOString(),\n  totalRecords: allData.length,\n  breakdown: {\n    hubspot: hubspotData.length,\n    pipedrive: pipedriveData.length,\n    salesforce: salesforceData.length\n  },\n  errors: errors,\n  data: allData,\n  csvData: csvContent,\n  csvFilePath: `/tmp/crm_data_${config.runId}.csv`\n};\n\nreturn output;"
      },
      "typeVersion": 2
    },
    {
      "id": "c8b8fd39-59f4-4a98-9a96-310887f8c46d",
      "name": "主数据库写入器",
      "type": "n8n-nodes-base.googleSheets",
      "position": [
        144,
        -320
      ],
      "parameters": {
        "columns": {
          "value": {},
          "schema": [
            {
              "id": "email",
              "type": "string",
              "display": true,
              "required": false,
              "displayName": "email",
              "defaultMatch": true,
              "canBeUsedToMatch": true
            }
          ],
          "mappingMode": "autoMapInputData",
          "matchingColumns": [
            "email"
          ]
        },
        "options": {
          "cellFormat": "USER_ENTERED"
        },
        "operation": "appendOrUpdate",
        "sheetName": {
          "__rl": true,
          "mode": "name",
          "value": "Master_CRM_Data"
        },
        "documentId": {
          "__rl": true,
          "mode": "id",
          "value": "={{ $('Configuration Center').item.json.masterSheetId }}"
        },
        "authentication": "serviceAccount"
      },
      "credentials": {
        "googleApi": {
          "id": "UD5OjKiVkvWF1KEV",
          "name": "Google Sheets account"
        }
      },
      "typeVersion": 4.4
    },
    {
      "id": "b5e1acea-e75d-43ec-aae9-838923e124e1",
      "name": "质量报告生成器",
      "type": "n8n-nodes-base.code",
      "position": [
        368,
        -96
      ],
      "parameters": {
        "jsCode": "// Quality Report Generator and Notifier\nconst config = $('Configuration Center').first().json;\nconst crmData = $('Parallel CRM Fetcher').first().json;\nconst processedData = $input.all();\n\n// Calculate statistics\nconst stats = {\n  runId: config.runId,\n  timestamp: new Date().toISOString(),\n  duration: Math.round((new Date() - new Date(crmData.timestamp)) / 1000),\n  \n  // Record counts\n  totalFetched: crmData.totalRecords,\n  totalProcessed: processedData.length,\n  recordsWritten: processedData.filter(r => !r.json.isDuplicate).length,\n  duplicatesFound: processedData.filter(r => r.json.isDuplicate).length,\n  \n  // Quality metrics\n  averageQuality: (processedData.reduce((sum, r) => sum + (r.json.dataQualityScore || 0), 0) / processedData.length).toFixed(3),\n  highQuality: processedData.filter(r => r.json.dataQualityScore >= config.qualityThreshold).length,\n  lowQuality: processedData.filter(r => r.json.dataQualityScore < config.qualityThreshold).length,\n  \n  // Source breakdown\n  breakdown: crmData.breakdown,\n  \n  // Issues summary\n  topIssues: {},\n  errors: crmData.errors\n};\n\n// Analyze quality issues\nprocessedData.forEach(record => {\n  if (record.json.qualityIssues) {\n    record.json.qualityIssues.forEach(issue => {\n      stats.topIssues[issue] = (stats.topIssues[issue] || 0) + 1;\n    });\n  }\n});\n\n// Sort issues by frequency\nstats.topIssues = Object.entries(stats.topIssues)\n  .sort((a, b) => b[1] - a[1])\n  .slice(0, 5)\n  .reduce((obj, [key, value]) => ({ ...obj, [key]: value }), {});\n\n// Create Slack notification\nconst slackMessage = {\n  blocks: [\n    {\n      type: \"header\",\n      text: {\n        type: \"plain_text\",\n        text: \"🔄 CRM Sync Completed\"\n      }\n    },\n    {\n      type: \"section\",\n      fields: [\n        { type: \"mrkdwn\", text: `*Run ID:* ${stats.runId}` },\n        { type: \"mrkdwn\", text: `*Duration:* ${stats.duration}s` },\n        { type: \"mrkdwn\", text: `*Records Processed:* ${stats.totalProcessed}` },\n        { type: \"mrkdwn\", text: `*Duplicates Found:* ${stats.duplicatesFound}` },\n        { type: \"mrkdwn\", text: `*Average Quality:* ${stats.averageQuality}` },\n        { type: \"mrkdwn\", text: `*High Quality:* ${stats.highQuality}` }\n      ]\n    }\n  ]\n};\n\n// Add error section if any\nif (stats.errors.length > 0) {\n  slackMessage.blocks.push({\n    type: \"section\",\n    text: {\n      type: \"mrkdwn\",\n      text: `⚠️ *Errors:* ${stats.errors.map(e => `${e.source}: ${e.error}`).join(', ')}`\n    }\n  });\n}\n\n// Send to Slack\nif (config.slackWebhookUrl) {\n  await $http.post(config.slackWebhookUrl, {\n    body: slackMessage,\n    headers: { 'Content-Type': 'application/json' }\n  });\n}\n\n// Write report to Google Sheets\nconst reportData = {\n  ...stats,\n  topIssues: JSON.stringify(stats.topIssues),\n  errors: JSON.stringify(stats.errors)\n};\n\n// Return report for sheet writing\nreturn reportData;"
      },
      "typeVersion": 2
    },
    {
      "id": "6cc65daf-89df-4142-a8d1-3b73c981d7b8",
      "name": "报告写入器",
      "type": "n8n-nodes-base.googleSheets",
      "position": [
        768,
        -96
      ],
      "parameters": {
        "columns": {
          "value": {},
          "mappingMode": "autoMapInputData"
        },
        "options": {},
        "operation": "append",
        "sheetName": {
          "__rl": true,
          "mode": "name",
          "value": "Quality_Reports"
        },
        "documentId": {
          "__rl": true,
          "mode": "id",
          "value": "={{ $('Configuration Center').item.json.masterSheetId }}"
        },
        "authentication": "serviceAccount"
      },
      "credentials": {
        "googleApi": {
          "id": "UD5OjKiVkvWF1KEV",
          "name": "Google Sheets account"
        }
      },
      "typeVersion": 4.4
    },
    {
      "id": "e936e062-489d-44d4-93d0-7b6f0b15790a",
      "name": "错误处理器",
      "type": "n8n-nodes-base.errorTrigger",
      "position": [
        -1088,
        288
      ],
      "parameters": {},
      "typeVersion": 1
    },
    {
      "id": "ddb1762a-799c-4552-948e-817d5014ae8a",
      "name": "错误处理器",
      "type": "n8n-nodes-base.code",
      "position": [
        -816,
        288
      ],
      "parameters": {
        "jsCode": "// Comprehensive Error Handler\nconst error = $input.first().json;\nconst config = $('Configuration Center').first()?.json || {};\n\n// Create detailed error log\nconst errorLog = {\n  timestamp: new Date().toISOString(),\n  runId: config.runId || 'unknown',\n  errorType: error.name || 'UnknownError',\n  errorMessage: error.message || 'No error message',\n  errorStack: error.stack || '',\n  nodeName: error.node?.name || 'Unknown node',\n  nodeType: error.node?.type || 'Unknown type',\n  workflowId: $execution.workflowId,\n  executionId: $execution.id,\n  context: JSON.stringify(error)\n};\n\n// Log to console for debugging\nconsole.error('Workflow Error:', errorLog);\n\n// Send error notification to Slack if configured\nif (config.slackWebhookUrl) {\n  const slackError = {\n    blocks: [\n      {\n        type: \"header\",\n        text: {\n          type: \"plain_text\",\n          text: \"❌ CRM Sync Error\"\n        }\n      },\n      {\n        type: \"section\",\n        text: {\n          type: \"mrkdwn\",\n          text: `*Error in node:* ${errorLog.nodeName}\\n*Message:* ${errorLog.errorMessage}\\n*Run ID:* ${errorLog.runId}\\n*Time:* ${errorLog.timestamp}`\n        }\n      }\n    ]\n  };\n  \n  try {\n    await $http.post(config.slackWebhookUrl, {\n      body: slackError,\n      headers: { 'Content-Type': 'application/json' }\n    });\n  } catch (notifyError) {\n    console.error('Failed to send Slack notification:', notifyError);\n  }\n}\n\n// Write to error log sheet if configured\nif (config.masterSheetId) {\n  return errorLog;\n}\n\n// Return error for further processing\nreturn { error: errorLog, handled: true };"
      },
      "typeVersion": 2
    },
    {
      "id": "82668ba0-bdee-4df3-9407-b250cd75cb90",
      "name": "CRM 数据处理代理",
      "type": "@n8n/n8n-nodes-langchain.agent",
      "position": [
        -256,
        -96
      ],
      "parameters": {
        "text": "You are a CRM Data Processing Expert. Your task is to:\n\n1. **Analyze the incoming CRM data** from multiple sources (HubSpot, Pipedrive, Salesforce)\n2. **Use the pandas MCP tools** to:\n   - Calculate data quality scores for each record\n   - Perform advanced deduplication using similarity matching\n   - Merge duplicate records intelligently\n   - Generate data cleansing reports\n\n3. **Process the data** by:\n   - Loading the CRM data into pandas for analysis\n   - Running deduplication algorithms on email, company name, and phone\n   - Calculating quality metrics based on completeness and accuracy\n   - Creating merged master records from duplicates\n\n4. **Return structured results** with:\n   - Deduplicated dataset\n   - Quality scores per record\n   - Merge history tracking\n   - Data quality report summary\n\nThe input data structure contains: runId, timestamp, totalRecords, breakdown (by source), errors, and data array with records containing: source, recordId, firstName, lastName, email, companyName, phone, stage, lastModified, rawData.\n\nProcess this data thoroughly and return clean, deduplicated CRM records ready for database insertion.",
        "options": {},
        "promptType": "define"
      },
      "typeVersion": 2.1
    },
    {
      "id": "0bdfa603-c570-49a0-949a-d3320d8097f1",
      "name": "pandas-mcp-server",
      "type": "@n8n/n8n-nodes-langchain.mcpClientTool",
      "position": [
        -32,
        112
      ],
      "parameters": {
        "sseEndpoint": "http://localhost:8000"
      },
      "typeVersion": 1
    },
    {
      "id": "d3a13c75-6461-4ee7-bfe0-c525119a419c",
      "name": "OpenAI 聊天模型",
      "type": "@n8n/n8n-nodes-langchain.lmChatOpenAi",
      "position": [
        -256,
        80
      ],
      "parameters": {
        "model": {
          "__rl": true,
          "mode": "list",
          "value": "gpt-4o-mini",
          "cachedResultName": "gpt-4o-mini"
        },
        "options": {}
      },
      "credentials": {
        "openAiApi": {
          "id": "M2qgdRuw59w2LPE1",
          "name": "OpenAi account 2"
        }
      },
      "typeVersion": 1.2
    },
    {
      "id": "d0b1fe2d-c480-42c2-9bbf-adc0734e8456",
      "name": "便签",
      "type": "n8n-nodes-base.stickyNote",
      "position": [
        -1520,
        -832
      ],
      "parameters": {
        "color": 3,
        "width": 560,
        "height": 640,
        "content": "🛠️ 多 CRM 数据同步与 AI 去重"
      },
      "typeVersion": 1
    },
    {
      "id": "4efb80f1-7722-47fd-8a4e-1225fb5a1116",
      "name": "便签 2",
      "type": "n8n-nodes-base.stickyNote",
      "position": [
        -32,
        -768
      ],
      "parameters": {
        "width": 464,
        "height": 608,
        "content": "📊 Google Sheets 配置"
      },
      "typeVersion": 1
    },
    {
      "id": "d9d7a768-5f00-4a5f-9bca-e47e63537376",
      "name": "便签 3",
      "type": "n8n-nodes-base.stickyNote",
      "position": [
        608,
        -416
      ],
      "parameters": {
        "color": 6,
        "width": 512,
        "height": 496,
        "content": "📊 Google Sheets 配置"
      },
      "typeVersion": 1
    },
    {
      "id": "52b53665-89ec-488a-afe0-647d6d66b664",
      "name": "便签 4",
      "type": "n8n-nodes-base.stickyNote",
      "position": [
        -944,
        -832
      ],
      "parameters": {
        "color": 5,
        "width": 528,
        "height": 640,
        "content": "🔧 必需的 MCP 服务器"
      },
      "typeVersion": 1
    },
    {
      "id": "0e0da7b2-ec42-48bd-b3b6-68e2db46b5da",
      "name": "便签 1",
      "type": "n8n-nodes-base.stickyNote",
      "position": [
        -1168,
        208
      ],
      "parameters": {
        "color": 4,
        "width": 576,
        "height": 256,
        "content": "## 🚨 错误管理系统"
      },
      "typeVersion": 1
    }
  ],
  "active": false,
  "pinData": {},
  "settings": {
    "executionOrder": "v1"
  },
  "versionId": "2d0b2e3a-2c29-470a-802a-bdc62f367239",
  "connections": {
    "Error Handler": {
      "main": [
        [
          {
            "node": "Error Processor",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Trigger Router": {
      "main": [
        [
          {
            "node": "Configuration Center",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "OpenAI Chat Model": {
      "ai_languageModel": [
        [
          {
            "node": "CRM Data Processing Agent",
            "type": "ai_languageModel",
            "index": 0
          }
        ]
      ]
    },
    "pandas-mcp-server": {
      "ai_tool": [
        [
          {
            "node": "CRM Data Processing Agent",
            "type": "ai_tool",
            "index": 0
          }
        ]
      ]
    },
    "Daily Sync Schedule": {
      "main": [
        [
          {
            "node": "Trigger Router",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Manual Sync Webhook": {
      "main": [
        [
          {
            "node": "Trigger Router",
            "type": "main",
            "index": 1
          }
        ]
      ]
    },
    "Configuration Center": {
      "main": [
        [
          {
            "node": "Parallel CRM Fetcher",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Parallel CRM Fetcher": {
      "main": [
        [
          {
            "node": "CRM Data Processing Agent",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Master Database Writer": {
      "main": [
        [
          {
            "node": "Quality Report Generator",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Quality Report Generator": {
      "main": [
        [
          {
            "node": "Report Writer",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "CRM Data Processing Agent": {
      "main": [
        [
          {
            "node": "Master Database Writer",
            "type": "main",
            "index": 0
          }
        ]
      ]
    }
  }
}
常见问题

如何使用这个工作流?

复制上方的 JSON 配置代码,在您的 n8n 实例中创建新工作流并选择「从 JSON 导入」,粘贴配置后根据需要修改凭证设置即可。

这个工作流适合什么场景?

高级 - 客户关系管理, AI 摘要总结

需要付费吗?

本工作流完全免费,您可以直接导入使用。但请注意,工作流中使用的第三方服务(如 OpenAI API)可能需要您自行付费。

工作流信息
难度等级
高级
节点数量18
分类2
节点类型11
难度说明

适合高级用户,包含 16+ 个节点的复杂工作流

作者
Grace Gbadamosi

Grace Gbadamosi

@grace-bola

AI implementation consultant specializing in n8n workflows. I help freelancers, consultants, and software creators become the 'AI automation expert' in their local markets by building premium automation solutions for local businesses. As an n8n verified creator, I design sophisticated RAG, Voice and multi-agent systems that solve real business problems.

外部链接
在 n8n.io 查看

分享此工作流