8wDlpd.png
8wDFp9.png
8wDEOx.png
8wDMfH.png
8wDKte.png

如何使用 django orm 进行没有外键的 LEFT OUTER JOIN

suresh madaparthi 2月前

101 0

我有以下模型:class User(Model): ### 用户字段 ###class CaseModel(Model): id = # django 内置 id 字段owner = models.ForeignKey('User') ### 其他字段...

我有以下模型:

class User(Model):
    ### 
    user fields
    ###

class CaseModel(Model):
    id = # django build in id field
    owner = models.ForeignKey('User')
    ### 
    other fields
    ###


class DraftMessageModel(Model):
    entity_id = models.IntegerField(null=True, blank=True) # CaseModel.id is linked to this
    ### 
    other fields
    ###

我需要构建生成以下 SQL 的查询集:

SELECT 
  C.id, 
  C.some_field, 
  D.another_field, 
  U.username,
  < some extra fields > 
FROM 
  CaseModel AS C 
  LEFT OUTER JOIN User AS U ON (C.owner_id = U.id) 
  LEFT OUTER JOIN DraftMessageModel AS D ON (C.id = D.entity_id) 
WHERE 
  C.owner_id = 100 
  AND < some extra condition >

我知道这是个小问题,但我无法处理它 queryset.extra ,或者 queryset.annotate(draft_message=FilteredRelation()) .

以下是我迄今为止尝试过的:

  • 查询集注释
queryset = CaseModel.objects.select_related('owner').filter(owner_id=100)

queryset = queryset.annotate(
    draft_message=FilteredRelation(
        'draftmessagemodel',
        condition=Q(draftmessagemodel__entity_id=F('id'))
    )
)
# error: Cannot resolve keyword 'draftmessagemodel' into field. Choices are: <CaseModel's other fields>
  • 查询集.额外
queryset = CaseModel.objects.select_related('owner').filter(owner_id=100)

queryset = queryset.extra(
    select={
        'draftmessagemodel__id': 'draftmessagemodel.id',
        'draftmessagemodel__text': 'draftmessagemodel.text',
    },
    tables=['draftmessagemodel'],
    where=[
        'casemodel.id = draftmessagemodel.entity_id'
    ]
)

生成不需要的 SQL:

SELECT 
  "casemodel"."id", 
  "user"."username",
  (draftmessagemodel.id) AS "draftmessagemodel__id", 
  (draftmessagemodel.text) AS "draftmessagemodel__text",
  <some other fields>
  
FROM 
  "casemodel" 
  LEFT OUTER JOIN "user" ON (
    "casemodel"."owner_id" = "user"."id"
  ), -- I didn't unserstand from where this comma come here
  "draftmessagemodel" 
WHERE 
  (
    "casemodel"."owner_id" = 100 
    AND (
      casemodel.id = draftmessagemodel.entity_id
    )
  )

帖子版权声明 1、本帖标题:如何使用 django orm 进行没有外键的 LEFT OUTER JOIN
    本站网址:http://xjnalaquan.com/
2、本网站的资源部分来源于网络,如有侵权,请联系站长进行删除处理。
3、会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。
4、本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
5、站长邮箱:yeweds@126.com 除非注明,本帖由suresh madaparthi在本站《python》版块原创发布, 转载请注明出处!
最新回复 (0)
  • 我不知道你到底想要什么(因为你只提供了部分信息),但你大致可以这样做。

    a = DraftMessageModel.objects.get({filter})
    b = User.objects.get({filter})
    CaseModel.objects.only('id', 'username', 'draftmessagemodel_id').filter(owner_id=100, id=a.entity_id, owner_id=b.id).annotate(draftmessagemodel__text='draftmessagemodel__text')
    
  • Ben 2月前 0 只看Ta
    引用 3

    如果要直接使用查询命令,请参见此链接。docs.djangoproject.com/en/5.0/topics/db/sql/...

  • 请注意,Stack Overflow 上的所有内容都要求使用英语。另外,不要在帖子中添加问候语,请参阅:.com/help/behavior

  • Ebad 2月前 0 只看Ta
    引用 5

    如何在开发过程中对 Scala.js 应用程序进行热重载我在开发过程中手动重载 Scala.js 应用程序。但是,我需要能够重载并立即反映

    如何在开发过程中在 scala.js 应用程序中进行热重载

    我在开发过程中手动重新加载 Scala.js 应用程序。但是,我需要重新加载并立即反映所做更改的能力,类似于 React 热重新加载的工作方式。

  • 请谷歌搜索任何答案并尝试。如果没有用,请说明您迄今为止尝试过的方法,社区将为您提供帮助。

  • 我的理解是,“热重载”具有非常具体的含义,即通过在运行时替换模块来保留程序的状态,从而在不改变状态的情况下改变程序行为。我不认为这种“真正的热重载”在 scalaJS 中是(至少不容易)实现的。

    如果想要'变化时快速刷新' - 即页面重新加载(但很快,cca 50ms)你可以尝试将其粘贴到控制台中(如果你已经有 scala-cli 和 coursier,则应该<5秒才能开始)并按照面包屑操作;

    scala-cli --version && \
    cs version && \
    git clone https://github.com/Quafadas/viteless.git && \
    cd viteless && \
    cs launch io.github.quafadas:live-server-scala-cli-js_3:0.1.4
    

    坦白说,这是我自己‘自制’的解决方案。

    更广泛的 scalajs 社区使用 vite - 有许多可用的模板。例如,您可以尝试将其作为起点; https://github.com/keynmol/scalajs-scala-cli-vite-template

  • 我们正在构建一个数据湖,并将我们的数据块运行时从 12.2 LTS 升级到 14.3 LTS,以支持 python 3.10。我们能够写入我们的冰山表,但读取这些表时......

    我们正在构建一个数据湖,并将我们的数据块运行时从 12.2 LTS 升级到 14.3 LTS,以支持 python 3.10。我们能够写入我们的冰山表,但读取这些表会出现以下阻碍,其中 Hadoop 字节缓冲区流试图读出:

    错误:UnsupportedOperationException:org.apache.hadoop.fs.BufferedFSInputStream 不支持字节缓冲区读取 org.apache.spark.SparkException:由于阶段失败导致作业中止:阶段 0.0 中的任务 0 失败 4 次,最近一次失败:阶段 0.0 中的任务 0.3 丢失(TID 3)(10.139.64.20 执行器 1):java.lang.UnsupportedOperationException:org.apache.hadoop.fs.BufferedFSInputStream 不支持字节缓冲区读取

    使用以下运行时配置:

    Azure databricks:14.3 LTS 运行时

    阿帕奇冰山:1.5.2

    火花:3.5.0

    scala:2.12azure-hadoop:3.4.0

    有人遇到过这个问题吗?

    databricks notebook 代码被简化为对我们的实际代码的测试:

    #paragraph 1
    pip install azure-storage-blob pymssql pymysql snowflake-connector-python
    
    #paragraph 2
    import datetime
    import pymysql
    import pymssql
    from pyspark.sql.functions import *
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import *
    from pyspark import SparkContext
    from pyspark import SparkConf
    from pyspark.sql.types import *
    import json
    import re
    import pandas as pd
    import gzip
    import shutil
    from azure.storage.blob import BlobClient, ContainerClient, BlobServiceClient
    from enum import Enum
    
    # metadata info - all these vars are set in run
    metadata_info = {
        "workflow_id": ,
        "workflow_name": "",
        "Terra_version": "",
        "stage_connection_name": "",
        "stage_schema": "",
        "host": "c", #cs.metadata_host, 
        "database": "",             #cs.metadata_database,
        "port": 3306,                                        #cs.metadata_port,
        "user_name": "",                        #cs.metadata_user_name,
        "password": "",                      #cs.metadata_passwd  
    }
    
    # cluster configuration
    def init_spark_config(
        catalog_container: str,
        catalog_account: str,
        catalog_sas_token: str,
        catalog_name: str = "spark_catalog",
    ):
        conf = SparkConf()
        # following are needed for Spark/Hadoop/AzureBlob configuration
        # example setup https://medium.com/@rvaid.29/reading-and-writing-data-to-azure-blob-storage-using-pyspark-cc8ce2fd3470
    
    
        # in Databricks, these have to be set on compute cluster and appropriate jars needs to be loaded to cluster via init script or linked from maven repository
        conf.set(
            "spark.jars.packages",
            "org.apache.hadoop:hadoop-azure:3.4.0,com.microsoft.azure:azure-storage:8.6.6,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2",
        )
        conf.set(
            "spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        )
        conf.set("spark.sql.catalogImplementation", "hive")
        conf.set(
            "spark.sql.catalog.spark_catalog",
            "org.apache.iceberg.spark.SparkSessionCatalog",
        )
        conf.set(
            f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog"
        )
        conf.set(f"spark.sql.catalog.{catalog_name}.type", "hadoop")
    
        # seems these can be adjusted later and per session
        conf.set(
            f"spark.sql.catalog.{catalog_name}.warehouse",
            f"wasbs://{catalog_container}@{catalog_account}.blob.core.windows.net/{catalog_name}",
        )
        conf.set(
            f"spark.hadoop.fs.azure.sas.{catalog_container}.{catalog_account}.blob.core.windows.net",
            catalog_sas_token,
        )
        conf.set(
            f"fs.azure.sas.{catalog_container}.{catalog_account}.blob.core.windows.net",
            catalog_sas_token,
        )
    
        return conf
    
    def get_storage_info(metadata, purpose: str):
      # query storage info
      if purpose == 'stage':
        query = f"""
          select c.connection_type, c.connection_details, c.password, '{{"stage_schema":"{metadata['stage_schema']}"}}' as storage_details
          from connection_details c
          where c.connection_name = '{metadata['stage_connection_name']}'
        """
      else:
        storageDetailsField = 'w.source_details'
        connectionIdField = 'connection_id'
        if purpose == 'target': 
          storageDetailsField = 'w.target_details'
          connectionIdField = 'target_connection_id'
        query = f"""
          select c.connection_type, c.connection_details, c.password, {storageDetailsField} as storage_details
          from connection_details c
          left join workflow_details w on json_value({storageDetailsField} , '$.{connectionIdField}') = c.connection_id
          where w.workflow_id = {metadata['workflow_id']}
        """
      conn = pymysql.connect(host = metadata['host']
                            ,port=metadata['port']
                            ,user = metadata['user_name']
                            ,password = metadata['password']
                            ,db = metadata['database']
                            ,charset='utf8mb4'
                            ,cursorclass=pymysql.cursors.DictCursor)
      cursor = conn.cursor()  
      cursor.execute(query)
      conn_details=json.dumps(cursor.fetchone())
      conn.commit()
      cursor.close()
      conn.close()
      return json.loads(conn_details)
    
    metadata = metadata_info
    start_time = datetime.now()
    source = get_storage_info(metadata_info, purpose="source")
    stage = get_storage_info(metadata_info, purpose="stage")
    target = get_storage_info(metadata_info, purpose="target")
    
    
    sourceAccount = json.loads(source["connection_details"])[
        "Storage_Account_Name"
    ]
    sourceContainer = json.loads(source["connection_details"])[
        "Container_Name"
    ]
    sourceType = json.loads(source["storage_details"])["file_type"]
    sourcePass = json.loads(source["connection_details"])["sas_token"]
    
    stageAccount = json.loads(stage["connection_details"])[
        "Storage_Account_Name"
    ]
    stageContainer = json.loads(stage["connection_details"])[
        "Container_Name"
    ]
    stageSchema = json.loads(stage["storage_details"])["stage_schema"]
    stagePass = json.loads(stage["connection_details"])["sas_token"]
    
    targetType = json.loads(target["storage_details"])["target_type"]
    targetSchema = json.loads(target["storage_details"])["target_schema"]
    targetDatabase = json.loads(target["storage_details"])[
        "target_database"
    ]
    targetServer = json.loads(target["connection_details"])["host"]
    targetPort = json.loads(target["connection_details"])["port"]
    
    
    conf = init_spark_config(
            catalog_container=stageContainer,
            catalog_account=stageAccount,
            catalog_sas_token=stagePass,
            catalog_name=stageSchema,
        )
    
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
    # paragraph 3 
    data = [("Raj", "10"), ("Mitch", "20")]
    
    # Define schema
    schema = StructType([
        StructField("Name", StringType(), True),
        StructField("Age", StringType(), True)
    ])
    
    # Create DataFrame
    df = spark.createDataFrame(data, schema=schema)
    
    # Show DataFrame
    df.show()
    
    #paragraph 4
    catalog_name = 'data_vault_lake.test'
    
    def setup_spark_catalog(
        container_name: str, account_name: str, sas_token: str, catalog_name: str
    ):
        spark.conf.set(
            f"fs.azure.sas.{container_name}.{account_name}.blob.core.windows.net", sas_token
        )
        spark.conf.set(
            f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog"
        )
        spark.conf.set(f"spark.sql.catalog.{catalog_name}.type", "hadoop")
        spark.conf.set(
            f"spark.sql.catalog.{catalog_name}.warehouse",
            f"wasbs://{container_name}@{account_name}.blob.core.windows.net/{catalog_name}",
        )
    
    #paragraph 5
    
    tableName = 'writeTest1'
    df.writeTo(f"{catalog_name}.{tableName}").using("iceberg").create()
    
    #paragraph 6
    dfOut = spark.read.format("iceberg").load("data_vault_lake.test.writeTest1")
    display(dfOut)
    

    第 6 段错误:

    UnsupportedOperationException: Byte-buffer read unsupported by org.apache.hadoop.fs.BufferedFSInputStream
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task 0.3 in stage 11.0 (TID 47) (10.139.64.20 executor 1): java.lang.UnsupportedOperationException: Byte-buffer read unsupported by org.apache.hadoop.fs.BufferedFSInputStream
        at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:160)
        at com.databricks.spark.metrics.FSInputStreamWithMetrics.$anonfun$read$1(FileSystemWithMetrics.scala:77)
        at com.databricks.spark.metrics.FSInputStreamWithMetrics.withTimeAndBytesReadMetric(FileSystemWithMetrics.scala:67)
        at com.databricks.spark.metrics.FSInputStreamWithMetrics.read(FileSystemWithMetrics.scala:77)
        at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:156)
        at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream$H2Reader.read(H2SeekableInputStream.java:82)
        at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:91)
        at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:76)
        at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:584)
        at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799)
        at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
        at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:238)
        at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:81)
        at org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
        at org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
        at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:109)
        at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:41)
        at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:143)
        at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:122)
        at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:160)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:64)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:64)
        at scala.Option.exists(Option.scala:376)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:64)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:99)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:64)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
        at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.$anonfun$encodeUnsafeRows$5(UnsafeRowBatchUtils.scala:88)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.$anonfun$encodeUnsafeRows$3(UnsafeRowBatchUtils.scala:88)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.$anonfun$encodeUnsafeRows$1(UnsafeRowBatchUtils.scala:68)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
        at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$2(Collector.scala:214)
        at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
        at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
        at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
        at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
        at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
    
    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3908)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3830)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3817)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3817)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1695)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1680)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1680)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4154)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4066)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4054)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:54)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1357)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1345)
        at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:3000)
        at org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:355)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
        at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:299)
        at org.apache.spark.sql.execution.collect.Collector.$anonfun$collect$1(Collector.scala:384)
        at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
        at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:381)
        at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:122)
        at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:131)
        at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:94)
        at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:90)
        at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:78)
        at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:546)
        at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
        at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:540)
        at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:557)
        at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:400)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:400)
        at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:318)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:558)
        at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
        at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:555)
        at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3780)
        at org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:3771)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4727)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1103)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4725)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$9(SQLExecution.scala:392)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:700)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:277)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:164)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:637)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4725)
        at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3770)
        at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:322)
        at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:100)
        at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:848)
        at com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1491)
        at com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:286)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
        at py4j.Gateway.invoke(Gateway.java:306)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
        at java.lang.Thread.run(Thread.java:750)
    Caused by: java.lang.UnsupportedOperationException: Byte-buffer read unsupported by org.apache.hadoop.fs.BufferedFSInputStream
        at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:160)
        at com.databricks.spark.metrics.FSInputStreamWithMetrics.$anonfun$read$1(FileSystemWithMetrics.scala:77)
        at com.databricks.spark.metrics.FSInputStreamWithMetrics.withTimeAndBytesReadMetric(FileSystemWithMetrics.scala:67)
        at com.databricks.spark.metrics.FSInputStreamWithMetrics.read(FileSystemWithMetrics.scala:77)
        at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:156)
        at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream$H2Reader.read(H2SeekableInputStream.java:82)
        at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:91)
        at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.H2SeekableInputStream.readFully(H2SeekableInputStream.java:76)
        at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:584)
        at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799)
        at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
        at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:238)
        at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:81)
        at org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
        at org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
        at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:109)
        at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:41)
        at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:143)
        at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:122)
        at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:160)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:64)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:64)
        at scala.Option.exists(Option.scala:376)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:64)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:99)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:64)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
        at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.$anonfun$encodeUnsafeRows$5(UnsafeRowBatchUtils.scala:88)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.$anonfun$encodeUnsafeRows$3(UnsafeRowBatchUtils.scala:88)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.$anonfun$encodeUnsafeRows$1(UnsafeRowBatchUtils.scala:68)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62)
        at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$2(Collector.scala:214)
        at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
        at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
        at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
        at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
        at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
    
  • 您可以尝试通过 Maven 安装 org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.4.3 并检查它是否对您有帮助吗?

  • 我们尝试了 1.4.3、1.5.0、1.5.1、1.5.2、1.6.0。在 databricks 12.2 lts 运行时中一切都运行良好,但它的 python 版本是 3.9。我们正在 3.10 中进行开发,因此我们需要升级我们的 databricks 运行时版本才能达到该版本。

  • 此外,1.4.3 iceberg 运行时版本的 spark (3.3) 与 databricks 14.3 lts 运行时 (3.5) 不兼容

  • 访问字节缓冲区读取流时出现问题 loki 。Apache Iceberg 不支持该功能。

    将我们的集群配置更新为以下内容:

    DB 运行时:14.3 LTS(Scala 2.12、Spark 3.5)

    集群 Spark 配置:

    fs.azure org.apache.hadoop.fs.azure.NativeAzureFileSystemspark.hadoop.fs.wasbs.implorg.apache.hadoop.fs.azure.NativeAzureFileSystemspark.sql.catalog.spark_catalogorg.apache.iceberg.spark.SparkSessionCatalog fs.wasbs.implorg.apache.hadoop.fs.azure.NativeAzureFileSystem spark.sql.extensionsorg.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

    库:azure-storage:8.6.6 hadoop-azure:3.4.0iceberg-spark-runtime-3.5_2.12:1.5.2

    我们在 Iceberg 项目上发现了一个错误,一旦修复,这个问题就会得到解决: https://github.com/apache/iceberg/issues/10808

  • 引用 13

    在生产部署时,如何读取项目中的 db.properties 文件或任何其他 conf 文件?我收到此错误...24/05/09 16:34:32 INFO 客户端:客户端令牌:N/A

    在生产部署时如何读取项目中的 db.properties 文件或任何其他 conf 文件?

    我收到这个错误...

    24/05/09 16:34:32 INFO Client:
             client token: N/A
             diagnostics: User class threw exception: java.io.FileNotFoundException: File file:/app/hadoop/yarn/local/usercache/user/appcache/application_1715046519048_2131/container_e37_1715046519048_2131_02_000001/user/db.properties does not exist
            at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:641)
            at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:930)
            at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:631)
    

    首先我将其保存在本地,但没有成功,然后我将其保存在 hdfs 目录中

    这是我的 Scala 代码:-

    sparkContext.addFile("hdfs://user/db.properties")
    val propertiesFile = SparkFiles.get("hdfs://user/db.properties")
    val properties = new Properties()
    properties.load(new FileInputStream(propertiesFile))
    

    这是我用于运行 spark 应用程序的脚本。我的脚本中有必要的详细信息,但这里是文件的脚本。

    [spark-submit.sh]
    
     --files hdfs://user/db.properties \
    

    而且我还将 db.properties 保存在 src 文件夹之外,如下所示:-

    project-root/
     db.properties
       
       
       
     src/
         main/
             scala/
                 mysqlTohive.scala
    

    在生产中我将这样的文件保存在不同的路径中:

    -
    path-home/user/spark/    [where below files are there]
    -->mysqlTohive.scala
    -->db.properties
    
  • 仍然得到同样的错误...使用额外的/ [ \'hdfs:///user/db.properties\' ] 并且在我的 script.sh 中像这样 --files hdfs:///user/db.properties \

  • 是的,在我的 hdfs 中它位于 /user/db.properties 中,而在我的 jar 和文件所在的生产中它位于此路径 home/user/spark/ 中

  • 引用 16

    听起来文件实际上不在您告诉程序要查找的位置。 bin/hdfs dfs -ls <path> 如果您有权访问 HDFS 实用程序,例如,如果您可以通过 SSH 进入集群,我会使用尝试来探索路径。否则,您可以尝试从 Spark 程序中列出文件:

    val fs = FileSystem.get(sc.hadoopConfiguration)
    fs.listStatus(new Path("hdfs://"))
    

    如果您实际上 没有 在 HDFS 中保留文件的副本,但它在 spark-submit 执行文件的地方本地可用(例如,实际上引导到集群中的驱动程序节点),那么您可能只需使用本地、绝对或相对路径(即没有前缀 hdfs://

    properties.load(new java.io.FileInputStream(SparkFiles.get("db.properties"))) // or /path-home/user/spark/ etc.
    
  • vsh 2月前 0 只看Ta
    引用 17

    我也尝试过这个.....不起作用...........val properties = new Properties() val propertiesFile = new FileInputStream(\'db.properties\') properties.load(propertiesFile) propertiesFile.close()

  • 我是 Prometheus 的新手,目前我遇到了一个问题。所以我需要为服务实现 Prometheus 指标。我们使用的语言是 Scala,http lib 是 spray。简单的 startJvmMetrics.b...

    我是 Prometheus 的新手,目前我遇到了一个问题。所以我需要为服务实现 Prometheus 指标。我们使用的语言是 Scala,http lib 是 spray。

    简单的开始

    JvmMetrics.builder.register()
    val server = HTTPServer.builder.port(9345).buildAndStart
    

    第二个 prometheus http 服务器毫无原因地立即死机。没有错误,只是每次都停止\“我花了太多时间解决这个问题\”

    我决定做的是从 defaultRegistry 中抓取指标并通过我的 http 路由返回它们。

    在这里我需要帮助。我找不到任何地方如何将 Seq[MetricSnapshot] 字符串转换为

    # HELP jvm_buffer_pool_capacity_bytes Bytes capacity of a given JVM buffer pool.
    # TYPE jvm_buffer_pool_capacity_bytes gauge
    jvm_buffer_pool_capacity_bytes{pool="direct"} 16384.0
    jvm_buffer_pool_capacity_bytes{pool="mapped"} 0.0
    jvm_buffer_pool_capacity_bytes{pool="mapped - 'non-volatile memory'"} 0.0
    

    如果有人知道如何做或者知道一些有用的库,请告诉我。我很确定我不是唯一一个想直接从注册表获取普罗米修斯指标的人

    PS 如果你碰巧知道任何其他解决方案,也请留下评论

  • 不要自己做这个,使用库!你最好调查一下 Prometheus HTTP 服务器为什么会立即停止,或者使用 Prometheus 导出器模块切换到 OpenTelemetry。

  • 引用 20

    您在哪里/如何启动 HTTP 服务器?我不知道 Spray,但这必须在某种类型的 \'service\' 单例中积极启动,而不是采用随机方法。

返回
作者最近主题: