使用 OpenTelemetry 和 Jaeger 对 MinIO 进行分布式追踪

Distributed Tracing with MinIO using OpenTelemetry and Jaeger

几年前,当您只有一个单体应用时,调试和诊断相当容易,因为可能只有一个服务和几个用户。如今,系统被分解成更小的微服务,部署在 Kubernetes 之上的容器中,分布在不同云环境的多个集群中。在这种分布式环境中,需要观察所有内容,既要了解整体情况,也要在需要时进行更细粒度的观察。

可观测性大致可以分为三个子类别:日志记录、指标和追踪。在本博客文章中,我们将向您展示在新的或现有的 MinIO 应用程序中设置追踪是多么简单。我们将构建一个执行一些基本请求的小型 MinIO 应用程序;这将是我们的基础应用程序,我们将添加追踪以更好地了解系统组件和函数如何交互。

追踪用于描述记录和观察应用程序发出的请求以及这些请求如何在系统中传播的活动。当系统以分布式方式设置时,我们称之为分布式追踪,因为它涉及通过多个系统观察应用程序及其交互。例如,作为开发人员,您的代码可能包含多个函数,但您更感兴趣的是 MinIO 函数执行所需的时间以及这些函数在应用程序中使用时的相互依赖关系。追踪将通过以下方式提供必要的洞察:

  • 识别性能和延迟瓶颈
  • 在重大事件后查找根本原因分析
  • 确定多服务架构中的依赖关系
  • 监控应用程序中的事务

MinIO

我们将从一个小型 MinIO Python 应用程序开始,该应用程序将展示几个简单的操作。稍后,我们将添加用于追踪的代码,以测量代码执行所需的时间。

安装 MinIO

有多种方法可以在各种环境中安装 MinIO。在本博客文章中,我们将使用 Docker 启动 MinIO,但在生产环境中,请务必以分布式设置进行安装。

  • 在本地机器上创建一个目录,MinIO 将在其中持久化数据

$ mkdir -p /minio/data

  • 使用 Docker 启动 MinIO 容器

$ docker run -d \
  -p 9000:9000 \
  -p 9001:9001 \
  --name minio \
  -v /minio/data:/data \
  -e "MINIO_ROOT_USER=minio" \
  -e "MINIO_ROOT_PASSWORD=minioadmin" \
  quay.io/minio/minio server /data --console-address ":9001"

注意:请保留上面使用的凭据副本,稍后您将需要它们来访问 MinIO。

  • 通过使用启动上述容器的凭据,使用浏览器登录 http://localhost:9001/ 来验证您是否可以访问 MinIO。

MinIO SDK

有几个受支持的 SDK 可供您将应用程序与 MinIO API 集成。在本例中,我们将使用 Python SDK

  • 安装 MinIO Python SDK

$ pip install minio

  • 将包含 MinIO 函数的整个 Python 脚本复制并粘贴到本地文本编辑器中,并将其另存为 minio_app.py。我们将在下面描述其功能时参考它。

from minio import Minio

# Convenient dict for basic config
config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "localhost:9000",
  "minio_username": "minio",
  "minio_password": "minioadmin",
}

# Initialize MinIO client
minio_client = Minio(config["minio_endpoint"],
              secure=False,
              access_key=config["minio_username"],
              secret_key=config["minio_password"]
              )

# Create destination bucket if it does not exist
if not minio_client.bucket_exists(config["dest_bucket"]):
  minio_client.make_bucket(config["dest_bucket"])
  print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))

# Create a test object
file_path = "test_object.txt"
f = open(file_path, "w")
f.write("created test object")
f.close()

# Put an object inside the bucket
minio_client.fput_object(config["dest_bucket"], file_path, file_path)

# Get the object from the bucket
minio_client.fget_object(config["dest_bucket"], file_path, config["dest_bucket"] + "/" + file_path)

# Get list of objects
for obj in minio_client.list_objects(config["dest_bucket"]):
  print(obj)
  print("Some objects here")

让我们逐步了解上面的脚本。我们正在使用在上一步中启动的 MinIO 容器调用一些基本操作。

在最顶部,我们导入了之前安装的 MinIO Python SDK,并使用以下默认值对其进行初始化:

  • MinIO 端点
  • MinIO 用户名
  • MinIO 密码
  • MinIO 中的目标存储桶名称

from minio import Minio

# Convenient dict for basic config
config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "localhost:9000",
  "minio_username": "minio",
  "minio_password": "minioadmin",
}

# Initialize MinIO client
minio_client = Minio(config["minio_endpoint"],
              secure=False,
              access_key=config["minio_username"],
              secret_key=config["minio_password"]
              )

  • 检查特定的目标存储桶是否存在;如果不存在,则创建它。

# 如果目标桶不存在,则创建
if not minio_client.bucket_exists(config["dest_bucket"]):
  minio_client.make_bucket(config["dest_bucket"])
  print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))

  • 创建一个测试对象以执行基本操作。这里我们正在创建一个包含一些文本的文件

# 创建一个测试对象
file_path = "test_object.txt"
f = open(file_path, "w")
f.write("created test object")
f.close()

  • 将测试对象放入我们之前创建的存储桶中。

minio_client.fput_object(config["dest_bucket"], file_path, file_path)

  • 获取上一步添加的测试对象。在运行脚本的机器上,我们将文件放在 <存储桶名称>/<文件路径> 以避免与原始文件冲突。

minio_client.fget_object(config["dest_bucket"], file_path, config["dest_bucket"] + "/" + file_path)

  • 获取存储桶中对象的列表,以确认我们的文件已成功上传。

for obj in minio_client.list_objects(config["dest_bucket"]):
  print(obj)
  print("这里有一些对象")

  • 输出结果类似如下。我们添加的文件显示为一个 Python 对象。现在我们知道对象在那里了。

<minio.datatypes.Object object at 0x109b1d3d0>
这里有一些对象

  • 使用以下命令运行脚本。您应该会在存储桶中看到新对象。

$ python minio_app.py

  • 现在,我们在运行脚本的机器上有了该对象,让我们从 MinIO 存储桶中将其删除,并验证那里没有其他对象。

minio_client.remove_object(config["dest_bucket"], file_path)

for obj in minio_client.list_objects(config["dest_bucket"]):
  print(obj)
  print("No objects here")

  • 由于这是唯一的对象,现在我们可以删除之前创建的存储桶。

if minio_client.bucket_exists(config["dest_bucket"]):
  minio_client.remove_bucket(config["dest_bucket"])
  print("目标存储桶 '%s' 已被移除" % (config["dest_bucket"]))

请记住,在本教程中,我们使用的是一个非常简单的应用程序,它使用了 MinIO SDK。由此,您可以轻松地了解如何在自己的应用程序中包含跟踪。虽然在构建应用程序时添加跟踪是理想的,但添加跟踪并利用它提供的洞察力永远不会太晚。

OpenTelemetry

OpenTelemetry 是一个框架,它允许您从应用程序中获取跟踪、指标和日志,并以一种可以被许多导出器(例如 Jaeger)使用的方式对其进行标准化。

安装 OpenTelemetry

与 MinIO 一样,OpenTelemetry 也支持许多SDK。有些 SDK 比其他的功能更丰富,但其中一个构建了所有功能的 SDK 是Python SDK。我们需要安装两个 Python 包

$ pip install opentelemetry-api
$ pip install opentelemetry-sdk

初始化 OpenTelemetry

安装所需的软件包后,将它们导入到上一节中启动的 minio_app.py 脚本中。

  • 导入以下软件包

opentelemetry 导入 trace
opentelemetry.sdk.trace 导入 TracerProvider
opentelemetry.sdk.trace.export 导入 ConsoleSpanExporter
opentelemetry.sdk.trace.export 导入 BatchSpanProcessor
opentelemetry.sdk.resources 导入 SERVICE_NAME, Resource

  • 在资源属性中设置服务名称,以便在搜索跟踪时更容易找到它们。我们将服务命名为 my-minio,但您可以随意命名。以下代码实现了这一点并初始化了跟踪。

resource = Resource(attributes={
  SERVICE_NAME: "my-minio"
})

provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)

我们已经创建了所有构建块;现在让我们创建一个可以观察的span。但什么是span?简单来说,一个span不过是函数进行的单个请求的开始和结束。可以有父span和子span;它们一起构成一个跟踪。

创建一个span

我们发出的第一个请求之一是检查存储桶是否存在。让我们为它创建一个span。

使用 tracer.start_as_current_span("检查存储桶是否存在"):

此时,脚本应该如下所示

from minio import Minio
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

# Convenient dict for basic config
config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "localhost:9000",
  "minio_username": "minio",
  "minio_password": "minioadmin",
}

# Initialize MinIO client
minio_client = Minio(config["minio_endpoint"],
              secure=False,
              access_key=config["minio_username"],
              secret_key=config["minio_password"]
              )

# 初始化 OpenTelemetry 提供程序
resource = Resource(attributes={
  SERVICE_NAME: "my-minio"
})

provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)

with tracer.start_as_current_span("检查存储桶是否存在"):
# 如果目标存储桶不存在,则创建
  if not minio_client.bucket_exists(config["dest_bucket"]):
    minio_client.make_bucket(config["dest_bucket"])
    print("目标存储桶 '%s' 已创建" % (config["dest_bucket"]))



...TRUNCATED...

这些跨度可以发送到多个导出器,但我们先将跟踪发送到 CLI。如果您运行上面的脚本,最后应该会看到一个 JSON 输出,如下所示。这就是你的跟踪。

$ python3 minio_app.py
Destination Bucket 'processed' has been created
<minio.datatypes.Object object at 0x103f36eb0>
Some objects here
Destination Bucket 'processed' has been removed
{
    "name": "check if bucket exists",
    "context": {
        "trace_id": "0xef41e07cf082045a2fc4eea70fd1a6de",
        "span_id": "0x867c14fe1fd97590",
        "trace_state": "[]"
    },
    "kind": "SpanKind.INTERNAL",
    "parent_id": null,
    "start_time": "2022-09-14T20:49:15.569511Z",
    "end_time": "2022-09-14T20:49:15.599886Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {},
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "service.name": "my-minio"
        },
        "schema_url": ""
    }
}

每个跨度都可以通过添加额外的属性以多种方式进行自定义。让我们添加一个属性 function.name,值为 CHECK_BUCKET

使用 tracer.start_as_current_span(“检查存储桶是否存在”):
  current_span = trace.get_current_span()
  current_span.set_attribute(“function.name”, “CHECK_BUCKET”)

如果再次运行脚本,您会在输出中注意到一个新的属性

...(内容截断)…

 

   },
    “attributes”: {
        “function.name”: “CHECK_BUCKET”
    },
    “events”: [],


...TRUNCATED...

您还可以添加事件来进一步丰富跟踪信息

  current_span.add_event(“正在检查存储桶是否存在。”)
  如果 minio_client.bucket_exists(config[“dest_bucket”]):
    current_span.add_event(“存储桶不存在,将创建它。”)

    minio_client.make_bucket(config[“dest_bucket”])

再次运行脚本,您会在 JSON 输出中注意到两个新事件

...(内容截断)… 


"events":[
        {
            "name": "检查存储桶是否存在。",
            "timestamp": "2022-09-14T21:09:48.505709Z",
            "attributes": {}
        },
        {
            "name": "存储桶不存在,将创建它。",
            "timestamp": "2022-09-14T21:09:48.514541Z",
            "attributes": {}
        }
    ],


...TRUNCATED...

添加更多跨度

到目前为止,我们只添加了一个跨度。为了使其更有用,让我们添加更多跨度以及其他属性和事件。通常,添加更多跨度不会降低应用程序的性能,因为这些是异步请求。

考虑到这一点,以下是更新后的脚本,其中包含一些额外的跨度

from minio import Minio
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

# Convenient dict for basic config
config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "localhost:9000",
  "minio_username": "minio",
  "minio_password": "minioadmin",
}

# Initialize MinIO client
minio_client = Minio(config["minio_endpoint"],
              secure=False,
              access_key=config["minio_username"],
              secret_key=config["minio_password"]
              )

resource = Resource(attributes={
  SERVICE_NAME: "my-minio"
})


provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)


tracer = trace.get_tracer(__name__)


# Create destination bucket if it does not exist
with tracer.start_as_current_span("check if bucket exists"):
  current_span = trace.get_current_span()
  current_span.set_attribute("function.name", "CHECK_BUCKET")

  current_span.add_event("Checking if bucket exists.")
  if not minio_client.bucket_exists(config["dest_bucket"]):
    current_span.add_event("Bucket does not exist, going to create it.")

    with tracer.start_as_current_span("create bucket"):
      minio_client.make_bucket(config["dest_bucket"])
      current_span.add_event("Bucket has been created.")
      print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))

with tracer.start_as_current_span("create object to add"):
  current_span = trace.get_current_span()
  current_span.set_attribute("function.name", "CREATE_OBJECT")

  # Create a test object
  file_path = "test_object.txt"
  f = open(file_path, "w")
  f.write("created test object")
  f.close()
  current_span.add_event("Test object has been created.")

# Put an object inside the bucket
with tracer.start_as_current_span("add created object to bucket"):
  current_span = trace.get_current_span()
  current_span.set_attribute("function.name", "CREATE_OBJECT")

  minio_client.fput_object(config["dest_bucket"], file_path, file_path)
  current_span.add_event("Test object has been placed in bucket.")

# Get the object from the bucket
with tracer.start_as_current_span("fetch object from bucket"):
  current_span = trace.get_current_span()
  current_span.set_attribute("function.name", "FETCH_OBJECT")

  minio_client.fget_object(config["dest_bucket"], file_path, config["dest_bucket"] + "/" + file_path)
  current_span.add_event("Test object has been fetched from bucket.")

# Get list of objects
for obj in minio_client.list_objects(config["dest_bucket"]):
  print(obj)
  print("Some objects here")


# Remove the object from bucket
with tracer.start_as_current_span("remove object from bucket"):
  current_span = trace.get_current_span()
  current_span.set_attribute("function.name", "REMOVE_OBJECT")

  minio_client.remove_object(config["dest_bucket"], file_path)
  current_span.add_event("Test object has been removed from bucket.")

# Get list of objects
for obj in minio_client.list_objects(config["dest_bucket"]):
  print(obj)
  print("No objects here")

# Remove destination bucket if it does exist
with tracer.start_as_current_span("check if bucket exists"):
  current_span = trace.get_current_span()
  current_span.set_attribute("function.name", "REMOVE_BUCKET")

  current_span.add_event("Checking if bucket exists.")
  if minio_client.bucket_exists(config["dest_bucket"]):
    current_span.add_event("Bucket exists, going to remove it.")

    with tracer.start_as_current_span("delete bucket"):
      minio_client.remove_bucket(config["dest_bucket"])
      current_span.add_event("Bucket has been removed.")
      print("Destination Bucket '%s' has been removed" % (config["dest_bucket"]))

这些只是一些示例;您可以根据需要详细说明,以使您的跟踪对您的团队有所帮助。您可以测量

  • 数据库调用的性能
  • AI/ML 作业的处理时间和性能
  • 连接到外部服务时的延迟

Jaeger

但存在一个问题:如果您现在尝试运行脚本,它会运行,但您会看到一大堆文本,这些文本可能比只有一个跨度时长得多且更复杂(因此没有那么有用)。为了理解这些跟踪,我们需要一个工具来收集和处理它们。

您可以使用许多工具,但 Jaeger 是最流行的工具之一。它很容易启动和运行,就像 MinIO 一样,而且像 MinIO 一样,功能非常丰富,可以帮助您进行根本原因分析和服务依赖性分析等。

安装 Jaeger

我们将 Jaeger 部署为 Docker 容器并公开必要的端口

  • 安装 Jaeger 容器

$ docker run -d --name jaeger -p 16686:16686 -p 6831:6831/udp jaegertracing/all-in-one

  • 以上命令会在 localhost 上暴露两个端口。

6831:Thrift 服务端口,用于接收追踪数据的入站端口。

16686:Jaeger UI 端口,用于可视化追踪数据。

配置 Jaeger

目前,我们的追踪数据正在输出到 CLI。现在,我们将稍微修改 Python 脚本,将其输出到我们刚刚创建的 Jaeger 容器。

  • 安装 OpenTelemetry 的 Jaeger 导出器。

$ pip install opentelemetry-exporter-jaeger

  • 通过替换以下行,在 Python 中导入包:

from opentelemetry.sdk.trace.export import ConsoleSpanExporter

替换为:

from opentelemetry.exporter.jaeger.thrift import JaegerExporter

  • 添加 Jaeger 导出器的主机信息:

jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

  • 将以下行:

processor = BatchSpanProcessor(ConsoleSpanExporter())

替换为:

processor = BatchSpanProcessor(jaeger_exporter)

  • 最终结果如下所示:

...(内容截断)…


from opentelemetry.exporter.jaeger.thrift import JaegerExporter

...TRUNCATED...

jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(jaeger_exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)


...TRUNCATED...

使用 Jaeger

重新运行脚本。现在不会看到输出的 JSON blob 数据(还记得我们提到的文本墙),而是转到 Jaeger UI,在左侧您将看到 my-minio 服务。选择它,然后单击“查找跟踪”(Find Traces)。

到目前为止,我们只发出了一次请求,并且应该从我们创建的几个 span 中看到一些跟踪。

单击其中一个显示“2 Spans”的跟踪;让我们选择显示“检查存储桶是否存在”的那个。您可以通过一种比 JSON blob 数据更易于理解的方式查看所有详细信息,从而永远告别文本墙并重新提高效率。

运行脚本五六次后,我们就可以开始看到模式的出现。上面看到的时间是不同 span 执行所花费的时间。我们在这里看到两个 span,是因为我们添加了两个父子 span。这不仅比巨大的 JSON blob 数据在视觉上更具吸引力,而且也更有帮助。

然后,您可以将这些数据从 Jaeger 发送到 Grafana,以便获得历史图表,甚至可以根据特定阈值设置警报。例如,如果 AI/ML 作业执行其功能的时间超过 10 毫秒,则会根据设置的阈值发出警报。无论您的应用程序在哪个环境中运行,您都可以确保通过单一平台进行监控。

获得跟踪后,您需要构建一个历史数据库,以便回顾以查看趋势和相关性。这时指标就派上用场了。OpenTelemetry 支持一整套值得一试的指标和日志记录框架。

分布式跟踪加速故障排除

跟踪只是实现可观测性的其中一步。通常,当事件发生时,我们不仅仅使用一个跟踪或一个日志或一个指标来确定和解决问题。通常需要理解这些因素的组合才能找到根本原因。

可观测性为自动化打开了大门。我们是自动化的忠实拥护者,但为了高效地在云规模上运行,您需要对应用程序的日志和性能有坚实的基础和可见性。

如果您需要有关 MinIO Python SDK 或如何在 MinIO 应用程序中实现跟踪的任何帮助,请随时通过Slack与我们联系。